## Paso 1: Cargar el Dataset

In [0]:
# Montar el contenedor de Azure Data Lake Storage (ADLS)
dbutils.fs.mount(
    source = "",
    mount_point = "/mnt/",
    extra_configs = {"": ""}
)



True

In [0]:

# Cargar el dataset desde Azure Data Lake Storage
df = spark.read.csv("/mnt/datalake/retail_dataset.csv", header=True, inferSchema=True)

# Mostrar las primeras filas del dataset
df.show(5)

+---+----------+-----------+--------------+------------+-----+--------+------------+
|_c0|      Date|Customer_ID|Transaction_ID|SKU_Category|  SKU|Quantity|Sales_Amount|
+---+----------+-----------+--------------+------------+-----+--------+------------+
|  1|2016-01-02|       2547|             1|         X52|0EM7L|     1.0|        3.13|
|  2|2016-01-02|        822|             2|         2ML|68BRQ|     1.0|        5.46|
|  3|2016-01-02|       3686|             3|         0H2|CZUZX|     1.0|        6.35|
|  4|2016-01-02|       3719|             4|         0H2|549KK|     1.0|        5.59|
|  5|2016-01-02|       9200|             5|         0H2|K8EHH|     1.0|        6.88|
+---+----------+-----------+--------------+------------+-----+--------+------------+
only showing top 5 rows



In [0]:
# Mostrar esquema inicial del dataset
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Customer_ID: integer (nullable = true)
 |-- Transaction_ID: integer (nullable = true)
 |-- SKU_Category: string (nullable = true)
 |-- SKU: string (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- Sales_Amount: double (nullable = true)



## Paso 2: Limpieza de Datos
2.1. Identificar columnas con todos los valores nulos

Este paso identifica y elimina columnas que contienen únicamente valores nulos.

In [0]:
from pyspark.sql.functions import col, when, isnan, count

# Obtener el total de filas en el dataset
total_rows = df.count()

# Contar los valores nulos por columna en una sola ejecución
null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0]

# Identificar columnas con todos los valores nulos
all_null_columns = [c for c, null_count in zip(df.columns, null_counts) if null_count == total_rows]

# Eliminar columnas completamente nulas
df_cleaned = df.drop(*all_null_columns)

# Mostrar las columnas restantes
df_cleaned.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Customer_ID: integer (nullable = true)
 |-- Transaction_ID: integer (nullable = true)
 |-- SKU_Category: string (nullable = true)
 |-- SKU: string (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- Sales_Amount: double (nullable = true)



2.2. Conversión de tipos de datos erróneos

Verificamos y aseguramos que las columnas con tipos de datos numéricos estén correctamente casteadas. Según tu esquema, ya tenemos los tipos correctos (integer, double, date), pero este código verifica y ajusta los datos si es necesario.

In [0]:
# Aseguramos la conversión correcta de tipos en columnas numéricas y de fecha
df_cleaned = df_cleaned.withColumn("Customer_ID", col("Customer_ID").cast("integer")) \
                       .withColumn("Transaction_ID", col("Transaction_ID").cast("integer")) \
                       .withColumn("Quantity", col("Quantity").cast("double")) \
                       .withColumn("Sales_Amount", col("Sales_Amount").cast("double"))

# Mostrar el esquema actualizado
df_cleaned.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Customer_ID: integer (nullable = true)
 |-- Transaction_ID: integer (nullable = true)
 |-- SKU_Category: string (nullable = true)
 |-- SKU: string (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- Sales_Amount: double (nullable = true)



2.3. Llenar o eliminar valores nulos según contexto

Llenamos valores nulos en columnas clave o eliminamos filas donde faltan datos críticos, por ejemplo, transacciones sin Sales_Amount o Quantity.

In [0]:
# Rellenar nulos con valores por defecto en columnas críticas si es necesario
df_cleaned = df_cleaned.fillna({'Quantity': 0, 'Sales_Amount': 0})

# Eliminar filas con demasiados nulos si es necesario
df_cleaned = df_cleaned.na.drop(subset=["Customer_ID", "Transaction_ID"])

# Mostrar un resumen después de la limpieza
df_cleaned.describe().show()


+-------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|              _c0|       Customer_ID|    Transaction_ID|      SKU_Category|               SKU|          Quantity|      Sales_Amount|
+-------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|           131706|            131706|            131706|            131706|            131706|            131706|            131706|
|   mean|          65853.5|12386.450366725889|32389.604186597422| 399.9587628865979| 52972.52046783626|1.4853114436699932|11.981524152278366|
| stddev|38020.39161423775| 6086.447551524349|18709.901237886686|220.36654679976633|31885.645096065182| 3.872667435765362|  19.3596994942029|
|    min|                1|                 1|                 1|               01F|             00GVC|              0.01|              0.02|
|    m

## Paso 3: Identificación y Aplicación de Particionamiento

3.1. Evaluar el tamaño del dataset y decidir particionamiento

Identificamos las columnas más adecuadas para particionar según la cardinalidad y el uso en consultas. En este caso, se podría particionar por SKU_Category, Customer_ID, y Date para optimizar las consultas.

In [0]:
# Evaluamos la cardinalidad de algunas columnas para decidir el particionamiento
df_cleaned.select("Customer_ID", "SKU_Category", "Date").groupBy("Customer_ID", "SKU_Category", "Date").count().show()

# Aplicar particionamiento basado en columnas de alta cardinalidad
df_partitioned = df_cleaned.repartition("Customer_ID", "SKU_Category", "Date")

# Guardar el dataset particionado en formato Parquet
df_partitioned.write.partitionBy("Customer_ID", "SKU_Category", "Date").mode("overwrite").parquet("/mnt/datalake/partitioned_data/")


+-----------+------------+----------+-----+
|Customer_ID|SKU_Category|      Date|count|
+-----------+------------+----------+-----+
|       2237|         LDZ|2016-01-04|    1|
|        754|         TW8|2016-01-04|    1|
|       1082|         1VL|2016-01-04|    1|
|        853|         BZU|2016-01-07|    1|
|       7223|         R6E|2016-01-07|    1|
|       3827|         JPI|2016-01-08|    1|
|       3957|         P42|2016-01-08|    1|
|       2436|         P42|2016-01-08|    1|
|       5253|         29A|2016-01-08|    1|
|       3719|         U5F|2016-01-11|    1|
|       3719|         0H2|2016-01-11|    1|
|       2811|         IEV|2016-01-11|    1|
|       2402|         IEV|2016-01-11|    1|
|        199|         U5F|2016-01-11|    1|
|       7060|         MOE|2016-01-11|    1|
|       1591|         1L6|2016-01-12|    1|
|         70|         A38|2016-01-13|    1|
|       3182|         LPF|2016-01-13|    1|
|       9095|         R6E|2016-01-13|    1|
|       5612|         RU6|2016-0

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

3.2. Optimización con Adaptive Query Execution (AQE)

Habilitamos AQE y Dynamic Partition Pruning (DPP) para optimizar las consultas en las tablas particionadas.

In [0]:
# Activar Adaptive Query Execution (AQE)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Activar Dynamic Partition Pruning (DPP)
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")


## Paso 4: Monitoreo del Rendimiento dentro del Código

4.1. Medir tiempo de ejecución

Se mide el tiempo de ejecución de consultas o transformaciones clave dentro del código.

In [0]:
import time

# Medir el tiempo de ejecución de una operación de consulta
start_time = time.time()

# Realizar una consulta en el dataset particionado
df_partitioned.filter("Customer_ID = 12345").groupBy("SKU_Category").sum("Sales_Amount").show()

# Mostrar el tiempo de ejecución
print(f"Tiempo de ejecución: {time.time() - start_time} segundos")


+------------+------------------+
|SKU_Category| sum(Sales_Amount)|
+------------+------------------+
|         0H2|              7.29|
|         2ML|13.149999999999999|
+------------+------------------+

Tiempo de ejecución: 1.3380613327026367 segundos


4.2. Monitorear el uso de recursos: CPU y memoria

Monitoreamos el uso de recursos directamente en el código.

In [0]:
# Obtener el número de particiones procesadas y tareas completadas
executor_memory_status = sc._jsc.sc().getExecutorMemoryStatus().toString()
executor_memory_status = executor_memory_status.replace("Map(", "").replace(")", "")
executor_memory_status = dict(
    item.split(" -> ") for item in executor_memory_status.split(", ")
)

for executor_id, memory in executor_memory_status.items():
    memory_used, memory_total = map(int, memory.strip("()").split(","))
    print(f"ID del ejecutor: {executor_id}, Memoria usada: {memory_used}, Memoria total: {memory_total}")

ID del ejecutor: 10.139.64.4:45095, Memoria usada: 6558842880, Memoria total: 6558842880
ID del ejecutor: 10.139.64.5:33779, Memoria usada: 6558842880, Memoria total: 6558842880
ID del ejecutor: 10.139.64.6:35641, Memoria usada: 9431207116, Memoria total: 9431188837


4.3. Resumen de plan de ejecución

Para verificar cómo se optimiza el trabajo, utilizamos el método explain() para obtener el plan físico y lógico de las operaciones.

In [0]:
# Mostrar el plan de ejecución para una consulta compleja
df_partitioned.filter("Customer_ID = 12345").groupBy("SKU_Category").sum("Sales_Amount").explain(True)


== Parsed Logical Plan ==
'Aggregate ['SKU_Category], ['SKU_Category, unresolvedalias(sum(Sales_Amount#368))]
+- Filter (Customer_ID#323 = 12345)
   +- RepartitionByExpression [Customer_ID#323, SKU_Category#98, Date#95]
      +- Filter atleastnnonnulls(2, Customer_ID#323, Transaction_ID#332)
         +- Project [_c0#94, Date#95, Customer_ID#323, Transaction_ID#332, SKU_Category#98, SKU#99, coalesce(nanvl(Quantity#341, cast(null as double)), cast(0 as double)) AS Quantity#367, coalesce(nanvl(Sales_Amount#350, cast(null as double)), cast(0 as double)) AS Sales_Amount#368]
            +- Project [_c0#94, Date#95, Customer_ID#323, Transaction_ID#332, SKU_Category#98, SKU#99, Quantity#341, cast(Sales_Amount#101 as double) AS Sales_Amount#350]
               +- Project [_c0#94, Date#95, Customer_ID#323, Transaction_ID#332, SKU_Category#98, SKU#99, cast(Quantity#100 as double) AS Quantity#341, Sales_Amount#101]
                  +- Project [_c0#94, Date#95, Customer_ID#323, cast(Transaction_I

## Plan Lógico (Parsed Logical Plan)

**'Aggregate ['SKU_Category]:** La consulta está agregando los datos (sumando Sales_Amount) agrupados por la columna SKU_Category.

**'Filter (Customer_ID = 12345):** Un filtro se aplica a la columna Customer_ID para seleccionar las filas donde Customer_ID es igual a 12345.

**'RepartitionByExpression [Customer_ID, SKU_Category, Date]:** Los datos son repartidos por las columnas Customer_ID, SKU_Category, y Date para distribuir las filas a través de diferentes particiones, lo que ayuda a paralelizar el procesamiento.

**'Project y 'Coalesce:** Se proyectan y ajustan varias columnas, como Quantity y Sales_Amount, donde se están utilizando funciones como coalesce y nanvl para reemplazar valores nulos con 0.

**'Relation csv:** Los datos se están leyendo de un archivo CSV con las columnas especificadas: Date, Customer_ID, Transaction_ID, SKU_Category, SKU, Quantity, y Sales_Amount.

## Plan Físico (Physical Plan)

**PhotonGroupingAgg:** La agregación ocurre en dos etapas. Primero, Spark realiza una agregación parcial (partial_sum) para cada SKU_Category y luego hace un "merge" de los resultados parciales para obtener la suma total de Sales_Amount.

**PhotonShuffleExchange:** Spark usa un "shuffle" para redistribuir los datos por SKU_Category y Customer_ID en 200 particiones, lo que permite procesar los datos en paralelo a lo largo del clúster.

**PhotonProject y PhotonFilter:** Se proyectan las columnas necesarias (SKU_Category, Sales_Amount) y se aplica el filtro para Customer_ID = 12345 lo más temprano posible.

**FileScan csv:** Finalmente, los datos se leen del archivo CSV con las columnas especificadas, aplicando filtros de datos como isnotnull(Customer_ID) y Customer_ID = 12345 antes de procesar las filas.

## Optimizaciones a partir del Plan Lógico

Evitar particionamiento innecesario

In [0]:
df_partitioned = df_cleaned.filter("Customer_ID = 12345").groupBy("SKU_Category").sum("Sales_Amount")


Convertir CSV a Parquet o a Delta

In [0]:
df.write.parquet("/mnt/datalake/parquet_dataset/")

# Then, read from Parquet:
df = spark.read.parquet("/mnt/datalake/parquet_dataset/")


Uso de cache

In [0]:
df_cleaned.cache()
df_cleaned.filter("Customer_ID = 12345").groupBy("SKU_Category").sum("Sales_Amount").show()


+------------+------------------+
|SKU_Category| sum(Sales_Amount)|
+------------+------------------+
|         0H2|              7.29|
|         2ML|13.149999999999999|
+------------+------------------+



## Paso 5: Guardar y Finalizar el Dataset

In [0]:
# Guardar el dataset final en formato Parquet
df_cleaned.write.parquet("/mnt/datalake/final_cleaned_data/", mode="overwrite")
