el objetivo es hacer una etl con spark a un conjunto de datos de un archivo excel que también fue sometido a una transformación con pandas

In [1]:
#COMIENZO ETL CON SPARK

from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import expr, to_date, split, explode, posexplode, concat_ws
import time

spark = SparkSession.builder.appName("ETL").getOrCreate()

In [31]:
inicio = time.time()

In [32]:
#leer archivo .xlsx

df_pandas = pd.read_excel('/content/Datos.xlsx', sheet_name="Hoja2 RF04", skiprows=4)


In [33]:
#Auxiliares

meses = {"ene": "01", "feb": "02", "mar": "03", "abr": "04", "may": "05", "jun": "06",
         "jul": "07", "ago": "08", "sep": "09", "oct": "10", "nov": "11", "dic": "12"}

cols_base = ['CANAL', 'GRUPO', 'CODIGO']

cajas = [
    'abr-25', 'may-25', 'jun-25', 'jul-25', 'ago-25', 'sep-25',
    'oct-25', 'nov-25', 'dic-25', 'ene-26', 'feb-26', 'mar-26'
]

precios = [
    'abr_25-Pr Netos', 'may_25-Pr Netos', 'jun_25-Pr Netos', 'jul_25-Pr Netos',
    'ago_25-Pr Netos', 'sep_25-Pr Netos', 'oct_25-Pr Netos', 'nov_25-Pr Netos',
    'dic_25-Pr Netos', 'ene_26-Pr Netos', 'feb_26-Pr Netos', 'mar_26-Pr Netos'
]

fap = [
    'Abr_25 FAP', 'May_25 FAP', 'Jun_25 FAP', 'Jul_25 FAP',
    'Ago_25 FAP', 'Sep_25 FAP', 'Oct_25 FAP', 'Nov_25 FAP',
    'Dic_25 FAP', 'Ene_26 FAP', 'Feb_26 FAP', 'Mar_26 FAP'
]

ptp = [
    'Abr_25 PTP $$', 'May_25 PTP $$', 'Jun_25 PTP $$', 'Jul_25 PTP $$',
    'Ago_25 PTP $$', 'Sep_25 PTP $$', 'Oct_25 PTP $$', 'Nov_25 PTP $$',
    'Dic_25 PTP $$', 'Ene_26 PTP $$', 'Feb_26 PTP $$', 'Mar_26 PTP $$'
]

# Selección total
columnas_seleccionadas = cols_base + cajas + precios + fap + ptp
df = df_pandas[columnas_seleccionadas]

In [34]:
df_spark = spark.createDataFrame(df)
df_spark.cache()


DataFrame[CANAL: string, GRUPO: string, CODIGO: bigint, abr-25: double, may-25: double, jun-25: double, jul-25: double, ago-25: double, sep-25: double, oct-25: double, nov-25: double, dic-25: double, ene-26: double, feb-26: double, mar-26: double, abr_25-Pr Netos: double, may_25-Pr Netos: double, jun_25-Pr Netos: double, jul_25-Pr Netos: double, ago_25-Pr Netos: double, sep_25-Pr Netos: double, oct_25-Pr Netos: double, nov_25-Pr Netos: double, dic_25-Pr Netos: double, ene_26-Pr Netos: double, feb_26-Pr Netos: double, mar_26-Pr Netos: double, Abr_25 FAP: double, May_25 FAP: double, Jun_25 FAP: double, Jul_25 FAP: double, Ago_25 FAP: double, Sep_25 FAP: double, Oct_25 FAP: double, Nov_25 FAP: double, Dic_25 FAP: double, Ene_26 FAP: double, Feb_26 FAP: double, Mar_26 FAP: double, Abr_25 PTP $$: double, May_25 PTP $$: double, Jun_25 PTP $$: double, Jul_25 PTP $$: double, Ago_25 PTP $$: double, Sep_25 PTP $$: double, Oct_25 PTP $$: double, Nov_25 PTP $$: double, Dic_25 PTP $$: double, Ene_2

In [35]:
# df_spark.printSchema()

In [36]:
#Funcion para despivotar columnas y generar df distintos

def unpivot(df, id_cols, value_cols, var_name="Fecha", val_name="Valor"):

    stack_expr = f"stack({len(value_cols)}, " + \
                 ", ".join([f"'{col}', `{col}`" for col in value_cols]) + \
                 f") as ({var_name}, {val_name})"

    return df.select(*id_cols, expr(stack_expr))

In [37]:
df_cajas = unpivot(df_spark, id_cols=cols_base, value_cols=cajas)
df_fap = unpivot(df_spark, id_cols=cols_base, value_cols= fap)
df_ptp = unpivot(df_spark, id_cols=cols_base, value_cols=ptp)
df_precios = unpivot(df_spark, id_cols=cols_base, value_cols=precios)

In [38]:
#limpiar columna Fecha en df_cajas y cambiar a tipo date

df_cajas = df_cajas.withColumn(
    "Fecha", expr("substring(Fecha, 1, 6)")
      ).withColumn("Fecha", expr("lower(Fecha)")
    ).withColumn("mes", (split("Fecha", "-").getItem(0))
    ).withColumn("año", (split("Fecha", "-").getItem(1))
    ).replace(to_replace=meses, subset=["mes"]).drop("Fecha")

df_cajas = df_cajas.withColumn(
    "Fecha", concat_ws("-", "mes", "año", )
    ).withColumn("Fecha", to_date("Fecha", "MM-yy")
    ).withColumnRenamed("Valor", "cajas").drop("mes", "año")


In [39]:
def limpieza(df , nombre = "Valor"): #esta funcion es para fap, precios y ptp pues tienen misma estructura

  df = df.withColumn(
    "Fecha", expr("substring(Fecha, 1, 6)")
      ).withColumn("Fecha", expr("lower(Fecha)")
    ).withColumn("mes", (split("Fecha", "_").getItem(0))
    ).withColumn("año", (split("Fecha", "_").getItem(1))
    ).replace(to_replace=meses, subset=["mes"]).drop("Fecha")

  df = df.withColumn(
      "Fecha", concat_ws("-", "mes", "año", )
      ).withColumn("Fecha", to_date("Fecha", "MM-yy")
      ).drop("mes", "año")

  df = df.withColumnRenamed("Valor", nombre)
  return df

In [40]:
df_fap = limpieza(df_fap, "fap")
df_ptp = limpieza(df_ptp, "ptp")
df_precios = limpieza(df_precios, "precios")


In [41]:
# display(df_cajas)
# display(df_fap)
# display(df_ptp)
# display(df_precios)

In [42]:
# join desde cajas a cada uno de los df
df_fin = df_cajas.join(df_fap, on=["CANAL", "GRUPO", "CODIGO", "Fecha"], how="left").join(
    df_ptp, on=["CANAL", "GRUPO", "CODIGO", "Fecha"], how="left").join(
    df_precios, on=["CANAL", "GRUPO", "CODIGO", "Fecha"], how="left")


In [43]:
df_fin = df_fin.withColumn("fact_operativa", (df_fin.cajas * df_fin.precios)/1000 )
df_fin.show(5)

+-----+-------+-------+----------+-------+---+---------+-------+--------------+
|CANAL|  GRUPO| CODIGO|     Fecha|  cajas|fap|      ptp|precios|fact_operativa|
+-----+-------+-------+----------+-------+---+---------+-------+--------------+
| GGCC|WALMART|1060553|2025-05-01| 4500.0|0.0|      0.0|30178.0|      135801.0|
| GGCC|WALMART|1060553|2025-07-01|15000.0|0.0| 117694.2|30178.0|      452670.0|
| GGCC|WALMART|1060553|2025-09-01|14000.0|0.0|109847.92|30178.0|      422492.0|
| GGCC|WALMART|1060553|2025-06-01| 4500.0|0.0|      0.0|30178.0|      135801.0|
| GGCC|WALMART|1060553|2025-04-01| 2500.0|0.0|  19615.7|30178.0|       75445.0|
+-----+-------+-------+----------+-------+---+---------+-------+--------------+
only showing top 5 rows



In [44]:
fin = time.time()
print(f"tiempo de lectura: {fin-inicio:.2f} segundos ")

tiempo de lectura: 88.10 segundos 


In [45]:
df_fin.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [CANAL#1339, GRUPO#1340, CODIGO#1341L, Fecha#1780, cajas#1788, fap#1870, ptp#1945, precios#2020, ((cajas#1788 * precios#2020) / 1000.0) AS fact_operativa#2200]
   +- BroadcastHashJoin [CANAL#1339, GRUPO#1340, CODIGO#1341L, Fecha#1780], [CANAL#2141, GRUPO#2142, CODIGO#2143L, Fecha#2007], LeftOuter, BuildRight, false
      :- Project [CANAL#1339, GRUPO#1340, CODIGO#1341L, Fecha#1780, cajas#1788, fap#1870, ptp#1945]
      :  +- BroadcastHashJoin [CANAL#1339, GRUPO#1340, CODIGO#1341L, Fecha#1780], [CANAL#2083, GRUPO#2084, CODIGO#2085L, Fecha#1932], LeftOuter, BuildRight, false
      :     :- Project [CANAL#1339, GRUPO#1340, CODIGO#1341L, Fecha#1780, cajas#1788, fap#1870]
      :     :  +- BroadcastHashJoin [CANAL#1339, GRUPO#1340, CODIGO#1341L, Fecha#1780], [CANAL#2026, GRUPO#2027, CODIGO#2028L, Fecha#1857], LeftOuter, BuildRight, false
      :     :     :- Project [CANAL#1339, GRUPO#1340, CODIGO#1341L, Valor#1697 AS cajas#