In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC # Capa Oro - Uber NYC Data Pipeline
# MAGIC 
# MAGIC ## Objetivo
# MAGIC Crear agregaciones de negocio, KPIs y métricas analíticas desde la capa Silver para facilitar el análisis y reporting.
# MAGIC 
# MAGIC ## Responsabilidades
# MAGIC - Leer datos curados de la capa Silver
# MAGIC - Crear agregaciones temporales (horarias, diarias, mensuales)
# MAGIC - Generar métricas de negocio y KPIs
# MAGIC - Crear tablas especializadas para diferentes casos de uso
# MAGIC 
# MAGIC ## Entrada
# MAGIC - Tabla Delta: `uber_silver` desde `/Volumes/workspace/default/uber_etl_azure/silver/`
# MAGIC 
# MAGIC ## Salida
# MAGIC - Múltiples tablas Delta especializadas en `/Volumes/workspace/default/uber_etl_azure/gold/`


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 1. Configuración e Imports

# COMMAND ---------

In [0]:
# Importar librerías necesarias
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import os

In [0]:
# Configurar rutas
silver_path = "/Volumes/workspace/default/uber_etl_azure/silver/"
gold_path = "/Volumes/workspace/default/uber_etl_azure/gold/"

# Crear directorio si no existe
dbutils.fs.mkdirs(gold_path)

print("✅ Configuración de Capa Oro inicializada")
print(f"🥈 Origen Silver: {silver_path}")
print(f"🥇 Destino Gold: {gold_path}")

✅ Configuración de Capa Oro inicializada
🥈 Origen Silver: /Volumes/workspace/default/uber_etl_azure/silver/
🥇 Destino Gold: /Volumes/workspace/default/uber_etl_azure/gold/


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 2. Validación de Dependencias

# COMMAND ----------

In [0]:
# Verificar que existe la tabla Silver
try:
    silver_files = dbutils.fs.ls(f"{silver_path}uber_silver")
    print("✅ Tabla Silver encontrada:")
    for file in silver_files[:5]:  # Mostrar solo los primeros 5 archivos
        print(f"   📄 {file.name}")
    
    if len(silver_files) > 5:
        print(f"   ... y {len(silver_files) - 5} archivos más")
        
except Exception as e:
    print(f"❌ Error: Tabla Silver no encontrada - {str(e)}")
    print("💡 Asegúrate de ejecutar primero el notebook '02_Uber_Silver_Layer'")
    raise


✅ Tabla Silver encontrada:
   📄 _delta_log/
   📄 part-00000-8aea1406-5f14-4215-89bf-809b333127eb.c000.snappy.parquet
   📄 part-00000-d442906f-da51-41b4-9c5e-c044c48e17b4.c000.snappy.parquet
   📄 part-00000-dc565cda-d614-4992-9e4f-c6bd53cb5656.c000.snappy.parquet
   📄 part-00000-fa85f50a-52a2-443e-a242-9891ec0a187d.c000.snappy.parquet


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 3. Lectura de Datos Silver

# COMMAND ----------

In [0]:
from pyspark import StorageLevel

# Leer datos de la capa Silver
print("📥 Leyendo datos de la capa Silver...")

try:
    df_silver = spark.read.format("delta").load(f"{silver_path}uber_silver")
    
    silver_count = df_silver.count()
    print(f"📊 Registros leídos de Silver: {silver_count:,}")
    
    if silver_count == 0:
        raise Exception("❌ No se encontraron datos en la capa Silver")
    
    # Removed persist to avoid serverless compute error
    # df_silver.persist(storageLevel=StorageLevel.MEMORY_ONLY)
    print("✅ Datos Silver cargados exitosamente")
    
except Exception as e:
    print(f"❌ Error al leer capa Silver: {str(e)}")
    raise

📥 Leyendo datos de la capa Silver...
📊 Registros leídos de Silver: 4,502,417
✅ Datos Silver cargados exitosamente


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 4. Exploración de Datos Silver

# COMMAND ----------

In [0]:
print("🔍 EXPLORACIÓN DE DATOS SILVER:")
print("=" * 40)

# Mostrar esquema
print("📋 Esquema de datos Silver:")
df_silver.printSchema()

# Rango temporal de datos
date_stats = df_silver.agg(
    min("pickup_datetime").alias("fecha_inicio"),
    max("pickup_datetime").alias("fecha_fin"),
    countDistinct("pickup_date").alias("dias_unicos")
).collect()[0]

print(f"\n📅 Información temporal:")
print(f"   Fecha inicio: {date_stats['fecha_inicio']}")
print(f"   Fecha fin: {date_stats['fecha_fin']}")
print(f"   Días únicos: {date_stats['dias_unicos']}")

🔍 EXPLORACIÓN DE DATOS SILVER:
📋 Esquema de datos Silver:
root
 |-- pickup_datetime: timestamp (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- base_code: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = true)
 |-- source_file: string (nullable = true)
 |-- bronze_layer_version: string (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pickup_day_of_week: string (nullable = true)
 |-- pickup_day_of_week_num: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- pickup_year: integer (nullable = true)
 |-- pickup_date: string (nullable = true)
 |-- pickup_week_of_year: integer (nullable = true)
 |-- time_category: string (nullable = true)
 |-- day_type: string (nullable = true)
 |-- trip_id: string (nullable = true)
 |-- silver_processing_timestamp: timestamp (nullable = true)
 |-- silver_layer_version: string (nullable = true)


📅 Información temporal:
   Fecha inicio: 20

In [0]:
# Información de bases
base_stats = df_silver.agg(
    countDistinct("base_code").alias("bases_unicas")
).collect()[0]

print(f"\n🏢 Información de bases:")
print(f"   Bases únicas: {base_stats['bases_unicas']}")

print("✅ Exploración completada")


🏢 Información de bases:
   Bases únicas: 5
✅ Exploración completada


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 5. Tabla Gold 1: Agregaciones Temporales Diarias

# COMMAND ----------

In [0]:
print("🥇 CREANDO TABLA GOLD 1: Agregaciones Diarias")
print("=" * 50)

🥇 CREANDO TABLA GOLD 1: Agregaciones Diarias


In [0]:
from pyspark.sql.functions import (
    count, countDistinct, avg, stddev, round, sum, when, col, max, expr
)

# Crear agregaciones por día
df_daily_metrics = df_silver.groupBy(
    "pickup_date", 
    "pickup_month", 
    "pickup_year", 
    "pickup_day_of_week",
    "pickup_day_of_week_num",
    "day_type"
).agg(
    count("trip_id").alias("total_trips"),
    countDistinct("base_code").alias("active_bases"),
    countDistinct("pickup_hour").alias("active_hours"),
    
    # Estadísticas de coordenadas
    round(avg("latitude"), 6).alias("avg_latitude"),
    round(avg("longitude"), 6).alias("avg_longitude"),
    round(stddev("latitude"), 6).alias("stddev_latitude"),
    round(stddev("longitude"), 6).alias("stddev_longitude"),
    
    # Distribución horaria
    sum(when(col("time_category") == "Morning Rush", 1).otherwise(0)).alias("morning_rush_trips"),
    sum(when(col("time_category") == "Evening Rush", 1).otherwise(0)).alias("evening_rush_trips"),
    sum(when(col("time_category") == "Midday", 1).otherwise(0)).alias("midday_trips"),
    sum(when(col("time_category") == "Night", 1).otherwise(0)).alias("night_trips"),
    sum(when(col("time_category") == "Late Night/Early Morning", 1).otherwise(0)).alias("late_night_trips"),
    
    # Horas pico
    expr("first(pickup_hour, true)").alias("first_trip_hour"),
    max("pickup_hour").alias("last_trip_hour")
)

display(df_daily_metrics)

pickup_date,pickup_month,pickup_year,pickup_day_of_week,pickup_day_of_week_num,day_type,total_trips,active_bases,active_hours,avg_latitude,avg_longitude,stddev_latitude,stddev_longitude,morning_rush_trips,evening_rush_trips,midday_trips,night_trips,late_night_trips,first_trip_hour,last_trip_hour
2014-07-22,7,2014,Tuesday,3,Weekday,28860,5,24,40.741402,-73.975229,0.034491,0.044091,5088,8308,9989,3760,1715,0,23
2014-04-06,4,2014,Sunday,1,Weekend,13392,5,24,40.732499,-73.971713,0.038106,0.055573,1151,2716,4705,1213,3607,0,23
2014-07-27,7,2014,Sunday,1,Weekend,22375,5,24,40.734194,-73.966844,0.040446,0.057394,2005,4832,8713,2507,4318,0,23
2014-07-26,7,2014,Saturday,7,Weekend,27354,5,24,40.735187,-73.97257,0.038371,0.047108,2297,6681,9005,5341,4030,0,23
2014-07-14,7,2014,Monday,2,Weekday,27189,5,24,40.739672,-73.973309,0.035703,0.051119,4621,8684,7913,3894,2077,0,23
2014-09-02,9,2014,Tuesday,3,Weekday,28612,5,24,40.738622,-73.967399,0.038873,0.058925,5816,7582,9840,3323,2051,0,23
2014-05-25,5,2014,Sunday,1,Weekend,10747,5,24,40.731542,-73.973881,0.040437,0.050443,878,2502,3993,1914,1460,0,23
2014-08-22,8,2014,Friday,6,Weekday,29229,5,24,40.736721,-73.972513,0.037615,0.050109,4750,6904,9608,5058,2909,0,23
2014-07-08,7,2014,Tuesday,3,Weekday,25596,5,24,40.740424,-73.974437,0.034685,0.046833,4566,7272,8702,3452,1604,0,23
2014-06-18,6,2014,Wednesday,4,Weekday,24497,5,24,40.741175,-73.976967,0.033689,0.045336,4171,6841,8793,3400,1292,0,23


In [0]:
# Añadir métricas calculadas
df_daily_metrics = df_daily_metrics \
    .withColumn("trips_per_base", round(col("total_trips") / col("active_bases"), 2)) \
    .withColumn("rush_hour_percentage", 
                round(((col("morning_rush_trips") + col("evening_rush_trips")) / col("total_trips")) * 100, 2)) \
    .withColumn("weekend_flag", when(col("day_type") == "Weekend", 1).otherwise(0)) \
    .withColumn("processing_timestamp", current_timestamp()) \
    .withColumn("gold_layer_version", lit("1.0"))

print(f"📊 Registros en agregación diaria: {df_daily_metrics.count()}")

📊 Registros en agregación diaria: 183


In [0]:
# Guardar agregación diaria
try:
    df_daily_metrics.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(f"{gold_path}daily_metrics")
    
    print("✅ Tabla Gold 1 (daily_metrics) guardada exitosamente")
    
except Exception as e:
    print(f"❌ Error al guardar daily_metrics: {str(e)}")
    raise

✅ Tabla Gold 1 (daily_metrics) guardada exitosamente


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 6. Tabla Gold 2: Agregaciones por Base y Hora

# COMMAND ----------

In [0]:
print("🥇 CREANDO TABLA GOLD 2: Métricas por Base y Hora")
print("=" * 50)

🥇 CREANDO TABLA GOLD 2: Métricas por Base y Hora


In [0]:
# Crear agregaciones por base y hora
df_base_hourly = df_silver.groupBy(
    "base_code",
    "pickup_hour",
    "time_category"
).agg(
    count("trip_id").alias("hourly_trips"),
    countDistinct("pickup_date").alias("active_days"),
    
    # Estadísticas geográficas
    round(avg("latitude"), 6).alias("avg_latitude"),
    round(avg("longitude"), 6).alias("avg_longitude"),
    round(min("latitude"), 6).alias("min_latitude"),
    round(max("latitude"), 6).alias("max_latitude"),
    round(min("longitude"), 6).alias("min_longitude"),
    round(max("longitude"), 6).alias("max_longitude"),
    
    # Distribución por días
    sum(when(col("day_type") == "Weekday", 1).otherwise(0)).alias("weekday_trips"),
    sum(when(col("day_type") == "Weekend", 1).otherwise(0)).alias("weekend_trips")
)

# Añadir métricas calculadas
df_base_hourly = df_base_hourly \
    .withColumn("avg_trips_per_day", round(col("hourly_trips") / col("active_days"), 2)) \
    .withColumn("weekend_percentage", 
                round((col("weekend_trips") / col("hourly_trips")) * 100, 2)) \
    .withColumn("coordinate_range_lat", 
                round(col("max_latitude") - col("min_latitude"), 6)) \
    .withColumn("coordinate_range_lon", 
                round(col("max_longitude") - col("min_longitude"), 6)) \
    .withColumn("processing_timestamp", current_timestamp()) \
    .withColumn("gold_layer_version", lit("1.0"))

In [0]:
# Añadir ranking por hora para cada base
window_spec = Window.partitionBy("base_code").orderBy(col("hourly_trips").desc())
df_base_hourly = df_base_hourly \
    .withColumn("hour_rank_by_trips", row_number().over(window_spec)) \
    .withColumn("is_peak_hour", when(col("hour_rank_by_trips") <= 3, 1).otherwise(0))

print(f"📊 Registros en agregación base-hora: {df_base_hourly.count()}")

📊 Registros en agregación base-hora: 120


In [0]:
# Guardar agregación por base y hora
try:
    df_base_hourly.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(f"{gold_path}base_hourly_metrics")
    
    print("✅ Tabla Gold 2 (base_hourly_metrics) guardada exitosamente")
    
except Exception as e:
    print(f"❌ Error al guardar base_hourly_metrics: {str(e)}")
    raise

✅ Tabla Gold 2 (base_hourly_metrics) guardada exitosamente


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 7. Tabla Gold 3: KPIs de Negocio Mensuales

# COMMAND ----------

In [0]:
print("🥇 CREANDO TABLA GOLD 3: KPIs Mensuales de Negocio")
print("=" * 50)

🥇 CREANDO TABLA GOLD 3: KPIs Mensuales de Negocio


In [0]:
# Crear KPIs mensuales
df_monthly_kpis = df_silver.groupBy(
    "pickup_month",
    "pickup_year"
).agg(
    # Métricas básicas
    count("trip_id").alias("total_monthly_trips"),
    countDistinct("base_code").alias("active_bases"),
    countDistinct("pickup_date").alias("active_days"),
    
    # Métricas de distribución temporal
    countDistinct("pickup_hour").alias("active_hours"),
    sum(when(col("day_type") == "Weekend", 1).otherwise(0)).alias("weekend_trips"),
    sum(when(col("day_type") == "Weekday", 1).otherwise(0)).alias("weekday_trips"),
    
    # Métricas por categoría de tiempo
    sum(when(col("time_category") == "Morning Rush", 1).otherwise(0)).alias("morning_rush_monthly"),
    sum(when(col("time_category") == "Evening Rush", 1).otherwise(0)).alias("evening_rush_monthly"),
    sum(when(col("time_category") == "Midday", 1).otherwise(0)).alias("midday_monthly"),
    sum(when(col("time_category") == "Night", 1).otherwise(0)).alias("night_monthly"),
    sum(when(col("time_category") == "Late Night/Early Morning", 1).otherwise(0)).alias("late_night_monthly"),
    
    # Estadísticas geográficas
    round(avg("latitude"), 6).alias("avg_monthly_latitude"),
    round(avg("longitude"), 6).alias("avg_monthly_longitude"),
    round(stddev("latitude"), 6).alias("geographic_spread_lat"),
    round(stddev("longitude"), 6).alias("geographic_spread_lon")
)

In [0]:
# Calcular KPIs avanzados
df_monthly_kpis = df_monthly_kpis \
    .withColumn("avg_trips_per_day", round(col("total_monthly_trips") / col("active_days"), 2)) \
    .withColumn("avg_trips_per_base", round(col("total_monthly_trips") / col("active_bases"), 2)) \
    .withColumn("weekend_ratio", round(col("weekend_trips") / col("weekday_trips"), 3)) \
    .withColumn("rush_hours_percentage", 
                round(((col("morning_rush_monthly") + col("evening_rush_monthly")) / 
                       col("total_monthly_trips")) * 100, 2)) \
    .withColumn("night_activity_percentage", 
                round(((col("night_monthly") + col("late_night_monthly")) / 
                       col("total_monthly_trips")) * 100, 2))

In [0]:
# Añadir comparación mes a mes
window_month = Window.orderBy("pickup_year", "pickup_month")
df_monthly_kpis = df_monthly_kpis \
    .withColumn("prev_month_trips", lag("total_monthly_trips", 1).over(window_month)) \
    .withColumn("mom_growth", 
                round(((col("total_monthly_trips") - col("prev_month_trips")) / 
                       col("prev_month_trips")) * 100, 2)) \
    .withColumn("processing_timestamp", current_timestamp()) \
    .withColumn("gold_layer_version", lit("1.0"))

print(f"📊 Registros en KPIs mensuales: {df_monthly_kpis.count()}")



📊 Registros en KPIs mensuales: 6




In [0]:
# Guardar KPIs mensuales
try:
    df_monthly_kpis.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(f"{gold_path}monthly_kpis")
    
    print("✅ Tabla Gold 3 (monthly_kpis) guardada exitosamente")
    
except Exception as e:
    print(f"❌ Error al guardar monthly_kpis: {str(e)}")
    raise



✅ Tabla Gold 3 (monthly_kpis) guardada exitosamente


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 8. Tabla Gold 4: Análisis Geoespacial por Zona

# COMMAND ----------

In [0]:
print("🥇 CREANDO TABLA GOLD 4: Análisis Geoespacial")
print("=" * 50)

🥇 CREANDO TABLA GOLD 4: Análisis Geoespacial


In [0]:
# Crear zonas geográficas basadas en cuadrículas
# División de NYC en grilla para análisis espacial
df_geo_analysis = df_silver \
    .withColumn("lat_zone", floor(col("latitude") * 100) / 100) \
    .withColumn("lon_zone", floor(col("longitude") * 100) / 100) \
    .withColumn("geo_zone_id", concat(col("lat_zone"), lit("_"), col("lon_zone")))

In [0]:
# Agregar por zonas geográficas
df_spatial_metrics = df_geo_analysis.groupBy(
    "geo_zone_id",
    "lat_zone",
    "lon_zone"
).agg(
    count("trip_id").alias("zone_total_trips"),
    countDistinct("base_code").alias("zone_bases"),
    countDistinct("pickup_date").alias("zone_active_days"),
    countDistinct("pickup_hour").alias("zone_active_hours"),
    
    # Métricas temporales por zona
    sum(when(col("time_category") == "Morning Rush", 1).otherwise(0)).alias("zone_morning_rush"),
    sum(when(col("time_category") == "Evening Rush", 1).otherwise(0)).alias("zone_evening_rush"),
    sum(when(col("day_type") == "Weekend", 1).otherwise(0)).alias("zone_weekend_trips"),
    sum(when(col("day_type") == "Weekday", 1).otherwise(0)).alias("zone_weekday_trips"),
    
    # Estadísticas por mes
    sum(when(col("pickup_month") == 4, 1).otherwise(0)).alias("april_trips"),
    sum(when(col("pickup_month") == 5, 1).otherwise(0)).alias("may_trips"),
    sum(when(col("pickup_month") == 6, 1).otherwise(0)).alias("june_trips"),
    sum(when(col("pickup_month") == 7, 1).otherwise(0)).alias("july_trips"),
    sum(when(col("pickup_month") == 8, 1).otherwise(0)).alias("august_trips"),
    sum(when(col("pickup_month") == 9, 1).otherwise(0)).alias("september_trips")
)

In [0]:
# Calcular métricas de densidad y actividad
df_spatial_metrics = df_spatial_metrics \
    .withColumn("zone_trips_per_day", round(col("zone_total_trips") / col("zone_active_days"), 2)) \
    .withColumn("zone_base_density", round(col("zone_bases") / col("zone_total_trips") * 1000, 2)) \
    .withColumn("zone_rush_percentage", 
                round(((col("zone_morning_rush") + col("zone_evening_rush")) / 
                       col("zone_total_trips")) * 100, 2)) \
    .withColumn("zone_weekend_percentage", 
                round((col("zone_weekend_trips") / col("zone_total_trips")) * 100, 2))

In [0]:
# Ranking de zonas por actividad
window_activity = Window.orderBy(col("zone_total_trips").desc())
df_spatial_metrics = df_spatial_metrics \
    .withColumn("zone_activity_rank", row_number().over(window_activity)) \
    .withColumn("is_high_activity_zone", when(col("zone_activity_rank") <= 10, 1).otherwise(0)) \
    .withColumn("processing_timestamp", current_timestamp()) \
    .withColumn("gold_layer_version", lit("1.0"))

print(f"📊 Registros en análisis geoespacial: {df_spatial_metrics.count()}")



📊 Registros en análisis geoespacial: 1684


In [0]:
# Guardar análisis geoespacial
try:
    df_spatial_metrics.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(f"{gold_path}spatial_metrics")
    
    print("✅ Tabla Gold 4 (spatial_metrics) guardada exitosamente")
    
except Exception as e:
    print(f"❌ Error al guardar spatial_metrics: {str(e)}")
    raise



✅ Tabla Gold 4 (spatial_metrics) guardada exitosamente


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 9. Tabla Gold 5: Dashboard Ejecutivo (Summary KPIs)

# COMMAND ----------

In [0]:
print("🥇 CREANDO TABLA GOLD 5: Dashboard Ejecutivo")
print("=" * 50)

🥇 CREANDO TABLA GOLD 5: Dashboard Ejecutivo


In [0]:
# Crear tabla resumen para dashboard ejecutivo
df_executive_summary = df_silver.agg(
    # KPIs principales
    count("trip_id").alias("total_trips_period"),
    countDistinct("base_code").alias("total_active_bases"),
    countDistinct("pickup_date").alias("total_active_days"),
    countDistinct("pickup_hour").alias("total_active_hours"),
    
    # Fechas del período
    min("pickup_datetime").alias("period_start"),
    max("pickup_datetime").alias("period_end"),
    
    # Distribución por tipo de día
    sum(when(col("day_type") == "Weekend", 1).otherwise(0)).alias("total_weekend_trips"),
    sum(when(col("day_type") == "Weekday", 1).otherwise(0)).alias("total_weekday_trips"),
    
    # Distribución por categoría horaria
    sum(when(col("time_category") == "Morning Rush", 1).otherwise(0)).alias("total_morning_rush"),
    sum(when(col("time_category") == "Evening Rush", 1).otherwise(0)).alias("total_evening_rush"),
    sum(when(col("time_category") == "Midday", 1).otherwise(0)).alias("total_midday"),
    sum(when(col("time_category") == "Night", 1).otherwise(0)).alias("total_night"),
    sum(when(col("time_category") == "Late Night/Early Morning", 1).otherwise(0)).alias("total_late_night"),
    
    # Estadísticas geográficas generales
    round(avg("latitude"), 6).alias("center_latitude"),
    round(avg("longitude"), 6).alias("center_longitude"),
    round(min("latitude"), 6).alias("south_bound"),
    round(max("latitude"), 6).alias("north_bound"),
    round(min("longitude"), 6).alias("west_bound"),
    round(max("longitude"), 6).alias("east_bound")
).withColumn("report_id", lit(1))  # ID único para el reporte

In [0]:
# Calcular KPIs ejecutivos
df_executive_summary = df_executive_summary \
    .withColumn("avg_daily_trips", round(col("total_trips_period") / col("total_active_days"), 0)) \
    .withColumn("avg_trips_per_base", round(col("total_trips_period") / col("total_active_bases"), 0)) \
    .withColumn("weekend_vs_weekday_ratio", round(col("total_weekend_trips") / col("total_weekday_trips"), 2)) \
    .withColumn("rush_hours_share", 
                round(((col("total_morning_rush") + col("total_evening_rush")) / 
                       col("total_trips_period")) * 100, 1)) \
    .withColumn("night_activity_share", 
                round(((col("total_night") + col("total_late_night")) / 
                       col("total_trips_period")) * 100, 1)) \
    .withColumn("geographic_coverage_lat", round(col("north_bound") - col("south_bound"), 4)) \
    .withColumn("geographic_coverage_lon", round(col("east_bound") - col("west_bound"), 4)) \
    .withColumn("report_generated_at", current_timestamp()) \
    .withColumn("gold_layer_version", lit("1.0"))

print(f"📊 Registro de dashboard ejecutivo creado")

📊 Registro de dashboard ejecutivo creado


In [0]:
# Guardar dashboard ejecutivo
try:
    df_executive_summary.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(f"{gold_path}executive_dashboard")
    
    print("✅ Tabla Gold 5 (executive_dashboard) guardada exitosamente")
    
except Exception as e:
    print(f"❌ Error al guardar executive_dashboard: {str(e)}")
    raise

✅ Tabla Gold 5 (executive_dashboard) guardada exitosamente


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 10. Validación de Tablas Gold

# COMMAND ----------

In [0]:
print("🔍 VALIDACIÓN DE TABLAS GOLD CREADAS:")
print("=" * 45)

🔍 VALIDACIÓN DE TABLAS GOLD CREADAS:


In [0]:
gold_tables = [
    ("daily_metrics", "Agregaciones Diarias"),
    ("base_hourly_metrics", "Métricas Base-Hora"),
    ("monthly_kpis", "KPIs Mensuales"),
    ("spatial_metrics", "Análisis Geoespacial"),
    ("executive_dashboard", "Dashboard Ejecutivo")
]

for table_name, description in gold_tables:
    try:
        table_path = f"{gold_path}{table_name}"
        df_validation = spark.read.format("delta").load(table_path)
        record_count = df_validation.count()
        column_count = len(df_validation.columns)
        
        print(f"✅ {description}:")
        print(f"   📊 Registros: {record_count:,}")
        print(f"   🏷️ Columnas: {column_count}")
        print(f"   📍 Ubicación: {table_path}")
        
        # Mostrar muestra de cada tabla
        print(f"   📋 Muestra de datos:")
        sample_cols = df_validation.columns[:5]  # Primeras 5 columnas
        df_validation.select(*sample_cols).show(2, truncate=True)
        
    except Exception as e:
        print(f"❌ Error validando {table_name}: {str(e)}")

print("✅ Validación de tablas completada")

✅ Agregaciones Diarias:
   📊 Registros: 183
   🏷️ Columnas: 25
   📍 Ubicación: /Volumes/workspace/default/uber_etl_azure/gold/daily_metrics
   📋 Muestra de datos:
+-----------+------------+-----------+------------------+----------------------+
|pickup_date|pickup_month|pickup_year|pickup_day_of_week|pickup_day_of_week_num|
+-----------+------------+-----------+------------------+----------------------+
| 2014-07-22|           7|       2014|           Tuesday|                     3|
| 2014-04-06|           4|       2014|            Sunday|                     1|
+-----------+------------+-----------+------------------+----------------------+
only showing top 2 rows
✅ Métricas Base-Hora:
   📊 Registros: 120
   🏷️ Columnas: 21
   📍 Ubicación: /Volumes/workspace/default/uber_etl_azure/gold/base_hourly_metrics
   📋 Muestra de datos:
+---------+-----------+-------------+------------+-----------+
|base_code|pickup_hour|time_category|hourly_trips|active_days|
+---------+-----------+-----------

In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 11. Análisis de Calidad y Rendimiento

# COMMAND ----------

In [0]:
print("📈 ANÁLISIS DE CALIDAD Y RENDIMIENTO:")
print("=" * 45)

📈 ANÁLISIS DE CALIDAD Y RENDIMIENTO:


In [0]:
# Métricas de transformación
silver_records = df_silver.count()

# Conteo de registros en cada tabla Gold
daily_count = spark.read.format("delta").load(f"{gold_path}daily_metrics").count()
base_hourly_count = spark.read.format("delta").load(f"{gold_path}base_hourly_metrics").count()
monthly_count = spark.read.format("delta").load(f"{gold_path}monthly_kpis").count()
spatial_count = spark.read.format("delta").load(f"{gold_path}spatial_metrics").count()

print(f"📊 Resumen de transformaciones:")
print(f"   🥈 Registros Silver de entrada: {silver_records:,}")
print(f"   📅 Agregaciones diarias: {daily_count:,}")
print(f"   🏢 Métricas base-hora: {base_hourly_count:,}")
print(f"   📈 KPIs mensuales: {monthly_count:,}")
print(f"   🗺️ Zonas geográficas: {spatial_count:,}")

📊 Resumen de transformaciones:
   🥈 Registros Silver de entrada: 4,502,417
   📅 Agregaciones diarias: 183
   🏢 Métricas base-hora: 120
   📈 KPIs mensuales: 6
   🗺️ Zonas geográficas: 1,684


In [0]:
# Factores de agregación
print(f"\n📉 Factores de agregación:")
print(f"   Daily: {silver_records // daily_count if daily_count > 0 else 0}:1")
print(f"   Base-Hourly: {silver_records // base_hourly_count if base_hourly_count > 0 else 0}:1")
print(f"   Spatial: {silver_records // spatial_count if spatial_count > 0 else 0}:1")


📉 Factores de agregación:
   Daily: 24603:1
   Base-Hourly: 37520:1
   Spatial: 2673:1


In [0]:
# Verificar consistencia de datos
print(f"\n🔍 Verificación de consistencia:")
executive_summary = spark.read.format("delta").load(f"{gold_path}executive_dashboard").collect()[0]
total_from_summary = executive_summary['total_trips_period']

if total_from_summary == silver_records:
    print(f"   ✅ Consistencia verificada: {total_from_summary:,} registros")
else:
    print(f"   ⚠️ Posible inconsistencia detectada")

print("✅ Análisis de calidad completado")


🔍 Verificación de consistencia:
   ✅ Consistencia verificada: 4,502,417 registros
✅ Análisis de calidad completado


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 12. Creación de Vistas SQL

# COMMAND ----------

In [0]:
print("📋 CREANDO VISTAS SQL PARA ANÁLISIS:")
print("=" * 40)

📋 CREANDO VISTAS SQL PARA ANÁLISIS:


In [0]:
# Registrar todas las tablas Gold como vistas temporales
tables_to_register = [
    ("daily_metrics", "uber_daily_gold"),
    ("base_hourly_metrics", "uber_base_hourly_gold"),
    ("monthly_kpis", "uber_monthly_gold"),
    ("spatial_metrics", "uber_spatial_gold"),
    ("executive_dashboard", "uber_executive_gold")
]

for table_name, view_name in tables_to_register:
    try:
        df_temp = spark.read.format("delta").load(f"{gold_path}{table_name}")
        df_temp.createOrReplaceTempView(view_name)
        print(f"✅ Vista creada: {view_name}")
    except Exception as e:
        print(f"❌ Error creando vista {view_name}: {str(e)}")

print(f"\n💡 Ejemplos de consultas SQL útiles:")
print(f"   -- Top 5 días con más viajes")
print(f"   SELECT pickup_date, total_trips FROM uber_daily_gold ORDER BY total_trips DESC LIMIT 5;")
print(f"   ")
print(f"   -- Comparación weekend vs weekday por mes")
print(f"   SELECT pickup_month, weekend_trips, weekday_trips FROM uber_monthly_gold;")
print(f"   ")
print(f"   -- Top 10 zonas más activas")
print(f"   SELECT geo_zone_id, zone_total_trips FROM uber_spatial_gold ORDER BY zone_total_trips DESC LIMIT 10;")
print(f"   ")
print(f"   -- KPIs ejecutivos principales")
print(f"   SELECT total_trips_period, avg_daily_trips, rush_hours_share FROM uber_executive_gold;")

print("✅ Vistas SQL creadas exitosamente")

✅ Vista creada: uber_daily_gold
✅ Vista creada: uber_base_hourly_gold
✅ Vista creada: uber_monthly_gold
✅ Vista creada: uber_spatial_gold
✅ Vista creada: uber_executive_gold

💡 Ejemplos de consultas SQL útiles:
   -- Top 5 días con más viajes
   SELECT pickup_date, total_trips FROM uber_daily_gold ORDER BY total_trips DESC LIMIT 5;
   
   -- Comparación weekend vs weekday por mes
   SELECT pickup_month, weekend_trips, weekday_trips FROM uber_monthly_gold;
   
   -- Top 10 zonas más activas
   SELECT geo_zone_id, zone_total_trips FROM uber_spatial_gold ORDER BY zone_total_trips DESC LIMIT 10;
   
   -- KPIs ejecutivos principales
   SELECT total_trips_period, avg_daily_trips, rush_hours_share FROM uber_executive_gold;
✅ Vistas SQL creadas exitosamente


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 13. Generación de Reportes de Estadísticas

# COMMAND ---------

In [0]:
print("📊 GENERANDO REPORTES DE ESTADÍSTICAS FINALES:")
print("=" * 50)

📊 GENERANDO REPORTES DE ESTADÍSTICAS FINALES:


In [0]:
# ✅ Leer correctamente desde ruta en Volumes (Unity Catalog)
gold_path = "/Volumes/workspace/default/uber_etl_azure/gold/"
executive_data = spark.read.format("delta").load(f"{gold_path}executive_dashboard").collect()[0]


In [0]:
print("📈 REPORTE 1: Estadísticas Generales del Pipeline")
print("-" * 50)

print(f"📊 Métricas principales:")
print(f"   Total de viajes procesados: {executive_data['total_trips_period']:,}")
print(f"   Bases activas: {executive_data['total_active_bases']}")
print(f"   Días de operación: {executive_data['total_active_days']}")
print(f"   Promedio de viajes diarios: {executive_data['avg_daily_trips']:,}")
print(f"   Promedio de viajes por base: {executive_data['avg_trips_per_base']:,}")

print(f"\n🕐 Distribución temporal:")
print(f"   Viajes en horario rush: {executive_data['rush_hours_share']:.1f}%")
print(f"   Actividad nocturna: {executive_data['night_activity_share']:.1f}%")
print(f"   Ratio weekend/weekday: {executive_data['weekend_vs_weekday_ratio']:.2f}")

print(f"\n🗺️ Cobertura geográfica:")
print(f"   Centro geográfico: ({executive_data['center_latitude']:.4f}, {executive_data['center_longitude']:.4f})")
print(f"   Cobertura latitud: {executive_data['geographic_coverage_lat']:.4f}°")
print(f"   Cobertura longitud: {executive_data['geographic_coverage_lon']:.4f}°")


📈 REPORTE 1: Estadísticas Generales del Pipeline
--------------------------------------------------
📊 Métricas principales:
   Total de viajes procesados: 4,502,417
   Bases activas: 5
   Días de operación: 183
   Promedio de viajes diarios: 24,603.0
   Promedio de viajes por base: 900,483.0

🕐 Distribución temporal:
   Viajes en horario rush: 42.5%
   Actividad nocturna: 24.2%
   Ratio weekend/weekday: 0.33

🗺️ Cobertura geográfica:
   Centro geográfico: (40.7387, -73.9742)
   Cobertura latitud: 0.3918°
   Cobertura longitud: 0.5587°


In [0]:
from pyspark.sql import functions as F

print(f"\n📈 REPORTE 2: Top Performers")
print("-" * 30)

# Ruta Gold de daily_metrics
daily_metrics_path = "/Volumes/workspace/default/uber_etl_azure/gold/daily_metrics"

# Cargar tabla desde Delta (no requiere registro)
daily_metrics = spark.read.format("delta").load(daily_metrics_path)

# Obtener Top 5 días más activos
top_days = (
    daily_metrics.select("pickup_date", "pickup_day_of_week", "total_trips", "day_type")
    .orderBy(F.desc("total_trips"))
    .limit(5)
    .collect()
)

# Imprimir resultados
print("🏆 Top 5 días más activos:")
for i, day in enumerate(top_days, 1):
    print(f"   {i}. {day['pickup_date']} ({day['pickup_day_of_week']}) - {day['total_trips']:,} viajes [{day['day_type']}]")


📈 REPORTE 2: Top Performers
------------------------------
🏆 Top 5 días más activos:
   1. 2014-09-13 (Saturday) - 42,819 viajes [Weekend]
   2. 2014-09-05 (Friday) - 42,029 viajes [Weekday]
   3. 2014-09-19 (Friday) - 40,684 viajes [Weekday]
   4. 2014-09-06 (Saturday) - 40,135 viajes [Weekend]
   5. 2014-09-18 (Thursday) - 40,000 viajes [Weekday]


In [0]:
# COMMAND ----------

# MAGIC %md
# MAGIC ## 14. Resumen Final y Próximos Pasos

# COMMAND ----------

In [0]:
print("🏆 RESUMEN FINAL - CAPA ORO COMPLETADA")
print("=" * 50)

🏆 RESUMEN FINAL - CAPA ORO COMPLETADA


In [0]:
print(f"📊 Tablas Gold creadas exitosamente:")
print(f"   🗓️ daily_metrics: Agregaciones diarias con métricas de actividad")
print(f"   🏢 base_hourly_metrics: Análisis detallado por base y hora")
print(f"   📈 monthly_kpis: KPIs de negocio con tendencias mensuales")
print(f"   🗺️ spatial_metrics: Análisis geoespacial por zonas")
print(f"   💼 executive_dashboard: Dashboard ejecutivo con KPIs principales")

print(f"\n📍 Ubicación de datos Gold:")
print(f"   {gold_path}")

print(f"\n🚀 Próximos pasos recomendados:")
print(f"   📊 Conectar Power BI/Tableau a las vistas SQL")
print(f"   📧 Configurar alertas automáticas en KPIs críticos")
print(f"   🔄 Implementar pipeline de actualización incremental")
print(f"   📈 Crear dashboards interactivos usando las agregaciones")
print(f"   🤖 Implementar modelos de Machine Learning con datos Gold")

print(f"\n💡 Comandos útiles para análisis:")
print(f"   -- Dashboard ejecutivo completo")
print(f"   SELECT * FROM uber_executive_gold;")
print(f"   ")
print(f"   -- Análisis de tendencias")
print(f"   SELECT pickup_month, total_monthly_trips, mom_growth FROM uber_monthly_gold ORDER BY pickup_month;")
print(f"   ")
print(f"   -- Top horas por base")
print(f"   SELECT base_code, pickup_hour, hourly_trips FROM uber_base_hourly_gold WHERE is_peak_hour = 1;")

print(f"\n🎉 ARQUITECTURA MEDALLÓN COMPLETADA EXITOSAMENTE")
print(f"✨ Pipeline Bronce → Plata → Oro implementado y optimizado")
print(f"🏅 Datos listos para análisis avanzado y toma de decisiones")

📊 Tablas Gold creadas exitosamente:
   🗓️ daily_metrics: Agregaciones diarias con métricas de actividad
   🏢 base_hourly_metrics: Análisis detallado por base y hora
   📈 monthly_kpis: KPIs de negocio con tendencias mensuales
   🗺️ spatial_metrics: Análisis geoespacial por zonas
   💼 executive_dashboard: Dashboard ejecutivo con KPIs principales

📍 Ubicación de datos Gold:
   /Volumes/workspace/default/uber_etl_azure/gold/

🚀 Próximos pasos recomendados:
   📊 Conectar Power BI/Tableau a las vistas SQL
   📧 Configurar alertas automáticas en KPIs críticos
   🔄 Implementar pipeline de actualización incremental
   📈 Crear dashboards interactivos usando las agregaciones
   🤖 Implementar modelos de Machine Learning con datos Gold

💡 Comandos útiles para análisis:
   -- Dashboard ejecutivo completo
   SELECT * FROM uber_executive_gold;
   
   -- Análisis de tendencias
   SELECT pickup_month, total_monthly_trips, mom_growth FROM uber_monthly_gold ORDER BY pickup_month;
   
   -- Top horas por ba

In [0]:
%sql
SELECT * FROM uber_executive_gold;

total_trips_period,total_active_bases,total_active_days,total_active_hours,period_start,period_end,total_weekend_trips,total_weekday_trips,total_morning_rush,total_evening_rush,total_midday,total_night,total_late_night,center_latitude,center_longitude,south_bound,north_bound,west_bound,east_bound,report_id,avg_daily_trips,avg_trips_per_base,weekend_vs_weekday_ratio,rush_hours_share,night_activity_share,geographic_coverage_lat,geographic_coverage_lon,report_generated_at,gold_layer_version
4502417,5,183,24,2014-04-01T00:00:00.000Z,2014-09-30T22:59:00.000Z,1125717,3376700,682876,1232653,1498592,687346,400950,40.738693,-73.974217,40.5258,40.9176,-74.2591,-73.7004,1,24603.0,900483.0,0.33,42.5,24.2,0.3918,0.5587,2025-07-28T17:29:15.387Z,1.0


In [0]:
%sql
SELECT pickup_month, total_monthly_trips, mom_growth FROM uber_monthly_gold ORDER BY pickup_month;

pickup_month,total_monthly_trips,mom_growth
4,562101,
5,648998,15.46
6,659436,1.61
7,789920,19.79
8,821662,4.02
9,1020300,24.18
