In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
import pyspark.sql.functions as F
import pandas as pd
from pyspark.sql.types import *
from notebookutils import mssparkutils as mu
from pyspark.sql import SparkSession

EMAIL_REGEX = r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'

StatementMeta(, 09b7c178-183d-45b3-b1b2-1a1664deeb12, 3, Finished, Available, Finished)

In [2]:
spark = SparkSession.builder.getOrCreate()

try:
    spark.sql("CREATE SCHEMA if NOT exists silver")
    print("Schema created silver")

except Exception as ex:
    print("error in schemas")

StatementMeta(, 09b7c178-183d-45b3-b1b2-1a1664deeb12, 4, Finished, Available, Finished)

Schema created silver


In [3]:
spark.sql("DROP TABLE IF EXISTS silver.brokers")
spark.sql("DROP TABLE IF EXISTS silver.campaigns")
spark.sql("DROP TABLE IF EXISTS silver.clients")
spark.sql("DROP TABLE IF EXISTS silver.leads")
spark.sql("DROP TABLE IF EXISTS silver.projects")
spark.sql("DROP TABLE IF EXISTS silver.sales")
spark.sql("DROP TABLE IF EXISTS silver.properties")




StatementMeta(, 09b7c178-183d-45b3-b1b2-1a1664deeb12, 5, Finished, Available, Finished)

DataFrame[]

In [4]:
schema_brokers =  StructType([
    StructField("BrokerId", IntegerType(),False),
    StructField("BrokerName", StringType(),False),
    StructField("Region", StringType(),False),
    StructField("Email", StringType(),False),
    StructField("_created_", TimestampType(),True),
    StructField("_server_", StringType(),False),
    StructField("_last_user_", StringType(),False),
])

schema_campaigns = StructType([
    StructField("CampaignID", IntegerType(), False),
    StructField("Channel", StringType(), True),
    StructField("CampaignName", StringType(), True), 
    StructField("StartDate", DateType(), True),
    StructField("EndDate", DateType(), True),    
    StructField("Budget", IntegerType(), True),
    StructField("_created_", TimestampType(), True),
    StructField("_server_", StringType(), True),
    StructField("_last_user_", StringType(), True)
])

schema_clients = StructType([
    StructField("ClientID",IntegerType(), False),
    StructField("FirstName", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("_created_", TimestampType(), True),
    StructField("_server_", StringType(), True),
    StructField("_last_user_", StringType(), True)

])

schema_leads = StructType([
    StructField("LeadID",IntegerType(), False),
    StructField("ClientID",IntegerType(), False),
    StructField("PropertyID",IntegerType(), False),
    StructField("CampaignID",IntegerType(), False),
    StructField("LeadDate",DateType(), True),
    StructField("LeadSource",StringType(), True),
    StructField("_created_", TimestampType(), True),
    StructField("_server_", StringType(), True),
    StructField("_last_user_", StringType(), True)
    
])

schema_projects = StructType([
    StructField("ProjectID",IntegerType(),False),
    StructField("ProjectName",StringType(),False),
    StructField("City",StringType(),False),
    StructField("Region",StringType(),False),
    StructField("LaunchYear",IntegerType(),False),
    StructField("Status",StringType(),False),
    StructField("_created_", TimestampType(), True),
    StructField("_server_", StringType(), True),
    StructField("_last_user_", StringType(), True)
])

schema_properties = StructType([
     StructField("PropertyID",IntegerType(),False),
     StructField("ProjectID",IntegerType(),False),
     StructField("PropertyType",StringType(),False),
     StructField("Size_m2",IntegerType(),False),
     StructField("Bedrooms",IntegerType(),False),
     StructField("Bathrooms",IntegerType(),False),
     StructField("ListPriceUSD",IntegerType(),False),
     StructField("AvailabilityStatus",StringType(),False),
     StructField("_created_", TimestampType(), True),
     StructField("_server_", StringType(), True),
     StructField("_last_user_", StringType(), True)

])

schema_sales = StructType([
    StructField("SaleID",IntegerType(),False),
    StructField("PropertyID",IntegerType(),False),
    StructField("ClientID",IntegerType(),False),
    StructField("BrokerID",IntegerType(),False),
    StructField("SaleDate",DateType(),False),
    StructField("SalePriceUSD",IntegerType(),False),
    StructField("_created_", TimestampType(), True),
    StructField("_server_", StringType(), True),
    StructField("_last_user_", StringType(), True)
])





StatementMeta(, 09b7c178-183d-45b3-b1b2-1a1664deeb12, 6, Finished, Available, Finished)

In [5]:
df_broker_silver = (
    spark.table("bronze.brokers")
    .rdd.toDF(schema_brokers)
    .dropDuplicates(["BrokerId"])
    .withColumn("BrokerName", F.trim(F.col("BrokerName")))
    .withColumn("Region", F.trim(F.col("Region")))
    .withColumn("Email", F.lower(F.trim(F.col("Email"))))
    .withColumn("_created_", F.col("_created_"))
    .withColumn("_processed_timestamp_", F.current_timestamp())  # Audit column
    .withColumn("_last_user_", F.lit("notebookMC1Silver"))  # Audit column
    .filter(F.col("Email").isNotNull())
    .filter(F.regexp_extract(F.col("Email"), EMAIL_REGEX, 0) != "")
    
)

df_clients_silver = (
    spark.table("bronze.clients")
    .rdd.toDF(schema_clients)
    .dropDuplicates(["ClientID"])
    .withColumn("FirstName",F.initcap(F.trim(F.col("FirstName"))))
    .withColumn("LastName",F.initcap(F.trim(F.col("LastName"))))
    .withColumn("FullName",F.concat(F.col("LastName"),F.lit(", "),F.col("FirstName")))
    .withColumn("Email",F.lower(F.col("Email")))
    .withColumn("Region",F.initcap(F.trim(F.col("Region"))))
    .withColumn("_created_", F.col("_created_"))
    .withColumn("_processed_timestamp_", F.current_timestamp())  # Audit column
    .withColumn("_last_user_", F.lit("notebookMC1Silver"))  # Audit column
    .filter(F.col("ClientID").isNotNull())   
)

df_campaigns_silver = (
    spark.table("bronze.campaigns")
    .rdd.toDF(schema_campaigns)
    .dropDuplicates(["CampaignID"])
    .withColumn("CampaignName", F.trim(F.col("CampaignName")))
    .withColumn("DaysBetweenCampaign", F.datediff(F.col("EndDate"), F.col("StartDate")))
    .withColumn("Channel", F.regexp_replace(F.concat_ws("", F.split(F.initcap(F.col("Channel")), " ")), " ", ""))    
    .withColumn("_created_", F.col("_created_"))
    .withColumn("_processed_timestamp_", F.current_timestamp())  # Audit column
    .withColumn("_last_user_", F.lit("notebookMC1Silver"))  # Audit column
).select(*["CampaignID", "CampaignName", "Channel", "Budget", "StartDate", "EndDate","DaysBetweenCampaign", "_created_","_server_","_last_user_","_processed_timestamp_"])

df_leads_silver = (
    spark.table("bronze.leads")
    .rdd.toDF(schema_leads)
    .filter(F.col("ClientID").isNotNull())
    .filter(F.col("PropertyID").isNotNull())
    .filter(F.col("CampaignID").isNotNull())
    .withColumn("_created_", F.col("_created_"))
    .withColumn("_processed_timestamp_", F.current_timestamp())  # Audit column
    .withColumn("_last_user_", F.lit("notebookMC1Silver"))  # Audit column

)


df_projects_silver = (
    spark.table("bronze.projects")
    .rdd.toDF(schema_projects)
    .withColumn("ProjectName", F.trim(F.col("ProjectName")))
    .withColumn("City", F.initcap(F.trim(F.col("City"))))
    .withColumn("Region", F.initcap(F.trim(F.col("Region"))))
    .withColumn("LaunchYear", F.col("LaunchYear"))
    .withColumn("Status", F.initcap(F.trim(F.col("Status"))))
    .withColumn("_created_", F.col("_created_"))
    .withColumn("_processed_timestamp_", F.current_timestamp())  # Audit column
    .withColumn("_last_user_", F.lit("notebookMC1Silver"))  # Audit column    
    
)

df_properties_silver = (
    spark.table("bronze.properties")
    .rdd.toDF(schema_properties)
    .withColumn("PropertyType",F.trim(F.col("PropertyType")))
    .withColumn("AvailabilityStatus",F.initcap(F.trim(F.col("AvailabilityStatus"))))
    .withColumn("_created_", F.col("_created_"))
    .withColumn("_processed_timestamp_", F.current_timestamp())  # Audit column
    .withColumn("_last_user_", F.lit("notebookMC1Silver"))  # Audit column 
    .filter(F.col("ProjectID").isNotNull())
    

)

df_sales_silver = (
    spark.table("bronze.sales")
    .rdd.toDF(schema_sales)
    .withColumn("_created_", F.col("_created_"))
    .withColumn("_processed_timestamp_", F.current_timestamp())  # Audit column
    .withColumn("_last_user_", F.lit("notebookMC1Silver"))  # Audit column 
    .filter(F.col("PropertyID").isNotNull())
    .filter(F.col("ClientID").isNotNull())
    .filter(F.col("BrokerID").isNotNull())
    .filter(F.col("SaleDate").isNotNull())


)



StatementMeta(, 09b7c178-183d-45b3-b1b2-1a1664deeb12, 7, Finished, Available, Finished)

In [6]:
df_broker_silver.write.format("delta").mode("overwrite").saveAsTable("silver.brokers")
df_campaigns_silver.write.format("delta").mode("overwrite").saveAsTable("silver.campaigns")
df_clients_silver.write.format("delta").mode("overwrite").saveAsTable("silver.clients")
df_leads_silver.write.format("delta").mode("overwrite").saveAsTable("silver.leads")
df_projects_silver.write.format("delta").mode("overwrite").saveAsTable("silver.projects")
df_properties_silver.write.format("delta").mode("overwrite").saveAsTable("silver.properties")
df_sales_silver.write.format("delta").mode("overwrite").saveAsTable("silver.sales")


StatementMeta(, 09b7c178-183d-45b3-b1b2-1a1664deeb12, 8, Finished, Available, Finished)