In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
import pyspark.sql.functions as F
from pyspark.sql.types import *
from notebookutils import mssparkutils as mu
import pandas as pd
from datetime import datetime


StatementMeta(, 11903d28-a631-4362-9694-b83ca04949a8, 3, Finished, Available, Finished)

In [7]:
# spark.sql("DROP TABLE IF EXISTS silver.brokers")
# spark.sql("DROP TABLE IF EXISTS silver.campaigns")
# spark.sql("DROP TABLE IF EXISTS silver.clients")
# spark.sql("DROP TABLE IF EXISTS silver.leads")
# spark.sql("DROP TABLE IF EXISTS silver.projects")
# spark.sql("DROP TABLE IF EXISTS silver.properties")
# spark.sql("DROP TABLE IF EXISTS silver.sales")

StatementMeta(, 11903d28-a631-4362-9694-b83ca04949a8, 9, Finished, Available, Finished)

DataFrame[]

In [2]:
# Definimos los esquemas de nuestros dataframes para asegurar tipos de datos correctos

# Esquema para brokers
schema_brokers = StructType([
    StructField("broker_id", IntegerType(), False),
    StructField("broker_name", StringType(), True),
    StructField("region", StringType(), True),
    StructField("email", StringType(), True),
    StructField("_load_date", TimestampType(), True),
    StructField("_source", StringType(), True),
    StructField("_user", StringType(), True)
])

# Esquema para campaigns
schema_campaigns = StructType([
    StructField("campaign_id", IntegerType(), False),
    StructField("channel", StringType(), True),
    StructField("campaign_name", StringType(), True),
    StructField("start_date", DateType(), True),
    StructField("end_date", DateType(), True),
    StructField("budget", IntegerType(), True),
    StructField("_load_date", TimestampType(), True),
    StructField("_source", StringType(), True),
    StructField("_user", StringType(), True)
])

# Esquema para clients
schema_clients = StructType([
    StructField("client_id", IntegerType(), False),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("region", StringType(), True),
    StructField("_load_date", TimestampType(), True),
    StructField("_source", StringType(), True),
    StructField("_user", StringType(), True)
])

# Esquema para leads
schema_leads = StructType([
    StructField("lead_id", IntegerType(), False),
    StructField("client_id", IntegerType(), False),
    StructField("propertie_id", IntegerType(), False),
    StructField("campaign_id", IntegerType(), False),
    StructField("lead_date", DateType(), True),
    StructField("lead_source", StringType(), True),
    StructField("_load_date", TimestampType(), True),
    StructField("_source", StringType(), True),
    StructField("_user", StringType(), True)
])

# Esquema para projects
schema_projects = StructType([
    StructField("project_id", IntegerType(), False),
    StructField("project_name", StringType(), True),
    StructField("city", StringType(), True),
    StructField("region", StringType(), True),
    StructField("launch_year", IntegerType(), True),
    StructField("status", StringType(), True),
    StructField("_load_date", TimestampType(), True),
    StructField("_source", StringType(), True),
    StructField("_user", StringType(), True)
])

# Esquema para properties
schema_properties = StructType([
    StructField("propertie_id", IntegerType(), False),
    StructField("project_id", IntegerType(), False),
    StructField("propertie_type", StringType(), True),
    StructField("size_m2", IntegerType(), True),
    StructField("bedrooms", IntegerType(), True),
    StructField("bathrooms", IntegerType(), True),
    StructField("list_price_usd", IntegerType(), True),
    StructField("availability_status", StringType(), True),
    StructField("_load_date", TimestampType(), True),
    StructField("_source", StringType(), True),
    StructField("_user", StringType(), True)
])

# Esquema para sales
schema_sales = StructType([
    StructField("sale_id", IntegerType(), False),
    StructField("propertie_id", IntegerType(), False),
    StructField("client_id", IntegerType(), False),
    StructField("broker_id", IntegerType(), False),
    StructField("sale_date", DateType(), True),
    StructField("sale_price_usd", IntegerType(), True),
    StructField("_load_date", TimestampType(), True),
    StructField("_source", StringType(), True),
    StructField("_user", StringType(), True)
])

StatementMeta(, 11903d28-a631-4362-9694-b83ca04949a8, 4, Finished, Available, Finished)

In [3]:
# Leer las tablas delta 
try:
    # Leer archivo de brokers
    df_brokers = spark.table("bronze.brokers")
    
    # Leer archivo de campaigns
    df_campaigns = spark.table("bronze.campaigns")
    
    # Leer archivo de clients
    df_clients = spark.table("bronze.clients")
    
    # Leer archivo de leads
    df_leads = spark.table("bronze.leads")
    
    # Leer archivo de projects
    df_projects = spark.table("bronze.projects")
    
    # Leer archivo de properties
    df_properties = spark.table("bronze.properties")
    
    # Leer archivo de sales
    df_sales = spark.table("bronze.sales")
    
    print("tablas leídos correctamente de la capa Bronze")
except Exception as e:
    print(f"Error al leer tablas: {str(e)}")

StatementMeta(, 11903d28-a631-4362-9694-b83ca04949a8, 5, Finished, Available, Finished)

tablas leídos correctamente de la capa Bronze


In [4]:
# Exploración de los datos
# Mostramos una vista previa de los datos cargados

# print("Vista previa de los datos de brokers:")
# display(df_brokers.limit(5))

# print("Vista previa de los datos de campaigns:")
# display(df_campaigns.limit(5))

# print("Vista previa de los datos de clients:")
# display(df_clients.limit(5))

# print("Vista previa de los datos de leads:")
# display(df_leads.limit(5))

# print("Vista previa de los datos de projects:")
# display(df_projects.limit(5))

# print("Vista previa de los datos de properties:")
# display(df_properties.limit(5))

# print("Vista previa de los datos de sales:")
# display(df_sales.limit(5))

StatementMeta(, 11903d28-a631-4362-9694-b83ca04949a8, 6, Finished, Available, Finished)

In [5]:
# ======================================================
# Limpieza y transformación
# ======================================================

# tabla brokers
df_brokers_silver = (
    spark.table("bronze.brokers")
    .rdd.toDF(schema_brokers)
    .withColumn("broker_name", F.trim(F.col("broker_name")))
    .withColumn("email", F.when(F.col("email").isNotNull(), F.lower(F.trim(F.col("email")))))
    .withColumn("region", F.upper(F.trim(F.col("region"))))
    .withColumn("dominio",F.upper(F.regexp_extract("email",r"@([^.]+)",1)))
    .filter(F.col("broker_id").isNotNull())
    .filter(F.col("email").isNull() | F.col("email").rlike("^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"))
    .dropDuplicates(["broker_id"])
    .withColumn("_silver_load_date", F.current_timestamp())
    .withColumn("_source_system", F.lit("bronze.brokers"))
)

# tabla campaigns
df_campaigns_silver = (
    spark.table("bronze.campaigns")
    .rdd.toDF(schema_campaigns)
    .withColumn("campaign_name", F.trim(F.col("campaign_name")))
    .withColumn("channel", F.upper(F.trim(F.col("channel"))))
    .withColumn("campaigns_duration_months",F.round(F.months_between(F.col("end_date"),F.col("start_date")),5))
    .withColumn("budget_per_month",F.when(F.col("campaigns_duration_months")==0,0).otherwise(F.round(F.col("budget")/F.col("campaigns_duration_months"),2)))
    .filter(F.col("start_date").isNotNull())
    .filter(F.col("end_date").isNull() | (F.col("end_date") >= F.col("start_date")))
    .filter(F.col("budget").isNull() | (F.col("budget") >= 0))
    .withColumn("_silver_load_date", F.current_timestamp())
    .withColumn("_source_system", F.lit("bronze.campaigns"))
)

# tabla clients
df_clients_silver = (
    spark.table("bronze.clients")
    .rdd.toDF(schema_clients)
    .withColumn("first_name", F.initcap(F.trim(F.col("first_name"))))
    .withColumn("last_name", F.initcap(F.trim(F.col("last_name"))))
    .withColumn("full_name",F.concat(F.col("first_name"), F.lit(" "), F.col("last_name")))
    .withColumn("email", F.lower(F.trim(F.col("email"))))
    .withColumn("region", F.upper(F.trim(F.col("region"))))
    .filter(F.col("client_id").isNotNull())
    .filter(F.col("email").isNull() | F.col("email").rlike("^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"))
    .withColumn("_silver_load_date", F.current_timestamp())
    .withColumn("_source_system", F.lit("bronze.clients"))
)

# tabla leads
df_leads_silver = (
    spark.table("bronze.leads")
    .rdd.toDF(schema_leads)
    .filter(F.col("client_id").isNotNull())
    .filter(F.col("propertie_id").isNotNull())
    .filter(F.col("campaign_id").isNotNull())
    .filter(F.col("lead_date").isNotNull())
    .filter(F.col("lead_date") <= F.current_date())
    .withColumn("lead_source", F.upper(F.trim(F.col("lead_source"))))
    .withColumn("_silver_load_date", F.current_timestamp())
    .withColumn("_source_system", F.lit("bronze.leads"))
)

# tabla projects
df_projects_silver = (
    spark.table("bronze.projects")
    .rdd.toDF(schema_projects)
    .withColumn("project_name", F.trim(F.col("project_name")))
    .withColumn("city", F.initcap(F.trim(F.col("city"))))
    .withColumn("region", F.upper(F.trim(F.col("region"))))
    .withColumn("status", F.upper(F.trim(F.col("status"))))
    .filter(F.col("launch_year").isNull() | ((F.col("launch_year") >= 1980) & (F.col("launch_year") <= F.year(F.current_date()))))
    .withColumn("_silver_load_date", F.current_timestamp())
    .withColumn("_source_system", F.lit("bronze.projects"))
)

# tabla properties
df_properties_silver = (
    spark.table("bronze.properties")
    .rdd.toDF(schema_properties)
    .filter(F.col("project_id").isNotNull())
    .withColumn("propertie_type", F.trim(F.col("propertie_type")))
    .withColumn("availability_status", F.upper(F.trim(F.col("availability_status"))))
    .withColumn("size_m2", F.when(F.col("size_m2") <= 0, None).otherwise(F.col("size_m2")))
    .withColumn("bedrooms", F.when(F.col("bedrooms") < 0, None).otherwise(F.col("bedrooms")))
    .withColumn("list_price_usd", F.when(F.col("list_price_usd") <= 0, None).otherwise(F.col("list_price_usd")))
    .withColumn("price_per_m2",F.when(F.col("size_m2") == 0,0).otherwise(F.round(F.col("list_price_usd")/F.col("size_m2"),2)))
    .withColumn("_silver_load_date", F.current_timestamp())
    .withColumn("_source_system", F.lit("bronze.properties"))
)

# tabla sales
df_sales_silver = (
    spark.table("bronze.sales")
    .rdd.toDF(schema_sales)
    .filter(F.col("propertie_id").isNotNull())
    .filter(F.col("client_id").isNotNull())
    .filter(F.col("broker_id").isNotNull())
    .filter(F.col("sale_date").isNotNull())
    .filter(F.col("sale_date") <= F.current_date())
    .filter(F.col("sale_price_usd") > 0)
    .withColumn("_silver_load_date", F.current_timestamp())
    .withColumn("_source_system", F.lit("bronze.sales"))
)

StatementMeta(, 11903d28-a631-4362-9694-b83ca04949a8, 7, Finished, Available, Finished)

In [8]:
#======================================================
# Guardado en silver
# ======================================================
df_brokers_silver.write.format("delta").mode("overwrite").saveAsTable("silver.brokers")
df_campaigns_silver.write.format("delta").mode("overwrite").saveAsTable("silver.campaigns")
df_clients_silver.write.format("delta").mode("overwrite").saveAsTable("silver.clients")
df_leads_silver.write.format("delta").mode("overwrite").saveAsTable("silver.leads")
df_projects_silver.write.format("delta").mode("overwrite").saveAsTable("silver.projects")
df_properties_silver.write.format("delta").mode("overwrite").saveAsTable("silver.properties")
df_sales_silver.write.format("delta").mode("overwrite").saveAsTable("silver.sales")

StatementMeta(, 11903d28-a631-4362-9694-b83ca04949a8, 10, Finished, Available, Finished)