In [0]:
%pip install python-dotenv

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import col
from dotenv import load_dotenv
from datetime import datetime

import os
import requests
import json
import uuid

# Cargar el .env
load_dotenv("/Workspace/Users/renzo_hc@outlook.com/data_engineer/.env")

# URL del JSON público en S3
url = os.getenv("BASE_URL_OPENMETEO")
response = requests.get(url)
data = response.json()

# Fecha de ingesta actual
fecha_ingesta = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

# Construcción de filas
rows = [
    (
        str(uuid.uuid4()),
        r["distrito"],
        r["lat"],
        r["lon"],
        r["fecha"],
        r["temp_max"],
        r["temp_min"],
        r["precipitacion"],
        fecha_ingesta
    )
    for r in data
]

# Definir esquema con nombres descriptivos
schema = StructType([
    StructField("id", StringType(), False),
    StructField("distrito", StringType(), True),
    StructField("latitud", DoubleType(), True),
    StructField("longitud", DoubleType(), True),
    StructField("fecha", StringType(), True),
    StructField("temperatura_maxima", DoubleType(), True),
    StructField("temperatura_minima", DoubleType(), True),
    StructField("precipitacion_total", DoubleType(), True),
    StructField("fecha_ingesta", StringType(), True)
])

# Crear DataFrame
df = spark.createDataFrame(rows, schema=schema)

# Tabla destino
tabla_bronze = "main.weather.bronze_distritos_diarios"

# Verificar existencia de la tabla y aplicar lógica de ingesta incremental
if not spark.catalog.tableExists(tabla_bronze):
    df.write.format("delta").mode("overwrite").saveAsTable(tabla_bronze)
    print("✅ Tabla creada e ingesta inicial completada.")
else:
    df_existente = spark.table(tabla_bronze)
    df_nuevos = df.join(df_existente, ["distrito", "fecha"], "left_anti")
    
    nuevos_registros = df_nuevos.count()
    if nuevos_registros > 0:
        df_nuevos.write.format("delta").mode("append").saveAsTable(tabla_bronze)
        print(f"✅ {nuevos_registros} nuevos registros insertados.")
    else:
        print("ℹ️ No se encontraron nuevos datos para insertar.")

