#  Taller: Limpieza y Validación de Datos Complejos en Big Data con PySpark
### Google Colab Notebook
---
**Objetivo:** Ejecutar un flujo completo de limpieza y validación de datos usando PySpark, detectando errores comunes y avanzados en datasets simulados, generando un archivo limpio en formato Parquet.

##  1. Instalación y Configuración del Entorno

In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
!tar xf spark-3.5.5-bin-hadoop3.tgz
!pip install -q pyspark findspark pandas numpy

In [None]:
import os
import findspark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.5-bin-hadoop3"
findspark.init()

##  2. Generación Automática del Dataset Complejo

In [None]:
import pandas as pd
import numpy as np
np.random.seed(42)
n_registros = 500
order_ids = np.arange(1000, 1000 + n_registros)
customer_ids = np.random.choice(['CUST-' + str(np.random.randint(1000, 9999)) for _ in range(100)], n_registros)
product_ids = np.random.choice(['PROD-' + str(np.random.randint(10, 99)) + chr(np.random.randint(65, 90)) for _ in range(50)], n_registros)
quantities = np.random.choice([1, 2, 3, np.nan], n_registros, p=[0.3, 0.3, 0.3, 0.1])
unit_prices = np.round(np.random.uniform(10, 300, n_registros), 2)
outlier_indices = np.random.choice(n_registros, size=10, replace=False)
unit_prices[outlier_indices] = np.random.choice([-50, 1200], size=10)
dates_iso = pd.date_range('2025-03-01', periods=n_registros//2).strftime('%Y-%m-%d').tolist()
dates_euro = pd.date_range('2025-03-01', periods=n_registros//2).strftime('%d/%m/%Y').tolist()
order_dates = np.random.choice(dates_iso + dates_euro, n_registros)
locations = ['Madrid-Centro', 'Barcelona @Sants', 'Valencia  Puerto', 'sevilla-triana', 'BILBAO-Casco']
store_locations = np.random.choice(locations + [np.nan], n_registros, p=[0.2,0.2,0.2,0.2,0.1,0.1])
data = pd.DataFrame({
    'order_id': order_ids,
    'customer_id': customer_ids,
    'product_id': product_ids,
    'quantity': quantities,
    'unit_price': unit_prices,
    'order_date': order_dates,
    'store_location': store_locations
})
duplicates = data.sample(frac=0.1, random_state=42)
data = pd.concat([data, duplicates], ignore_index=True)
data = data.sample(frac=1, random_state=42).reset_index(drop=True)
data.to_csv('retail_sales_complex.csv', index=False)

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataQualityComplex").getOrCreate()
df = spark.read.csv("retail_sales_complex.csv", header=True, inferSchema=True)
df.show(50)

##  3. Diagnóstico de Calidad de Datos

In [None]:
from pyspark.sql.functions import col, count, when
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
print(f"Duplicados detectados: {df.count() - df.dropDuplicates().count()}")

In [27]:
# Guardamos copia del dataset original antes de limpiar
df_original = df
total_registros = df_original.count()

In [28]:
# Guardamos copia del dataset original antes de limpiar
df_original = df
total_registros = df_original.count()

##  4. Limpieza Avanzada

In [29]:
# Rellenar valores nulos
mean_quantity = df.agg({'quantity': 'mean'}).first()[0]
df = df.fillna({'quantity': round(mean_quantity, 0), 'store_location': 'Desconocido'})

In [30]:
# Eliminar duplicados
df = df.dropDuplicates(['order_id', 'customer_id', 'order_date'])

In [31]:
# Normalización de fechas
from pyspark.sql.functions import to_date
df = df.withColumn('order_date',
    when(col('order_date').rlike('\d{2}/\d{2}/\d{4}'),
         to_date(col('order_date'), 'dd/MM/yyyy'))
    .otherwise(to_date(col('order_date'), 'yyyy-MM-dd'))
)

In [32]:
# Limpieza de store_location
from pyspark.sql.functions import regexp_replace, lower, trim
df = df.withColumn('store_location', regexp_replace('store_location', '@', '-'))
df = df.withColumn('store_location', regexp_replace('store_location', '  ', '-'))
df = df.withColumn('store_location', lower(trim(col('store_location'))))

In [33]:
# Tratamiento de outliers
Q1 = df.approxQuantile("unit_price", [0.25], 0.05)[0]
Q3 = df.approxQuantile("unit_price", [0.75], 0.05)[0]
IQR = Q3 - Q1
df = df.filter((col("unit_price") >= (Q1 - 1.5 * IQR)) & (col("unit_price") <= (Q3 + 1.5 * IQR)))

##  5. Análisis de Métricas de Calidad Pre y Post Limpieza

###  Porcentaje de Nulos, Duplicados y Outliers

In [34]:
# 1. Porcentaje de valores nulos
nulos = df_original.select([count(when(col(c).isNull(), c)).alias(c) for c in df_original.columns])
nulos_percent = nulos.select([(col(c)/total_registros*100).alias(c + '_pct') for c in df_original.columns])
nulos_percent.show()

# 2. Duplicados
duplicados = total_registros - df_original.dropDuplicates().count()
print(f'Duplicados detectados: {duplicados} ({round(duplicados/total_registros*100, 2)}%)')

+------------+---------------+--------------+-----------------+--------------+--------------+------------------+
|order_id_pct|customer_id_pct|product_id_pct|     quantity_pct|unit_price_pct|order_date_pct|store_location_pct|
+------------+---------------+--------------+-----------------+--------------+--------------+------------------+
|         0.0|            0.0|           0.0|9.272727272727273|           0.0|           0.0|               0.0|
+------------+---------------+--------------+-----------------+--------------+--------------+------------------+

Duplicados detectados: 50 (9.09%)


###  Estadísticas Descriptivas

In [35]:
print('Antes de la limpieza:')
df_original.describe(['quantity', 'unit_price']).show()

print('Después de la limpieza:')
df.describe(['quantity', 'unit_price']).show()

Antes de la limpieza:
+-------+------------------+------------------+
|summary|          quantity|        unit_price|
+-------+------------------+------------------+
|  count|               499|               550|
|   mean|1.9619238476953909|166.47359999999998|
| stddev|0.8410604239645704|144.73499241089607|
|    min|               1.0|             -50.0|
|    max|               3.0|            1200.0|
+-------+------------------+------------------+

Después de la limpieza:
+-------+------------------+------------------+
|summary|          quantity|        unit_price|
+-------+------------------+------------------+
|  count|               494|               494|
|   mean|1.9838056680161944|154.52921052631584|
| stddev|0.8004441532568164| 85.41247319250427|
|    min|               1.0|             -50.0|
|    max|               3.0|            299.83|
+-------+------------------+------------------+



###  Conteo de Valores Únicos

In [36]:
print('Ubicaciones únicas tras limpieza:')
df.select('store_location').distinct().show()

Ubicaciones únicas tras limpieza:
+----------------+
|  store_location|
+----------------+
|  sevilla-triana|
|    bilbao-casco|
|             nan|
| valencia-puerto|
|   madrid-centro|
|barcelona -sants|
+----------------+



In [37]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
df.write.mode('overwrite').parquet('retail_sales_cleaned.parquet')

+--------+-----------+----------+--------+----------+----------+--------------+
|order_id|customer_id|product_id|quantity|unit_price|order_date|store_location|
+--------+-----------+----------+--------+----------+----------+--------------+
|       0|          0|         0|       0|         0|         0|             0|
+--------+-----------+----------+--------+----------+----------+--------------+



## 6. Comprobaciones Finales y exportación de los datos

In [38]:
# Comprobación final de nulos
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Comprobación de tamaño final del dataset
print(f"Número final de registros: {df.count()}")

# Exportación a formato Parquet
df.write.mode('overwrite').parquet('retail_sales_cleaned.parquet')


+--------+-----------+----------+--------+----------+----------+--------------+
|order_id|customer_id|product_id|quantity|unit_price|order_date|store_location|
+--------+-----------+----------+--------+----------+----------+--------------+
|       0|          0|         0|       0|         0|         0|             0|
+--------+-----------+----------+--------+----------+----------+--------------+

Número final de registros: 494


##  7. Comparativa: Ventajas de Convertir CSV a Parquet

En esta sección analizaremos las diferencias prácticas entre trabajar con un archivo CSV y un archivo Parquet en términos de **tamaño**, **velocidad de carga** y **eficiencia en la lectura de datos**.

###  Comparar Tamaño de Archivos

In [39]:
!ls -lh retail_sales_complex.csv
!du -h retail_sales_cleaned.parquet

-rw-r--r-- 1 root root 32K Apr 29 16:40 retail_sales_complex.csv
24K	retail_sales_cleaned.parquet


###  Comparar Tiempo de Carga en PySpark

In [40]:
import time

start_csv = time.time()
df_csv = spark.read.csv("retail_sales_complex.csv", header=True, inferSchema=True)
end_csv = time.time()
print(f"Tiempo de carga CSV: {end_csv - start_csv:.4f} segundos")

start_parquet = time.time()
df_parquet = spark.read.parquet("retail_sales_cleaned.parquet")
end_parquet = time.time()
print(f"Tiempo de carga Parquet: {end_parquet - start_parquet:.4f} segundos")

Tiempo de carga CSV: 0.5689 segundos
Tiempo de carga Parquet: 0.2808 segundos


###  Comparar Lectura de Columnas Específicas

In [41]:
start = time.time()
df_csv.select("order_id", "quantity").show(5)
print("CSV → lectura columnas:", time.time() - start)

start = time.time()
df_parquet.select("order_id", "quantity").show(5)
print("Parquet → lectura columnas:", time.time() - start)

+--------+--------+
|order_id|quantity|
+--------+--------+
|    1195|     3.0|
|    1079|     3.0|
|    1480|     2.0|
|    1109|     1.0|
|    1280|     1.0|
+--------+--------+
only showing top 5 rows

CSV → lectura columnas: 0.5639824867248535
+--------+--------+
|order_id|quantity|
+--------+--------+
|    1000|     2.0|
|    1001|     2.0|
|    1002|     1.0|
|    1003|     2.0|
|    1004|     3.0|
+--------+--------+
only showing top 5 rows

Parquet → lectura columnas: 0.24532198905944824


###  Conclusión

Reflexiona sobre las diferencias observadas en términos de espacio, tiempos de carga y eficiencia.

- ¿Qué ventajas ofrece el formato Parquet frente al CSV?
- ¿En qué tipo de proyectos Big Data sería más adecuado usar Parquet?

Incluye esta reflexión en tu informe final.

##  8. Informe del Alumno
- **Errores detectados:**
- **Acciones aplicadas:**
- **Impacto en el negocio:**
- **Capturas de evidencia:**