In [ ]:
# En este notebook aprenderemos a:
# Leer los datos con PySpark
# Realizar transformaciones
# Guardar los dataframes como tablas Delta
# Crear un modelo estrella para análisis

#Configuración inicial y verificación del entorno
# Este comando verifica que estamos en el entorno correcto de Fabric

import pyspark.sql.functions as F
from pyspark.sql.types import *
from notebookutils import mssparkutils
import pandas as pd

# Verificar versión de Spark
print(f"Versión de Spark: {spark.version}")

In [ ]:
# Crear estructura de carpetas en el lakehouse
# Este comando crea las carpetas necesarias en el lakehouse para almacenar nuestros archivos

try:
    # Crear carpetas para datos brutos y procesados       
    mssparkutils.fs.mkdirs("Files/Bronze")
    mssparkutils.fs.mkdirs("Files/silver")
    mssparkutils.fs.mkdirs("Files/Gold")
    print("✅ Estructura de carpetas creada correctamente")
except Exception as e:
    print(f"Error al crear carpetas: {str(e)}")

### Validación de datos

In [ ]:
df_ = spark.read.format("csv").option("header","true").load("Files/Bronze/properties.csv")
# df now is a Spark DataFrame containing CSV data from "Files/Bronze/brokers.csv".
display(df_)

In [ ]:
df.printSchema()

In [ ]:

# Definición de esquemas para mejor control de los datos
# Definimos los esquemas de nuestros dataframes para asegurar tipos de datos correctos

# Esquema para Brokers
schema_brokers = StructType([
    StructField("BrokerID", IntegerType(), True),
    StructField("BrokerName", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("Email", StringType(), True)
])



# Esquema para campaigns
schema_campaigns = StructType([
    StructField("CampaignID", IntegerType(), True),
    StructField("Channel", StringType(), True),
    StructField("CampaignName", StringType(), True),
    StructField("StartDate", DateType(), True),
    StructField("EndDate", DateType(), True),
    StructField("BudgetUSD", DoubleType(), True)
])

# Esquema para clients
schema_clients = StructType([
    StructField("ClientID", IntegerType(), True),
    StructField("FirstName", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("Region", StringType(), True)
])


# Esquema para projects

schema_projects = StructType([
    StructField("ProjectID", IntegerType(), True),
    StructField("ProjectName", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("LaunchYear", IntegerType(), True),
    StructField("Status", StringType(), True)
])


# Esquema para properties

schema_properties = StructType([
    StructField("PropertyID", IntegerType(), True),
    StructField("ProjectID", IntegerType(), True),
    StructField("PropertyType", StringType(), True),
    StructField("Size_m2", IntegerType(), True),
    StructField("Bedrooms", IntegerType(), True),
    StructField("Bathrooms", IntegerType(), True),
    StructField("ListPriceUSD", DoubleType(), True),
    StructField("AvailabilityStatus", StringType(), True)
])


# Esquema para lead

schema_leads = StructType([
    StructField("LeadID", IntegerType(), True),
    StructField("ClientID", IntegerType(), True),
    StructField("PropertyID", IntegerType(), True),
    StructField("CampaignID", IntegerType(), True),
    StructField("LeadDate", DateType(), True),
    StructField("LeadSource", StringType(), True)
])




# Esquema para ventas
schema_sales = StructType([
    StructField("SaleID", IntegerType(), True),
    StructField("PropertyID", IntegerType(), True),
    StructField("ClientID", IntegerType(), True),
    StructField("BrokerID", IntegerType(), True),
    StructField("SaleDate", DateType(), True),
    StructField("SalePriceUSD", DoubleType(), True)
])

print("✅ Esquemas definidos correctamente")


In [ ]:
# Leer los archivos CSV con los esquemas definidos
# Este comando lee los archivos CSV utilizando los esquemas definidos anteriormente

try:
    # Leer archivo de Brokers
    df_brokers= spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_brokers) \
        .load("Files/Bronze/brokers.csv")
    
    # Leer archivo de campaigns
    df_campaigns = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_campaigns) \
        .load("Files/Bronze/campaigns.csv")
    
    # Leer archivo de clients
    df_clients = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_clients) \
        .load("Files/Bronze/clients.csv")
    
    # Leer archivo de projects
    df_projects = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_projects) \
        .load("Files/Bronze/projects.csv")

        # Leer archivo de properties
    df_properties = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_properties) \
        .load("Files/Bronze/properties.csv")    

    # Leer archivo de ventas
    df_leads = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_leads) \
        .load("Files/Bronze/leads.csv")

    # Leer archivo de ventas
    df_sales = spark.read.format("csv") \
        .option("header", "true") \
        .schema(schema_sales) \
        .load("Files/Bronze/sales.csv")
    
    print("✅ Archivos CSV leídos correctamente")
except Exception as e:
    print(f"Error al leer archivos: {str(e)}")

In [ ]:

# Exploración de los datos
# Mostramos una vista previa de los datos cargados

print("Vista previa de los datos de brokers:")
display(df_brokers.limit(5))

print("Vista previa de los datos de campaigns:")
display(df_campaigns.filter(F.col("CampaignID").isin('4009','4025')))

print("Vista previa de los datos de projects:")
display(df_projects.filter(F.col("ProjectID")=='114'))

print("Vista previa de los datos de clients:")
display(df_clients.filter(F.col("ClientID").isin('3020','3003')))

print("Vista previa de los datos de properties:")
display(df_properties.filter(F.col("PropertyID")=='1007'))

print("Vista previa de los datos de leads:")
display(df_leads.filter(F.col("PropertyID")=='1007'))

print("Vista previa de los datos de sales:")
display(df_sales.filter(F.col("PropertyID")=='1007'))


In [ ]:
print("Cantidad de registros por tabla:\n")

print(f"Brokers: {df_brokers.count()}")
print(f"Campaigns: {df_campaigns.count()}")
print(f"Projects: {df_projects.count()}")
print(f"Clients: {df_clients.count()}")
print(f"Properties: {df_properties.count()}")
print(f"Leads: {df_leads.count()}")
print(f"Sales: {df_sales.count()}")

In [ ]:
print("Cantidad de registros y esquema por tabla:\n")

print(f"Brokers: {df_brokers.count()}")
df_brokers.printSchema()

print(f"\nCampaigns: {df_campaigns.count()}")
df_campaigns.printSchema()

print(f"\nProjects: {df_projects.count()}")
df_projects.printSchema()

print(f"\nClients: {df_clients.count()}")
df_clients.printSchema()

print(f"\nProperties: {df_properties.count()}")
df_properties.printSchema()

print(f"\nLeads: {df_leads.count()}")
df_leads.printSchema()

print(f"\nSales: {df_sales.count()}")
df_sales.printSchema()

In [ ]:

# Guardar los dataframes como tablas Delta
# Guardamos los dataframes procesados como tablas Delta en el lakehouse

try:
    # Guardar brokers
    df_brokers.write.format("delta").mode("overwrite").save("Files/Silver/brokers_delta")
    
    # Guardar clients
    df_clients.write.format("delta").mode("overwrite").save("Files/Silver/clients_delta")
   

    # Guardar campaigns
    df_campaigns.write.format("delta").mode("overwrite").save("Files/Silver/campaigns_delta")
    
    # Guardar projects
    df_projects.write.format("delta").mode("overwrite").save("Files/Silver/projects_delta")
    
    # Guardar properties
    df_properties.write.format("delta").mode("overwrite").save("Files/Silver/properties_delta")
    
    # Guardar leads
    df_leads.write.format("delta").mode("overwrite").save("Files/Silver/leads_delta")

    # Guardar Sales
    df_sales.write.format("delta").mode("overwrite").save("Files/Silver/Sales_delta")

    
    print("✅ Dataframes guardados como archivos Delta correctamente")
except Exception as e:
    print(f"Error al guardar archivos Delta: {str(e)}")
