# Streaming: C√°lculo de Estad√≠sticas en Tiempo Real

**Objetivo:** Procesar el stream de eventos de Kafka y calcular estad√≠sticas en tiempo real.

**Estad√≠sticas a calcular:**
- M√≠nimos, M√°ximos, Promedios y Varianza de:
  - Pases por equipo
  - Distancia recorrida
  - Posiciones promedio
  - Posesi√≥n

**Arquitectura:** Spark Structured Streaming + RAPIDS GPU

## 1. Setup y Configuraci√≥n

In [None]:
import os
import sys
import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime

print(f"Setup complete - {datetime.now()}")

## 2. Inicializar Spark Session con GPU y Kafka

In [None]:
# Initialize Spark with RAPIDS GPU acceleration and Kafka support
spark = SparkSession.builder \
    .appName("StatsBomb-Streaming-Statistics-GPU") \
    .master("spark://spark-master:7077") \
    .config("spark.rapids.sql.enabled", "true") \
    .config("spark.plugins", "com.nvidia.spark.SQLPlugin") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint-stats") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print(f"‚úì Spark Version: {spark.version}")
print(f"‚úì Spark Master: {spark.sparkContext.master}")
print(f"‚úì Spark UI: http://localhost:4040")
print("\nüìä Monitor Spark UI para capturar m√©tricas de rendimiento:")
print("   - Job Time")
print("   - Shuffle Read/Write")
print("   - I/O")
print("   - Scheduler Delay")
print("   - Spill (Memory/Disk)")

## 3. Definir Schema de Eventos de StatsBomb

In [None]:
# Define schema for StatsBomb events
event_schema = StructType([
    StructField("event", StructType([
        StructField("id", StringType(), True),
        StructField("index", IntegerType(), True),
        StructField("period", IntegerType(), True),
        StructField("timestamp", StringType(), True),
        StructField("minute", IntegerType(), True),
        StructField("second", IntegerType(), True),
        StructField("type", StructType([
            StructField("id", IntegerType(), True),
            StructField("name", StringType(), True)
        ]), True),
        StructField("team", StructType([
            StructField("id", IntegerType(), True),
            StructField("name", StringType(), True)
        ]), True),
        StructField("player", StructType([
            StructField("id", IntegerType(), True),
            StructField("name", StringType(), True)
        ]), True),
        StructField("location", ArrayType(DoubleType()), True),
        StructField("pass_end_location", ArrayType(DoubleType()), True),
        StructField("under_pressure", BooleanType(), True),
    ]), True),
    StructField("metadata", StructType([
        StructField("producer_timestamp", StringType(), True),
        StructField("producer_id", StringType(), True)
    ]), True)
])

print("‚úì Schema definido")
print("\nEstructura del schema:")
event_schema.printTreeString()

## 4. Conectar a Kafka Stream

In [None]:
# Kafka configuration
KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
KAFKA_TOPIC = "statsbomb-360-events"

# Read from Kafka
kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 10000) \
    .load()

print(f"‚úì Conectado a Kafka: {KAFKA_BOOTSTRAP_SERVERS}")
print(f"‚úì Topic: {KAFKA_TOPIC}")
print(f"‚úì Stream creado")

# Parse JSON from Kafka value
parsed_stream = kafka_stream.select(
    col("timestamp").alias("kafka_timestamp"),
    from_json(col("value").cast("string"), event_schema).alias("data")
).select(
    "kafka_timestamp",
    "data.event.*",
    "data.metadata.*"
)

print("‚úì Stream parseado")
print("\nColumnas disponibles:")
print(parsed_stream.columns)

## 5. Procesamiento de Eventos - Feature Extraction

In [None]:
# Extract relevant features from events
events_with_features = parsed_stream \
    .withColumn("event_time", col("kafka_timestamp")) \
    .withColumn("event_type", col("type.name")) \
    .withColumn("team_name", col("team.name")) \
    .withColumn("player_name", col("player.name")) \
    .withColumn("location_x", col("location").getItem(0)) \
    .withColumn("location_y", col("location").getItem(1)) \
    .withColumn("is_pass", when(col("event_type") == "Pass", 1).otherwise(0)) \
    .withColumn("is_shot", when(col("event_type") == "Shot", 1).otherwise(0)) \
    .withColumn("is_dribble", when(col("event_type") == "Dribble", 1).otherwise(0)) \
    .withColumn(
        "pass_distance",
        when(
            col("pass_end_location").isNotNull(),
            sqrt(
                pow(col("pass_end_location").getItem(0) - col("location_x"), 2) +
                pow(col("pass_end_location").getItem(1) - col("location_y"), 2)
            )
        ).otherwise(0)
    )

print("‚úì Features extra√≠dos de los eventos")
print("\nFeatures calculados:")
print("  - event_type: Tipo de evento")
print("  - team_name: Equipo")
print("  - location_x, location_y: Posici√≥n en el campo")
print("  - is_pass, is_shot, is_dribble: Flags de tipo de evento")
print("  - pass_distance: Distancia del pase")

## 6. Agregaciones por Ventanas de Tiempo (1 minuto)

In [None]:
# Define time windows (1 minute tumbling window)
windowed_stats = events_with_features \
    .withWatermark("event_time", "30 seconds") \
    .groupBy(
        window(col("event_time"), "1 minute"),
        col("team_name")
    ) \
    .agg(
        # Pases
        sum("is_pass").alias("total_passes"),
        min("is_pass").alias("min_passes"),
        max("is_pass").alias("max_passes"),
        avg("is_pass").alias("avg_passes"),
        stddev("is_pass").alias("stddev_passes"),
        
        # Distancia de pases
        sum("pass_distance").alias("total_pass_distance"),
        min("pass_distance").alias("min_pass_distance"),
        max("pass_distance").alias("max_pass_distance"),
        avg("pass_distance").alias("avg_pass_distance"),
        stddev("pass_distance").alias("stddev_pass_distance"),
        
        # Posiciones promedio
        avg("location_x").alias("avg_position_x"),
        avg("location_y").alias("avg_position_y"),
        stddev("location_x").alias("stddev_position_x"),
        stddev("location_y").alias("stddev_position_y"),
        
        # Tiros y dribles
        sum("is_shot").alias("total_shots"),
        sum("is_dribble").alias("total_dribbles"),
        
        # Total de eventos (proxy para posesi√≥n)
        count("*").alias("total_events")
    ) \
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        "team_name",
        "total_passes",
        "min_passes",
        "max_passes",
        "avg_passes",
        "stddev_passes",
        "total_pass_distance",
        "min_pass_distance",
        "max_pass_distance",
        "avg_pass_distance",
        "stddev_pass_distance",
        "avg_position_x",
        "avg_position_y",
        "stddev_position_x",
        "stddev_position_y",
        "total_shots",
        "total_dribbles",
        "total_events"
    ) \
    .orderBy("window_start", "team_name")

print("‚úì Agregaciones configuradas")
print("‚úì Ventanas: 1 minuto (tumbling window)")
print("‚úì Watermark: 30 segundos")

## 7. Iniciar Stream - Output a Consola

In [None]:
# Start streaming query with console output
query = windowed_stats.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .option("numRows", 50) \
    .trigger(processingTime="10 seconds") \
    .start()

print("="*80)
print("STREAMING ACTIVO - ESTAD√çSTICAS EN TIEMPO REAL")
print("="*80)
print(f"Query ID: {query.id}")
print(f"Query Name: {query.name}")
print(f"Status: {query.status}")
print("\nüìä CAPTURA DE M√âTRICAS:")
print("   1. Abre Spark UI en http://localhost:4040")
print("   2. Ve a la pesta√±a 'Streaming'")
print("   3. Captura las siguientes m√©tricas:")
print("      - Input Rate (eventos/segundo)")
print("      - Process Rate (eventos/segundo)")
print("      - Batch Duration")
print("      - Scheduling Delay")
print("   4. Ve a la pesta√±a 'Jobs' para m√©tricas detalladas:")
print("      - Job Time")
print("      - Shuffle Read/Write")
print("      - Spill (Memory/Disk)")
print("="*80)
print("\n‚ö†Ô∏è  Presiona el bot√≥n STOP en la celda para detener el streaming")
print("="*80)

In [None]:
# Monitor query status
import time

try:
    while query.isActive:
        print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Query Status: {query.status}")
        print(f"Recent Progress:")
        
        if query.recentProgress:
            latest = query.recentProgress[-1]
            print(f"  - Batch ID: {latest.get('batchId', 'N/A')}")
            print(f"  - Input Rows: {latest.get('numInputRows', 'N/A')}")
            print(f"  - Process Rate: {latest.get('processedRowsPerSecond', 'N/A'):.2f} rows/sec" if latest.get('processedRowsPerSecond') else "  - Process Rate: N/A")
            print(f"  - Duration: {latest.get('durationMs', {}).get('triggerExecution', 'N/A')} ms")
        
        time.sleep(10)
        
except KeyboardInterrupt:
    print("\n‚ö†Ô∏è  Deteniendo streaming...")
    query.stop()
    print("‚úì Streaming detenido")

## 8. Detener Stream (ejecutar cuando sea necesario)

In [None]:
# Stop the streaming query
if query.isActive:
    print("Deteniendo streaming query...")
    query.stop()
    query.awaitTermination(timeout=30)
    print("‚úì Query detenido")
else:
    print("Query no est√° activo")

## 9. Resumen de M√©tricas a Capturar

In [None]:
print("="*80)
print("M√âTRICAS A CAPTURAR DESDE SPARK UI (localhost:4040)")
print("="*80)
print("\n1. STREAMING TAB:")
print("   ‚úì Input Rate (eventos/segundo)")
print("   ‚úì Process Rate (eventos/segundo)")
print("   ‚úì Batch Duration (ms)")
print("   ‚úì Operation Duration (ms)")
print("   ‚úì Scheduling Delay (ms)")
print("\n2. JOBS TAB:")
print("   ‚úì Job Duration")
print("   ‚úì Shuffle Read Size")
print("   ‚úì Shuffle Write Size")
print("   ‚úì Spill (Memory)")
print("   ‚úì Spill (Disk)")
print("\n3. STAGES TAB:")
print("   ‚úì Stage Duration")
print("   ‚úì Task Deserialization Time")
print("   ‚úì GC Time")
print("   ‚úì Input Size / Records")
print("   ‚úì Output Size / Records")
print("\n4. EXECUTORS TAB:")
print("   ‚úì Storage Memory Used")
print("   ‚úì Disk Used")
print("   ‚úì Total Tasks")
print("   ‚úì Task Time (GC Time)")
print("="*80)
print("\nüí° COMPARACI√ìN ARQUITECTURAS:")
print("   Para arquitectura 2 (CPU), tu compa√±ero debe:")
print("   1. Comentar l√≠neas de RAPIDS en spark-defaults.conf")
print("   2. Remover asignaci√≥n de GPU en docker-compose.yml")
print("   3. Re-ejecutar este mismo notebook")
print("   4. Comparar m√©tricas GPU vs CPU")
print("="*80)

In [None]:
# Clean up
# spark.stop()
print("\nNota: Spark session sigue activa.")
print("Ejecuta 'spark.stop()' cuando termines todas las pruebas.")