In [2]:
# En este notebook aprenderemos a:
# Leer los datos con PySpark
# Realizar transformaciones
# Guardar los dataframes como tablas Delta
# Crear un modelo estrella para análisis

#Configuración inicial y verificación del entorno
# Este comando verifica que estamos en el entorno correcto de Fabric

import pyspark.sql.functions as F
from pyspark.sql.types import *
from notebookutils import mssparkutils
import pandas as pd

# Verificar versión de Spark
print(f"Versión de Spark: {spark.version}")

StatementMeta(, f2db9b89-5486-4a51-93cf-e2c1eb1955f0, 4, Finished, Available, Finished)

Versión de Spark: 3.5.1.5.4.20250416.1


In [3]:
# Crear estructura de carpetas en el lakehouse
# Este comando crea las carpetas necesarias en el lakehouse para almacenar nuestros archivos

try:
    # Crear carpetas para datos brutos y procesados
    mssparkutils.fs.mkdirs("Files/bronze")
    mssparkutils.fs.mkdirs("Files/silver")
    mssparkutils.fs.mkdirs("Files/gold")
    mssparkutils.fs.mkdirs("Files/processed")
    mssparkutils.fs.mkdirs("Files/catalogo")
    print("✅ Estructura de carpetas creada correctamente")
except Exception as e:
    print(f"Error al crear carpetas: {str(e)}")

StatementMeta(, f2db9b89-5486-4a51-93cf-e2c1eb1955f0, 5, Finished, Available, Finished)

✅ Estructura de carpetas creada correctamente


In [4]:
# Definición de esquemas para mejor control de los datos
# Definimos los esquemas de nuestros dataframes para asegurar tipos de datos correctos

# Esquema para clientes
schema_brokers = StructType([
    StructField("BrokerID", IntegerType(), False),
    StructField("BrokerName", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("email", StringType(), True)
   
])

schema_campaigns = StructType([
    StructField("CampaignID", IntegerType(), False),
    StructField("Channel", StringType(), True),
    StructField("CampaignName", StringType(), True),
    StructField("StartDate", DateType(), True),
    StructField("BudgetUSD", IntegerType(), False)
])

schema_clients = StructType([
    StructField("ClientID", IntegerType(), False),
    StructField("FirstName", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("Email", DateType(), True),
    StructField("Region", IntegerType(), False)
])
schema_leads = StructType([
    StructField("LeadID", IntegerType(), False),
    StructField("ClientID", IntegerType(), False),
    StructField("PropertyID", IntegerType(), False),
    StructField("CampaignID", IntegerType(), False),
    StructField("LeadDate", DateType(), True),
    StructField("LeadSource", StringType(), True)
])
schema_projects = StructType([
    StructField("ProjectID", IntegerType(), False),
    StructField("ProjectName", StringType(), False),
    StructField("City", StringType(), False),
    StructField("Region", StringType(), False),
    StructField("LaunchYear", StringType(), True),
    StructField("Status", StringType(), True)
])
schema_properties = StructType([
    StructField("PropertyID", IntegerType(), False),
     StructField("ProjectID", IntegerType(), False),

    StructField("PropertyType", StringType(), False),

    StructField("Size_m2", IntegerType(), False),
    StructField("Bedrooms", IntegerType(), False),
    StructField("Bathrooms", IntegerType(), False),
    StructField("ListPriceUSD", IntegerType(), False),
    StructField("AvailabilityStatus", StringType(), False)
])
schema_sales = StructType([
    StructField("SaleID", IntegerType(), False),
    StructField("PropertyID", IntegerType(), False),
    StructField("ClientID", IntegerType(), False),
    StructField("BrokerID", IntegerType(), False),
    StructField("SaleDate", DateType(), False),
    StructField("SalePriceUSD", IntegerType(), False)
])




StatementMeta(, f2db9b89-5486-4a51-93cf-e2c1eb1955f0, 6, Finished, Available, Finished)

In [5]:
# Leer los archivos CSV con los esquemas definidos
# Este comando lee los archivos CSV utilizando los esquemas definidos anteriormente

try:
    # Leer archivo de brokers
    df_brokers = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_brokers) \
        .load("Files/raw/brokers.csv")
    
    # Leer archivo de campaigns
    df_campaigns = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_campaigns) \
        .load("Files/raw/campaigns.csv")
    
    # Leer archivo de clients
    df_clients = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_clients) \
        .load("Files/raw/clients.csv")
    
    # Leer archivo de leads
    df_leads = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_leads) \
        .load("Files/raw/leads.csv")
    
    # Leer archivo de projects
    df_projects = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_projects) \
        .load("Files/raw/projects.csv")

      # Leer archivo de properties
    df_properties = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_properties) \
        .load("Files/raw/properties.csv")
    
     # Leer archivo de sales
    df_sales = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_sales) \
        .load("Files/raw/sales.csv")
    
    print("✅ Archivos CSV leídos correctamente")
except Exception as e:
    print(f"Error al leer archivos: {str(e)}")

StatementMeta(, f2db9b89-5486-4a51-93cf-e2c1eb1955f0, 7, Finished, Available, Finished)

✅ Archivos CSV leídos correctamente


In [6]:
display(df_sales.limit(5))

StatementMeta(, f2db9b89-5486-4a51-93cf-e2c1eb1955f0, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6327b584-aa54-4da6-aeb0-03b6c11eb399)

In [7]:
# Exploración de los datos
# Mostramos una vista previa de los datos cargados

print("Vista previa de los datos de brokers:")
display(df_brokers.limit(5))

print("Vista previa de los datos de campaigns:")
display(df_campaigns.limit(5))

print("Vista previa de los datos de clients:")
display(df_clients.limit(5))

print("Vista previa de los datos de leads:")
display(df_leads.limit(5))

print("Vista previa de los datos de projects:")
display(df_projects.limit(5))

print("Vista previa de los datos de properties:")
display(df_properties.limit(5))

print("Vista previa de los datos de sales:")
display(df_sales.limit(5))

StatementMeta(, f2db9b89-5486-4a51-93cf-e2c1eb1955f0, 9, Finished, Available, Finished)

Vista previa de los datos de brokers:


SynapseWidget(Synapse.DataFrame, 4ac34ce1-4077-47a4-a095-21b161192ada)

Vista previa de los datos de campaigns:


SynapseWidget(Synapse.DataFrame, 997f8902-d782-43c0-b1e5-1f099b00f069)

Vista previa de los datos de clients:


SynapseWidget(Synapse.DataFrame, 09c2e2c8-4210-43bf-8102-c24a71370ccf)

Vista previa de los datos de leads:


SynapseWidget(Synapse.DataFrame, 9e1203d4-e26f-4833-8d5a-109007af23b8)

Vista previa de los datos de projects:


SynapseWidget(Synapse.DataFrame, f496a6b5-9475-4a43-a668-7ba7da9d0048)

Vista previa de los datos de properties:


SynapseWidget(Synapse.DataFrame, 0ea07176-275e-4ffb-b855-be94b6616c80)

Vista previa de los datos de sales:


SynapseWidget(Synapse.DataFrame, bd731cff-0205-4061-98ed-111143ca3699)

In [10]:

# Realizar transformaciones en los dataframes
# Aplicamos transformaciones básicas para mejorar la calidad de los datos

# Transformación de clientes: Añadir columna nombre_completo
df_clientes_procesado = df_clients
df_brokers_procesado = df_brokers
df_campaigns_procesado = df_campaigns
df_leads_procesado = df_leads
df_projects_procesado = df_projects
df_properties_procesado = df_properties
df_sales_procesado = df_sales

StatementMeta(, f2db9b89-5486-4a51-93cf-e2c1eb1955f0, 12, Finished, Available, Finished)

In [None]:
df = spark.read.format("csv").option("header","true").load("Files/raw/brokers.csv")
# df now is a Spark DataFrame containing CSV data from "Files/raw/brokers.csv".
display(df)

In [None]:
df = spark.read.format("csv").option("header","true").load("Files/raw/campaigns.csv")
# df now is a Spark DataFrame containing CSV data from "Files/raw/campaigns.csv".
display(df)

In [None]:
df = spark.read.format("csv").option("header","true").load("Files/raw/clients.csv")
# df now is a Spark DataFrame containing CSV data from "Files/raw/clients.csv".
display(df)

In [None]:
df = spark.read.format("csv").option("header","true").load("Files/raw/leads.csv")
# df now is a Spark DataFrame containing CSV data from "Files/raw/leads.csv".
display(df)

In [None]:
df = spark.read.format("csv").option("header","true").load("Files/raw/properties.csv")
# df now is a Spark DataFrame containing CSV data from "Files/raw/properties.csv".
display(df)

In [None]:
df = spark.read.format("csv").option("header","true").load("Files/raw/sales.csv")
# df now is a Spark DataFrame containing CSV data from "Files/raw/sales.csv".
display(df)

In [None]:
df = spark.sql("SELECT * FROM LKH_Proyecto_Integrador_RCR_01.`brokers csv` LIMIT 1000")
display(df)

In [None]:
df = spark.sql("SELECT * FROM LKH_Proyecto_Integrador_RCR_01.`campaigns csv` LIMIT 1000")
display(df)

In [None]:
df = spark.sql("SELECT * FROM LKH_Proyecto_Integrador_RCR_01.`clients csv` LIMIT 1000")
display(df)

In [None]:
df = spark.sql("SELECT * FROM LKH_Proyecto_Integrador_RCR_01.`campaigns csv` LIMIT 1000")
display(df)

In [None]:
df = spark.sql("SELECT * FROM LKH_Proyecto_Integrador_RCR_01.`leads csv` LIMIT 1000")
display(df)

In [None]:
df = spark.sql("SELECT * FROM LKH_Proyecto_Integrador_RCR_01.`projects csv` LIMIT 1000")
display(df)

In [None]:
df = spark.sql("SELECT * FROM LKH_Proyecto_Integrador_RCR_01.`properties csv` LIMIT 1000")
display(df)

In [None]:
df = spark.sql("SELECT * FROM LKH_Proyecto_Integrador_RCR_01.`sales csv` LIMIT 1000")
display(df)

In [12]:
# Mostrar ejemplos de las transformaciones
print("Ejemplo de clientes procesados:")
display(df_clientes_procesado.limit(5))

print("Ejemplo de brokers procesados:")
display(df_brokers_procesado.limit(5))

print("Ejemplo de campaigns procesadas:")
display(df_campaigns_procesado.limit(5))

print("Ejemplo de leads procesado:")
display(df_leads_procesado.limit(5))

print("Ejemplo de projects procesadas:")
display(df_projects_procesado.limit(5))

print("Ejemplo de properties procesadas:")
display(df_properties_procesado.limit(5))


print("Ejemplo de sales procesadas:")
display(df_sales_procesado.limit(5))

StatementMeta(, f2db9b89-5486-4a51-93cf-e2c1eb1955f0, 14, Finished, Available, Finished)

Ejemplo de clientes procesados:


SynapseWidget(Synapse.DataFrame, 28fd7259-bd25-49fc-aead-5f3b6e0b424e)

Ejemplo de brokers procesados:


SynapseWidget(Synapse.DataFrame, 275c5393-83de-498e-a13d-21c00e067cef)

Ejemplo de campaigns procesadas:


SynapseWidget(Synapse.DataFrame, f3650c0f-bde9-4002-9ac5-698693832a64)

Ejemplo de leads procesado:


SynapseWidget(Synapse.DataFrame, bcb7313d-6981-42e1-8282-1261885432d1)

Ejemplo de projects procesadas:


SynapseWidget(Synapse.DataFrame, 9836a375-ddd8-46e6-8aef-b0753038589d)

Ejemplo de properties procesadas:


SynapseWidget(Synapse.DataFrame, a40d85d2-8f39-40f3-98e1-1b2a274d7338)

Ejemplo de sales procesadas:


SynapseWidget(Synapse.DataFrame, 2a924977-e6bc-4b47-9583-9f76e6fce541)

In [14]:
# Guardar los dataframes como tablas Delta
# Guardamos los dataframes procesados como tablas Delta en el lakehouse

try:
    # Guardar clientes
    df_clientes_procesado.write.format("delta").mode("overwrite").save("Files/gold/clients_delta")
    
    # Guardar brokers
    df_brokers_procesado.write.format("delta").mode("overwrite").save("Files/gold/brokers_delta")
    
    # Guardar campaigns
    df_campaigns_procesado.write.format("delta").mode("overwrite").save("Files/gold/campaigns_delta")
    
    # Guardar leads
    df_leads_procesado.write.format("delta").mode("overwrite").save("Files/gold/leads_delta")
    
    # Guardar projects
    df_projects_procesado.write.format("delta").mode("overwrite").save("Files/gold/projects_delta")

     # Guardar properties
    df_properties_procesado.write.format("delta").mode("overwrite").save("Files/gold/properties_delta")

     # Guardar sales
    df_sales_procesado.write.format("delta").mode("overwrite").save("Files/gold/sales_delta")
    
    print("✅ Dataframes guardados como archivos Delta correctamente")
except Exception as e:
    print(f"Error al guardar archivos Delta: {str(e)}")

StatementMeta(, f2db9b89-5486-4a51-93cf-e2c1eb1955f0, 16, Finished, Available, Finished)

✅ Dataframes guardados como archivos Delta correctamente


In [17]:
# Registrar tablas como vistas temporales en la sesión actual

try:
    # Registrar tablas como vistas temporales
    df_properties_procesado.createOrReplaceTempView("dim_propiedad")
    df_brokers_procesado.createOrReplaceTempView("dim_corredor")
    df_projects_procesado.createOrReplaceTempView("dim_proyecto")
    df_campaigns_procesado.createOrReplaceTempView("dim_campanha")
    df_clientes_procesado.createOrReplaceTempView("dim_clientes")
    df_sales_procesado.createOrReplaceTempView("fact_ventas")
    df_leads_procesado.createOrReplaceTempView("fact_leads")
   
    
    # Verificar que se han creado correctamente
    tables = spark.sql("SHOW TABLES").collect()
    print("Tablas disponibles en la sesión:")
    for table in tables:
        print(f" - {table.tableName}")
    
    print("✅ Vistas temporales creadas correctamente para la sesión actual")
    
    # Alternativa: Guardar como tablas directamente en el Lakehouse 
    # Esto registra las tablas en el catálogo del Lakehouse actual
    print("\nRegistrando tablas en el catálogo del Lakehouse...")
    
    # Registrar los dataframes como tablas en el Lakehouse actual parte Tables
    df_properties_procesado.write.format("delta").mode("overwrite").saveAsTable("dim_propiedad")
    df_brokers_procesado.write.format("delta").mode("overwrite").saveAsTable("dim_corredor")
    df_projects_procesado.write.format("delta").mode("overwrite").saveAsTable("dim_proyecto")
    df_campaigns_procesado.write.format("delta").mode("overwrite").saveAsTable("dim_campanha")
    df_clientes_procesado.write.format("delta").mode("overwrite").saveAsTable("dim_clientes")
    df_sales_procesado.write.format("delta").mode("overwrite").saveAsTable("fact_ventas")
    df_leads_procesado.write.format("delta").mode("overwrite").saveAsTable("fact_leads")
    
    print("✅ Tablas creadas correctamente en el catálogo del Lakehouse")
except Exception as e:
    print(f"Error al crear tablas: {str(e)}")

StatementMeta(, f2db9b89-5486-4a51-93cf-e2c1eb1955f0, 19, Finished, Available, Finished)

Tablas disponibles en la sesión:
 - projects csv
 - leads csv
 - brokers csv
 - clients csv
 - properties csv
 - campaigns csv
 - sales csv
 - dim_propiedad
 - dim_corredor
 - dim_proyecto
 - dim_campanha
 - dim_clientes
 - dim_corredor
 - dim_propiedad
 - dim_proyecto
 - fact_leads
 - fact_ventas
✅ Vistas temporales creadas correctamente para la sesión actual

Registrando tablas en el catálogo del Lakehouse...
✅ Tablas creadas correctamente en el catálogo del Lakehouse


In [4]:
# Consultar el modelo estrella completo
# Realizamos una consulta SQL que une todas las tablas del modelo estrella

query = """
SELECT 
    v.SaleID,
    p.PropertyType as TipoPropiedad,
    p.Size_m2,
    p.Bedrooms as Habitacion,
    p.Bathrooms as Banho,
    p.ListPriceUSD as ListaPrecio,
    p.AvailabilityStatus EstadoDisponible,
    c.FirstName as PrimerNombre,
    c.LastName as Apellido,
    c.Email,
    c.Region,
    co.BrokerName as NombreBroker,
    co.Region as RegionBroker,
    co.Email as EmailBroker,
    v.SaleDate as FechaVenta,
    v.SalePriceUSD  as PrecioVentaDolares
    
FROM 
    fact_ventas v
JOIN 
    dim_propiedad p ON v.PropertyID = p.PropertyID
JOIN 
    dim_clientes c ON v.ClientID = c.ClientID
JOIN 
    dim_corredor co ON v.BrokerID = co.BrokerID

ORDER BY 
    v.SaleID
"""

try:
    # Ejecutar la consulta
    resultado = spark.sql(query)
    
    # Mostrar los resultados
    print("Consulta del modelo estrella completo:")
    display(resultado)
    
    print("✅ Consulta ejecutada correctamente")
except Exception as e:
    print(f"Error al ejecutar la consulta: {str(e)}")

StatementMeta(, 2e6f8cc2-31d2-4376-9058-24097875e13c, 6, Finished, Available, Finished)

Consulta del modelo estrella completo:


SynapseWidget(Synapse.DataFrame, e512615c-175d-4af6-889d-0912303e46e4)

✅ Consulta ejecutada correctamente


In [10]:
# Análisis básico de ventas
# Realizamos un análisis básico de leads por campañas

query_analisis = """
SELECT 
    l.LeadID,
    c.Channel,
    c.CampaignName as NombreCampanha,
    c.StartDate as FechaInicial,
    c.BudgetUSD as BudgetDolar,
    l.LeadDate  as FechaLead,
    l.LeadSource  as OrigenLead

FROM 
    fact_leads l
JOIN  dim_campanha c ON l.CampaignID = c.CampaignID

ORDER BY 
    l.LeadID
"""

try:
    # Ejecutar la consulta de análisis
    resultado_analisis = spark.sql(query_analisis)
    
    # Mostrar los resultados
    print("Análisis de Leads por Campaña:")
    display(resultado_analisis)
    
    # Convertir a pandas para visualización
    df_analisis_pd = resultado_analisis.toPandas()
    
    # Visualizar con matplotlib (si está disponible)
    try:
        import matplotlib.pyplot as plt
        
        plt.figure(figsize=(10, 6))
        plt.bar(df_analisis_pd['campanha'], df_analisis_pd['leads_totales'])
        plt.title('Leads totales por Campaña')
        plt.xlabel('campanha')
        plt.ylabel('leads_totales ($)')
        plt.xticks(rotation=45)
        plt.tight_layout()
        display(plt.gcf())
    except ImportError:
        print("Matplotlib no está disponible para visualización. Mostrando datos en formato tabular.")
        
    print("✅ Análisis completado correctamente")
except Exception as e:
    print(f"Error al ejecutar análisis: {str(e)}")

StatementMeta(, c57f178f-3694-43be-b1bc-0b1fb2eeb737, 12, Finished, Available, Finished)

Análisis de Leads por Campaña:


SynapseWidget(Synapse.DataFrame, 353e98dc-c766-4a0a-b77c-3f473c962925)

Error al ejecutar análisis: 'campanha'


<Figure size 1000x600 with 0 Axes>

In [None]:
# Análisis básico de ventas
# Realizamos un análisis básico de las ventas por cliente

query_analisis = """
SELECT 
    v.SaleID,
    c.FirstName as PrimerNombre,
    c.LastName as Apellido,
    c.Email,
    c.Region,
    v.SaleDate as FechaVenta,
    v.SalePriceUSD  as PrecioVentaDolares
    
FROM 
    fact_ventas v
JOIN  dim_clientes c ON v.ClientID = c.ClientID

ORDER BY 
    v.SaleID
"""

try:
    # Ejecutar la consulta de análisis
    resultado_analisis = spark.sql(query_analisis)
    
    # Mostrar los resultados
    print("Análisis de ventas por cliente:")
    display(resultado_analisis)
    
    # Convertir a pandas para visualización
    df_analisis_pd = resultado_analisis.toPandas()
    
    # Visualizar con matplotlib (si está disponible)
    try:
        import matplotlib.pyplot as plt
        
        plt.figure(figsize=(10, 6))
        plt.bar(df_analisis_pd['cliente'], df_analisis_pd['ventas_totales'])
        plt.title('Ventas totales por cliente')
        plt.xlabel('cliente')
        plt.ylabel('Ventas totales ($)')
        plt.xticks(rotation=45)
        plt.tight_layout()
        display(plt.gcf())
    except ImportError:
        print("Matplotlib no está disponible para visualización. Mostrando datos en formato tabular.")
        
    print("✅ Análisis completado correctamente")
except Exception as e:
    print(f"Error al ejecutar análisis: {str(e)}")

In [11]:
# Guardar el resultado del análisis
# Guardamos el resultado del análisis como archivo CSV

try:
    # Convertir a pandas para guardarlo como CSV
    df_analisis_pd.to_csv("/tmp/analisis_ventas_cliente.csv", index=False)
    
    # Subir al lakehouse
    mssparkutils.fs.put("Files/processed/analisis_ventas_cliente.csv", 
                        "/tmp/analisis_ventas_cliente.csv", True)
    
    print("✅ Resultado del análisis guardado como CSV en 'Files/processed/analisis_ventas_cliente.csv'")
except Exception as e:
    print(f"Error al guardar resultado: {str(e)}")

StatementMeta(, c57f178f-3694-43be-b1bc-0b1fb2eeb737, 13, Finished, Available, Finished)

✅ Resultado del análisis guardado como CSV en 'Files/processed/analisis_ventas_cliente.csv'
