# Procesamiento de Datos a Gran Escala

## Pontificia Universidad Javeriana

*Autor:*
- Juan Felipe González Quintero 

*Fecha:* 29 - 10 - 2025

---

## **Temática**
Técnicas de preprocesamiento de datos con Pyspark

---

## **Contexto**

Preparar grandes volúmenes de datos es muy importante para su posterior uso en modelos de Machine Learning. 

En entornos reales (como empresas, bancos o sistemas de salud), los datos:

* Provienen de múltiples fuentes,

* Contienen valores nulos o inconsistentes,

* Y suelen requerir transformaciones y limpieza antes de poder analizarlos o entrenar modelos predictivos.

Por eso, PySpark —una librería distribuida sobre Apache Spark— resulta esencial, ya que permite manejar datasets de gran tamaño de manera eficiente, paralela y escalable.

---

## **Objetivo**
El objetivo de este cuaderno es aprender sentencias pyspark para el preprocesamiento de los datos a través de ejemplos básicos de diferentes técnicas utilizadas en el medio.

---

## **Metodología**
La metodología seguida en esta práctica se estructura en varias etapas orientadas al tratamiento de los datos mediante PySpark:

* **Creación de un dataframe de prueba:** Para entender el flujo básico del preprocesamiento de una forma sencilla.

* **Identificación y tratamiento de valores faltantes:** Se exploran distintas técnicas aplicables para el tratamiento de valores faltantes en los datos.

* **Limpieza de datos:** se identifican y eliminan valores nulos o duplicados con funciones como dropna() y dropDuplicates().

* **Pivoting y Explode:** Se hace uso de estas técnicas para reorganizar los datos de una forma más coherente.

* **Normalización de datos:** Para obtener una mayor comprensión sobre los datos, se aplica normalización en algunas de las variables del estudio.

## Identificación y tratamiento de valores faltantes

In [0]:
df = spark.createDataFrame(
[
('Store 1',1,448),
('Store 1',2,None),
('Store 1',3,499),
('Store 1',44,432),
(None,None,None),
('Store 2',1,355),
('Store 2',1,355),
('Store 2',None,345),
('Store 2',3,387),
('Store 2',4,312),
],
['Store','WeekInMonth','Revenue']
)

## **Indentificación**
A continuación se hace la identificación de valores nulos abarcando diferentes enfoques siendo estos:
- Filtrar los valores nulos de una sola columna
- Filtrar los valores nulos de todo el dataframe


In [0]:
#En este caso se hace el filtrado de valores nulos únicamente para la columna 'Revenue' y se imprimen por pantalla
display(df.filter(df.Revenue.isNull()))

Store,WeekInMonth,Revenue
Store 1,2.0,
,,


Como se pudo evidenciar, la función `filter()` en este caso retorna los registros del dataframe en los que el valor de la columna `Revenue`tiene por valor "null".

Ahora, si se quisiera contar la cantidad de resitros nulos por cada una de las columnas, tendría que ejecutarse algo como lo siguiente:

In [0]:
#En este caso, el filtrado se hace para todas las columnas a través de una consulta SQL, se cuenta la cantidad de registros nulos por cada una y posteriormente se imprimen por pantalla

from pyspark.sql.functions import count, when, isnull
display(df.select(
[count(when(isnull(c), c)).alias(c) for c in df.columns]
))

Store,WeekInMonth,Revenue
1,2,2


##Tratamiento

### 1. Eliminado registros con valores faltantes

Para aplicar la eliminación de registros con valores faltantes, al igual que en el apartado anterior, se puede hacer de varias formas dependiendo el enfoque necesario.

Un posible enfoque podría ser el que se presenta a continuación, donde a través de la función `dropna()` se hace la eliminación de **todos los registros que contengan, en cualquiera de las columnas, un valor nulo:**

In [0]:
#Se eliminan del dataframe todos los registros con algún valor nulo en cualquiera de las columnas que lo integran
df2 = df.dropna()
display(df2)

Store,WeekInMonth,Revenue
Store 1,1,448
Store 1,3,499
Store 1,44,432
Store 2,1,355
Store 2,1,355
Store 2,3,387
Store 2,4,312


Como se puede ver, al aplicar lo anterior ya no se muestran más aquellos registros que contenían uno o más valores nulos y en su lugar, se conservan en el dataframe todos los registros que si contenían información completa.

Otro posible enfoque (según lo requiera el caso) podría ser el de eliminar únicamente los registros que contengan valores nulos en **todas sus columnas** como se muestra a continuación:

In [0]:
#Se hace la eliminación de los registros que contienen valores nulos en todas sus columnas con el argumento 'all' en la función dropna()
df2 = df.dropna('all')
display(df2)

Store,WeekInMonth,Revenue
Store 1,1.0,448.0
Store 1,2.0,
Store 1,3.0,499.0
Store 1,44.0,432.0
Store 2,1.0,355.0
Store 2,1.0,355.0
Store 2,,345.0
Store 2,3.0,387.0
Store 2,4.0,312.0


### 2. Sustituyendo por un valor

Otra posible alternativa para el tratamiento de datos nulos aparte de eliminarlos direcatamente, es reemplazarlos con otros valores. Esto se puede aplicar únicamente **dependiendo** de si tiene sentido conceptual aplicar estas sustituciones, ya que existen muchos casos en los cuáles si reemplazamos los nulos con otro valor, los datos pierden sentido e integridad, dando lugar a confusiones

There is one important thing to note about fillna – it’ll only do the exchange
operation for matching column types. So if you use a numeric value for a string column
or the other way around, it won’t work.

Una posible forma de abordar este reemplazo es definiendo directamente un valor por el cuál serán reemplazados todos los valores nulos a través de la función `fillna()`. Algo importante a tener en cuenta es que esta función solo nos permite hacer el reemplazo **si el tipo de variable del valor que se le proporciona como argumento coincide con el de la columna**, es decir que por ejemplo, si se proporciona como argumento de `fillna()` un valor numérico, el reemplazo solo será efectuado en las columnas que sean de tipo numérico, de otra forma, no se aplicará.

Para demostrar lo anteriormente explicado, a continuación se indica de tres formas distintas como se puede realizar el reemplazo:

In [0]:
#Se intenta hacer el reemplazo de TODOS los valores nulos del dataframe por 0
display(df.fillna(0))

#Se hace el reemplazo de los valores nulos de la columna 'Revenue' por 0
display(df.fillna(0, ['Revenue']))

#Se hace el reemplazo de los valores nulos de la columna 'WeekInMonth' por el número 2 y de la columna 'Revenue' por el número 3
display(df.fillna({'WeekInMonth' : 2, 'Revenue' : 3}))

Store,WeekInMonth,Revenue
Store 1,1,448
Store 1,2,0
Store 1,3,499
Store 1,44,432
,0,0
Store 2,1,355
Store 2,1,355
Store 2,0,345
Store 2,3,387
Store 2,4,312


Store,WeekInMonth,Revenue
Store 1,1.0,448
Store 1,2.0,0
Store 1,3.0,499
Store 1,44.0,432
,,0
Store 2,1.0,355
Store 2,1.0,355
Store 2,,345
Store 2,3.0,387
Store 2,4.0,312


Store,WeekInMonth,Revenue
Store 1,1,448
Store 1,2,3
Store 1,3,499
Store 1,44,432
,2,3
Store 2,1,355
Store 2,1,355
Store 2,2,345
Store 2,3,387
Store 2,4,312


De los resultados obtenidos de la aplicación de la función `fillna()` sobre el dataframe se puede hacer mención de ciertos detalles:

- Para el primer uso (el intento de reemplazar todos los valores nulos de todo el dataframe por 0), se puede notar a simple vista que el cambio fue efectuado en las columnas `WeekInMonth` y `Revenue` que poseen valores numéricos, pero en el caso de la columna `Store`, el reemplazo no surte efecto debido a que el tipo de variable de esta columna no coincide con el del valor por el cual se intentó reemplazar.
- Para el segundo uso (el reemplazo de todos los valores nulos de la columna Revenue) se efectúa el reemplazo de todos los nulos por cero mientras las demás columnas mantienen sus valores nulos.
- Para el último uso (reemplazo de todos los valores nulos de las columnas `WeekInMonth` y `Revenue`) se efectúa el cambio correctamente, mientras la columna restante mantiene sus valores nulos.

###3. Sustituyendo con la media

Otra forma de abordar el tratamiento de valores nulos es hacer la sustitución por la media. A esta técnica se le llama **imputación por la media** y consiste en reemplazar los valores nulos de una variable numérica por el promedio de los valores existentes en esa misma columna. **Tiene sentido aplicarla cuando los datos faltantes son pocos**, se asumen como aleatorios (es decir, no siguen un patrón sistemático) y la variable no presenta una gran dispersión ni valores atípicos que distorsionen la media. Es una estrategia simple y útil para mantener el tamaño del conjunto de datos sin introducir sesgos significativos.

Para aplicar esta técnica, a continuación se hace el cálculo de la media para una de las columnas del dataframe y posteriormente se aplica el reemplazo con la función `fillna()`

In [0]:
#Se realiza el cálculo de la media para la columna Revenue a través de la funcón mean
from pyspark.sql.functions import mean
df.select(mean(df.Revenue)).show()

+------------+
|avg(Revenue)|
+------------+
|     391.625|
+------------+



Una vez calculado el valor promedio de la columna, se procede a realizar el reemplazo de los valores nulos por ese valor:

In [0]:
#Se realiza el reemplazo de los valores nulos de la columna 'Revenue' por la media de la columna
display(df.fillna(391.625, ['Revenue']))

Store,WeekInMonth,Revenue
Store 1,1.0,448
Store 1,2.0,391
Store 1,3.0,499
Store 1,44.0,432
,,391
Store 2,1.0,355
Store 2,1.0,355
Store 2,,345
Store 2,3.0,387
Store 2,4.0,312


## Eliminando duplicados

Otra técnica de preprocesamiento de datos es la eliminación de duplicados. En este caso es posible aplicar esta técnica a través de la función `dropDuplicates()` como se evidencia a continuación:

In [0]:
#Eliminación de registros duplicados
display(df.dropDuplicates())

Store,WeekInMonth,Revenue
Store 1,1.0,448.0
Store 1,2.0,
Store 1,3.0,499.0
Store 1,44.0,432.0
,,
Store 2,1.0,355.0
Store 2,,345.0
Store 2,3.0,387.0
Store 2,4.0,312.0


En este caso se evidencia que en efecto se eliminó el único registro duplicado, el cuál tenía asociados los valores 'Store 2', '1', '355'.

Otra forma de obtener el mismo resultado que se vió anteriormente es utilizar la función `dropDuplicates()` para eliminar los registros que tengan la misma combinación de valores en las columnas  `WeekInMonth` y `Revenue`, es decir, que existan 2 registros o más con los mismos valores exactamente en esas dos columnas:

In [0]:
display(df.dropDuplicates(['Store','WeekInMonth']))

Store,WeekInMonth,Revenue
Store 1,1.0,448.0
Store 1,2.0,
Store 1,3.0,499.0
Store 1,44.0,432.0
,,
Store 2,1.0,355.0
Store 2,,345.0
Store 2,3.0,387.0
Store 2,4.0,312.0


En este caso, también se eliminó el registro del cuál ya se había hecho mención antes ya que con la función se detectaron dos registros con exactamente el valor '1' en la columna  `WeekInMonth` y  el valor '355' en la columna `Revenue`.

## Eliminando columnas

Así como es posible eliminar registros, también existe otra técnica de preprocesamiento de datos que permite eliminar columnas (bien puede ser que no son relevantes para el estudio, poseen una cantidad considerable de registros nulos, etc).

A continuación se aplica esta técnica a través del uso de la función `drop()`:

In [0]:
#Se elimina la columna 'Revenue'
display(df.drop('Revenue'))

#Se eliminan las columnas 'Revenue' y 'Store'
display(df.drop('Revenue','Store'))

Store,WeekInMonth
Store 1,1.0
Store 1,2.0
Store 1,3.0
Store 1,44.0
,
Store 2,1.0
Store 2,1.0
Store 2,
Store 2,3.0
Store 2,4.0


WeekInMonth
1.0
2.0
3.0
44.0
""
1.0
1.0
""
3.0
4.0


En este caso se ejemplifica la eliminación de columnas con una sola eliminación y dos eliminaciones, las cuáles se efectuaron correctamente.

## Identificando y resolviendo valores inconsistentes

Una técnica más de preprocesamiento de datos es la detectacción y evaluación de valores inconsistentes o atípicos en los datos (por ejemplo, valores numéricos negativos o desproporcionadamente altos).
Una vez identificados, el siguiente paso suele ser corregirlos, eliminarlos o imputarlos según la naturaleza del análisis.

A continuación se ve un ejemplo de como podría aplicarse esta técnica:

In [0]:
#Se revisan los valores de los registros del dataframe
display(df)

Store,WeekInMonth,Revenue
Store 1,1.0,448.0
Store 1,2.0,
Store 1,3.0,499.0
Store 1,44.0,432.0
,,
Store 2,1.0,355.0
Store 2,1.0,355.0
Store 2,,345.0
Store 2,3.0,387.0
Store 2,4.0,312.0


Luego de revisar los datos contenidos en el dataframe, se puede hacer un análisis descriptivo de cierto grupo de registros específico en busca de valores atípicos. Para este ejemplo se hace sobre la columna Store y el grupo de registros cuyo valor en esa columna es Store 1

In [0]:
#Se filtra los registros que contienen el valor 'Store 1' en la columna 'Store' y se realiza un análisis descriptivo de los mismos
display(df.filter(df.Store == 'Store 1').describe())

summary,Store,WeekInMonth,Revenue
count,4,4.0,3.0
mean,,12.5,459.6666666666667
stddev,,21.01586702153082,34.99047489436709
min,Store 1,1.0,432.0
max,Store 1,44.0,499.0


Esto dará el valor en un cuantil dado, en el intervalo de 0 a 1. Por lo tanto, si establece el segundo argumento en 0.0, obtendrá el valor más bajo para la columna. Con 1.0 obtienes el valor más alto. En el medio tienes la mediana, que es lo que se está buscando:

In [0]:
print(df.approxQuantile('Revenue', [0.5], 0))

[355.0]


## Pivot

A veces, desea cambiar sus datos de filas a columnas. La función se llama pivotar y está disponible en Pyspark.

Básicamente, estás rotando los datos alrededor de un eje determinado, de ahí el nombre.

En este caso, ese eje son los datos en una de sus columnas.

Para ejemplificar esto, se realizará un pivotado sobre el DataFrame original para que las tiendas (Store) se conviertan en columnas y las semanas del mes (WeekInMonth) en filas.

De esta manera, los ingresos (Revenue) se agregan y reorganizan por tienda y semana, obteniendo una estructura más adecuada para el análisis y visualización de patrones de venta.

In [0]:
#Se realizan agregaciones por medio de la función groupBy y pivot para obtener el total de ventas por semana
df_pivoted = df.groupBy('WeekInMonth').pivot('Store').sum('Revenue').orderBy('WeekInMonth')
display(df_pivoted)

WeekInMonth,null,Store 1,Store 2
,,,345.0
1.0,,448.0,710.0
2.0,,,
3.0,,499.0,387.0
4.0,,,312.0
44.0,,432.0,


Posteriormente se realiza una consulta de agregación que tambien agrupa por pivote y WeekInMonth y luego suma el revenue. Esto se hace con el objetivo de comparar las agregaciones solas, con el pivote.

In [0]:
display(df
.groupBy('Store','WeekInMonth')
.sum('Revenue')
.orderBy('WeekInMonth'))

Store,WeekInMonth,sum(Revenue)
,,
Store 2,,345.0
Store 1,1.0,448.0
Store 2,1.0,710.0
Store 1,2.0,
Store 1,3.0,499.0
Store 2,3.0,387.0
Store 2,4.0,312.0
Store 1,44.0,432.0


Como se puede observar en las tablas resultantes, tanto la agregación como el pivotado se basan en el mismo principio de agrupar y resumir datos, pero sus objetivos son diferentes:

- Al utilizar únicamente la **agregación**, se obtiene el revenue total por tienda y semana, pero la estructura de la tabla permanece en formato vertical, lo que dificulta identificar patrones o realizar comparaciones directas entre tiendas.  
- En cambio, con el **pivotado**, los datos se reorganizan en un formato horizontal, permitiendo observar de forma más clara el comportamiento del revenue de cada tienda por semana, facilitando el análisis visual y la detección de tendencias.

Adicionalmente, se puede aplicar la operación inversa al pivotado, conocida como unpivot o stack.  
Este proceso convierte las columnas generadas por el pivote en filas, devolviendo los datos a un formato vertical.  

En este caso la función stack() permite reorganizar los valores de las columnas de cada tienda en un solo atributo común llamado Store, con su respectivo Revenue.

In [0]:
display(df_pivoted.withColumnRenamed('Store 1','Store1')
        .withColumnRenamed('Store 2','Store2')
        .selectExpr('WeekInMonth',"stack(2, 'Store 1', Store1, 'Store 2', Store2) as (Store,Revenue)"))

WeekInMonth,Store,Revenue
,Store 1,
,Store 2,345.0
1.0,Store 1,448.0
1.0,Store 2,710.0
2.0,Store 1,
2.0,Store 2,
3.0,Store 1,499.0
3.0,Store 2,387.0
4.0,Store 1,
4.0,Store 2,312.0


## Explode

Hay otra situación con la que te encontrarás de vez en cuando. A veces llegan varios puntos de datos juntos en una columna. Esto usual cuando JSON es el formato de origen.

Puede resolver este problema utilizando el comando de Explode. Tomará la cadena con varios valores y los colocará en una fila cada uno.

Para ejemplificar esto, se crea un dataframe en el que se tiene una columan que tiene como datos listas de relojes.

Al usar explode cada elemento de la lista watches se convierte en una fila independiente, lo que facilita análisis posteriores como conteos, agrupaciones o cálculos estadísticos por tipo de reloj.

In [0]:
from pyspark.sql.functions import explode
df = spark.createDataFrame([
(1, ['Rolex','Patek','Jaeger']),
(2, ['Omega','Heuer']),
(3, ['Swatch','Rolex'])],
('id','watches'))
display(df.withColumn('watches',explode(df.watches)))

id,watches
1,Rolex
1,Patek
1,Jaeger
2,Omega
2,Heuer
3,Swatch
3,Rolex


## Normalización

Antes de aplicar las técnicas de **normalización**, es necesario verificar la existencia del conjunto de datos que se va a utilizar.  
El comando `%fs ls` lista los archivos disponibles en la ruta especificada dentro del sistema de archivos de Databricks (DBFS), en este caso `/databricks-datasets/definitive-guide/data/simple-ml-scaling`.

In [0]:
%fs ls /databricks-datasets/definitive-guide/data/simple-ml-scaling

path,name,size,modificationTime
dbfs:/databricks-datasets/definitive-guide/data/simple-ml-scaling/_SUCCESS,_SUCCESS,0,1596560320000
dbfs:/databricks-datasets/definitive-guide/data/simple-ml-scaling/part-00000-cd03406a-cc9b-42b0-9299-1e259fdd9382-c000.gz.parquet,part-00000-cd03406a-cc9b-42b0-9299-1e259fdd9382-c000.gz.parquet,1663,1596560320000


Una vez verificada la disponibilidad del conjunto de datos, se procede con su carga utilizando el formato `parquet`.  
El comando `spark.read.parquet()` permite leer archivos almacenados en este formato optimizado, generando un DataFrame distribuido en Spark.

In [0]:
scaleDF = spark.read.parquet("/databricks-datasets/definitive-guide/data/simple-ml-scaling")
display(scaleDF)

id,features
0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""1.0"",""0.1"",""-1.0""]}"
1,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""2.0"",""1.1"",""1.0""]}"
0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""1.0"",""0.1"",""-1.0""]}"
1,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""2.0"",""1.1"",""1.0""]}"
1,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""3.0"",""10.1"",""3.0""]}"


Ahora se procede a realizar la normalización de escala Min–Max, una técnica que ajusta los valores numéricos de una característica a un nuevo rango definido.  
La función `MinMaxScaler` de MLlib calcula los valores mínimo y máximo del conjunto y reescala cada valor proporcionalmente dentro del intervalo definido mediante `setMin()` y `setMax()` (en este caso [5, 10]).

In [0]:
from pyspark.ml.feature import MinMaxScaler
minMax = MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
fittedminMax = minMax.fit(scaleDF)
fittedminMax.transform(scaleDF).show()

+---+--------------+---------------------------------+
| id|      features|MinMaxScaler_e51f876d3730__output|
+---+--------------+---------------------------------+
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  1|[3.0,10.1,3.0]|                 [10.0,10.0,10.0]|
+---+--------------+---------------------------------+



Ahora, se cambia la normalización por **normalización estadística** o **estandarización Z-Score**, utilizando el transformador `StandardScaler` de MLlib.  

Esta técnica ajusta los datos restando la media y dividiendo por la desviación estándar de cada atributo, de forma que los valores finales tengan media igual a 0 y varianza unitaria (desviación estándar igual a 1).

A diferencia del escalado Min–Max, este método no define un rango fijo, sino que centra los datos y los escala estadísticamente para que todas las variables tengan la misma magnitud relativa.  

De esta manera, se reduce el efecto de las diferentes escalas y unidades de medida entre atributos, aunque no elimina completamente la influencia de los valores atípicos, ya que estos aún afectan el cálculo de la media y la desviación estándar.

In [0]:
from pyspark.ml.feature import StandardScaler
sScaler = StandardScaler().setInputCol("features")
sScaler.fit(scaleDF).transform(scaleDF).show()

+---+--------------+-----------------------------------+
| id|      features|StandardScaler_0cb5e52fc5c7__output|
+---+--------------+-----------------------------------+
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  1|[3.0,10.1,3.0]|               [3.58568582800318...|
+---+--------------+-----------------------------------+



##Conclusiones

- El preprocesamiento de datos es una fase crítica en cualquier proyecto de Machine Learning, ya que garantiza la calidad, consistencia y coherencia de la información antes de su análisis o modelado. Sin una limpieza adecuada, los resultados de los modelos pueden ser imprecisos o incluso erróneos.

- El uso de PySpark facilita el manejo de grandes volúmenes de datos gracias a su procesamiento distribuido, permitiendo realizar operaciones de limpieza, transformación y análisis de forma paralela, eficiente y escalable dentro de entornos como Databricks.

- Las funciones dropna() y dropDuplicates() resultan fundamentales para la eliminación de valores nulos y duplicados, mientras que métodos como filter() y approxQuantile() permiten identificar valores inconsistentes y analizar la distribución de los datos.

- El tratamiento de valores inconsistentes mediante estadísticas descriptivas y cuantiles contribuye a detectar posibles outliers o errores en los registros, lo que asegura una mayor confiabilidad en el conjunto final de datos utilizado para el modelado.

- La práctica permitió comprender el flujo completo de preparación de datos en PySpark, desde la importación y exploración inicial hasta la limpieza, transformación y validación final, consolidando así los fundamentos para aplicar técnicas de análisis y aprendizaje automático sobre datos reales a gran escala.