# **ABP Modulo 7**



Lección 1 — Big Data
1) 5V’s aplicadas al proyecto

Volumen: múltiples miles de registros (y potencialmente millones si crece) provenientes de ventas, navegación, sensores/logs o eventos—requiere almacenamiento y cómputo escalables.

Velocidad: necesidad de procesar y responder rápido (batch + streaming) para reportes o detección de eventos casi en tiempo real.

Variedad: formatos heterogéneos (CSV/JSON/Parquet) y datos estructurados/no estructurados.

Veracidad: control de calidad (validez, duplicados, valores faltantes) para decisiones confiables. (Está implícito en el objetivo de integrar y documentar bien los datos.)

Valor: transformar los datos en insights y en un pipeline reutilizable (consulta, ML y streaming).

2) Beneficios del enfoque distribuido frente al local

Escalabilidad: divide y conquista; procesa datos masivos en varios nodos en lugar de un solo equipo.

Rendimiento: trabajo en memoria y ejecución paralela reducen tiempos en cargas iterativas y ML.

Versatilidad en un mismo ecosistema: batch, SQL, ML y streaming bajo una misma plataforma (Spark).

Tiempo real/cuasi tiempo real: capacidad de streaming para reaccionar a eventos.

3) Mapa de tecnologías clave

Ingesta / Orquestación: archivos (CSV/JSON) y/o cola de mensajes (Kafka) si simulas streaming.

Almacenamiento:

Data Lake en Parquet (carpetas por fecha/partición) para costo/velocidad.

Catálogo (metadatos) y scripts de carga/limpieza. (La guía exige integrar CSV/JSON/Parquet).

Procesamiento distribuido:

Spark SQL/DataFrames para consultas optimizadas.

RDDs para transformaciones de bajo nivel cuando aplique.

Streaming (Structured Streaming) para flujos en tiempo real/simulados.

ML escalable: Spark MLlib (VectorAssembler + modelo supervisado) para entrenamiento y evaluación.

Entregables/documentación: scripts, modelo guardado, capturas de streaming y README.


Lección 2: Apache Spark

**Cuándo y por qué usar Spark en un entorno Big Data**

Spark se usa cuando se necesita procesar grandes volúmenes de datos distribuidos que no caben en un solo computador.

Es ideal cuando se requiere alta velocidad, ya que usa procesamiento en memoria mucho más rápido que MapReduce tradicional.

Permite trabajar con distintos tipos de datos y fuentes (archivos, bases de datos, streams) dentro de un mismo ecosistema.

Se integra con múltiples herramientas del entorno Big Data (Hadoop, Kafka, Cassandra, etc.).

Se usa tanto para procesamiento batch como para streaming en tiempo real, además de consultas SQL y machine learning escalable.

**Arquitectura general de Spark**

Spark funciona bajo un modelo maestro–trabajadores distribuido, compuesto por:

Driver:
Es el proceso principal. Envía el código de la aplicación, crea el plan de ejecución (DAG) y coordina las tareas entre los ejecutores. Contiene el SparkContext.
Se ejecuta una sola vez por aplicación.

Executors:
Son los procesos que se ejecutan en los nodos del clúster.
Reciben tareas del Driver, procesan los datos en paralelo y devuelven los resultados parciales.
Cada Executor maneja múltiples tareas concurrentes y usa memoria propia.

Cluster Manager:
Asigna los recursos del clúster (CPU, RAM) a las aplicaciones de Spark.
Puede ser YARN, Mesos, Kubernetes o el Standalone manager de Spark.
Permite escalar horizontalmente agregando nodos.


Módulos de Spark necesarios para el proyecto **texto en negrita**

Para este proyecto usaremos principalmente:

Spark SQL / DataFrames → para procesar datos tabulares y ejecutar consultas optimizadas.

Spark Structured Streaming → para procesar flujos de datos en tiempo real.

Spark MLlib → para construir, entrenar y evaluar modelos de machine learning de forma distribuida.

In [6]:
#Lección 3 — RDDs en Spark
#1) Crear entorno local de Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Leccion3_RDDs").getOrCreate()
sc = spark.sparkContext


In [8]:
#2) Crear un RDD con datos simulados
# Datos simulados (nombre, edad, nota)
datos = [
    ("Ana", 23, 5.8),
    ("Luis", 30, 6.1),
    ("Carla", 21, 4.9),
    ("Pedro", 28, 5.5),
    ("Sofía", 26, 6.8)
]

# Crear RDD
rdd = sc.parallelize(datos)
rdd.collect()

[('Ana', 23, 5.8),
 ('Luis', 30, 6.1),
 ('Carla', 21, 4.9),
 ('Pedro', 28, 5.5),
 ('Sofía', 26, 6.8)]

In [9]:
#aplicar transformaciones
# Filtrar personas con nota >= 5.5
rdd_filtrado = rdd.filter(lambda x: x[2] >= 5.5)

# Map: convertir a (nombre, nota*10)
rdd_mapeado = rdd_filtrado.map(lambda x: (x[0], x[2]*10))

# Crear otro RDD pequeño para usar union
extra = sc.parallelize([("Nuevo", 25, 6.0)])
rdd_union = rdd.union(extra)

# Ordenar por edad
rdd_ordenado = rdd_union.sortBy(lambda x: x[1])

In [10]:
#Ejecutar acciones
# Contar registros
total = rdd_ordenado.count()

# Ver todos
todos = rdd_ordenado.collect()

# Calcular media de notas
mean_nota = rdd.map(lambda x: x[2]).mean()

print("Total personas:", total)
print("Media de notas:", mean_nota)
print("Datos ordenados:", todos)

Total personas: 6
Media de notas: 5.819999999999999
Datos ordenados: [('Carla', 21, 4.9), ('Ana', 23, 5.8), ('Nuevo', 25, 6.0), ('Sofía', 26, 6.8), ('Pedro', 28, 5.5), ('Luis', 30, 6.1)]


In [11]:
#Leccion 4
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Leccion4_SQL_DF").getOrCreate()

In [12]:
#Crear datos
from pyspark.sql import functions as F

# Datos de ejemplo (id, nombre, edad, nota, país)
data = [
    (1, "Ana", 23, 5.8, "CL"),
    (2, "Luis", 30, 6.1, "AR"),
    (3, "Carla", 21, 4.9, "CL"),
    (4, "Pedro", 28, 5.5, "PE"),
    (5, "Sofia", 26, 6.8, "CL"),
]
cols = ["id","nombre","edad","nota","pais"]
df = spark.createDataFrame(data, cols)

In [13]:
# Guardar en CSV, JSON, Parquet
df.write.mode("overwrite").option("header","true").csv("/content/df_csv")
df.write.mode("overwrite").json("/content/df_json")
df.write.mode("overwrite").parquet("/content/df_parquet")

# Cargar desde CSV / JSON / Parquet
df_csv = spark.read.option("header","true").option("inferSchema","true").csv("/content/df_csv")
df_json = spark.read.json("/content/df_json")
df_parq = spark.read.parquet("/content/df_parquet")

In [14]:
#Consultas
df_parq.createOrReplaceTempView("personas")

# 2.1 Selección y filtro
res1 = spark.sql("""
  SELECT id, nombre, edad, nota, pais
  FROM personas
  WHERE pais = 'CL' AND nota >= 5.5
  ORDER BY nota DESC
""")
res1.show()

# 2.2 Agregación por país
res2 = spark.sql("""
  SELECT pais, COUNT(*) AS n, ROUND(AVG(nota),2) AS nota_prom
  FROM personas
  GROUP BY pais
  ORDER BY n DESC
""")
res2.show()

+---+------+----+----+----+
| id|nombre|edad|nota|pais|
+---+------+----+----+----+
|  5| Sofia|  26| 6.8|  CL|
|  1|   Ana|  23| 5.8|  CL|
+---+------+----+----+----+

+----+---+---------+
|pais|  n|nota_prom|
+----+---+---------+
|  CL|  3|     5.83|
|  PE|  1|      5.5|
|  AR|  1|      6.1|
+----+---+---------+



In [15]:
#Funciones
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def categoria_nota(x):
    if x is None: return "desconocido"
    return "alto" if x >= 6.0 else ("medio" if x >= 5.5 else "bajo")

udf_cat = udf(categoria_nota, StringType())

df_cat = df_parq.withColumn("categoria", udf_cat(F.col("nota")))
df_cat.show()

df_cat.createOrReplaceTempView("personas_cat")
spark.sql("""
  SELECT nombre, nota, categoria
  FROM personas_cat
  ORDER BY nota DESC
""").show()

+---+------+----+----+----+---------+
| id|nombre|edad|nota|pais|categoria|
+---+------+----+----+----+---------+
|  3| Carla|  21| 4.9|  CL|     bajo|
|  4| Pedro|  28| 5.5|  PE|    medio|
|  5| Sofia|  26| 6.8|  CL|     alto|
|  1|   Ana|  23| 5.8|  CL|    medio|
|  2|  Luis|  30| 6.1|  AR|     alto|
+---+------+----+----+----+---------+

+------+----+---------+
|nombre|nota|categoria|
+------+----+---------+
| Sofia| 6.8|     alto|
|  Luis| 6.1|     alto|
|   Ana| 5.8|    medio|
| Pedro| 5.5|    medio|
| Carla| 4.9|     bajo|
+------+----+---------+



In [16]:
#Comparar rendimiento
import time

# DataFrames (usa Catalyst + Tungsten, más optimizado)
t0 = time.time()
avg_df = df_parq.select(F.avg("nota")).first()[0]
t1 = time.time()

# RDD (más manual)
rdd = df_parq.rdd
t2 = time.time()
vals = rdd.map(lambda row: row["nota"]).filter(lambda x: x is not None).collect()
avg_rdd = sum(vals)/len(vals)
t3 = time.time()

print(f"AVG DataFrame: {avg_df:.3f} | tiempo: {(t1-t0)*1000:.2f} ms")
print(f"AVG RDD:       {avg_rdd:.3f} | tiempo: {(t3-t2)*1000:.2f} ms")

# (Opcional) ver plan físico optimizado de DataFrames
df_parq.select(F.avg("nota")).explain()

AVG DataFrame: 5.820 | tiempo: 421.72 ms
AVG RDD:       5.820 | tiempo: 590.46 ms
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[avg(nota#73)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=262]
      +- HashAggregate(keys=[], functions=[partial_avg(nota#73)])
         +- FileScan parquet [nota#73] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/df_parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<nota:double>




In [17]:
#Leccion 5
#Creacion directorio y session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("L5_Streaming").getOrCreate()

import shutil, os, time
base_dir = "/content/stream_in"
shutil.rmtree(base_dir, ignore_errors=True)
os.makedirs(base_dir, exist_ok=True)

In [18]:
#Esquema de eventos
from pyspark.sql import functions as F, types as T

schema = T.StructType([
    T.StructField("event_time", T.TimestampType(), True),
    T.StructField("source", T.StringType(), True),
    T.StructField("value", T.DoubleType(), True),
])

# Lee archivos JSON que vayan apareciendo en la carpeta
events = (spark.readStream
          .schema(schema)
          .option("maxFilesPerTrigger", 1)   # 1 archivo por microbatch
          .json(base_dir))

In [20]:
#Transformaciones
agg = (events
       .withWatermark("event_time", "10 minutes")
       .groupBy(F.window("event_time", "5 minutes", "2 minutes"), "source")
       .agg(F.count("*").alias("n"), F.avg("value").alias("avg_value"))
       .orderBy(F.col("window").asc(), F.col("source").asc()))  # OK si usamos complete

query = (agg.writeStream
         .outputMode("complete")      # << antes estaba "update"
         .format("console")
         .option("truncate", "false")
         .start())

In [21]:
#Generar datos
# Generamos tres micro-lotes con datos sintéticos
import pandas as pd
from datetime import datetime, timedelta

t0 = datetime.now().replace(second=0, microsecond=0)

def write_batch(idx, n=200):
    rows = []
    for i in range(n):
        ts = t0 + timedelta(seconds=idx*30 + i)  # tiempos crecientes
        rows.append({"event_time": ts.isoformat(sep=" "),
                     "source": "sensor_A" if i%2==0 else "sensor_B",
                     "value": 50 + i%7})
    pd.DataFrame(rows).to_json(f"{base_dir}/batch_{idx:03d}.json",
                               orient="records", lines=True)

for i in range(3):      # tres microbatches
    write_batch(i, n=300)
    time.sleep(2)       # simula llegada gradual de archivos


In [22]:
# Último progreso del stream (muestra micro-lotes, filas procesadas, etc.)
print(query.lastProgress)        # dict o None si aún no hay lotes
print(query.status)              # estado general

{'id': '8babd850-e891-4a6f-b021-f5af9a18936f', 'runId': '6d943d1c-f61c-43ad-9b52-e6d42a96b6e5', 'name': None, 'timestamp': '2025-09-13T20:32:48.176Z', 'batchId': 3, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 2, 'triggerExecution': 2}, 'eventTime': {'watermark': '2025-09-13T20:26:59.000Z'}, 'stateOperators': [{'operatorName': 'stateStoreSave', 'numRowsTotal': 20, 'numRowsUpdated': 0, 'allUpdatesTimeMs': 218, 'numRowsRemoved': 0, 'allRemovalsTimeMs': 0, 'commitTimeMs': 628, 'memoryUsedBytes': 7664, 'numRowsDroppedByWatermark': 0, 'numShufflePartitions': 4, 'numStateStoreInstances': 4, 'customMetrics': {'loadedMapCacheHitCount': 28, 'loadedMapCacheMissCount': 0, 'stateOnCurrentVersionSizeBytes': 5408}}], 'sources': [{'description': 'FileStreamSource[file:/content/stream_in]', 'startOffset': {'logOffset': 2}, 'endOffset': {'logOffset': 2}, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSe

In [23]:
#parar datos

query.stop()

In [24]:
# Comparación simple con batch (mismo esquema y datos combinados)
df_batch = spark.read.schema(schema).json(base_dir)   # lee todo junto (batch)

import time
t0 = time.time()
res = (df_batch
       .groupBy(F.window("event_time","5 minutes","2 minutes"), "source")
       .agg(F.count("*").alias("n"), F.avg("value").alias("avg_value"))
       .orderBy("window","source")
       ).collect()
t1 = time.time()
print(f"Batch: {len(res)} filas agregadas en {1000*(t1-t0):.2f} ms")

Batch: 10 filas agregadas en 1167.65 ms


In [25]:
#Leccion 6
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Leccion6_MLlib").getOrCreate()


In [26]:
from google.colab import files
uploaded = files.upload()  # selecciona wdbc.data
#link https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/wdbc.data

Saving wdbc.data to wdbc.data
Saving wdbc.names to wdbc.names


In [29]:
#Cargar WDBC (desde archivo ya subido o vía wget)
df_wdbc = spark.read.csv("wdbc.data", header=False, inferSchema=True)

# 3) Verifica cuántas columnas tiene (debe ser 32)
print("n_cols:", len(df_wdbc.columns))
df_wdbc.show(3, truncate=False)

n_cols: 32
+--------+---+-----+-----+-----+------+-------+-------+------+-------+------+-------+------+------+-----+-----+--------+-------+-------+-------+-------+--------+-----+-----+-----+------+------+------+------+------+------+-------+
|_c0     |_c1|_c2  |_c3  |_c4  |_c5   |_c6    |_c7    |_c8   |_c9    |_c10  |_c11   |_c12  |_c13  |_c14 |_c15 |_c16    |_c17   |_c18   |_c19   |_c20   |_c21    |_c22 |_c23 |_c24 |_c25  |_c26  |_c27  |_c28  |_c29  |_c30  |_c31   |
+--------+---+-----+-----+-----+------+-------+-------+------+-------+------+-------+------+------+-----+-----+--------+-------+-------+-------+-------+--------+-----+-----+-----+------+------+------+------+------+------+-------+
|842302  |M  |17.99|10.38|122.8|1001.0|0.1184 |0.2776 |0.3001|0.1471 |0.2419|0.07871|1.095 |0.9053|8.589|153.4|0.006399|0.04904|0.05373|0.01587|0.03003|0.006193|25.38|17.33|184.6|2019.0|0.1622|0.6656|0.7119|0.2654|0.4601|0.1189 |
|842517  |M  |20.57|17.77|132.9|1326.0|0.08474|0.07864|0.0869|0.07017

In [30]:
#generar target
from pyspark.sql import functions as F

# Renombrar columnas (ID, Diagnosis, 30 features)
cols = ["ID","Diagnosis"] + [f"feat_{i}" for i in range(30)]
assert len(cols) == len(df_wdbc.columns), "El CSV no tiene 32 columnas"
df_wdbc = df_wdbc.toDF(*cols)

# label binaria: M=1, B=0
df_wdbc = df_wdbc.withColumn("label", F.when(F.col("Diagnosis")=="M", 1).otherwise(0))
df_wdbc.select("ID","Diagnosis","label").show(5, truncate=False)

+--------+---------+-----+
|ID      |Diagnosis|label|
+--------+---------+-----+
|842302  |M        |1    |
|842517  |M        |1    |
|84300903|M        |1    |
|84348301|M        |1    |
|84358402|M        |1    |
+--------+---------+-----+
only showing top 5 rows



In [31]:
#ensamblar
from pyspark.ml.feature import VectorAssembler, StandardScaler

feature_cols = [f"feat_{i}" for i in range(30)]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vec")
scaler = StandardScaler(inputCol="features_vec", outputCol="features", withMean=True, withStd=True)

df_assembled = assembler.transform(df_wdbc)
df_scaled = scaler.fit(df_assembled).transform(df_assembled).select("label","features")
df_scaled.show(5, truncate=False)

+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                                                                                                                                                                                                                                                                                   

In [32]:
#entrenar modelo
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

# Split
train, test = df_scaled.randomSplit([0.8, 0.2], seed=42)

# Modelo
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Grid de hiperparámetros
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.0, 0.01, 0.1])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

# Evaluador
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol="label")

# Pipeline (solo el modelo porque ya escalamos antes)
pipeline = Pipeline(stages=[lr])

# Cross-Validation
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=3,
                    parallelism=2)

cv_model = cv.fit(train)
pred = cv_model.transform(test)

In [33]:
#Metricas del modelo
# AUC-ROC
auc = evaluator.evaluate(pred)
print(f"AUC-ROC: {auc:.4f}")

# Matriz de confusión y métricas
tp = pred.filter("label = 1 AND prediction = 1").count()
fp = pred.filter("label = 0 AND prediction = 1").count()
tn = pred.filter("label = 0 AND prediction = 0").count()
fn = pred.filter("label = 1 AND prediction = 0").count()

precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
recall    = tp / (tp + fn) if (tp + fn) > 0 else 0.0
accuracy  = (tp + tn) / (tp + tn + fp + fn)

print(f"Confusion Matrix -> TN:{tn} FP:{fp} FN:{fn} TP:{tp}")
print(f"Precisión: {precision:.4f} | Recall: {recall:.4f} | Exactitud: {accuracy:.4f}")

AUC-ROC: 0.9936
Confusion Matrix -> TN:54 FP:0 FN:2 TP:30
Precisión: 1.0000 | Recall: 0.9375 | Exactitud: 0.9767


In [34]:
#Guardar modelo
save_path = "/content/modelo_wdbc_lr_cv"
cv_model.bestModel.write().overwrite().save(save_path)
print("Modelo guardado en:", save_path)

Modelo guardado en: /content/modelo_wdbc_lr_cv


In [35]:
#cargar y aplicar modelo
from pyspark.ml import PipelineModel

modelo = PipelineModel.load(save_path)
pred2 = modelo.transform(test)
pred2.select("features","label","probability","prediction").show(5, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------------------------------------------+----------+
|features                                                                                                                                                                                                                                                                                                                                            