# Procesamiento de Datos a Gran Escala: Polars y PySpark

## Introducción

En el mundo actual del Data Engineering, el procesamiento eficiente de grandes volúmenes de datos se ha convertido en una necesidad crítica. A medida que los conjuntos de datos crecen exponencialmente, las herramientas tradicionales como Pandas comienzan a mostrar limitaciones significativas. En este contexto, han surgido alternativas más potentes como Polars y PySpark, diseñadas específicamente para abordar los desafíos del procesamiento de datos a gran escala.

Este notebook educativo está diseñado para proporcionar una comprensión profunda de estas tecnologías, sus ventajas sobre Pandas, y cómo implementarlas efectivamente en flujos de trabajo de ETL (Extracción, Transformación y Carga) modernos.

Exploraremos:
1. Definiciones y conceptos fundamentales de Pandas, Polars y PySpark
2. Análisis comparativo detallado entre estas tecnologías
3. Sintaxis y patrones de uso comunes
4. Implementación de un ETL completo utilizando el dataset de taxis de Nueva York
5. Ejercicios prácticos para consolidar el aprendizaje

Comencemos con las definiciones básicas de cada tecnología.

## Definiciones

### Pandas

Pandas es una biblioteca de análisis de datos de código abierto para Python que proporciona estructuras de datos flexibles y herramientas para trabajar con datos estructurados. Creada por Wes McKinney en 2008, Pandas se ha convertido en una herramienta estándar en el ecosistema de ciencia de datos de Python.

**Características principales:**
- **DataFrame**: Estructura de datos bidimensional similar a una tabla de base de datos o una hoja de cálculo.
- **Series**: Estructura de datos unidimensional similar a un array o lista etiquetada.
- **Manipulación de datos**: Funciones para filtrar, transformar, agregar y visualizar datos.
- **Manejo de datos faltantes**: Herramientas para detectar y manejar valores nulos.
- **Integración con otras bibliotecas**: Funciona bien con NumPy, Matplotlib, Scikit-learn, etc.

**Limitaciones:**
- **Rendimiento en memoria**: Pandas carga todos los datos en memoria, lo que limita su capacidad para manejar conjuntos de datos muy grandes.
- **Procesamiento en un solo núcleo**: Por defecto, Pandas utiliza un solo núcleo para el procesamiento, lo que limita su velocidad en operaciones complejas.
- **Consumo de memoria**: Pandas puede consumir hasta 5-10 veces más memoria que el tamaño original de los datos debido a su estructura interna.

**Casos de uso ideales:**
- Análisis exploratorio de datos de tamaño pequeño a mediano (hasta unos pocos GB).
- Limpieza y transformación de datos para conjuntos de datos que caben en memoria.
- Visualización rápida y análisis estadístico de datos estructurados.
- Prototipado rápido de flujos de trabajo de datos.

### Polars

Polars es una biblioteca de manipulación de datos de alto rendimiento implementada en Rust con enlaces a Python. Creada por Ritchie Vink en 2020, Polars está diseñada para ser una alternativa más rápida y eficiente a Pandas, especialmente para conjuntos de datos grandes.

**Características principales:**
- **Basada en Apache Arrow**: Utiliza el formato de memoria columnar de Arrow para un procesamiento eficiente.
- **Ejecución perezosa (lazy)**: Permite optimizar consultas antes de ejecutarlas, similar a los motores de bases de datos modernos.
- **Paralelismo automático**: Aprovecha múltiples núcleos de CPU sin configuración adicional.
- **Operaciones vectorizadas**: Implementa operaciones optimizadas para procesamiento columnar.
- **API similar a Pandas**: Facilita la transición desde Pandas con una sintaxis familiar pero más consistente.
- **Gestión eficiente de memoria**: Consume significativamente menos memoria que Pandas para las mismas operaciones.

**Ventajas sobre Pandas:**
- **Rendimiento**: De 5 a 20 veces más rápido que Pandas en muchas operaciones comunes.
- **Eficiencia de memoria**: Utiliza menos memoria para las mismas operaciones.
- **Escalabilidad**: Puede manejar conjuntos de datos más grandes sin problemas.
- **Consistencia de API**: Diseño más coherente y predecible que Pandas.
- **Expresiones complejas**: Permite construir consultas complejas de manera más intuitiva.

**Casos de uso ideales:**
- Procesamiento de conjuntos de datos medianos a grandes (varios GB) que aún caben en la memoria de una sola máquina.
- ETL de alto rendimiento para datos estructurados.
- Análisis de datos que requieren operaciones complejas y rendimiento optimizado.
- Migración desde Pandas cuando se necesita mejor rendimiento sin cambiar a un framework distribuido.

### PySpark

PySpark es la API de Python para Apache Spark, un motor de procesamiento distribuido de código abierto diseñado para el procesamiento de datos a gran escala. Desarrollado originalmente en la Universidad de California, Berkeley, Spark se ha convertido en una herramienta estándar para el procesamiento de big data.

**Características principales:**
- **Procesamiento distribuido**: Permite distribuir el procesamiento de datos a través de múltiples nodos en un clúster.
- **Computación en memoria**: Mantiene los datos en memoria para un acceso más rápido durante iteraciones múltiples.
- **Tolerancia a fallos**: Reconstruye automáticamente los datos perdidos en caso de fallos de nodos.
- **Ecosistema completo**: Incluye módulos para SQL (Spark SQL), procesamiento de streaming (Structured Streaming), aprendizaje automático (MLlib) y análisis de grafos (GraphX).
- **DataFrame API**: Proporciona una abstracción similar a Pandas pero distribuida.
- **Optimizador Catalyst**: Optimiza automáticamente los planes de ejecución de consultas.

**Ventajas sobre Pandas:**
- **Escalabilidad horizontal**: Puede escalar a petabytes de datos distribuyendo el procesamiento en múltiples máquinas.
- **Procesamiento fuera de memoria**: No está limitado por la RAM disponible en una sola máquina.
- **Paralelismo masivo**: Aprovecha cientos o miles de núcleos en un clúster.
- **Integración con ecosistemas de big data**: Se integra fácilmente con HDFS, Hive, Kafka, etc.
- **Capacidades de streaming**: Permite procesar datos en tiempo real además de procesamiento por lotes.

**Casos de uso ideales:**
- Procesamiento de conjuntos de datos muy grandes (decenas de TB o más) que no caben en la memoria de una sola máquina.
- ETL a escala de big data.
- Análisis de datos distribuidos en tiempo real y por lotes.
- Aplicaciones de machine learning a gran escala.
- Procesamiento de datos en entornos de nube o clústeres on-premise.

## Arquitectura y Fundamentos Técnicos

Para entender mejor por qué Polars y PySpark superan a Pandas en ciertos escenarios, es importante comprender las diferencias fundamentales en su arquitectura y diseño técnico.

### Arquitectura de Pandas

Pandas está construido sobre NumPy y utiliza sus arrays como estructura de datos subyacente. Su arquitectura se caracteriza por:

- **Formato de almacenamiento en memoria**: Utiliza un formato de memoria basado en filas (aunque internamente usa arrays de NumPy que son columnares).
- **Modelo de ejecución**: Ejecución inmediata (eager execution) de operaciones.
- **Paralelismo**: Principalmente de un solo hilo, aunque algunas operaciones pueden usar múltiples núcleos con la opción `swifter` o `pandarallel`.
- **Gestión de memoria**: Carga completa de los datos en memoria RAM.
- **Optimización**: Limitada, principalmente a nivel de operaciones individuales.

### Arquitectura de Polars

Polars está implementado en Rust y utiliza Apache Arrow como formato de memoria subyacente. Su arquitectura se caracteriza por:

- **Formato de almacenamiento en memoria**: Formato columnar de Apache Arrow, que permite un acceso más eficiente a los datos y mejor utilización de la caché de CPU.
- **Modelo de ejecución**: Soporta tanto ejecución inmediata como ejecución perezosa (lazy execution) que permite optimizaciones globales.
- **Paralelismo**: Paralelismo automático a nivel de CPU, aprovechando todos los núcleos disponibles.
- **Gestión de memoria**: Gestión eficiente de memoria con menor overhead que Pandas.
- **Optimización**: Optimizador de consultas que puede reordenar y combinar operaciones para mejorar el rendimiento.

### Arquitectura de PySpark

PySpark es una interfaz de Python para Apache Spark, que está implementado en Scala y se ejecuta en la JVM. Su arquitectura se caracteriza por:

- **Arquitectura distribuida**: Modelo de computación distribuida con un nodo maestro (driver) y múltiples nodos trabajadores (executors).
- **RDD (Resilient Distributed Dataset)**: Abstracción fundamental que representa una colección inmutable de objetos distribuidos en un clúster.
- **DataFrame API**: Capa de abstracción sobre RDDs que proporciona una interfaz similar a SQL.
- **Modelo de ejecución**: Ejecución perezosa con optimización global de consultas mediante el optimizador Catalyst.
- **Gestión de memoria**: Combinación de almacenamiento en memoria y disco, con spilling automático a disco cuando la memoria es insuficiente.
- **Tolerancia a fallos**: Reconstrucción automática de datos perdidos mediante el seguimiento de linaje (lineage).
- **Paralelismo**: Paralelismo a nivel de clúster, con múltiples executors y múltiples núcleos por executor.

## Importando las Bibliotecas

Veamos cómo importar cada una de estas bibliotecas y verificar sus versiones:

In [3]:
import os
os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/openjdk-11.jdk/Contents/Home"


In [4]:
# Importar Pandas
import pandas as pd
print(f"Pandas version: {pd.__version__}")

# Importar Polars
import polars as pl
print(f"Polars version: {pl.__version__}")

# Importar PySpark
import pyspark
from pyspark.sql import SparkSession
print(f"PySpark version: {pyspark.__version__}")

# Crear una sesión de Spark
spark = SparkSession.builder \
    .appName("PySpark Introduction") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print("Todas las bibliotecas importadas correctamente.")

Pandas version: 2.2.3
Polars version: 1.26.0
PySpark version: 3.5.5


25/04/07 19:18:54 WARN Utils: Your hostname, MacBook-Air-de-Isabel.local resolves to a loopback address: 127.0.0.1; using 10.16.10.138 instead (on interface en0)
25/04/07 19:18:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/07 19:18:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Todas las bibliotecas importadas correctamente.


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 51442)
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/Users/isaromobru/Desktop/DS102024_/.venv/lib/python3.11/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/Users/isaromobru/Desktop/DS1020

## Creación de DataFrames Básicos

Veamos cómo crear un DataFrame simple en cada una de estas tecnologías:

In [5]:
# Datos de ejemplo
data = {
    'nombre': ['Ana', 'Carlos', 'María', 'Juan', 'Elena'],
    'edad': [25, 32, 28, 41, 37],
    'ciudad': ['Madrid', 'Barcelona', 'Sevilla', 'Valencia', 'Bilbao'],
    'salario': [35000, 42000, 38000, 45000, 51000]
}

# DataFrame en Pandas
df_pandas = pd.DataFrame(data)
print("DataFrame en Pandas:")
print(df_pandas)
print("\n")

# DataFrame en Polars
df_polars = pl.DataFrame(data)
print("DataFrame en Polars:")
print(df_polars)
print("\n")

# DataFrame en PySpark
df_spark = spark.createDataFrame(data=[(name, age, city, salary) 
                                      for name, age, city, salary in zip(data['nombre'], data['edad'], data['ciudad'], data['salario'])], # aqui se le asigna 
                                schema=['nombre', 'edad', 'ciudad', 'salario']) # aqui le dices que las columnas se llaman asi
print("DataFrame en PySpark:")
df_spark.show()

DataFrame en Pandas:
   nombre  edad     ciudad  salario
0     Ana    25     Madrid    35000
1  Carlos    32  Barcelona    42000
2   María    28    Sevilla    38000
3    Juan    41   Valencia    45000
4   Elena    37     Bilbao    51000


DataFrame en Polars:
shape: (5, 4)
┌────────┬──────┬───────────┬─────────┐
│ nombre ┆ edad ┆ ciudad    ┆ salario │
│ ---    ┆ ---  ┆ ---       ┆ ---     │
│ str    ┆ i64  ┆ str       ┆ i64     │
╞════════╪══════╪═══════════╪═════════╡
│ Ana    ┆ 25   ┆ Madrid    ┆ 35000   │
│ Carlos ┆ 32   ┆ Barcelona ┆ 42000   │
│ María  ┆ 28   ┆ Sevilla   ┆ 38000   │
│ Juan   ┆ 41   ┆ Valencia  ┆ 45000   │
│ Elena  ┆ 37   ┆ Bilbao    ┆ 51000   │
└────────┴──────┴───────────┴─────────┘


DataFrame en PySpark:


                                                                                

+------+----+---------+-------+
|nombre|edad|   ciudad|salario|
+------+----+---------+-------+
|   Ana|  25|   Madrid|  35000|
|Carlos|  32|Barcelona|  42000|
| María|  28|  Sevilla|  38000|
|  Juan|  41| Valencia|  45000|
| Elena|  37|   Bilbao|  51000|
+------+----+---------+-------+



25/04/08 04:37:59 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 3660427 ms exceeds timeout 120000 ms
25/04/08 04:37:59 WARN SparkContext: Killing executors is not supported by current scheduler.
25/04/08 04:38:00 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

## Conclusiones sobre Definiciones

Hemos explorado las definiciones y características fundamentales de Pandas, Polars y PySpark. Cada tecnología tiene sus propias fortalezas y casos de uso ideales:

- **Pandas** es excelente para análisis exploratorio rápido y conjuntos de datos pequeños a medianos que caben en memoria.
- **Polars** ofrece un rendimiento significativamente mejor que Pandas para conjuntos de datos medianos a grandes, manteniendo una API familiar pero más consistente.
- **PySpark** es la solución para procesamiento de datos verdaderamente grandes que requieren distribución en múltiples máquinas.

En las siguientes secciones, profundizaremos en un análisis comparativo detallado de estas tecnologías, explorando sus diferencias en términos de rendimiento, sintaxis y funcionalidades.