# Caso 2 · Metro de Santiago con PySpark
**Muestra reducida + Streaming simulado + KMeans (MLlib)**

In [1]:
# Instalar PySpark en el Entorno de Colab
!pip install -q pyspark

### 1) Inicialización de Spark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Caso2-Metro-PySpark").getOrCreate()
print("Spark version:", spark.version)

Spark version: 3.5.1


### 2) Carga del CSV

In [4]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

PATH_DATA = "metro_santiago_200.csv"
df = spark.read.csv(PATH_DATA, header=True, inferSchema=True)
print("Filas totales:", df.count())
df.printSchema()
df.show(10, truncate=False)

Filas totales: 200
root
 |-- id_evento: integer (nullable = true)
 |-- linea: string (nullable = true)
 |-- estacion: string (nullable = true)
 |-- tipo_incidente: string (nullable = true)
 |-- duracion_minutos: integer (nullable = true)
 |-- hora: timestamp (nullable = true)

+---------+-----+--------------+------------------------+----------------+-------------------+
|id_evento|linea|estacion      |tipo_incidente          |duracion_minutos|hora               |
+---------+-----+--------------+------------------------+----------------+-------------------+
|1        |L1   |Baquedano     |Falla eléctrica         |20              |2025-09-15 07:30:00|
|2        |L2   |Cal y Canto   |Mantenimiento           |15              |2025-09-15 22:15:00|
|3        |L3   |Plaza de Armas|Evacuación              |10              |2025-09-15 14:45:00|
|4        |L4   |Las Torres    |Interrupción de servicio|25              |2025-09-15 09:15:00|
|5        |L5   |Santa Ana     |Falla eléctrica         |

In [5]:
# 2.1 Tomar muestra 20%
df_sample = df.sample(False, 0.2, seed=42)
print("Filas muestra:", df_sample.count())

Filas muestra: 39


### 3) RDDs + DataFrames + SQL

In [6]:
rdd = df_sample.rdd
cols = df_sample.columns
print("Columns:", cols)
if "estacion" in cols:
    print(rdd.map(lambda r: (r['estacion'],1)).reduceByKey(lambda a,b:a+b).take(10))

df_sample.createOrReplaceTempView("metro")
if "linea" in cols and "estacion" in cols:
    spark.sql("SELECT linea, COUNT(*) AS total FROM metro GROUP BY linea ORDER BY total DESC").show()

Columns: ['id_evento', 'linea', 'estacion', 'tipo_incidente', 'duracion_minutos', 'hora']
[('Los Héroes', 5), ('Tobalaba', 2), ('Pajaritos', 3), ('Ñuñoa', 3), ('Franklin', 4), ('Lo Prado', 3), ('Vicente Valdés', 4), ('Plaza de Armas', 3), ('Cerrillos', 3), ('Plaza Egaña', 2)]
+-----+-----+
|linea|total|
+-----+-----+
|   L6|    7|
|   L1|    7|
|   L2|    7|
|   L3|    6|
|   L5|    5|
|   L4|    4|
|  L4A|    3|
+-----+-----+



### 4) MLlib – KMeans

In [7]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

num_cols = [c for (c,t) in df_sample.dtypes if t in ["int","bigint","double","float"]]
if len(num_cols) < 2 and "estacion" in df_sample.columns:
    feats = df_sample.groupBy("estacion").agg(count("*").alias("freq"))
else:
    feats = df_sample.select(*num_cols).dropna()

assembler = VectorAssembler(inputCols=[c for c in feats.columns if c != "estacion"], outputCol="features_raw")
feats_vec = assembler.transform(feats)
scaler = StandardScaler(inputCol="features_raw", outputCol="features")
feats_std = scaler.fit(feats_vec).transform(feats_vec)

kmeans = KMeans(featuresCol="features", k=3, seed=42)
model = kmeans.fit(feats_std)
pred = model.transform(feats_std)

evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="prediction", metricName="silhouette", distanceMeasure="squaredEuclidean")
sil = evaluator.evaluate(pred)
print("Silhouette (k=3):", sil)
for i,c in enumerate(model.clusterCenters()): print(i,c)

Silhouette (k=3): 0.5206044606009212
0 [2.74958796 4.01356396]
1 [1.55186585 5.60226636]
2 [0.78588079 4.12505185]


### 5) Streaming simulado

In [23]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("linea", StringType(), True),
    StructField("estacion", StringType(), True),
    StructField("afluencia", IntegerType(), True),
])
stream_path = "stream_inputs"
df_stream = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).csv(stream_path)
agg = (df_stream.withWatermark("timestamp","10 minutes")
       .groupBy(window(col("timestamp"), "5 minutes"), col("estacion"))
       .agg(sum("afluencia").alias("pasajeros"))
       )
query = (agg.writeStream.outputMode("update").format("console").option("truncate", False).start())
# Para detener la query: query.stop()

### 6) Analizar el flujo de datos entrantes

In [25]:
# Contar eventos por estación en ventanas de 5 minutos
from pyspark.sql.functions import window, count

# define una agregación de ventana de tiempo
stream_analysis = (df_stream
                   .withWatermark("timestamp", "10 minutes") # Define marca de agua para gestionar datos tardíos
                   .groupBy(window("timestamp", "5 minutes"), "estacion") # Agrupar por ventana de 5 minutos y estación
                   .agg(count("*").alias("event_count")) # Contar eventos en cada grupo
                  )

# Inicia consulta de streaming para mostrar los resultados en la consola
query_analysis = (stream_analysis
                  .writeStream
                  .outputMode("update") # mostrar los recuentos actualizados
                  .format("console")
                  .option("truncate", False)
                  .start())

# Para detener la query: query_analysis.stop()

### 7) Exportar artefactos

In [26]:
import os, json
os.makedirs("artifacts", exist_ok=True)
with open("artifacts/caso2_metrics.json","w") as f:
    json.dump({"silhouette_k3": float(sil)}, f, indent=2)
model.write().overwrite().save("models/caso2_kmeans")
print("Guardado silhouette y modelo en artifacts/ y models/")

Guardado silhouette y modelo en artifacts/ y models/
