### 🛠️ Instalación y configuración de PySpark
Instalamos la librería `pyspark` y creamos una `SparkSession` que nos permitirá trabajar con procesamiento distribuido en tiempo real.

In [1]:
# ✅ Instalación de PySpark en Google Colab
!pip install -q pyspark

# ✅ Crear SparkSession con soporte a Structured Streaming
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ProyectoMetroSantiago").config("spark.executor.memory", "1g").config("spark.driver.memory", "1g").config("spark.sql.shuffle.partitions","2").getOrCreate()

print("Spark version:", spark.version)

Spark version: 3.5.1


### 📁 Simulación de archivos en tiempo real
Creamos una carpeta `stream_data` y copiamos 3 versiones del archivo para simular eventos que llegan en tiempo real.

In [2]:
# ✅ Simulación de lectura de archivos tipo stream
# Usaremos un folder simulado con varias copias del CSV como si fueran eventos en tiempo real

import shutil
import os

# Crear carpetas simuladas
os.makedirs("stream_data", exist_ok=True)

# Subir el CSV base al entorno de Colab (ya cargado previamente)
uploaded_path = "/content/metro_santiago.csv"

# Simular datos que llegan en "tiempo real" (3 archivos)
for i in range(1, 4):
    shutil.copy(uploaded_path, f"stream_data/lote{i}.csv")

### 🧱 Definición del esquema y lectura de archivos como stream
Evitamos inferencia automática del esquema y configuramos la lectura continua desde la carpeta `stream_data`.


In [3]:
# ✅ Definir esquema para evitar inferencia en modo streaming
from pyspark.sql.types import StructType, StringType, IntegerType

schema = (StructType()
    .add("id_evento", IntegerType())
    .add("linea", StringType())
    .add("estacion", StringType())
    .add("tipo_incidente", StringType())
    .add("duracion_minutos", IntegerType())
    .add("hora", StringType())
)

df_stream = (
    spark.readStream
         .schema(schema)
         .option("sep", ",")
         .option("header", "true")          # tu CSV tiene encabezado
         .option("maxFilesPerTrigger", 1)   # ver micro-batches claros
         .csv("stream_data")
         .select("estacion", "linea", "tipo_incidente", "duracion_minutos")   # usa solo lo necesario
)

### 🔍 Verificación del esquema del CSV en lectura batch
Leemos los archivos en modo tradicional para confirmar que el esquema es correcto y los datos son válidos antes de procesarlos como streaming.

In [4]:
df_test = spark.read.schema(schema).option("header","true").csv("stream_data")
df_test.printSchema()
df_test.show(2)

root
 |-- id_evento: integer (nullable = true)
 |-- linea: string (nullable = true)
 |-- estacion: string (nullable = true)
 |-- tipo_incidente: string (nullable = true)
 |-- duracion_minutos: integer (nullable = true)
 |-- hora: string (nullable = true)

+---------+-----+-----------+---------------+----------------+-----+
|id_evento|linea|   estacion| tipo_incidente|duracion_minutos| hora|
+---------+-----+-----------+---------------+----------------+-----+
|        1|   L2|   Lo Prado|Falla eléctrica|              15|11:45|
|        2|   L5|Irarrázaval|     Evacuación|              15|20:00|
+---------+-----+-----------+---------------+----------------+-----+
only showing top 2 rows



### 📊 Transformación: conteo de incidentes por tipo
Agrupamos el DataFrame de streaming para contar cuántos eventos hay por cada tipo de incidente. Simplificamos la salida reduciendo particiones.

In [5]:
# 🔄 Transformación: total de incidentes por tipo
from pyspark.sql.functions import count

df_agrupado = (
    df_stream.groupBy("tipo_incidente")
             .agg(count("*").alias("total_incidentes"))
)

# (Opcional) Menos particiones para salida más limpia en consola
spark.conf.set("spark.sql.shuffle.partitions", "1")

### 🖨️ Impresión de resultados con foreachBatch
Mostramos los resultados de cada micro-batch en consola utilizando una función personalizada. Ideal para entornos como Google Colab.

In [6]:
# Arranca el foreachBatch y espera a que procese
import shutil
shutil.rmtree("/tmp/chk_metro_fb", ignore_errors=True)

def print_batch(df, epoch_id):
    df.orderBy("tipo_incidente").show(n=50, truncate=False)

query = (
    df_agrupado.writeStream
        .foreachBatch(print_batch)
        .outputMode("complete")
        .option("checkpointLocation", "/tmp/chk_metro_fb")
        .trigger(processingTime="2 seconds")
        .start()
)

query.awaitTermination(10)   # <-- SIN ESTO no verás nada
query.stop()

+------------------------+----------------+
|tipo_incidente          |total_incidentes|
+------------------------+----------------+
|Evacuación              |8               |
|Falla eléctrica         |14              |
|Interrupción de servicio|11              |
|Mantenimiento           |12              |
|Retraso                 |5               |
+------------------------+----------------+

+------------------------+----------------+
|tipo_incidente          |total_incidentes|
+------------------------+----------------+
|Evacuación              |16              |
|Falla eléctrica         |28              |
|Interrupción de servicio|22              |
|Mantenimiento           |24              |
|Retraso                 |10              |
+------------------------+----------------+

+------------------------+----------------+
|tipo_incidente          |total_incidentes|
+------------------------+----------------+
|Evacuación              |24              |
|Falla eléctrica         |42  

### 🔍 Verificación: ¿Estamos realmente en modo streaming?
Confirmamos que los DataFrames están siendo tratados como flujos continuos (`streaming=True`).

In [7]:
# A) ¿El DataFrame es realmente de streaming?
print("df_stream.isStreaming:", df_stream.isStreaming)
print("df_agrupado.isStreaming:", df_agrupado.isStreaming)

df_stream.isStreaming: True
df_agrupado.isStreaming: True


### 📂 Verificación: archivos disponibles para el stream
Listamos el contenido de la carpeta `stream_data` para ver si aún hay datos esperando ser procesados.

In [8]:
# B) ¿Hay archivos por procesar en stream_data?
import os
print(os.listdir("stream_data"))

['lote2.csv', 'lote1.csv', 'lote3.csv']


### 🧾 Confirmación del esquema de df_stream
Imprimimos la estructura del DataFrame en streaming para asegurar que el esquema definido fue correctamente aplicado.

In [9]:
# C) ¿El esquema de df_stream es el esperado?
df_stream.printSchema()

root
 |-- estacion: string (nullable = true)
 |-- linea: string (nullable = true)
 |-- tipo_incidente: string (nullable = true)
 |-- duracion_minutos: integer (nullable = true)

