In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth, to_date

# --- CONFIGURACIÓN CENTRALIZADA ---
DB_HOST = "sql_server"
DB_PORT = "1433"
DB_USER = "sa"
DB_PASS = "PasswordFuerte123!" 
DRIVER_PKG = "com.microsoft.sqlserver:mssql-jdbc:11.2.0.jre8"
DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

# Bases de datos
DB_OLTP = "alquiler_habitacion"
DB_DW = "DW_AlquilerHabitacion"

print("--- INICIANDO PROCESO ETL (MODO FULL REFRESH) ---")

# Iniciar Spark
spark = SparkSession.builder \
    .appName("ETL-Alquileres") \
    .master("local[*]") \
    .config("spark.jars.packages", DRIVER_PKG) \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Funciones de Lectura y Escritura
def get_jdbc_url(db_name):
    return f"jdbc:sqlserver://{DB_HOST}:{DB_PORT};databaseName={db_name};encrypt=true;trustServerCertificate=true;"

def leer_origen(table_name):
    print(f"   -> Extrayendo {table_name}...")
    return spark.read.format("jdbc") \
        .option("url", get_jdbc_url(DB_OLTP)) \
        .option("dbtable", table_name) \
        .option("user", DB_USER) \
        .option("password", DB_PASS) \
        .option("driver", DRIVER_CLASS) \
        .load()

def escribir_destino(df, table_name):
    print(f"   -> Cargando {table_name}...")
    df.write.format("jdbc") \
        .option("url", get_jdbc_url(DB_DW)) \
        .option("dbtable", table_name) \
        .option("user", DB_USER) \
        .option("password", DB_PASS) \
        .option("driver", DRIVER_CLASS) \
        .mode("append") \
        .save()

# Función de Limpieza
def limpiar_dw():
    print("\n>>> 0. Limpiando Data Warehouse (Full Refresh)...")
    url = get_jdbc_url(DB_DW)
    try:
        # REGISTRO MANUAL DEL DRIVER (Vital para comandos SQL directos)
        # Accedemos a la clase del driver directamente desde el Gateway de Spark
        driver_instance = spark.sparkContext._gateway.jvm.com.microsoft.sqlserver.jdbc.SQLServerDriver()
        spark.sparkContext._gateway.jvm.java.sql.DriverManager.registerDriver(driver_instance)
        
        manager = spark.sparkContext._gateway.jvm.java.sql.DriverManager
        con = manager.getConnection(url, DB_USER, DB_PASS)
        stmt = con.createStatement()
        
        # Orden estricto: Primero Hechos (Hijos), luego Dimensiones (Padres)
        print("   -> Borrando FACT_PAGO...")
        stmt.execute("DELETE FROM FACT_PAGO")
        stmt.execute("DBCC CHECKIDENT ('FACT_PAGO', RESEED, 0)") # Reinicia IDs a 0
        
        print("   -> Borrando Dimensiones...")
        stmt.execute("DELETE FROM DIM_CLIENTE")
        stmt.execute("DBCC CHECKIDENT ('DIM_CLIENTE', RESEED, 0)")
        
        stmt.execute("DELETE FROM DIM_HABITACION")
        stmt.execute("DBCC CHECKIDENT ('DIM_HABITACION', RESEED, 0)")
        
        stmt.execute("DELETE FROM DIM_PROPIETARIO")
        stmt.execute("DBCC CHECKIDENT ('DIM_PROPIETARIO', RESEED, 0)")
        
        stmt.execute("DELETE FROM DIM_TIEMPO")
        stmt.execute("DBCC CHECKIDENT ('DIM_TIEMPO', RESEED, 0)")
        
        stmt.close()
        con.close()
        print("# DW Limpiado correctamente. Listo subir los datos.")
        
    except Exception as e:
        print(f"\n❌ ERROR CRÍTICO AL LIMPIAR: {e}")
        print("   ⛔ El proceso se detendrá para evitar duplicar datos.")
        raise e # Detenemos el script. Si no limpiamos, NO cargamos.

# ==========================================
# EJECUCIÓN DEL FLUJO
# ==========================================

# LIMPIEZA AUTOMÁTICA
limpiar_dw()

print("\n>>> 1. Procesando Dimensiones...")

# CLIENTE
df_cliente = leer_origen("CLIENTE")
df_cliente_dw = df_cliente.select("idCliente", "nombreCli", "dniCli", "telefonoCli", "correoCli", "direccionCli")
escribir_destino(df_cliente_dw, "DIM_CLIENTE")

# HABITACION
df_habitacion = leer_origen("HABITACION")
df_hab_dw = df_habitacion.select("idHabitacion", "numeroHabitacion", "tipoHabitacion", "precioMensual", "estado")
escribir_destino(df_hab_dw, "DIM_HABITACION")

# PROPIETARIO
df_propietario = leer_origen("PROPIETARIO")
df_prop_dw = df_propietario.select("idPropietario", "nombrePropietario", "correoPropietario")
escribir_destino(df_prop_dw, "DIM_PROPIETARIO")

# DIMENSIÓN TIEMPO
print("\n>>> 2. Procesando Dimensión Tiempo...")
df_pagos_src = leer_origen("PAGO")

# Extraer fechas únicas de los pagos
df_fechas = df_pagos_src.select(to_date(col("fechaPago")).alias("fecha")).distinct()

df_tiempo = df_fechas.select(
    col("fecha"),
    year(col("fecha")).alias("anio"),
    month(col("fecha")).alias("mes"),
    dayofmonth(col("fecha")).alias("dia")
)
escribir_destino(df_tiempo, "DIM_TIEMPO")

# FACT_PAGO
print("\n>>> 3. Procesando Hechos (FACT_PAGO)...")

# Se Lee las Dimensiones del DW
def leer_dw(table):
    return spark.read.format("jdbc") \
        .option("url", get_jdbc_url(DB_DW)).option("dbtable", table) \
        .option("user", DB_USER).option("password", DB_PASS).option("driver", DRIVER_CLASS).load()

dim_cliente = leer_dw("DIM_CLIENTE").select("idCliente", "idClienteDW")
dim_habitacion = leer_dw("DIM_HABITACION").select("idHabitacion", "idHabitacionDW")
dim_propietario = leer_dw("DIM_PROPIETARIO").select("idPropietario", "idPropietarioDW")
dim_tiempo = leer_dw("DIM_TIEMPO").select("fecha", "idTiempoDW")

# Se prepara la Transacción Completa
df_contrato = leer_origen("CONTRATO")

# Se unen los Pago con Contrato para saber quién y qué
df_transaccion = df_pagos_src.alias("p") \
    .join(df_contrato.alias("c"), col("p.idContrato") == col("c.idContrato")) \
    .select(
        col("p.fechaPago"),
        col("p.montoPago"),
        col("p.metodoPago"),
        col("c.idCliente"),
        col("c.idHabitacion"),
        col("c.idPropietario")
    )

# Sustitución de IDs viejos por IDs del DW
df_fact = df_transaccion \
    .join(dim_cliente, "idCliente") \
    .join(dim_habitacion, "idHabitacion") \
    .join(dim_propietario, "idPropietario") \
    .join(dim_tiempo, df_transaccion.fechaPago == dim_tiempo.fecha) \
    .select(
        col("idClienteDW"),
        col("idHabitacionDW"),
        col("idPropietarioDW"),
        col("idTiempoDW"),
        col("montoPago"),
        col("metodoPago")
    )

escribir_destino(df_fact, "FACT_PAGO")

print("\n# ETL Finalizado Correctamente -> DW Actualizado")

--- INICIANDO PROCESO ETL (MODO FULL REFRESH) ---

>>> 0. Limpiando Data Warehouse (Full Refresh)...
   -> Borrando FACT_PAGO...
   -> Borrando Dimensiones...
# DW Limpiado correctamente. Listo subir los datos.

>>> 1. Procesando Dimensiones...
   -> Extrayendo CLIENTE...
   -> Cargando DIM_CLIENTE...
   -> Extrayendo HABITACION...
   -> Cargando DIM_HABITACION...
   -> Extrayendo PROPIETARIO...
   -> Cargando DIM_PROPIETARIO...

>>> 2. Procesando Dimensión Tiempo...
   -> Extrayendo PAGO...
   -> Cargando DIM_TIEMPO...

>>> 3. Procesando Hechos (FACT_PAGO)...
   -> Extrayendo CONTRATO...
   -> Cargando FACT_PAGO...

# ETL Finalizado Correctamente -> DW Actualizado
