<a href="https://colab.research.google.com/github/apchavezr/-Analisis_Grandes_Volumenes_Datos/blob/main/etl_kafka_simulado_pyspark_(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transformación de datos con PySpark
Simulación de datos tipo Kafka y procesamiento distribuido con PySpark en Google Colab.

## Paso 1: Instalación de PySpark
Se instala la biblioteca `pyspark`, que es la interfaz de Apache Spark para Python. Este paso es necesario en Colab, ya que el entorno no la tiene instalada por defecto.

In [1]:
# Instalar PySpark en Colab
!pip install pyspark



## Paso 2: Inicialización de la sesión Spark
Aquí se crea una instancia de `SparkSession`, la puerta de entrada para utilizar Spark con DataFrames. Esta sesión nos permitirá realizar operaciones distribuidas sobre conjuntos de datos.

In [2]:
# Inicializar sesión de Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ETL_simulado") \
    .getOrCreate()

## Paso 3: Simulación de datos provenientes de Kafka
En lugar de conectar directamente a Kafka (que no está disponible en Colab), se crea un archivo JSON local con una estructura similar a los mensajes que Kafka transmitiría.
El archivo contiene eventos con distintos tipos, como `compra` y `consulta`, con valores numéricos asociados.

In [3]:
# Simular datos tipo Kafka (JSON)
data_json = """
[
  {"user_id": "u001", "event_type": "compra", "timestamp": "2025-04-21T10:00:00", "value": 1500},
  {"user_id": "u002", "event_type": "consulta", "timestamp": "2025-04-21T10:02:00", "value": 0},
  {"user_id": "u003", "event_type": "compra", "timestamp": "2025-04-21T10:03:00", "value": 950},
  {"user_id": "u004", "event_type": "compra", "timestamp": "2025-04-21T10:05:00", "value": 3000}
]
"""

with open("eventos_kafka.json", "w") as f:
    f.write(data_json)

## Paso 4: Lectura del archivo JSON
Se usa `spark.read.json()` para cargar el archivo JSON en un DataFrame. Esto simula la lectura de un flujo de datos de Kafka.
### Salida esperada:
Una tabla con columnas `user_id`, `event_type`, `timestamp`, `value`, que representa el conjunto original de eventos.

In [4]:
# Leer JSON simulado
df_eventos = spark.read.json("eventos_kafka.json")
df_eventos.show()

+---------------+----------+-------------------+-------+-----+
|_corrupt_record|event_type|          timestamp|user_id|value|
+---------------+----------+-------------------+-------+-----+
|              [|      NULL|               NULL|   NULL| NULL|
|           NULL|    compra|2025-04-21T10:00:00|   u001| 1500|
|           NULL|  consulta|2025-04-21T10:02:00|   u002|    0|
|           NULL|    compra|2025-04-21T10:03:00|   u003|  950|
|           NULL|    compra|2025-04-21T10:05:00|   u004| 3000|
|              ]|      NULL|               NULL|   NULL| NULL|
+---------------+----------+-------------------+-------+-----+



## Paso 5: Filtrado de eventos relevantes
Se aplica un filtro para conservar únicamente los eventos de tipo `compra` cuyo valor sea superior a 1000. Esto es una forma básica de limpieza de datos.
### Salida esperada:
Un subconjunto del DataFrame original, que solo incluye las compras importantes.

In [5]:
# Filtrar eventos de tipo 'compra' con valor > 1000
df_filtrado = df_eventos.filter((df_eventos.event_type == "compra") & (df_eventos.value > 1000))
df_filtrado.show()

+---------------+----------+-------------------+-------+-----+
|_corrupt_record|event_type|          timestamp|user_id|value|
+---------------+----------+-------------------+-------+-----+
|           NULL|    compra|2025-04-21T10:00:00|   u001| 1500|
|           NULL|    compra|2025-04-21T10:05:00|   u004| 3000|
+---------------+----------+-------------------+-------+-----+



## Paso 6: Clasificación por categorías
Se agrega una nueva columna `categoria` que clasifica las compras según su valor:
- `alta`: compras mayores o iguales a 3000
- `media`: compras entre 1500 y 2999
- `baja`: compras de 1000 a 1499 (aunque no se genera en este ejemplo)
### Salida esperada:
Un DataFrame enriquecido con una columna adicional que indica la categoría de la compra.

In [6]:
# Clasificar por categoría
from pyspark.sql.functions import when

df_enriquecido = df_filtrado.withColumn(
    "categoria",
    when(df_filtrado.value >= 3000, "alta")
    .when(df_filtrado.value >= 1500, "media")
    .otherwise("baja")
)

df_enriquecido.show()

+---------------+----------+-------------------+-------+-----+---------+
|_corrupt_record|event_type|          timestamp|user_id|value|categoria|
+---------------+----------+-------------------+-------+-----+---------+
|           NULL|    compra|2025-04-21T10:00:00|   u001| 1500|    media|
|           NULL|    compra|2025-04-21T10:05:00|   u004| 3000|     alta|
+---------------+----------+-------------------+-------+-----+---------+



## Paso 7: Almacenamiento en formato Parquet
El DataFrame resultante se guarda en el sistema de archivos local en formato Parquet. Este formato es columnar y altamente eficiente, ideal para grandes volúmenes de datos.

In [9]:
# Guardar en formato Parquet
df_enriquecido.write.mode("overwrite").parquet("compras_filtradas")

## Paso 8: Lectura de verificación
Se lee el archivo Parquet guardado anteriormente para verificar que los datos fueron correctamente transformados y almacenados.
### Salida esperada:
Un DataFrame que contiene únicamente las compras filtradas, con su respectiva categoría.

In [10]:
# Leer los datos almacenados para verificación
df_verificacion = spark.read.parquet("/tmp/compras_filtradas")
df_verificacion.show()

+---------------+----------+-------------------+-------+-----+---------+
|_corrupt_record|event_type|          timestamp|user_id|value|categoria|
+---------------+----------+-------------------+-------+-----+---------+
|           NULL|    compra|2025-04-21T10:00:00|   u001| 1500|    media|
|           NULL|    compra|2025-04-21T10:05:00|   u004| 3000|     alta|
+---------------+----------+-------------------+-------+-----+---------+



## Paso 9: Finalización de la sesión
Se cierra la sesión de Spark para liberar los recursos utilizados en el procesamiento distribuido.

In [11]:
# Cierre de sesión
spark.stop()