In [None]:
from pyspark.sql.functions import col, to_date, dayofweek, date_format, when, lit, unix_timestamp, round, current_timestamp, coalesce, hour, concat, year, month
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
from delta.tables import DeltaTable
import requests
import os
import pandas as pd

print("Corriendo el modelado dimensional gold...")

# CARGA Y UNIÓN DE TABLAS SILVER
print("Buscando datos en capa silver...")
try:
    df_yellow = spark.table("silver_yellow_taxi")
    print(f"Yellow cargado: {df_yellow.count()} registros")

    if spark.catalog.tableExists("silver_green_taxi"):
        df_green = spark.table("silver_green_taxi")
        print(f"Green encontrado: {df_green.count()} registros")
        df_silver = df_yellow.unionByName(df_green, allowMissingColumns=True)
        print("Unión exitosa de Yellow y Green")
    else:
        df_silver = df_yellow
        print("No se encontró tabla Green, procesando solo Yellow.")

except Exception as e:
    print(f"Error cargando tablas: {e}")
    if spark.catalog.tableExists("silver_yellow_taxi"):
        df_silver = spark.table("silver_yellow_taxi")
    else:
        mssparkutils.notebook.exit("Error: No hay datos Silver para procesar.")

# DIMENSION: dim_date
print("Generando dim_date...")
df_date = df_silver.select(to_date(col("pickup_time")).alias("date_id")).distinct() \
    .withColumn("year", year(col("date_id"))) \
    .withColumn("month", month(col("date_id"))) \
    .withColumn("day", col("date_id").substr(9, 2).cast("int")) \
    .withColumn("quarter",  when(col("month").between(1, 3), 1)
                           .when(col("month").between(4, 6), 2)
                           .when(col("month").between(7, 9), 3)
                           .otherwise(4)) \
    .withColumn("day_of_week", dayofweek(col("date_id"))) \
    .withColumn("day_name", date_format(col("date_id"), "EEEE")) \
    .withColumn("month_name", date_format(col("date_id"), "MMMM")) \
    .withColumn("is_weekend", when(col("day_of_week").isin(1, 7), True).otherwise(False))

df_date.write.format("delta").mode("overwrite").saveAsTable("dim_date")
print("dim_date lista.")

# DIMENSION: dim_payment_type
print("Generando dim_payment_type...")
data_payment = [
    (1, "Credit Card"), (2, "Cash"), (3, "No Charge"), 
    (4, "Dispute"), (5, "Unknown"), (6, "Voided Trip")
]
schema_payment = StructType([
    StructField("payment_type_id", IntegerType(), False),
    StructField("payment_desc", StringType(), False)
])
df_payment = spark.createDataFrame(data_payment, schema=schema_payment)
df_payment.write.format("delta").mode("overwrite").saveAsTable("dim_payment_type")
print("dim_payment_type lista.")

# DIMENSION: dim_zone
print("Generando dim_zone...")
url_zones = "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"
try:
    pdf_zone = pd.read_csv(url_zones)
    df_zone_raw = spark.createDataFrame(pdf_zone)
    df_zone = df_zone_raw.select(
        col("LocationID").cast("int").alias("zone_id"),
        col("Borough").alias("borough"),
        col("Zone").alias("zone_name"),
        col("service_zone")
    )
    df_zone = df_zone.withColumn("full_location", concat(col("zone_name"), lit(", New York, NY, USA")))
except Exception as e:
    print(f"Falló la descarga de zonas ({e}), creando dummy.")
    df_zone = df_silver.select(col("pickup_zone_id").alias("zone_id")).distinct() \
        .withColumn("borough", lit("Unknown")).withColumn("zone_name", lit("Unknown")) \
        .withColumn("service_zone", lit("Unknown")).withColumn("full_location", lit("New York, NY, USA"))

unknown_schema = df_zone.schema
unknown_row = spark.createDataFrame([(999, "Unknown", "Unknown", "Unknown", "New York, NY, USA")], unknown_schema)
df_zone_final = df_zone.union(unknown_row).distinct()
df_zone_final.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("dim_zone")
print("dim_zone lista.")

# FACT TABLE: fact_trips
print("Armando fact_trips...")

df_fact = df_silver.select(
    col("trip_id"), 
    col("pickup_time"),
    col("dropoff_time"),
    col("taxi_type"),
    to_date(col("pickup_time")).alias("date_id"),
    col("pickup_zone_id"),
    col("dropoff_zone_id"),
    col("payment_type").alias("payment_type_id"),
    hour(col("pickup_time")).alias("trip_hour"),
    col("passenger_count"),
    col("trip_distance"),
    col("tip_amount"),
    col("fare_amount"),
    col("total_amount"),
    round((unix_timestamp(col("dropoff_time")) - unix_timestamp(col("pickup_time"))) / 60, 2).alias("duration_minutes"),
    current_timestamp().alias("loaded_at")
)

df_fact = df_fact.fillna(-1, subset=["payment_type_id"])

# Logica de merge para evitar duplicados
if spark.catalog.tableExists("fact_trips"):
    print("Tabla fact_trips detectada. Aplicando MERGE...")
    dt_fact = DeltaTable.forName(spark, "fact_trips")
    dt_fact.alias("target").merge(
        df_fact.alias("source"),
        "target.trip_id = source.trip_id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()
else:
    print("Creando fact_trips por primera vez...")
    df_fact.write.format("delta").mode("overwrite").option("overwriteSchema", "true").partitionBy("date_id").saveAsTable("fact_trips")

# Optimizacion z order
print("Optimizando tablas...")
spark.sql("OPTIMIZE fact_trips ZORDER BY (pickup_zone_id, dropoff_zone_id)")
spark.sql("VACUUM fact_trips RETAIN 168 HOURS") 

print("Modelo Gold finalizado")
mssparkutils.session.stop()