# 1.- Implementaci√≥n T√©cnica: Preparaci√≥n del Sistema

implementaci√≥n t√©cnica para preparar la infraestructura base.
(instala Java y Kafka)

In [2]:
#1: Configuraci√≥n del Sistema, Instalaci√≥n de Java y Descarga de Kafka
import os
import subprocess
import time

# Instalaci√≥n de Java 8 (Versi√≥n Headless para servidores)
# Se utiliza -qq para silenciar la salida y evitar llenar el notebook de logs de apt
print("Iniciando la instalaci√≥n de OpenJDK 8...")
!apt-get update > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# 2. Configuraci√≥n de Variables de Entorno
# Es crucial establecer JAVA_HOME antes de intentar ejecutar cualquier script de Spark o Kafka
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
print(f"JAVA_HOME configurado en: {os.environ['JAVA_HOME']}")

# 3. Descarga y Extracci√≥n de Binarios de Apache Kafka
# Se selecciona la versi√≥n 3.6.1 con Scala 2.12 para garantizar compatibilidad con Spark 3.x
# Referencia cruzada con  sobre instalaci√≥n manual
KAFKA_VERSION = "3.6.1"
SCALA_VERSION = "2.12"
KAFKA_TGZ = f"kafka_{SCALA_VERSION}-{KAFKA_VERSION}.tgz"
KAFKA_URL = f"https://archive.apache.org/dist/kafka/{KAFKA_VERSION}/{KAFKA_TGZ}"

print(f"Descargando Kafka {KAFKA_VERSION}...")
if not os.path.exists(KAFKA_TGZ):
    # Descargar el archivo .tgz de Kafka
    subprocess.run(['wget', '-q', KAFKA_URL], check=True)
    # Extraer el contenido del archivo .tgz
    subprocess.run(['tar', '-xzf', KAFKA_TGZ], check=True)
    print("Kafka descargado y extra√≠do correctamente.")
else:
    print("Archivo binario de Kafka ya existente. Omitiendo descarga.")

# Definir KAFKA_HOME para uso futuro en scripts
KAFKA_DIR = f"/content/kafka_{SCALA_VERSION}-{KAFKA_VERSION}"
os.environ["KAFKA_HOME"] = KAFKA_DIR

Iniciando la instalaci√≥n de OpenJDK 8...
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
JAVA_HOME configurado en: /usr/lib/jvm/java-8-openjdk-amd64
Descargando Kafka 3.6.1...
Kafka descargado y extra√≠do correctamente.


# Fuente de Datos: Open-Meteo API
La API de Open-Meteo proporciona datos meteorol√≥gicos de alta precisi√≥n utilizando modelos globales (como NOAA GFS) y locales. La estructura de respuesta es un objeto JSON (cruda)

In [3]:
{
  "latitude": 52.52,
  "longitude": 13.41,
  "generationtime_ms": 0.05,
  "utc_offset_seconds": 0,
  "timezone": "GMT",
  "timezone_abbreviation": "GMT",
  "elevation": 38.0,
  "current_units": {
    "time": "iso8601",
    "temperature_2m": "¬∞C",
    "relative_humidity_2m": "%",
    "wind_speed_10m": "km/h"
  },
  "current": {
    "time": "2024-01-01T12:00",
    "interval": 900,
    "temperature_2m": 15.4,
    "relative_humidity_2m": 62,
    "wind_speed_10m": 12.5
  }
}

{'latitude': 52.52,
 'longitude': 13.41,
 'generationtime_ms': 0.05,
 'utc_offset_seconds': 0,
 'timezone': 'GMT',
 'timezone_abbreviation': 'GMT',
 'elevation': 38.0,
 'current_units': {'time': 'iso8601',
  'temperature_2m': '¬∞C',
  'relative_humidity_2m': '%',
  'wind_speed_10m': 'km/h'},
 'current': {'time': '2024-01-01T12:00',
  'interval': 900,
  'temperature_2m': 15.4,
  'relative_humidity_2m': 62,
  'wind_speed_10m': 12.5}}

# 2.- Configuraci√≥n del T√≥pico
La unidad fundamental de organizaci√≥n en Kafka es el T√≥pico. Para este proyecto, se crea el t√≥pico weather_events.

In [4]:
# C√©lula 2: Inicio de Servicios (Zookeeper y Kafka) y Creaci√≥n de T√≥picos
import time

# 1. Iniciar Zookeeper en segundo plano
# Se redirige stderr a stdout (2>&1) para capturar todos los logs en un solo archivo
print("Iniciando servicio Zookeeper...")
!nohup $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties > zookeeper.log 2>&1 &

# Espera prudencial para permitir la inicializaci√≥n de Zookeeper
time.sleep(20) # Aumentado de 10 a 20 segundos

# 2. Iniciar el Broker de Kafka
print("Iniciando servicio Kafka Broker...")
!nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > kafka.log 2>&1 &

# Espera para que el broker se registre en Zookeeper
time.sleep(30) # Aumentado de 15 a 30 segundos

# 3. Creaci√≥n del T√≥pico 'weather_events'
print("Creando t√≥pico 'weather_events'...")
!$KAFKA_HOME/bin/kafka-topics.sh --create --topic weather_events --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

# Verificaci√≥n: Listar t√≥picos existentes
print("T√≥picos disponibles:")
!$KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server localhost:9092


Iniciando servicio Zookeeper...
Iniciando servicio Kafka Broker...
Creando t√≥pico 'weather_events'...
Created topic weather_events.
T√≥picos disponibles:
weather_events


# 3.- Ingesta (Productor de Eventos)
Env√≠a los datos a Kafka.

In [5]:
# C√©lula 3: Script del Productor (Ingesta)
!pip install kafka-python -qq
import json
import time
import random
import requests
from kafka import KafkaProducer

# Configuraci√≥n
TOPIC = "weather_events"
SERVER = "localhost:9092"
# Coordenadas para Madrid (o cualquier ubicaci√≥n de inter√©s)
URL = "https://api.open-meteo.com/v1/forecast?latitude=40.41&longitude=-3.70&current=temperature_2m,relative_humidity_2m,wind_speed_10m&timezone=Europe%2FMadrid"

# Inicializaci√≥n del Productor
producer = KafkaProducer(
    bootstrap_servers=SERVER,
    value_serializer=lambda x: json.dumps(x).encode('utf-8') # Serializaci√≥n a bytes
)

print("Iniciando ciclo de producci√≥n de eventos...")

# Simulamos un ciclo de producci√≥n (en un caso real, esto ser√≠a un while True)
# Limitamos a 100 iteraciones para el ejemplo en el reporte, pero en ejecuci√≥n real puede ser infinito.
try:
    for _ in range(60): # Producir datos durante aprox 2-3 minutos
        response = requests.get(URL, timeout=5)
        if response.status_code == 200:
            raw_data = response.json()
            current = raw_data.get("current", {})

            # Construcci√≥n del payload enriquecido
            payload = {
                "sensor_id": "madrid_station_01",
                "timestamp": time.time(), # Unix timestamp actual
                "temperature": current.get("temperature_2m") + random.uniform(-0.2, 0.2), # Jitter simulado
                "humidity": current.get("relative_humidity_2m"),
                "wind_speed": current.get("wind_speed_10m"),
                "status": "active"
            }

            producer.send(TOPIC, value=payload)
            # No imprimimos cada env√≠o para no saturar la salida del notebook, solo un indicador
            if _ % 10 == 0:
                print(f"Enviado lote {_}: {payload}")

        time.sleep(2) # Frecuencia de muestreo: 2 segundos
except Exception as e:
    print(f"Error en el productor: {e}")
finally:
    producer.flush()
    print("Ciclo de producci√≥n finalizado.")

[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/326.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[90m‚ï∫[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m174.1/326.3 kB[0m [31m5.3 MB/s[0m eta [36m0:00:01[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m326.3/326.3 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[?25hIniciando ciclo de producci√≥n de eventos...
Enviado lote 0: {'sensor_id': 'madrid_station_01', 'timestamp': 1764206844.849152, 'temperature': 2.0144369659329495, 'humidity': 73, 'wind_speed': 1.8, 'status': 'active'}
Enviado lote 10: {'sensor_id': 'madrid_station_01', 'timestamp': 1764206873.6211932, 'temperature': 2.028091940557849, 'humidity': 73, 'wind_speed': 1.8, 'status': '

#

# 4.- Inicializaci√≥n de la SparkSession

La configuraci√≥n de la sesi√≥n debe incluir expl√≠citamente la descarga del paquete JAR. Adem√°s, dado que estamos en un entorno de recursos limitados, se ajusta la configuraci√≥n spark.sql.shuffle.partitions.

In [6]:
# C√©lula 4: Inicializaci√≥n de Spark con Soporte Kafka
!pip install pyspark -qq

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, avg, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Definici√≥n de coordenadas Maven exactas
KAFKA_JAR_PACKAGE = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0"

spark = SparkSession.builder \
   .appName("WeatherStreamingPipeline") \
   .master("local[*]") \
   .config("spark.jars.packages", KAFKA_JAR_PACKAGE) \
   .config("spark.sql.shuffle.partitions", "2") \
   .getOrCreate()

print(f"Spark Session iniciada. Versi√≥n: {spark.version}")

Spark Session iniciada. Versi√≥n: 3.5.1


In [7]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

weather_schema = StructType([
    StructField("sensor_id", StringType(), True),
    StructField("timestamp", DoubleType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("wind_speed", DoubleType(), True),
    StructField("status", StringType(), True)
])

# 5: L√≥gica de Streaming y Escritura en Data Lakes

El primer nivel, Lake 1, tiene como objetivo la persistencia fiel de los datos tal como llegan

El segundo nivel, Lake 2, contiene datos refinados. Aqu√≠ se aplican transformaciones anal√≠ticas.

In [8]:
# C√©lula 5: L√≥gica de Streaming y Escritura en Data Lakes

# 1. Lectura del Stream Kafka
raw_stream = spark.readStream \
   .format("kafka") \
   .option("kafka.bootstrap.servers", "localhost:9092") \
   .option("subscribe", "weather_events") \
   .option("startingOffsets", "earliest") \
   .load()

# 2. Parseo y Selecci√≥n (Preparaci√≥n)
parsed_stream = raw_stream.selectExpr("CAST(value AS STRING) as json_str") \
   .select(from_json(col("json_str"), weather_schema).alias("data")) \
   .select(
       col("data.sensor_id"),
       col("data.timestamp"),
       col("data.temperature"),
       col("data.humidity"),
       col("data.wind_speed"),
       col("data.status")
   )

# 3. Escritura Lake 1 (RAW) - Sin agregaciones, escritura directa
query_lake1 = parsed_stream.writeStream \
   .format("parquet") \
   .option("path", "/content/datalake/lake1_raw") \
   .option("checkpointLocation", "/content/datalake/checkpoints/lake1") \
   .outputMode("append") \
   .trigger(processingTime="10 seconds") \
   .start()

# 4. Transformaci√≥n para Lake 2 (Agregados)
# Convertir timestamp unix a TimestampType para usar funciones de ventana
windowed_stream = parsed_stream.withColumn("event_time", col("timestamp").cast(TimestampType()))

# Definir agregaci√≥n con WATERMARK (Cr√≠tico para escribir a Parquet)
# Ventana de 1 minuto, deslizante cada 30 segundos. Watermark de 2 minutos.
agg_stream = windowed_stream \
   .withWatermark("event_time", "2 minutes") \
   .groupBy(
        window(col("event_time"), "1 minute", "30 seconds"),
        col("sensor_id")
    ) \
   .agg(
        avg("temperature").alias("avg_temp"),
        avg("humidity").alias("avg_hum"),
        avg("wind_speed").alias("avg_wind")
    )

# 5. Escritura Lake 2 (TRANSFORMED)
query_lake2 = agg_stream.writeStream \
   .format("parquet") \
   .option("path", "/content/datalake/lake2_transformed") \
   .option("checkpointLocation", "/content/datalake/checkpoints/lake2") \
   .outputMode("append") \
   .trigger(processingTime="30 seconds") \
   .start()

print("Streams iniciados. Recopilando datos...")
# Dejar correr un tiempo para generar archivos
time.sleep(120)
# En un entorno real no se detienen, aqu√≠ lo hacemos para liberar recursos para el siguiente paso
query_lake1.stop()
query_lake2.stop()

Streams iniciados. Recopilando datos...


# 6. An√°lisis Batch con Polars y Pandas

Una vez que los datos aterrizan en el Data Lake (formato Parquet), el pipeline cambia de modalidad streaming a modalidad batch para an√°lisis exploratorio o reportes complejos. Este proyecto eval√∫a dos herramientas para esta tarea: Pandas y Polars.

In [9]:
# C√©lula 6: An√°lisis Comparativo Batch
import polars as pl
import pandas as pd
import glob
import os

LAKE_PATH = "/content/datalake/lake1_raw/*.parquet"

# Verificaci√≥n de existencia de archivos
files = glob.glob(LAKE_PATH)
if not files:
    print("Esperando a que Spark escriba los primeros archivos Parquet...")
    time.sleep(30)

# 1. Implementaci√≥n √ìptima con POLARS
print("--- An√°lisis con POLARS ---")
try:
    # scan_parquet crea un LazyFrame. No lee datos hasta llamar a.collect()
    q = pl.scan_parquet(LAKE_PATH)

    # Aplicamos transformaciones lazy
    analytics_pl = (
        q.filter(pl.col("temperature") > 10)
        .group_by("sensor_id")
        .agg([
             pl.col("temperature").mean().alias("temp_media"),
             pl.col("humidity").max().alias("humedad_max"),
             pl.count().alias("num_registros")
         ])
        .collect() # Materializaci√≥n
    )
    print(analytics_pl)
except Exception as e:
    print(f"Error en Polars: {e}")

# 2. Implementaci√≥n Tradicional con PANDAS
print("\n--- An√°lisis con PANDAS ---")
try:
    # Pandas requiere encontrar los archivos primero
    files = glob.glob(LAKE_PATH)
    if files:
        # Lectura secuencial y concatenaci√≥n (menos eficiente)
        df_pd = pd.concat([pd.read_parquet(f) for f in files])

        # Operaciones equivalentes
        analytics_pd = df_pd[df_pd["temperature"] > 10].groupby("sensor_id").agg({
            "temperature": "mean",
            "humidity": "max",
            "sensor_id": "count"
        }).rename(columns={"sensor_id": "num_registros"})

        print(analytics_pd)
    else:
        print("No se encontraron archivos para Pandas.")
except Exception as e:
    print(f"Error en Pandas: {e}")

--- An√°lisis con POLARS ---



`pl.count()` is deprecated. Please use `pl.len()` instead.
(Deprecated in version 0.20.5)



shape: (0, 4)
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ sensor_id ‚îÜ temp_media ‚îÜ humedad_max ‚îÜ num_registros ‚îÇ
‚îÇ ---       ‚îÜ ---        ‚îÜ ---         ‚îÜ ---           ‚îÇ
‚îÇ str       ‚îÜ f64        ‚îÜ f64         ‚îÜ u32           ‚îÇ
‚ïû‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï™‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï™‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï™‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï°
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò

--- An√°lisis con PANDAS ---
Empty DataFrame
Columns: [temperature, humidity, num_registros]
Index: []


# 7.- Dashboarding: Streamlit con T√∫nel Seguro

La etapa final es la presentaci√≥n de los datos procesados a trav√©s de un Dashboard interactivo. Streamlit se ha consolidado como el est√°ndar para aplicaciones de datos r√°pidas en Python debido a su simplicidad.

In [10]:
# C√©lula 7: Creaci√≥n del archivo app.py
%%writefile app.py
import streamlit as st
import pandas as pd
import plotly.express as px
import glob
import time

st.set_page_config(page_title="Monitor Meteorol√≥gico Real-Time", layout="wide")

st.title("üì° Dashboard de Ingenier√≠a de Datos: Streaming Weather")
st.markdown("Visualizaci√≥n de datos ingestados v√≠a Kafka y procesados con Spark.")

# Configuraci√≥n de rutas
LAKE1_PATH = "/content/datalake/lake1_raw/*.parquet"
LAKE2_PATH = "/content/datalake/lake2_transformed/*.parquet"

def load_data(path, is_aggregated=False):
    files = glob.glob(path)
    if not files:
        return pd.DataFrame()

    # En un entorno real usar√≠amos Polars aqu√≠ tambi√©n para velocidad
    # Usamos Pandas para compatibilidad directa con st.dataframe
    dfs =
    for f in files:
        try:
            dfs.append(pd.read_parquet(f))
        except:
            continue # Ignorar archivos corruptos o en escritura

    if not dfs:
        return pd.DataFrame()

    df = pd.concat(dfs)

    if not is_aggregated and 'timestamp' in df.columns:
        df['datetime'] = pd.to_datetime(df['timestamp'], unit='s')
        df = df.sort_values('datetime')

    return df

# Contenedor principal para autorefresco visual
placeholder = st.empty()

# Bot√≥n de refresco manual (simula el loop de tiempo real bajo demanda)
if st.button('üîÑ Actualizar Datos'):
    st.rerun()

with placeholder.container():
    # Cargar datos RAW
    df_raw = load_data(LAKE1_PATH)

    if not df_raw.empty:
        # M√©tricas KPI (√öltimo valor recibido)
        latest = df_raw.iloc[-1]

        col1, col2, col3, col4 = st.columns(4)
        col1.metric("Temperatura Actual", f"{latest['temperature']:.1f} ¬∞C")
        col2.metric("Humedad", f"{latest['humidity']:.1f} %")
        col3.metric("Viento", f"{latest['wind_speed']:.1f} km/h")
        col4.metric("Registros Totales", len(df_raw))

        # Gr√°ficos
        st.subheader("Tendencias en Tiempo Real (Raw Data)")
        tab1, tab2 = st.tabs()

        with tab1:
            fig_temp = px.line(df_raw, x='datetime', y='temperature', title='Evoluci√≥n T√©rmica', markers=True)
            st.plotly_chart(fig_temp, use_container_width=True)

        with tab2:
            fig_multi = px.line(df_raw, x='datetime', y=['humidity', 'wind_speed'], title='Condiciones Atmosf√©ricas')
            st.plotly_chart(fig_multi, use_container_width=True)
    else:
        st.warning("‚è≥ Esperando datos en el Data Lake... Aseg√∫rate de que Spark est√© ejecut√°ndose.")

    # Secci√≥n de Datos Transformados (Lake 2)
    st.divider()
    st.subheader("üìä Datos Agregados (Ventanas de 1 Minuto - Spark Streaming)")

    df_agg = load_data(LAKE2_PATH, is_aggregated=True)
    if not df_agg.empty:
        # Las columnas de ventana son complejas en Parquet, simplificamos visualizaci√≥n
        st.dataframe(df_agg.tail(10), use_container_width=True)
    else:
        st.info("Las agregaciones requieren que pase el Watermark (2 minutos) para escribirse.")

Writing app.py


# 8.- Aviso

En caso de ejecutarcon "Ejecutar todas" de Colab y se detiene el entorno, solo debe ejecutar la celula 8 nuevamente, asi saldr√° graficando los datos

In [1]:
# C√©lula 8: Dashboard
!pip install dash plotly pandas -q

from dash import Dash, html, dcc, Input, Output
import plotly.express as px
import pandas as pd
import glob
import os
import threading
from google.colab import output
import time
import signal
import psutil

# ===== LIMPIEZA AGRESIVA DE PUERTOS =====
def kill_port(port):
    """Mata todos los procesos usando un puerto espec√≠fico"""
    try:
        for proc in psutil.process_iter(['pid', 'name', 'connections']):
            try:
                for conn in proc.connections():
                    if conn.laddr.port == port:
                        print(f"Matando proceso {proc.pid} en puerto {port}")
                        proc.kill()
            except:
                pass
    except:
        pass

print("=== LIMPIANDO PUERTOS ===")
kill_port(8051)
kill_port(8052)
!fuser -k 8051/tcp 2>/dev/null
!fuser -k 8052/tcp 2>/dev/null
time.sleep(3)
print("‚úì Puertos liberados\n")

# 1. Configuraci√≥n y Lectura de Datos
LAKE_PATH = "/content/datalake/lake2_transformed/*.parquet"

def read_data():
    files = glob.glob(LAKE_PATH)

    # Define default columns including 'event_time' for an empty DataFrame
    default_columns = ['event_time', 'sensor_id', 'avg_temp', 'avg_hum', 'avg_wind']

    if not files:
        return pd.DataFrame(columns=default_columns)

    try:
        latest_file = max(files, key=os.path.getctime)
        df = pd.read_parquet(latest_file)

        # Transform 'window' column into 'event_time'
        if 'window' in df.columns:
            # The 'window' column contains dictionaries like {'start': <timestamp_ns>, 'end': <timestamp_ns>}
            # Extract 'start' and convert to datetime, assuming epoch nanoseconds.
            df['event_time'] = df['window'].apply(lambda x: pd.to_datetime(x['start'], unit='ns'))
            df = df.drop(columns=['window']) # Remove the original 'window' column

        print(f"‚úì Datos cargados: {len(df)} registros")
        print(f"Columnas despu√©s de transformaci√≥n: {df.columns.tolist()}") # Debug print
        return df
    except Exception as e:
        print(f"‚úó Error leyendo datos: {e}")
        return pd.DataFrame(columns=default_columns)

# Verificar datos disponibles
print("=== VERIFICANDO DATOS ===")
test_df = read_data()
if not test_df.empty:
    print(f"Columnas: {test_df.columns.tolist()}")
    print(f"Rango temporal: {test_df['event_time'].min()} a {test_df['event_time'].max()}")
else:
    print("‚ö†Ô∏è No hay datos disponibles a√∫n")
print()

# 2. App Dash
app = Dash(__name__)

app.layout = html.Div([
    html.H1("üå¶Ô∏è Dashboard de Monitoreo Clim√°tico",
            style={'textAlign': 'center', 'color': '#2c3e50', 'marginTop': '20px',
                   'fontFamily': 'Arial, sans-serif'}),

    dcc.Interval(id='interval-component', interval=5000, n_intervals=0),

    html.Div(id='kpi-display', style={
        'display': 'flex',
        'justifyContent': 'space-around',
        'margin': '20px',
        'flexWrap': 'wrap'
    }),

    html.Div([
        dcc.Graph(id='temp-graph'),
        dcc.Graph(id='wind-graph')
    ], style={'padding': '0 20px'})
])

# 3. Callback
@app.callback(
    [Output('kpi-display', 'children'),
     Output('temp-graph', 'figure'),
     Output('wind-graph', 'figure')],
    [Input('interval-component', 'n_intervals')]
)
def update_metrics(n):
    df = read_data()

    if df.empty:
        msg = html.Div([
            html.H3("‚è≥ Esperando datos de Spark...", style={'color': '#ff9800'}),
            html.P("Aseg√∫rate de que el proceso de Spark est√© ejecut√°ndose (C√©lula 5)",
                   style={'color': '#666'})
        ], style={'textAlign': 'center', 'padding': '50px'})

        empty_fig = px.line(title="Sin datos disponibles")
        empty_fig.update_layout(
            annotations=[{
                'text': 'Esperando datos...',
                'xref': 'paper',
                'yref': 'paper',
                'showarrow': False,
                'font': {'size': 20, 'color': '#999'}
            }]
        )
        return [msg], empty_fig, empty_fig

    # The 'event_time' column is now created in read_data. Ensure it's datetime type.
    # This check might be redundant if read_data ensures correct type, but keeps robustness.
    if not pd.api.types.is_datetime64_any_dtype(df['event_time']):
        df['event_time'] = pd.to_datetime(df['event_time'])

    df = df.sort_values('event_time')
    latest = df.iloc[-1]

    # KPIs
    kpis = [
        html.Div([
            html.H4("üå°Ô∏è Temperatura", style={'margin': '0 0 10px 0', 'color': '#666', 'fontSize': '16px'}),
            html.H2(f"{latest['avg_temp']:.1f}", style={'margin': '0', 'color': '#d32f2f', 'fontSize': '36px'}),
            html.P("¬∞C", style={'margin': '5px 0 0 0', 'color': '#999', 'fontSize': '14px'})
        ], style={'textAlign': 'center', 'padding': '25px', 'backgroundColor': '#ffebee',
                  'borderRadius': '12px', 'flex': '1', 'margin': '10px', 'minWidth': '150px',
                  'boxShadow': '0 4px 6px rgba(0,0,0,0.1)', 'transition': 'transform 0.2s'}),

        html.Div([
            html.H4("üíß Humedad", style={'margin': '0 0 10px 0', 'color': '#666', 'fontSize': '16px'}),
            html.H2(f"{latest['avg_hum']:.1f}", style={'margin': '0', 'color': '#1976d2', 'fontSize': '36px'}),
            html.P("%", style={'margin': '5px 0 0 0', 'color': '#999', 'fontSize': '14px'})
        ], style={'textAlign': 'center', 'padding': '25px', 'backgroundColor': '#e3f2fd',
                  'borderRadius': '12px', 'flex': '1', 'margin': '10px', 'minWidth': '150px',
                  'boxShadow': '0 4px 6px rgba(0,0,0,0.1)', 'transition': 'transform 0.2s'}),

        html.Div([
            html.H4("üí® Viento", style={'margin': '0 0 10px 0', 'color': '#666', 'fontSize': '16px'}),
            html.H2(f"{latest['avg_wind']:.1f}", style={'margin': '0', 'color': '#388e3c', 'fontSize': '36px'}),
            html.P("km/h", style={'margin': '5px 0 0 0', 'color': '#999', 'fontSize': '14px'})
        ], style={'textAlign': 'center', 'padding': '25px', 'backgroundColor': '#f1f8e9',
                  'borderRadius': '12px', 'flex': '1', 'margin': '10px', 'minWidth': '150px',
                  'boxShadow': '0 4px 6px rgba(0,0,0,0.1)', 'transition': 'transform 0.2s'})
    ]

    # Gr√°ficas
    fig_temp = px.line(df, x='event_time', y='avg_temp',
                       title='üìà Evoluci√≥n de Temperatura',
                       markers=True,
                       labels={'event_time': 'Hora', 'avg_temp': 'Temperatura (¬∞C)'})
    fig_temp.update_layout(
        hovermode='x unified',
        height=400,
        plot_bgcolor='#fafafa',
        paper_bgcolor='white'
    )
    fig_temp.update_traces(line_color='#f44336', line_width=3, marker_size=8)

    fig_wind = px.bar(df, x='event_time', y='avg_wind',
                      title='üí® Intensidad del Viento',
                      labels={'event_time': 'Hora', 'avg_wind': 'Velocidad (km/h)'})
    fig_wind.update_layout(
        height=400,
        plot_bgcolor='#fafafa',
        paper_bgcolor='white'
    )
    fig_wind.update_traces(marker_color='#4caf50', marker_line_width=0)

    return kpis, fig_temp, fig_wind

# 4. Iniciar servidor en puerto 8051
print("=== INICIANDO DASHBOARD ===")
PORT = 8051

thread = threading.Thread(target=app.run, kwargs={'port': PORT, 'debug': False})
thread.daemon = True
thread.start()

time.sleep(4)  # Esperar m√°s tiempo para asegurar inicio

print(f"‚úì Dashboard activo en puerto {PORT}")
print("Renderizando interfaz...\n")
output.serve_kernel_port_as_iframe(port=PORT, height=650)

=== LIMPIANDO PUERTOS ===
‚úì Puertos liberados

=== VERIFICANDO DATOS ===
‚úì Datos cargados: 2 registros
Columnas despu√©s de transformaci√≥n: ['sensor_id', 'avg_temp', 'avg_hum', 'avg_wind', 'event_time']
Columnas: ['sensor_id', 'avg_temp', 'avg_hum', 'avg_wind', 'event_time']
Rango temporal: 2025-11-27 01:26:30 a 2025-11-27 01:27:00

=== INICIANDO DASHBOARD ===
Dash is running on http://127.0.0.1:8051/



INFO:dash.dash:Dash is running on http://127.0.0.1:8051/



 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:8051
INFO:werkzeug:[33mPress CTRL+C to quit[0m


‚úì Dashboard activo en puerto 8051
Renderizando interfaz...



<IPython.core.display.Javascript object>