#  Taller: Limpieza y Validación de Datos Complejos en Big Data con PySpark
### Google Colab Notebook
---
**Objetivo:** Ejecutar un flujo completo de limpieza y validación de datos usando PySpark, detectando errores comunes y avanzados en datasets simulados, generando un archivo limpio en formato Parquet.

##  1. Instalación y Configuración del Entorno

In [12]:
!wget https://downloads.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz


--2025-04-30 18:06:47--  https://downloads.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
Resolving downloads.apache.org (downloads.apache.org)... 135.181.214.104, 88.99.208.237, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|135.181.214.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400724056 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.5-bin-hadoop3.tgz’


2025-04-30 18:07:09 (17.5 MB/s) - ‘spark-3.5.5-bin-hadoop3.tgz’ saved [400724056/400724056]



In [13]:
!ls -lh spark-3.5.5-bin-hadoop3.tgz
!tar xf spark-3.5.5-bin-hadoop3.tgz
!ls /content/spark-3.5.5-bin-hadoop3/python/lib/

-rw-r--r-- 1 root root 383M Feb 23 20:47 spark-3.5.5-bin-hadoop3.tgz
py4j-0.10.9.7-src.zip  PY4J_LICENSE.txt  pyspark.zip


In [15]:
!pip install -q pyspark findspark pandas numpy

In [16]:
!ls /content/spark-3.5.5-bin-hadoop3/python/lib/

py4j-0.10.9.7-src.zip  PY4J_LICENSE.txt  pyspark.zip


In [17]:
import os
import findspark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.5-bin-hadoop3"
findspark.init()

##  2. Generación Automática del Dataset Complejo

In [18]:
import pandas as pd
import numpy as np
np.random.seed(42)
n_registros = 500
order_ids = np.arange(1000, 1000 + n_registros)
customer_ids = np.random.choice(['CUST-' + str(np.random.randint(1000, 9999)) for _ in range(100)], n_registros)
product_ids = np.random.choice(['PROD-' + str(np.random.randint(10, 99)) + chr(np.random.randint(65, 90)) for _ in range(50)], n_registros)
quantities = np.random.choice([1, 2, 3, np.nan], n_registros, p=[0.3, 0.3, 0.3, 0.1])
unit_prices = np.round(np.random.uniform(10, 300, n_registros), 2)
outlier_indices = np.random.choice(n_registros, size=10, replace=False)
unit_prices[outlier_indices] = np.random.choice([-50, 1200], size=10)
dates_iso = pd.date_range('2025-03-01', periods=n_registros//2).strftime('%Y-%m-%d').tolist()
dates_euro = pd.date_range('2025-03-01', periods=n_registros//2).strftime('%d/%m/%Y').tolist()
order_dates = np.random.choice(dates_iso + dates_euro, n_registros)
locations = ['Madrid-Centro', 'Barcelona @Sants', 'Valencia  Puerto', 'sevilla-triana', 'BILBAO-Casco']
store_locations = np.random.choice(locations + [np.nan], n_registros, p=[0.2,0.2,0.2,0.2,0.1,0.1])
data = pd.DataFrame({
    'order_id': order_ids,
    'customer_id': customer_ids,
    'product_id': product_ids,
    'quantity': quantities,
    'unit_price': unit_prices,
    'order_date': order_dates,
    'store_location': store_locations
})
duplicates = data.sample(frac=0.1, random_state=42)
data = pd.concat([data, duplicates], ignore_index=True)
data = data.sample(frac=1, random_state=42).reset_index(drop=True)
data.to_csv('retail_sales_complex.csv', index=False)

In [19]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataQualityComplex").getOrCreate()
df = spark.read.csv("retail_sales_complex.csv", header=True, inferSchema=True)
df.show(50)

+--------+-----------+----------+--------+----------+----------+----------------+
|order_id|customer_id|product_id|quantity|unit_price|order_date|  store_location|
+--------+-----------+----------+--------+----------+----------+----------------+
|    1195|  CUST-7863|  PROD-48A|     3.0|     17.44|11/09/2025|Barcelona @Sants|
|    1079|  CUST-6390|  PROD-59B|     3.0|    234.31|22/05/2025|   Madrid-Centro|
|    1480|  CUST-1769|  PROD-63V|     2.0|    188.04|25/03/2025|Valencia  Puerto|
|    1109|  CUST-6618|  PROD-14E|     1.0|     -50.0|2025-06-10|  sevilla-triana|
|    1280|  CUST-7873|  PROD-48A|     1.0|    127.25|22/10/2025|             nan|
|    1440|  CUST-8916|  PROD-15L|     1.0|     85.35|2025-05-12|             nan|
|    1084|  CUST-1860|  PROD-51I|     3.0|     31.31|2025-07-12|Valencia  Puerto|
|    1368|  CUST-4099|  PROD-79S|     2.0|     98.36|2025-10-25|   Madrid-Centro|
|    1132|  CUST-9792|  PROD-17P|     2.0|     205.3|28/08/2025|             nan|
|    1364|  CUST

##  3. Diagnóstico de Calidad de Datos

In [None]:
from pyspark.sql.functions import col, count, when
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
print(f"Duplicados detectados: {df.count() - df.dropDuplicates().count()}")

In [None]:
# Guardamos copia del dataset original antes de limpiar
df_original = df
total_registros = df_original.count()

In [None]:
# Guardamos copia del dataset original antes de limpiar
df_original = df
total_registros = df_original.count()

##  4. Limpieza Avanzada

In [None]:
# Rellenar valores nulos
mean_quantity = df.agg({'quantity': 'mean'}).first()[0]
df = df.fillna({'quantity': round(mean_quantity, 0), 'store_location': 'Desconocido'})

In [None]:
# Eliminar duplicados
df = df.dropDuplicates(['order_id', 'customer_id', 'order_date'])

In [None]:
# Normalización de fechas
from pyspark.sql.functions import to_date
df = df.withColumn('order_date',
    when(col('order_date').rlike('\d{2}/\d{2}/\d{4}'),
         to_date(col('order_date'), 'dd/MM/yyyy'))
    .otherwise(to_date(col('order_date'), 'yyyy-MM-dd'))
)

In [None]:
# Limpieza de store_location
from pyspark.sql.functions import regexp_replace, lower, trim
df = df.withColumn('store_location', regexp_replace('store_location', '@', '-'))
df = df.withColumn('store_location', regexp_replace('store_location', '  ', '-'))
df = df.withColumn('store_location', lower(trim(col('store_location'))))

In [None]:
# Tratamiento de outliers
Q1 = df.approxQuantile("unit_price", [0.25], 0.05)[0]
Q3 = df.approxQuantile("unit_price", [0.75], 0.05)[0]
IQR = Q3 - Q1
df = df.filter((col("unit_price") >= (Q1 - 1.5 * IQR)) & (col("unit_price") <= (Q3 + 1.5 * IQR)))

##  5. Análisis de Métricas de Calidad Pre y Post Limpieza

###  Porcentaje de Nulos, Duplicados y Outliers

In [None]:
# 1. Porcentaje de valores nulos
nulos = df_original.select([count(when(col(c).isNull(), c)).alias(c) for c in df_original.columns])
nulos_percent = nulos.select([(col(c)/total_registros*100).alias(c + '_pct') for c in df_original.columns])
nulos_percent.show()

# 2. Duplicados
duplicados = total_registros - df_original.dropDuplicates().count()
print(f'Duplicados detectados: {duplicados} ({round(duplicados/total_registros*100, 2)}%)')

###  Estadísticas Descriptivas

In [None]:
print('Antes de la limpieza:')
df_original.describe(['quantity', 'unit_price']).show()

print('Después de la limpieza:')
df.describe(['quantity', 'unit_price']).show()

###  Conteo de Valores Únicos

In [None]:
print('Ubicaciones únicas tras limpieza:')
df.select('store_location').distinct().show()

In [None]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
df.write.mode('overwrite').parquet('retail_sales_cleaned.parquet')

## 6. Comprobaciones Finales y exportación de los datos

In [None]:
# Comprobación final de nulos
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Comprobación de tamaño final del dataset
print(f"Número final de registros: {df.count()}")

# Exportación a formato Parquet
df.write.mode('overwrite').parquet('retail_sales_cleaned.parquet')


##  7. Comparativa: Ventajas de Convertir CSV a Parquet

En esta sección analizaremos las diferencias prácticas entre trabajar con un archivo CSV y un archivo Parquet en términos de **tamaño**, **velocidad de carga** y **eficiencia en la lectura de datos**.

###  Comparar Tamaño de Archivos

In [None]:
!ls -lh retail_sales_complex.csv
!du -h retail_sales_cleaned.parquet

###  Comparar Tiempo de Carga en PySpark

In [None]:
import time

start_csv = time.time()
df_csv = spark.read.csv("retail_sales_complex.csv", header=True, inferSchema=True)
end_csv = time.time()
print(f"Tiempo de carga CSV: {end_csv - start_csv:.4f} segundos")

start_parquet = time.time()
df_parquet = spark.read.parquet("retail_sales_cleaned.parquet")
end_parquet = time.time()
print(f"Tiempo de carga Parquet: {end_parquet - start_parquet:.4f} segundos")

###  Comparar Lectura de Columnas Específicas

In [None]:
start = time.time()
df_csv.select("order_id", "quantity").show(5)
print("CSV → lectura columnas:", time.time() - start)

start = time.time()
df_parquet.select("order_id", "quantity").show(5)
print("Parquet → lectura columnas:", time.time() - start)

###  Conclusión

Reflexiona sobre las diferencias observadas en términos de espacio, tiempos de carga y eficiencia.

- ¿Qué ventajas ofrece el formato Parquet frente al CSV?
- ¿En qué tipo de proyectos Big Data sería más adecuado usar Parquet?

Incluye esta reflexión en tu informe final.

##  8. Informe del Alumno
- **Errores detectados:**
- **Acciones aplicadas:**
- **Impacto en el negocio:**
- **Capturas de evidencia:**