In [1]:
from pyspark.sql import SparkSession

# Crear sesión de Spark si no la tienes ya
spark = SparkSession.builder.appName("AnalisisVentas").getOrCreate()

# Cargar el archivo CSV
df = spark.read.csv("work/FactVentas.csv", header=True, inferSchema=True)

In [2]:
from pyspark.sql.functions import col

ventas_por_producto = df.groupBy("ID_Producto") \
    .agg({"Cantidad": "sum", "Total_Venta": "sum"}) \
    .withColumnRenamed("sum(Cantidad)", "Cantidad_Total") \
    .withColumnRenamed("sum(Total_Venta)", "Venta_Total")

ventas_por_producto.show()

+-----------+--------------+------------------+
|ID_Producto|Cantidad_Total|       Venta_Total|
+-----------+--------------+------------------+
|        148|            60|439.59000000000003|
|         31|            70|            257.04|
|        137|            74|            749.54|
|         85|           100|1453.4500000000003|
|         65|            70|            381.94|
|         53|           101|           1423.86|
|        133|            73|1380.8100000000002|
|         78|           122|           1723.34|
|        108|            66| 364.7100000000001|
|         34|            61|            784.78|
|        115|            51| 474.6100000000001|
|        126|            66|            708.45|
|        101|            33|            477.61|
|         81|            69| 940.7700000000002|
|         28|            55| 788.6899999999999|
|         76|            74|1412.1000000000004|
|         26|           101| 723.5600000000001|
|         27|           103|2126.4100000

In [3]:
from pyspark.ml.feature import VectorAssembler

# Convertir columnas numéricas en vector de entrada
assembler = VectorAssembler(
    inputCols=["Cantidad_Total", "Venta_Total"],
    outputCol="features"
)

data_cluster = assembler.transform(ventas_por_producto)

In [4]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=3, seed=1, featuresCol="features", predictionCol="cluster")
modelo = kmeans.fit(data_cluster)
resultado = modelo.transform(data_cluster)

resultado.select("ID_Producto", "Cantidad_Total", "Venta_Total", "cluster").show()

+-----------+--------------+------------------+-------+
|ID_Producto|Cantidad_Total|       Venta_Total|cluster|
+-----------+--------------+------------------+-------+
|        148|            60|439.59000000000003|      1|
|         31|            70|            257.04|      1|
|        137|            74|            749.54|      0|
|         85|           100|1453.4500000000003|      2|
|         65|            70|            381.94|      1|
|         53|           101|           1423.86|      2|
|        133|            73|1380.8100000000002|      2|
|         78|           122|           1723.34|      2|
|        108|            66| 364.7100000000001|      1|
|         34|            61|            784.78|      0|
|        115|            51| 474.6100000000001|      1|
|        126|            66|            708.45|      0|
|        101|            33|            477.61|      1|
|         81|            69| 940.7700000000002|      0|
|         28|            55| 788.6899999999999| 

In [5]:
resultado.select("ID_Producto", "Cantidad_Total", "Venta_Total", "cluster") \
    .toPandas().to_csv("/home/jovyan/work/segmentacion_productos.csv", index=False)

In [6]:
from pyspark.sql.functions import col

ventas_sucursal = df.groupBy("ID_Sucursal") \
    .agg({"Total_Venta": "sum", "Cantidad": "sum"}) \
    .withColumnRenamed("sum(Total_Venta)", "TotalSucursal") \
    .withColumnRenamed("sum(Cantidad)", "CantidadSucursal") \
    .withColumn("TicketPromedio", col("TotalSucursal") / col("CantidadSucursal"))

ventas_sucursal.show()

+-----------+----------------+------------------+------------------+
|ID_Sucursal|CantidadSucursal|     TotalSucursal|    TicketPromedio|
+-----------+----------------+------------------+------------------+
|          1|            1801|          21962.65|12.194697390338701|
|          6|            1822| 22609.26999999999|12.409039517014264|
|          3|            1595|19364.449999999997|12.140721003134795|
|          5|            1830|22992.919999999987| 12.56443715846994|
|          4|            1644|          19106.99|11.622256690997569|
|          2|            1530|18231.369999999977|11.915928104575148|
+-----------+----------------+------------------+------------------+



In [7]:
from pyspark.sql.functions import month

ventas_por_mes = df.withColumn("Mes", month("Fecha")) \
    .groupBy("Mes") \
    .sum("Total_Venta") \
    .orderBy("Mes")

ventas_por_mes.show()

+---+------------------+
|Mes|  sum(Total_Venta)|
+---+------------------+
|  1| 21568.70999999999|
|  2| 19775.99999999999|
|  3|19870.000000000004|
|  4|15819.119999999997|
|  5|12099.439999999997|
|  6| 5112.180000000002|
|  7|5804.0199999999995|
|  8| 4684.899999999999|
|  9|           5438.56|
| 10| 3843.790000000001|
| 11| 4677.189999999999|
| 12| 5573.739999999999|
+---+------------------+



In [8]:
ventas_sucursal.toPandas().to_csv("/home/jovyan/work/ventas_por_sucursal.csv", index=False)

In [None]:

resultado.select("ID_Producto", "Cantidad_Total", "Venta_Total", "cluster").orderBy("cluster").show()

+-----------+--------------+-----------------+-------+
|ID_Producto|Cantidad_Total|      Venta_Total|cluster|
+-----------+--------------+-----------------+-------+
|         64|            31|           688.65|      0|
|         10|            67|           874.52|      0|
|        117|            67|           632.59|      0|
|         48|            70|772.3099999999998|      0|
|         43|            90|          1161.52|      0|
|         34|            61|           784.78|      0|
|         61|           106|          1060.28|      0|
|         81|            69|940.7700000000002|      0|
|        127|            73|730.0999999999999|      0|
|         26|           101|723.5600000000001|      0|
|        107|            69|884.9300000000003|      0|
|         12|           100|            714.9|      0|
|         17|            77|798.3000000000002|      0|
|         93|            72|          1152.16|      0|
|          9|            71|           768.36|      0|
|         

In [12]:
# Cargar otras tablas
productos = spark.read.csv("work/DimProducto.csv", header=True, inferSchema=True)
sucursales = spark.read.csv("work/DimSucursal.csv", header=True, inferSchema=True)

# Unir productos con ventas
ventas_enriquecidas = df.join(productos, df.ID_Producto == productos.ID_Producto) \
                        .join(sucursales, df.ID_Sucursal == sucursales.ID_Sucursal)

ventas_enriquecidas.select("ID_Venta", "NombreProducto", "Nombre", "Total_Venta").show(5)

+--------+----------------+--------------------+-----------+
|ID_Venta|  NombreProducto|              Nombre|Total_Venta|
+--------+----------------+--------------------+-----------+
|       1|Guantes de Látex|Sucursal La Libertad|      27.18|
|       2|  Enjuague Bucal| Sucursal San Miguel|      50.19|
|       3|Guantes de Látex|  Sucursal Santa Ana|      19.88|
|       4|Vendas Elásticas|    Sucursal Escalón|       9.17|
|       5|Mascarillas KN95|Sucursal La Libertad|      11.48|
+--------+----------------+--------------------+-----------+
only showing top 5 rows



In [None]:
inventario = spark.read.csv("work/DimInventario.csv", header=True, inferSchema=True)


# Join con producto
ventas_stock = ventas_por_producto.join(inventario, "ID_Producto")

# Calcular rotación = ventas / stock
ventas_stock = ventas_stock.withColumn("Rotacion", col("Cantidad_Total") / col("StockActual"))

ventas_stock.select("ID_Producto", "Cantidad_Total", "StockActual", "Rotacion").show()

+-----------+--------------+-----------+-------------------+
|ID_Producto|Cantidad_Total|StockActual|           Rotacion|
+-----------+--------------+-----------+-------------------+
|        148|            60|        272|0.22058823529411764|
|        148|            60|        120|                0.5|
|        148|            60|         45| 1.3333333333333333|
|         31|            70|        240| 0.2916666666666667|
|         31|            70|        325| 0.2153846153846154|
|         85|           100|        176| 0.5681818181818182|
|         85|           100|        115| 0.8695652173913043|
|         65|            70|        154|0.45454545454545453|
|         65|            70|        240| 0.2916666666666667|
|         53|           101|        111| 0.9099099099099099|
|         53|           101|        218|  0.463302752293578|
|        133|            73|         89| 0.8202247191011236|
|        133|            73|        443|0.16478555304740405|
|        108|           

In [None]:
from pyspark.sql.functions import col

# Paso 1: Cargar archivo de fechas 
fechas = spark.read.csv("work/DimFecha.csv", header=True, inferSchema=True)

# Paso 2: Hacer join entre ventas y fechas
ventas_fechadas = df.join(fechas, df.ID_Fecha == fechas.ID_Fecha)

# Paso 3: Agrupar por mes
ventas_mes = ventas_fechadas.groupBy("Mes") \
    .sum("Total_Venta") \
    .withColumnRenamed("sum(Total_Venta)", "Ventas_Mensuales") \
    .orderBy("Mes")

ventas_mes.show()

# Paso 4 : Agrupar por Año y Mes
ventas_anyo_mes = ventas_fechadas.groupBy("Año", "Mes") \
    .sum("Total_Venta") \
    .withColumnRenamed("sum(Total_Venta)", "Ventas_Por_Mes") \
    .orderBy("Año", "Mes")

ventas_anyo_mes.show()

# Paso 5 : Agrupar por Semana
ventas_semana = ventas_fechadas.groupBy("Semana") \
    .sum("Total_Venta") \
    .withColumnRenamed("sum(Total_Venta)", "Ventas_Semanales") \
    .orderBy("Semana")

ventas_semana.show()

# Paso 6 : Exportar ventas por mes 
ventas_anyo_mes.toPandas().to_csv("/home/jovyan/work/ventas_por_mes.csv", index=False)


+---+------------------+
|Mes|  Ventas_Mensuales|
+---+------------------+
|  1| 21568.70999999999|
|  2| 19775.99999999999|
|  3|19870.000000000004|
|  4|15819.119999999997|
|  5|12099.439999999997|
|  6| 5112.180000000002|
|  7|5804.0199999999995|
|  8| 4684.899999999999|
|  9|           5438.56|
| 10| 3843.790000000001|
| 11| 4677.189999999999|
| 12| 5573.739999999999|
+---+------------------+

+----+---+------------------+
| Año|Mes|    Ventas_Por_Mes|
+----+---+------------------+
|2023|  1| 5168.200000000001|
|2023|  2| 4528.550000000001|
|2023|  3|5582.5599999999995|
|2023|  4|           4156.23|
|2023|  5|            4786.5|
|2023|  6| 5112.180000000002|
|2023|  7|5804.0199999999995|
|2023|  8| 4684.899999999999|
|2023|  9|           5438.56|
|2023| 10| 3843.790000000001|
|2023| 11| 4677.189999999999|
|2023| 12| 5573.739999999999|
|2024|  1|          16400.51|
|2024|  2|          15247.45|
|2024|  3|14287.440000000008|
|2024|  4|11662.890000000001|
|2024|  5| 7312.939999999999|

In [20]:
ventas_mes.toPandas().to_csv("/home/jovyan/work/ventas_por_mes_simple.csv", index=False)
ventas_semana.toPandas().to_csv("/home/jovyan/work/ventas_por_semana.csv", index=False)

In [None]:
# --------------------------------------------
# Análisis por Categoría de Producto
# --------------------------------------------

# Cargar DimProducto 
productos = spark.read.csv("work/DimProducto.csv", header=True, inferSchema=True)

# Unir ventas con productos
ventas_categoria = df.join(productos, "ID_Producto")

# Agrupar por categoría
ventas_por_categoria = ventas_categoria.groupBy("Categoria") \
    .sum("Total_Venta") \
    .withColumnRenamed("sum(Total_Venta)", "VentasTotales") \
    .orderBy("VentasTotales", ascending=False)

ventas_por_categoria.show()

# Guardar CSV
ventas_por_categoria.toPandas().to_csv("/home/jovyan/work/ventas_por_categoria.csv", index=False)

+----------------+------------------+
|       Categoria|     VentasTotales|
+----------------+------------------+
|     Medicamento| 61454.97999999998|
|         Higiene|21914.890000000003|
|      Suplemento| 20733.24999999999|
|Cuidado Personal|          20164.53|
+----------------+------------------+



In [None]:
# --------------------------------------------
# Productos en Riesgo de Agotarse
# --------------------------------------------

from pyspark.sql.functions import col, when

# Unir ventas con inventario 
ventas_por_producto = df.groupBy("ID_Producto", "ID_Sucursal") \
    .agg({"Cantidad": "sum"}) \
    .withColumnRenamed("sum(Cantidad)", "CantidadVendida")

inventario = spark.read.csv("work/DimInventario.csv", header=True, inferSchema=True)

ventas_stock = ventas_por_producto.join(inventario, ["ID_Producto", "ID_Sucursal"])

# Calcular rotación
ventas_stock = ventas_stock.withColumn("Rotacion", col("CantidadVendida") / col("StockActual"))

# Clasificar nivel de riesgo
ventas_stock = ventas_stock.withColumn(
    "NivelDeRiesgo",
    when((col("Rotacion") >= 1) & (col("StockActual") <= 200), "ALTO")
    .when((col("Rotacion") >= 0.5) & (col("StockActual") <= 300), "MEDIO")
    .otherwise("BAJO")
)

# Mostrar tabla completa con riesgo
ventas_stock.select("ID_Producto", "ID_Sucursal", "CantidadVendida", "StockActual", "Rotacion", "NivelDeRiesgo", "FechaDeRestock").show()



+-----------+-----------+---------------+-----------+--------------------+-------------+--------------+
|ID_Producto|ID_Sucursal|CantidadVendida|StockActual|            Rotacion|NivelDeRiesgo|FechaDeRestock|
+-----------+-----------+---------------+-----------+--------------------+-------------+--------------+
|         48|          6|             18|        362|0.049723756906077346|         BAJO|    2025-06-27|
|         62|          1|              6|        105| 0.05714285714285714|         BAJO|    2025-06-16|
|        105|          1|             20|        435| 0.04597701149425287|         BAJO|    2025-06-20|
|         91|          6|             11|         30| 0.36666666666666664|         BAJO|    2025-08-02|
|         16|          3|              5|        353|0.014164305949008499|         BAJO|    2025-07-19|
|        112|          2|             13|        481| 0.02702702702702703|         BAJO|    2025-07-25|
|          6|          1|             21|         19|   1.105263

In [27]:
ventas_stock.select(
    "ID_Producto", "ID_Sucursal", "CantidadVendida",
    "StockActual", "Rotacion", "NivelDeRiesgo", "FechaDeRestock"
).toPandas().to_csv("/home/jovyan/work/productos_nivel_riesgo.csv", index=False)

In [None]:
#baja rotacion mensual
# Paso 1: Cargar DimFecha si no lo hiciste ya
fechas = spark.read.csv("work/DimFecha.csv", header=True, inferSchema=True)

# Paso 2: Unir ventas con fecha
ventas_fechadas = df.join(fechas, df.ID_Fecha == fechas.ID_Fecha)

# Paso 3: Agrupar por producto y mes
ventas_mensuales = ventas_fechadas.groupBy("ID_Producto", "Mes") \
    .sum("Cantidad") \
    .withColumnRenamed("sum(Cantidad)", "CantidadMensual")

# Paso 4: Filtrar productos con baja rotación
productos_baja_rotacion = ventas_mensuales.filter(col("CantidadMensual") < 5)

# Mostrar resultado
productos_baja_rotacion.orderBy("CantidadMensual").show()

# Exportar a CSV
productos_baja_rotacion.toPandas().to_csv("/home/jovyan/work/productos_baja_rotacion_mensual.csv", index=False)

+-----------+---+---------------+
|ID_Producto|Mes|CantidadMensual|
+-----------+---+---------------+
|        104|  2|              1|
|         67|  4|              1|
|         35| 10|              1|
|        141| 10|              1|
|        138|  9|              1|
|         68| 11|              1|
|         45|  6|              1|
|        139| 11|              1|
|         14|  6|              1|
|        120| 10|              1|
|         58|  9|              1|
|         45|  5|              1|
|         44|  6|              1|
|         43|  3|              1|
|         25|  6|              1|
|        115|  1|              1|
|         62|  2|              1|
|        130| 12|              1|
|         22|  6|              1|
|        125|  9|              1|
+-----------+---+---------------+
only showing top 20 rows

