# 🚇 Módulo 8 – Streaming con PySpark (Google Colab)

## 1) Instalar y configurar Spark en Colab

In [1]:
!apt-get install -qq openjdk-11-jdk > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.7/spark-3.5.7-bin-hadoop3.tgz
!tar xf spark-3.5.7-bin-hadoop3.tgz
!pip -q install pyspark findspark

In [2]:
import os
os.environ['JAVA_HOME']='/usr/lib/jvm/java-11-openjdk-amd64'
os.environ['SPARK_HOME']='/content/spark-3.5.7-bin-hadoop3'
print('JAVA_HOME =', os.environ['JAVA_HOME'])
print('SPARK_HOME =', os.environ['SPARK_HOME'])

JAVA_HOME = /usr/lib/jvm/java-11-openjdk-amd64
SPARK_HOME = /content/spark-3.5.7-bin-hadoop3


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Mod8-Streaming').getOrCreate()
print('✅ Spark version:', spark.version)

✅ Spark version: 3.5.7


## 2) Preparar carpeta de entrada y datos de prueba

In [4]:
!mkdir -p /content/stream_inputs
import pandas as pd
from datetime import datetime, timedelta
now = datetime.now()
df = pd.DataFrame({'timestamp':[now, now+timedelta(minutes=1), now+timedelta(minutes=2)],'linea':['L1','L1','L2'],'estacion':['Los Héroes','Baquedano','Tobalaba'],'afluencia':[120,300,180]})
df.to_csv('/content/stream_inputs/batch1.csv', index=False)
print('✅ CSV de prueba creado')

✅ CSV de prueba creado


## 3) Lectura con Structured Streaming + agregación por ventanas (5 min)

In [5]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import col, window, sum as _sum
schema = StructType([StructField('timestamp', TimestampType(), True),StructField('linea', StringType(), True),StructField('estacion', StringType(), True),StructField('afluencia', IntegerType(), True)])
df_stream = (spark.readStream.schema(schema).option('maxFilesPerTrigger', 1).csv('/content/stream_inputs'))
agg = (df_stream.withWatermark('timestamp','10 minutes').groupBy(window(col('timestamp'),'5 minutes'), col('estacion')).agg(_sum('afluencia').alias('pasajeros')))
query = (agg.writeStream.outputMode('update').format('console').option('truncate', False).start())
print('▶️ Streaming iniciado'); query.awaitTermination(20); query.stop(); print('⏹️ Streaming detenido')

▶️ Streaming iniciado
⏹️ Streaming detenido


## 4) Clasificación binaria (hora punta) – entrenamiento offline

In [6]:
hist = spark.createDataFrame([('Los Héroes',100,0),('Los Héroes',350,1),('Los Héroes',220,1),('Baquedano',90,0),('Baquedano',310,1),('Baquedano',180,0),('Tobalaba',80,0),('Tobalaba',190,0),('Tobalaba',400,1)], ['estacion','pasajeros','hora_punta'])
hist.show()

+----------+---------+----------+
|  estacion|pasajeros|hora_punta|
+----------+---------+----------+
|Los Héroes|      100|         0|
|Los Héroes|      350|         1|
|Los Héroes|      220|         1|
| Baquedano|       90|         0|
| Baquedano|      310|         1|
| Baquedano|      180|         0|
|  Tobalaba|       80|         0|
|  Tobalaba|      190|         0|
|  Tobalaba|      400|         1|
+----------+---------+----------+



In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
label_col='hora_punta'
indexer=StringIndexer(inputCol='estacion', outputCol='estacion_idx', handleInvalid='keep')
encoder=OneHotEncoder(inputCols=['estacion_idx'], outputCols=['estacion_ohe'])
assembler=VectorAssembler(inputCols=['pasajeros','estacion_ohe'], outputCol='features_raw')
scaler=StandardScaler(inputCol='features_raw', outputCol='features')
lr=LogisticRegression(featuresCol='features', labelCol=label_col)
pipe=Pipeline(stages=[indexer,encoder,assembler,scaler,lr])
train,test=hist.randomSplit([0.8,0.2],seed=42)
model=pipe.fit(train)
pred=model.transform(test)
auc=BinaryClassificationEvaluator(labelCol=label_col, metricName='areaUnderROC').evaluate(pred)
acc=MulticlassClassificationEvaluator(labelCol=label_col, metricName='accuracy').evaluate(pred)
print('AUC:', round(auc,3),'| Accuracy:', round(acc,3))

AUC: 1.0 | Accuracy: 0.5


## 5) Guardar artefactos y modelo

In [8]:
import json, os
os.makedirs('/content/artifacts', exist_ok=True)
os.makedirs('/content/models', exist_ok=True)
with open('/content/artifacts/stream_cls_metrics.json','w') as f:
    json.dump({'auc': float(auc), 'accuracy': float(acc)}, f, indent=2)
model.write().overwrite().save('/content/models/stream_lr_model')
print('✅ Guardados artefactos y modelo')

✅ Guardados artefactos y modelo
