# An√°lisis de Ventas con PySpark
Este notebook realiza un an√°lisis de datos de ventas usando Spark.

In [None]:
# Importar configuraciones
import sys
import os
sys.path.append(os.path.abspath('./src'))
os.makedirs("resultados", exist_ok=True)

from config.spark_config import SparkSession
from etl.transformaciones import TransformacionesVentas
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

spark = SparkSession.getActiveSession()
if spark:
    spark.stop()

# Inicializar Spark
spark = SparkSession.builder \
    .appName("AnalisisVentas-Notebook") \
    .enableHiveSupport() \
    .getOrCreate()

transformaciones = TransformacionesVentas(spark)
spark.conf.get("spark.sql.catalogImplementation")

# Cargar datos
Se importan los DataFrames de ventas y productos para iniciar el an√°lisis.

In [None]:
# Cargar datos
ventas_df, productos_df = transformaciones.cargar_datos()

print("üìä Vista previa de ventas:")
ventas_df.show(5)

print("\nüì¶ Vista previa de productos:")
productos_df.show(5)

print(f"\nTotal ventas: {ventas_df.count()}")
print(f"Total productos: {productos_df.count()}")

# Esquema de datos
Verificaci√≥n de los tipos de datos de cada columna.

In [None]:
print("Esquema ventas:")
ventas_df.printSchema()

print("\nEsquema productos:")
productos_df.printSchema()

# Calcular m√©tricas y An√°lisis Temporal
C√°lculo de ingresos por categor√≠a, an√°lisis de ventas por fecha y identificaci√≥n de mejores clientes.

In [None]:
# Calcular m√©tricas
ventas_completas_df, metricas_df = transformaciones.calcular_metricas(ventas_df, productos_df)

print("üìà M√©tricas por categor√≠a:")
metricas_df.show()

# An√°lisis temporal
analisis_temporal_df = transformaciones.analisis_temporal(ventas_df)

print("üìÖ An√°lisis por fecha:")
analisis_temporal_df.show()

# Top clientes
top_clientes_df = transformaciones.top_clientes(ventas_df, top_n=5)

print("üëë Top 5 clientes:")
top_clientes_df.show()

In [None]:
# An√°lisis adicional: Ventas por producto
ventas_por_producto = ventas_completas_df.groupBy("nombre", "categoria") \
    .agg(
        F.sum("cantidad").alias("unidades_vendidas"),
        F.sum("venta_total").alias("ingresos"),
        F.round(F.avg("precio_unitario"), 2).alias("precio_promedio")
    ) \
    .orderBy(F.desc("ingresos"))

print("üèÜ Productos m√°s vendidos:")
ventas_por_producto.show()

# Visualizaci√≥n
Representaci√≥n gr√°fica de los ingresos por categor√≠a usando Matplotlib.

In [None]:
try:
    import matplotlib.pyplot as plt
    
    # Convertir a pandas para visualizaci√≥n
    metricas_pandas = metricas_df.toPandas()
    
    plt.figure(figsize=(10, 6))
    plt.bar(metricas_pandas['categoria'], metricas_pandas['ingresos_totales'])
    plt.title('Ingresos por Categor√≠a')
    plt.xlabel('Categor√≠a')
    plt.ylabel('Ingresos Totales')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
    
except ImportError:
    print("Matplotlib no disponible para visualizaci√≥n")

# Guardar resultados
Exportaci√≥n de los datos procesados a formato Parquet (optimizada para Big Data).

In [None]:
import os
print("üíæ Guardando resultados...")

# Obtener ruta base del proyecto din√°micamente
project_root = os.getcwd()
output_dir = os.path.join(project_root, "resultados")
os.makedirs(output_dir, exist_ok=True)

# Convertir a formato que Spark entiende
output_uri = f"file://{output_dir}"

# Guardar
metricas_df.write.mode("overwrite").parquet(f"{output_uri}/metricas_ventas.parquet")
ventas_completas_df.write.mode("overwrite").parquet(f"{output_uri}/ventas_completas.parquet")

print("‚úÖ Resultados guardados en formato Parquet")

# Consultas SQL con Hive
Uso de Spark SQL para realizar consultas sobre tablas temporales.

In [None]:
try:
    # Crear tabla Hive temporal
    ventas_completas_df.createOrReplaceTempView("ventas_completas")
    
    print("üîç Consulta SQL a trav√©s de Hive:")
    resultado_sql = spark.sql("""
        SELECT categoria, 
               SUM(venta_total) as ingresos_totales,
               AVG(precio_unitario) as precio_promedio
        FROM ventas_completas
        GROUP BY categoria
        ORDER BY ingresos_totales DESC
    """)
    resultado_sql.show()
    
except Exception as e:
    print(f"Hive no disponible: {e}")

# Detener Spark
from config.spark_config import detener_spark_session
detener_spark_session(spark)