In [1]:
import os
import tempfile
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, max as spark_max, min as spark_min, stddev, when, desc, round as spark_round
from pyspark.conf import SparkConf
from pyspark import SparkContext
from minio import Minio
import warnings
warnings.filterwarnings("ignore")

In [2]:
# Inicializar SparkSession
try:
    sc = SparkContext.getOrCreate()
    sc.stop()
except:
    pass
import time
time.sleep(1)
conf = SparkConf().setAppName("GenerarKPIs").setMaster("local[*]").set("spark.driver.bindAddress", "127.0.0.1").set("spark.driver.host", "127.0.0.1")
try:
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)
except:
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("‚úÖ Spark iniciado")

‚úÖ Spark iniciado


In [3]:
MINIO_ENDPOINT = os.environ.get("MINIO_ENDPOINT", "localhost:9000")
MINIO_ACCESS_KEY = os.environ.get("MINIO_ACCESS_KEY", "minioadmin")
MINIO_SECRET_KEY = os.environ.get("MINIO_SECRET_KEY", "minioadmin")

minio_client = Minio(MINIO_ENDPOINT, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=False)
print("‚úÖ MinIO conectado")

‚úÖ MinIO conectado


In [4]:
archivo_silver = None
try:
    print("üì• Buscando archivos Silver...")
    objects = minio_client.list_objects("meteo-silver", recursive=True)
    archivos_silver = [obj.object_name for obj in objects if obj.object_name.endswith(".csv")]
    if archivos_silver:
        archivo_silver = sorted(archivos_silver)[-1]
        print(f"‚úÖ Cargando: {archivo_silver}")
        temp_dir = tempfile.gettempdir()
        temp_file = os.path.join(temp_dir, archivo_silver.split("/")[-1])
        minio_client.fget_object("meteo-silver", archivo_silver, temp_file)
        df = spark.read.csv(temp_file, header=True, inferSchema=True)
        print(f"‚úÖ Cargados {df.count()} registros")
    else:
        print("‚ö†Ô∏è Sin archivos Silver")
        df = spark.createDataFrame([(1, 25.5, 60, "2024-01-01")], ["id", "temperature", "humidity", "timestamp"])
except Exception as e:
    print(f"‚ö†Ô∏è Error: {e}")
    df = spark.createDataFrame([(1, 25.5, 60, "2024-01-01")], ["id", "temperature", "humidity", "timestamp"])

print(f"\nüìä DataFrame: {df.count()} registros")
df.show(3)

üì• Buscando archivos Silver...
‚ö†Ô∏è Sin archivos Silver


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\Users\Alumno_AI\Desktop\Estacion_Meteorologica\venv_meteo\Lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Alumno_AI\AppData\Local\Programs\Python\Python312\Lib\socket.py", line 708, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [WinError 10054] Se ha forzado la interrupci√≥n de una conexi√≥n existente por el host remoto

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Alumno_AI\Desktop\Estacion_Meteorologica\venv_meteo\Lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Alumno_AI\Desktop\Estacion_


KeyboardInterrupt



In [None]:
print("\n" + "="*70)
print("üìä KPI 1: DISPONIBILIDAD Y CALIDAD DE DATOS")
print("="*70)

total_registros = df.count()
registros_temp_validos = df.filter(col("temperature").isNotNull()).count()
registros_humedad_validos = df.filter(col("humidity").isNotNull()).count()

disponibilidad = (registros_temp_validos / total_registros * 100) if total_registros > 0 else 0
calidad_datos = (registros_humedad_validos / total_registros * 100) if total_registros > 0 else 0

print(f"‚úÖ Disponibilidad de Sensores: {disponibilidad:.2f}%")
print(f"‚úÖ Calidad de Datos (Humedad): {calidad_datos:.2f}%")
print(f"‚úÖ Total de Registros: {total_registros}")
print(f"‚úÖ Registros V√°lidos (Temperatura): {registros_temp_validos}")
print(f"‚úÖ Registros V√°lidos (Humedad): {registros_humedad_validos}")

In [None]:
print("\n" + "="*70)
print("üìä KPI 2: ESTABILIDAD CLIM√ÅTICA")
print("="*70)

metricas_clima = df.agg(
    avg("temperature").alias("temp_promedio"),
    spark_max("temperature").alias("temp_maxima"),
    spark_min("temperature").alias("temp_minima"),
    stddev("temperature").alias("temp_desviacion"),
    avg("humidity").alias("humedad_promedio"),
    spark_max("humidity").alias("humedad_maxima"),
    spark_min("humidity").alias("humedad_minima")
).collect()[0]

temp_avg = round(float(metricas_clima.temp_promedio or 0), 2)
temp_max = round(float(metricas_clima.temp_maxima or 0), 2)
temp_min = round(float(metricas_clima.temp_minima or 0), 2)

temp_rango = round(temp_max - temp_min, 2)

hum_avg = round(float(metricas_clima.humedad_promedio or 0), 2)
hum_max = round(float(metricas_clima.humedad_maxima or 0), 2)
hum_min = round(float(metricas_clima.humedad_minima or 0), 2)
hum_rango = round(hum_max - hum_min, 2)

print(f"üå°Ô∏è Temperatura Promedio: {temp_avg}¬∞C")
print(f"üå°Ô∏è Temperatura M√°xima: {temp_max}¬∞C")
print(f"üå°Ô∏è Temperatura M√≠nima: {temp_min}¬∞C")
print(f"üå°Ô∏è Rango de Temperatura: {temp_rango}¬∞C")

print(f"üíß Humedad Promedio: {hum_avg}%")
print(f"üíß Humedad M√°xima: {hum_max}%")
print(f"üíß Humedad M√≠nima: {hum_min}%")
print(f"üíß Rango de Humedad: {hum_rango}%")

In [None]:
print("\n" + "="*70)
print("üìä KPI 3: DETECCI√ìN DE ANOMAL√çAS")
print("="*70)

# Anomal√≠as de temperatura
temp_fuera_rango = df.filter((col("temperature") < 0) | (col("temperature") > 50)).count()
# Anomal√≠as de humedad
hum_invalida = df.filter((col("humidity") < 0) | (col("humidity") > 100)).count()
# Riesgo de condensaci√≥n
riesgo_condensacion = df.filter((col("temperature") < 5) & (col("humidity") > 85)).count()

temp_anomalia_pct = (temp_fuera_rango / total_registros * 100) if total_registros > 0 else 0
hum_anomalia_pct = (hum_invalida / total_registros * 100) if total_registros > 0 else 0
riesgo_pct = (riesgo_condensacion / total_registros * 100) if total_registros > 0 else 0

print(f"üö® Temperaturas Fuera de Rango (0-50¬∞C): {temp_fuera_rango} ({temp_anomalia_pct:.2f}%)")
print(f"üö® Humedades Inv√°lidas (0-100%): {hum_invalida} ({hum_anomalia_pct:.2f}%)")
print(f"‚ö†Ô∏è Riesgo de Condensaci√≥n (T<5¬∞C + H>85%): {riesgo_condensacion} ({riesgo_pct:.2f}%)")

In [None]:
print("\n" + "="*70)
print("üìä KPI 4: CONDICIONES OPERATIVAS (ALERTAS)")
print("="*70)

# Condiciones √≥ptimas: 15-28¬∞C y 40-70% humedad
optimas = df.filter((col("temperature") >= 15) & (col("temperature") <= 28) & 
                    (col("humidity") >= 40) & (col("humidity") <= 70)).count()
# Alerta: 5-15¬∞C o 28-35¬∞C, O 30-40% o 70-80% humedad
alerta = df.filter(((col("temperature") >= 5) & (col("temperature") < 15)) |
                   ((col("temperature") > 28) & (col("temperature") <= 35)) |
                   ((col("humidity") >= 30) & (col("humidity") < 40)) |
                   ((col("humidity") > 70) & (col("humidity") <= 80))).count()
# Cr√≠tica: T < 5¬∞C o T > 35¬∞C, O H < 30% o H > 80%
critica = df.filter((col("temperature") < 5) | (col("temperature") > 35) |
                    (col("humidity") < 30) | (col("humidity") > 80)).count()

optimas_pct = (optimas / total_registros * 100) if total_registros > 0 else 0
alerta_pct = (alerta / total_registros * 100) if total_registros > 0 else 0
critica_pct = (critica / total_registros * 100) if total_registros > 0 else 0

print(f"‚úÖ Condiciones √ìptimas (15-28¬∞C, 40-70% H): {optimas} ({optimas_pct:.2f}%)")
print(f"‚ö†Ô∏è Condiciones de Alerta: {alerta} ({alerta_pct:.2f}%)")
print(f"üö® Condiciones Cr√≠ticas: {critica} ({critica_pct:.2f}%)")

In [None]:
print("\n" + "="*70)
print("üìä KPI 5: RENDIMIENTO POR SENSOR")
print("="*70)

kpi_sensor = df.groupBy("id").agg(
    count("*").alias("lecturas"),
    spark_round(avg("temperature"), 2).alias("temp_avg"),
    spark_round(spark_max("temperature"), 2).alias("temp_max"),
    spark_round(spark_min("temperature"), 2).alias("temp_min"),
    spark_round(avg("humidity"), 2).alias("hum_avg"),
    spark_round(spark_max("humidity"), 2).alias("hum_max"),
    spark_round(spark_min("humidity"), 2).alias("hum_min")
).orderBy(desc("lecturas"))

print("\nResumen por Sensor:")
kpi_sensor.show(truncate=False)

In [None]:
print("\n" + "="*70)
print("üíæ GUARDANDO KPIs EN MINIO (GOLD LAYER)")
print("="*70)

archivo_gold = 'metricas_kpi_gold.csv'
archivo_datos = 'datos_principales_silver.csv'

try:
    import io
    from pyspark.sql.functions import col
    from pyspark.sql.types import TimestampType
    
    # ========== GUARDAR KPIs ==========
    # Eliminar archivo anterior si existe
    try:
        minio_client.remove_object("meteo-gold", archivo_gold)
        print(f"üóëÔ∏è  KPI anterior eliminado")
    except:
        pass
    
    # --- CORRECCI√ìN DE FECHAS PARA KPIs ---
    kpi_export = kpi_sensor
    for field in kpi_sensor.schema.fields:
        if isinstance(field.dataType, TimestampType):
            kpi_export = kpi_export.withColumn(field.name, col(field.name).cast("string"))
    # --------------------------------------

    # Convertir KPIs a pandas (usando la versi√≥n corregida kpi_export)
    pdf_kpi = kpi_export.toPandas()
    csv_buffer_kpi = pdf_kpi.to_csv(index=False)
    csv_bytes_kpi = io.BytesIO(csv_buffer_kpi.encode('utf-8'))
    
    # Subir KPIs a MinIO
    minio_client.put_object(
        "meteo-gold", 
        archivo_gold, 
        csv_bytes_kpi,
        length=len(csv_buffer_kpi.encode('utf-8')),
        content_type="text/csv"
    )
    print(f"‚úÖ {archivo_gold} guardado en meteo-gold")
    print(f"   üìä Columnas: {list(pdf_kpi.columns)}")
    print(f"   üìà Registros: {len(pdf_kpi)}")
    
    # ========== GUARDAR DATOS PRINCIPALES ==========
    # Eliminar archivo anterior
    try:
        minio_client.remove_object("meteo-gold", archivo_datos)
        print(f"\nüóëÔ∏è  Datos anteriores eliminados")
    except:
        pass
    
    # --- CORRECCI√ìN DE FECHAS PARA DATOS PRINCIPALES (Aqu√≠ fallaba antes) ---
    df_export = df
    for field in df.schema.fields:
        if isinstance(field.dataType, TimestampType):
            df_export = df_export.withColumn(field.name, col(field.name).cast("string"))
    # ------------------------------------------------------------------------

    # Convertir datos principales a pandas (usando df_export)
    pdf_datos = df_export.toPandas()
    csv_buffer_datos = pdf_datos.to_csv(index=False)
    csv_bytes_datos = io.BytesIO(csv_buffer_datos.encode('utf-8'))
    
    # Subir datos a MinIO
    minio_client.put_object(
        "meteo-gold", 
        archivo_datos, 
        csv_bytes_datos,
        length=len(csv_buffer_datos.encode('utf-8')),
        content_type="text/csv"
    )
    print(f"‚úÖ {archivo_datos} guardado en meteo-gold")
    print(f"   üìä Columnas: {list(pdf_datos.columns)}")
    print(f"   üìà Registros: {len(pdf_datos)}")
    
    print("\n" + "="*70)
    print("‚úÖ DATOS DE NEGOCIO GENERADOS CORRECTAMENTE")
    print("="*70)
    print(f"üìç KPIs por Sensor: meteo-gold/{archivo_gold}")
    print(f"üìç Datos Principales: meteo-gold/{archivo_datos}")
    print("="*70)
    
except Exception as e:
    error_msg = str(e)
    # Ignorar errores de Hadoop (no cr√≠ticos)
    if "UnsatisfiedLinkError" in error_msg or "NativeIO" in error_msg:
        print(f"‚ö†Ô∏è  Warning Hadoop ignorado (no cr√≠tico)")
        print("‚úÖ DATOS GENERADOS CORRECTAMENTE")
    else:
        print(f"‚ùå Error fatal: {e}")
        import traceback
        traceback.print_exc()