In [1]:
# ============================================================================
# Notebook: NB_01_Bronze_To_Silver
# Proyecto: QAPITAL Real Estate Analytics
# Descripción: Limpieza, validación y estandarización de datos raw
# Capa: Bronze → Silver
# ============================================================================

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime

# Configuración de paths
base_lakehouse_path = "abfss://WSP_QAPITAL_REALSTATE_PRD@onelake.dfs.fabric.microsoft.com/LKH_QAPITAL_REALSTATE.Lakehouse/"
bronze_path = f"{base_lakehouse_path}Files/bronze/"
silver_tables_path = f"{base_lakehouse_path}Tables/"

print("=" * 70)
print("INICIO - Transformación Bronze → Silver")
print(f"Timestamp: {datetime.now()}")
print("=" * 70)

# ============================================================================
# FUNCIÓN PARA LEER ARCHIVOS CSV DESDE BRONZE
# ============================================================================

def read_bronze_csv(filename):
    """Lee archivo CSV desde Bronze usando path absoluto"""
    df = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(f"{bronze_path}{filename}")
    
    count = df.count()
    print(f"✅ {filename}: {count} registros")
    return df

# ============================================================================
# LEER TODOS LOS ARCHIVOS DESDE BRONZE
# ============================================================================

print("\n📂 Leyendo archivos desde Bronze...\n")

df_brokers = read_bronze_csv("brokers.txt")
df_campaigns = read_bronze_csv("campaigns.txt")
df_clients = read_bronze_csv("clients.txt")
df_leads = read_bronze_csv("leads.txt")
df_projects = read_bronze_csv("projects.txt")
df_properties = read_bronze_csv("properties.txt")
df_sales = read_bronze_csv("sales.txt")

print("\n✅ Todos los archivos leídos exitosamente")

# ============================================================================
# TRANSFORMACIONES BRONZE → SILVER
# ============================================================================

print("\n🔧 Aplicando transformaciones y limpieza...\n")

# --- BROKERS ---
df_brokers_clean = df_brokers \
    .dropDuplicates(["BrokerID"]) \
    .na.drop(subset=["BrokerID", "BrokerName"]) \
    .withColumn("BrokerName", trim(col("BrokerName"))) \
    .withColumn("Region", trim(col("Region"))) \
    .withColumn("Email", lower(trim(col("Email"))))

df_brokers_clean.write.format("delta").mode("overwrite") \
    .save(f"{silver_tables_path}silver_brokers")
print(f"✅ silver_brokers: {df_brokers_clean.count()} registros")

# --- CAMPAIGNS ---
df_campaigns_clean = df_campaigns \
    .dropDuplicates(["CampaignID"]) \
    .na.drop(subset=["CampaignID", "CampaignName"]) \
    .withColumn("CampaignName", trim(col("CampaignName"))) \
    .withColumn("Channel", trim(col("Channel")))

df_campaigns_clean.write.format("delta").mode("overwrite") \
    .save(f"{silver_tables_path}silver_campaigns")
print(f"✅ silver_campaigns: {df_campaigns_clean.count()} registros")

# --- CLIENTS ---
df_clients_clean = df_clients \
    .dropDuplicates(["ClientID"]) \
    .na.drop(subset=["ClientID"]) \
    .withColumn("FirstName", trim(col("FirstName"))) \
    .withColumn("LastName", trim(col("LastName"))) \
    .withColumn("Email", lower(trim(col("Email")))) \
    .withColumn("Region", trim(col("Region")))

df_clients_clean.write.format("delta").mode("overwrite") \
    .save(f"{silver_tables_path}silver_clients")
print(f"✅ silver_clients: {df_clients_clean.count()} registros")

# --- PROJECTS ---
df_projects_clean = df_projects \
    .dropDuplicates(["ProjectID"]) \
    .na.drop(subset=["ProjectID", "ProjectName"]) \
    .withColumn("ProjectName", trim(col("ProjectName"))) \
    .withColumn("City", trim(col("City"))) \
    .withColumn("Region", trim(col("Region"))) \
    .withColumn("Status", trim(col("Status")))

df_projects_clean.write.format("delta").mode("overwrite") \
    .save(f"{silver_tables_path}silver_projects")
print(f"✅ silver_projects: {df_projects_clean.count()} registros")

# --- PROPERTIES ---
df_properties_clean = df_properties \
    .dropDuplicates(["PropertyID"]) \
    .na.drop(subset=["PropertyID", "ProjectID"]) \
    .withColumn("PropertyType", trim(col("PropertyType"))) \
    .withColumn("AvailabilityStatus", trim(col("AvailabilityStatus")))

df_properties_clean.write.format("delta").mode("overwrite") \
    .save(f"{silver_tables_path}silver_properties")
print(f"✅ silver_properties: {df_properties_clean.count()} registros")

# --- LEADS ---
df_leads_clean = df_leads \
    .dropDuplicates(["LeadID"]) \
    .na.drop(subset=["LeadID", "ClientID", "PropertyID"]) \
    .withColumn("LeadSource", trim(col("LeadSource")))

df_leads_clean.write.format("delta").mode("overwrite") \
    .save(f"{silver_tables_path}silver_leads")
print(f"✅ silver_leads: {df_leads_clean.count()} registros")

# --- SALES ---
df_sales_clean = df_sales \
    .dropDuplicates(["SaleID"]) \
    .na.drop(subset=["SaleID", "PropertyID", "ClientID", "BrokerID"])

df_sales_clean.write.format("delta").mode("overwrite") \
    .save(f"{silver_tables_path}silver_sales")
print(f"✅ silver_sales: {df_sales_clean.count()} registros")

# ============================================================================
# RESUMEN FINAL
# ============================================================================

print("\n" + "=" * 70)
print("✅ TRANSFORMACIÓN SILVER COMPLETADA")
print("=" * 70)
print("\nTablas Delta creadas en capa Silver:")
print("  • silver_brokers")
print("  • silver_campaigns")
print("  • silver_clients")
print("  • silver_projects")
print("  • silver_properties")
print("  • silver_leads")
print("  • silver_sales")
print(f"\nFinalizado: {datetime.now()}")
print("=" * 70)

StatementMeta(, 532d1bc8-448d-483b-8e73-1ab81a5666d3, 3, Finished, Available, Finished)

INICIO - Transformación Bronze → Silver
Timestamp: 2025-10-04 18:16:14.491447

📂 Leyendo archivos desde Bronze...

✅ brokers.txt: 30 registros
✅ campaigns.txt: 30 registros
✅ clients.txt: 30 registros
✅ leads.txt: 30 registros
✅ projects.txt: 30 registros
✅ properties.txt: 30 registros
✅ sales.txt: 30 registros

✅ Todos los archivos leídos exitosamente

🔧 Aplicando transformaciones y limpieza...

✅ silver_brokers: 30 registros
✅ silver_campaigns: 30 registros
✅ silver_clients: 30 registros
✅ silver_projects: 30 registros
✅ silver_properties: 30 registros
✅ silver_leads: 30 registros
✅ silver_sales: 30 registros

✅ TRANSFORMACIÓN SILVER COMPLETADA

Tablas Delta creadas en capa Silver:
  • silver_brokers
  • silver_campaigns
  • silver_clients
  • silver_projects
  • silver_properties
  • silver_leads
  • silver_sales

Finalizado: 2025-10-04 18:16:50.077933
