<a href="https://colab.research.google.com/github/fjgr/IA_BigData/blob/main/BIU/UT3/Actividad_An%C3%A1lisis_y_Limpieza_con_PySpark_y_exportaci%C3%B3n_a_Parquet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#  Taller: Limpieza y Validación de Datos Complejos en Big Data con PySpark
### Google Colab Notebook
---
**Objetivo:** Ejecutar un flujo completo de limpieza y validación de datos usando PySpark, detectando errores comunes y avanzados en datasets simulados, generando un archivo limpio en formato Parquet.

##  1. Instalación y Configuración del Entorno

In [4]:
# Instalación del JDK (Java Development Kit) versión 11, requerido para ejecutar Apache Spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Descarga de Apache Spark versión 3.5.5 precompilado con soporte para Hadoop 3 desde el sitio oficial de Apache
!wget -q https://downloads.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz

# Descompresión del archivo descargado para disponer del entorno de Spark localmente
!tar xf spark-3.5.5-bin-hadoop3.tgz

# Instalación de bibliotecas necesarias para trabajar con Spark y análisis de datos
# - pyspark: interfaz de Apache Spark para Python
# - findspark: utilidad para encontrar e inicializar Spark en notebooks
# - pandas y numpy: bibliotecas para análisis y estructuras de datos eficientes
!pip install -q pyspark findspark pandas numpy

In [5]:
# Importación de módulos necesarios
import os              # Para manipular variables de entorno del sistema
import findspark       # Para inicializar el entorno de Apache Spark en notebooks

# Configuración de las variables de entorno requeridas por Apache Spark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"  # Ruta al JDK 11
os.environ["SPARK_HOME"] = "/content/spark-3.5.5-bin-hadoop3"   # Ruta a la instalación local de Spark

# Inicialización del entorno de Spark con findspark
findspark.init()


##  2. Generación Automática del Dataset Complejo

In [6]:
import pandas as pd
import numpy as np

# Establecer semilla para reproducibilidad
np.random.seed(42)
n_registros = 500  # Tamaño del conjunto de datos

# Generación de columnas simuladas
order_ids = np.arange(1000, 1000 + n_registros)

# IDs de clientes con formato alfanumérico
customer_ids = np.random.choice(['CUST-' + str(np.random.randint(1000, 9999)) for _ in range(100)], n_registros)

# IDs de productos con formato SKU (incluye letras)
product_ids = np.random.choice(['PROD-' + str(np.random.randint(10, 99)) + chr(np.random.randint(65, 90)) for _ in range(50)], n_registros)

# Cantidades compradas, con un 10% de valores faltantes
quantities = np.random.choice([1, 2, 3, np.nan], n_registros, p=[0.3, 0.3, 0.3, 0.1])

# Precios unitarios realistas
unit_prices = np.round(np.random.uniform(10, 300, n_registros), 2)

# Introducción de valores atípicos (precios negativos o extremos)
outlier_indices = np.random.choice(n_registros, size=10, replace=False)
unit_prices[outlier_indices] = np.random.choice([-50, 1200], size=10)

# Fechas en formatos ISO y europeo mezclados
dates_iso = pd.date_range('2025-03-01', periods=n_registros//2).strftime('%Y-%m-%d').tolist()
dates_euro = pd.date_range('2025-03-01', periods=n_registros//2).strftime('%d/%m/%Y').tolist()
order_dates = np.random.choice(dates_iso + dates_euro, n_registros)

# Ubicaciones de tiendas con variaciones tipográficas y valores faltantes
locations = ['Madrid-Centro', 'Barcelona @Sants', 'Valencia  Puerto', 'sevilla-triana', 'BILBAO-Casco']
store_locations = np.random.choice(locations + [np.nan], n_registros, p=[0.2, 0.2, 0.2, 0.2, 0.1, 0.1])

# Construcción del DataFrame con todos los campos
data = pd.DataFrame({
    'order_id': order_ids,
    'customer_id': customer_ids,
    'product_id': product_ids,
    'quantity': quantities,
    'unit_price': unit_prices,
    'order_date': order_dates,
    'store_location': store_locations
})

# Inserción de duplicados (10%) y desorden aleatorio
duplicates = data.sample(frac=0.1, random_state=42)
data = pd.concat([data, duplicates], ignore_index=True)
data = data.sample(frac=1, random_state=42).reset_index(drop=True)

# Guardado del dataset en archivo CSV
data.to_csv('retail_sales_complex.csv', index=False)


In [8]:
from pyspark.sql import SparkSession

# Inicialización de la sesión de Spark para procesamiento distribuido
spark = SparkSession.builder .appName("DataQualityComplex") .getOrCreate() # Nombre de la aplicación (identificable en UI de Spark)

# Carga del dataset desde archivo CSV, infiriendo tipos de datos automáticamente
df = spark.read.csv("retail_sales_complex.csv", header=True, inferSchema=True)

# Visualización de las primeras 50 filas del DataFrame para inspección inicial
df.show(50)


+--------+-----------+----------+--------+----------+----------+----------------+
|order_id|customer_id|product_id|quantity|unit_price|order_date|  store_location|
+--------+-----------+----------+--------+----------+----------+----------------+
|    1195|  CUST-7863|  PROD-48A|     3.0|     17.44|11/09/2025|Barcelona @Sants|
|    1079|  CUST-6390|  PROD-59B|     3.0|    234.31|22/05/2025|   Madrid-Centro|
|    1480|  CUST-1769|  PROD-63V|     2.0|    188.04|25/03/2025|Valencia  Puerto|
|    1109|  CUST-6618|  PROD-14E|     1.0|     -50.0|2025-06-10|  sevilla-triana|
|    1280|  CUST-7873|  PROD-48A|     1.0|    127.25|22/10/2025|             nan|
|    1440|  CUST-8916|  PROD-15L|     1.0|     85.35|2025-05-12|             nan|
|    1084|  CUST-1860|  PROD-51I|     3.0|     31.31|2025-07-12|Valencia  Puerto|
|    1368|  CUST-4099|  PROD-79S|     2.0|     98.36|2025-10-25|   Madrid-Centro|
|    1132|  CUST-9792|  PROD-17P|     2.0|     205.3|28/08/2025|             nan|
|    1364|  CUST

##  3. Diagnóstico de Calidad de Datos

In [9]:
from pyspark.sql.functions import col, count, when

# Cómputo de valores nulos por columna
# Se utiliza 'when' para marcar valores nulos y 'count' para contarlos
df.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df.columns
]).show()

# Cómputo de registros duplicados
# Se calcula la diferencia entre el total de registros y los únicos
print(f"Duplicados detectados: {df.count() - df.dropDuplicates().count()}")

+--------+-----------+----------+--------+----------+----------+--------------+
|order_id|customer_id|product_id|quantity|unit_price|order_date|store_location|
+--------+-----------+----------+--------+----------+----------+--------------+
|       0|          0|         0|      51|         0|         0|             0|
+--------+-----------+----------+--------+----------+----------+--------------+

Duplicados detectados: 50


In [10]:
# Conservación del dataset original antes de iniciar el proceso de limpieza
# Importante para trazabilidad y comparaciones posteriores
df_original = df

# Cómputo del total de registros originales (útil para análisis de impacto de limpieza)
total_registros = df_original.count()

In [11]:
# Preservación del dataset original antes de aplicar transformaciones
# Esto permite comparar resultados después de la limpieza o revertir cambios si es necesario
df_original = df

# Almacenamiento del número total de registros originales para evaluar impacto de la limpieza
total_registros = df_original.count()

##  4. Limpieza Avanzada

In [12]:
# --- Limpieza de valores nulos ---

# Cálculo de la media de la columna 'quantity' ignorando nulos
# Se redondea el valor para mantener consistencia con datos enteros
mean_quantity = df.agg({'quantity': 'mean'}).first()[0]

# Relleno de valores nulos:
# - 'quantity' se imputa con la media redondeada
# - 'store_location' se reemplaza con un valor genérico 'Desconocido'
df = df.fillna({
    'quantity': round(mean_quantity, 0),
    'store_location': 'Desconocido'
})

In [13]:
# --- Eliminación de duplicados ---

# Se eliminan registros duplicados considerando como clave compuesta:
# - order_id
# - customer_id
# - order_date
# Esta combinación representa una transacción única por cliente en una fecha determinada.
df = df.dropDuplicates(['order_id', 'customer_id', 'order_date'])

In [14]:
# --- Normalización de fechas ---

from pyspark.sql.functions import to_date, col, when

# Conversión de fechas en distintos formatos al tipo DateType
# - Si el formato es europeo (dd/MM/yyyy), se aplica dicho patrón
# - En caso contrario, se asume formato ISO (yyyy-MM-dd)
df = df.withColumn('order_date',
    when(col('order_date').rlike(r'\d{2}/\d{2}/\d{4}'),
         to_date(col('order_date'), 'dd/MM/yyyy'))
    .otherwise(to_date(col('order_date'), 'yyyy-MM-dd'))
)

In [15]:
# --- Limpieza y normalización de la columna 'store_location' ---

from pyspark.sql.functions import regexp_replace, lower, trim, col

# Reemplazo de caracteres inconsistentes o problemáticos
# - '@' → '-' para unificar separadores
# - Doble espacio → '-' para evitar errores en tokenización
df = df.withColumn('store_location', regexp_replace('store_location', '@', '-'))
df = df.withColumn('store_location', regexp_replace('store_location', '  ', '-'))

# Conversión a minúsculas y eliminación de espacios sobrantes
df = df.withColumn('store_location', lower(trim(col('store_location'))))


In [16]:
# --- Tratamiento de outliers en 'unit_price' usando el método del rango intercuartílico (IQR) ---

from pyspark.sql.functions import col

# Cálculo aproximado de los cuartiles primero (Q1) y tercero (Q3)
# La función approxQuantile es eficiente para grandes volúmenes de datos
Q1 = df.approxQuantile("unit_price", [0.25], 0.05)[0]
Q3 = df.approxQuantile("unit_price", [0.75], 0.05)[0]

# Cálculo del rango intercuartílico (IQR)
IQR = Q3 - Q1

# Filtrado de valores extremos: solo se conservan aquellos dentro de 1.5 × IQR del rango intercuartílico
df = df.filter(
    (col("unit_price") >= (Q1 - 1.5 * IQR)) &
    (col("unit_price") <= (Q3 + 1.5 * IQR))
)


##  5. Análisis de Métricas de Calidad Pre y Post Limpieza

###  Porcentaje de Nulos, Duplicados y Outliers

In [17]:
from pyspark.sql.functions import count, when, col

# --- 1. Análisis de valores nulos ---

# Cálculo del número de valores nulos por columna
nulos = df_original.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df_original.columns
])

# Conversión de los conteos a porcentajes respecto al total de registros originales
nulos_percent = nulos.select([
    (col(c) / total_registros * 100).alias(c + '_pct') for c in df_original.columns
])

# Visualización del porcentaje de valores nulos por columna
nulos_percent.show()

# --- 2. Detección de duplicados ---

# Cómputo de registros duplicados en el dataset original
duplicados = total_registros - df_original.dropDuplicates().count()

# Presentación de resultados con porcentaje relativo
print(f'Duplicados detectados: {duplicados} ({round(duplicados / total_registros * 100, 2)}%)')


+------------+---------------+--------------+-----------------+--------------+--------------+------------------+
|order_id_pct|customer_id_pct|product_id_pct|     quantity_pct|unit_price_pct|order_date_pct|store_location_pct|
+------------+---------------+--------------+-----------------+--------------+--------------+------------------+
|         0.0|            0.0|           0.0|9.272727272727273|           0.0|           0.0|               0.0|
+------------+---------------+--------------+-----------------+--------------+--------------+------------------+

Duplicados detectados: 50 (9.09%)


###  Estadísticas Descriptivas

In [18]:
# --- Comparación estadística antes y después de la limpieza ---

# Estadísticas descriptivas del dataset original (antes de imputaciones y filtros)
print('Antes de la limpieza:')
df_original.describe(['quantity', 'unit_price']).show()

# Estadísticas descriptivas del dataset limpio (tras imputación de nulos y eliminación de outliers)
print('Después de la limpieza:')
df.describe(['quantity', 'unit_price']).show()

Antes de la limpieza:
+-------+------------------+------------------+
|summary|          quantity|        unit_price|
+-------+------------------+------------------+
|  count|               499|               550|
|   mean|1.9619238476953909|166.47359999999998|
| stddev|0.8410604239645704|144.73499241089607|
|    min|               1.0|             -50.0|
|    max|               3.0|            1200.0|
+-------+------------------+------------------+

Después de la limpieza:
+-------+------------------+------------------+
|summary|          quantity|        unit_price|
+-------+------------------+------------------+
|  count|               494|               494|
|   mean|1.9838056680161944|154.52921052631584|
| stddev|0.8004441532568164| 85.41247319250427|
|    min|               1.0|             -50.0|
|    max|               3.0|            299.83|
+-------+------------------+------------------+



###  Conteo de Valores Únicos

In [19]:
# --- Verificación de valores únicos en 'store_location' tras limpieza ---

print('Ubicaciones únicas tras limpieza:')
df.select('store_location').distinct().show()


Ubicaciones únicas tras limpieza:
+----------------+
|  store_location|
+----------------+
|  sevilla-triana|
|    bilbao-casco|
|             nan|
| valencia-puerto|
|   madrid-centro|
|barcelona -sants|
+----------------+



In [20]:
from pyspark.sql.functions import col, count, when

# --- Verificación final de valores nulos en el dataset limpio ---
# Se asegura que no persistan valores nulos tras el proceso de limpieza
df.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df.columns
]).show()

# --- Persistencia del DataFrame limpio en formato Parquet ---
# El formato Parquet es eficiente, comprimido y óptimo para análisis posteriores
# Se sobrescribe el archivo si ya existe
df.write.mode('overwrite').parquet('retail_sales_cleaned.parquet')


+--------+-----------+----------+--------+----------+----------+--------------+
|order_id|customer_id|product_id|quantity|unit_price|order_date|store_location|
+--------+-----------+----------+--------+----------+----------+--------------+
|       0|          0|         0|       0|         0|         0|             0|
+--------+-----------+----------+--------+----------+----------+--------------+



## 6. Comprobaciones Finales y exportación de los datos

In [22]:
from pyspark.sql.functions import col, count, when

# --- Comprobación final de valores nulos ---
# Se confirma que tras el proceso de limpieza no quedan valores faltantes
df.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df.columns
]).show()

# --- Validación del tamaño final del dataset limpio ---
# Se imprime el número total de registros tras eliminar duplicados y outliers
print(f"Número final de registros: {df.count()}")

# --- Exportación del dataset limpio en formato Parquet ---
# Formato columnar optimizado para almacenamiento y análisis distribuido
df.write.mode('overwrite').parquet('retail_sales_cleaned.parquet')

+--------+-----------+----------+--------+----------+----------+--------------+
|order_id|customer_id|product_id|quantity|unit_price|order_date|store_location|
+--------+-----------+----------+--------+----------+----------+--------------+
|       0|          0|         0|       0|         0|         0|             0|
+--------+-----------+----------+--------+----------+----------+--------------+

Número final de registros: 494


##  7. Comparativa: Ventajas de Convertir CSV a Parquet

En esta sección analizaremos las diferencias prácticas entre trabajar con un archivo CSV y un archivo Parquet en términos de **tamaño**, **velocidad de carga** y **eficiencia en la lectura de datos**.

###  Comparar Tamaño de Archivos

In [23]:
# --- Comparación de tamaño en disco entre el dataset original (CSV) y el limpio (Parquet) ---

# Tamaño del archivo CSV original
!ls -lh retail_sales_complex.csv

# Tamaño del dataset limpio exportado en formato Parquet
!du -h retail_sales_cleaned.parquet

-rw-r--r-- 1 root root 32K Apr 23 17:23 retail_sales_complex.csv
24K	retail_sales_cleaned.parquet


###  Comparar Tiempo de Carga en PySpark

In [24]:
import time

# --- Comparación de tiempos de carga entre CSV y Parquet ---

# Medición de tiempo para lectura del archivo CSV original
start_csv = time.time()
df_csv = spark.read.csv("retail_sales_complex.csv", header=True, inferSchema=True)
end_csv = time.time()
print(f"Tiempo de carga CSV: {end_csv - start_csv:.4f} segundos")

# Medición de tiempo para lectura del archivo Parquet (post-limpieza)
start_parquet = time.time()
df_parquet = spark.read.parquet("retail_sales_cleaned.parquet")
end_parquet = time.time()
print(f"Tiempo de carga Parquet: {end_parquet - start_parquet:.4f} segundos")

Tiempo de carga CSV: 0.5731 segundos
Tiempo de carga Parquet: 0.5137 segundos


###  Comparar Lectura de Columnas Específicas

In [25]:
import time

# --- Comparación de tiempos de lectura selectiva de columnas entre CSV y Parquet ---

# Lectura de columnas específicas desde archivo CSV (formato fila a fila)
start = time.time()
df_csv.select("order_id", "quantity").show(5)
print("CSV → lectura columnas:", time.time() - start)

# Lectura de columnas específicas desde archivo Parquet (formato columnar optimizado)
start = time.time()
df_parquet.select("order_id", "quantity").show(5)
print("Parquet → lectura columnas:", time.time() - start)

+--------+--------+
|order_id|quantity|
+--------+--------+
|    1195|     3.0|
|    1079|     3.0|
|    1480|     2.0|
|    1109|     1.0|
|    1280|     1.0|
+--------+--------+
only showing top 5 rows

CSV → lectura columnas: 0.2171480655670166
+--------+--------+
|order_id|quantity|
+--------+--------+
|    1000|     2.0|
|    1001|     2.0|
|    1002|     1.0|
|    1003|     2.0|
|    1004|     3.0|
+--------+--------+
only showing top 5 rows

Parquet → lectura columnas: 0.7865235805511475


###  Conclusión

Reflexiona sobre las diferencias observadas en términos de espacio, tiempos de carga y eficiencia.

- ¿Qué ventajas ofrece el formato Parquet frente al CSV?
- ¿En qué tipo de proyectos Big Data sería más adecuado usar Parquet?

Incluye esta reflexión en tu informe final.

##  8. Informe del Alumno
- **Errores detectados:**
- **Acciones aplicadas:**
- **Impacto en el negocio:**
- **Capturas de evidencia:**