## Gold Layer (Delta) Using Dimension and Facts

In [0]:
%run ./Utils

In [0]:
from pyspark.sql.functions import format_number
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id

In [0]:
# Var in this notebook
Silver_path = "sales_case.silver_sales_table"

#Define the table name and database to save
database_name = 'sales_case'

# Check if the table Gold exists
Dim_product_exists = spark.sql(f"SHOW TABLES IN {database_name} LIKE 'gold_dim_product'").count() > 0
Dim_category_exists = spark.sql(f"SHOW TABLES IN {database_name} LIKE 'gold_dim_category'").count() > 0
Dim_segment_exists = spark.sql(f"SHOW TABLES IN {database_name} LIKE 'gold_dim_segment'").count() > 0
Dim_manufacturer_exists = spark.sql(f"SHOW TABLES IN {database_name} LIKE 'gold_dim_manufacturer'").count() > 0
Dim_region_exists = spark.sql(f"SHOW TABLES IN {database_name} LIKE 'gold_dim_region'").count() > 0
Dim_client_exists = spark.sql(f"SHOW TABLES IN {database_name} LIKE 'gold_dim_client'").count() > 0
Fact_sales_exists = spark.sql(f"SHOW TABLES IN {database_name} LIKE 'gold_fact_sales'").count() > 0

#### Reading data from Silver table

In [0]:
# Reading from a Paquet table
df_silver = spark.read.table(Silver_path)

display(df_silver.take(10))

ProductID,Date,ClientID,Units,Product,Category,Segment,ManufacturerID,Manufacturer,UnitCost,UnitPrice,PostalCode,City,State,Region,District,Country,filename,Email,Name,SalesTotal,Year,Month
506,2011-01-02,159938,1,Maximus UM-11,Urban,Moderation,7,VanArsdel,90.83,124.42,55041,Lake City,MN,Central,District #28,USA,dados_2011.csv,pascale.ferguson@xyza.com,Pascale Ferguson,90.83,2011,1
506,2011-01-27,158876,1,Maximus UM-11,Urban,Moderation,7,VanArsdel,90.83,124.42,55316,Champlin,MN,Central,District #28,USA,dados_2011.csv,regina.villarreal@xyza.com,Regina Villarreal,90.83,2011,1
506,2011-01-28,100427,1,Maximus UM-11,Urban,Moderation,7,VanArsdel,90.83,124.42,56401,Brainerd,MN,Central,District #28,USA,dados_2011.csv,ciara.alvarado@xyza.com,Ciara Alvarado,90.83,2011,1
506,2011-01-30,219021,1,Maximus UM-11,Urban,Moderation,7,VanArsdel,90.83,124.42,55709,Bovey,MN,Central,District #28,USA,dados_2011.csv,yoko.english@xyza.com,Yoko English,90.83,2011,1
506,2011-01-29,280795,1,Maximus UM-11,Urban,Moderation,7,VanArsdel,90.83,124.42,55816,Duluth,MN,Central,District #28,USA,dados_2011.csv,eve.hamilton@xyza.com,Eve Hamilton,90.83,2011,1
506,2011-01-30,30565,1,Maximus UM-11,Urban,Moderation,7,VanArsdel,90.83,124.42,56001,Mankato,MN,Central,District #28,USA,dados_2011.csv,hoyt.ashley@xyza.com,Hoyt Ashley,90.83,2011,1
506,2011-01-27,168503,1,Maximus UM-11,Urban,Moderation,7,VanArsdel,90.83,124.42,55117,Saint Paul,MN,Central,District #28,USA,dados_2011.csv,morgan.smith@xyza.com,Morgan Smith,90.83,2011,1
506,2011-01-27,162455,1,Maximus UM-11,Urban,Moderation,7,VanArsdel,90.83,124.42,55127,Saint Paul,MN,Central,District #28,USA,dados_2011.csv,talon.hudson@xyza.com,Talon Hudson,90.83,2011,1
506,2011-01-22,119705,1,Maximus UM-11,Urban,Moderation,7,VanArsdel,90.83,124.42,55109,Saint Paul,MN,Central,District #28,USA,dados_2011.csv,aphrodite.simmons@xyza.com,Aphrodite Simmons,90.83,2011,1
506,2011-01-29,162456,1,Maximus UM-11,Urban,Moderation,7,VanArsdel,90.83,124.42,55127,Saint Paul,MN,Central,District #28,USA,dados_2011.csv,kibo.monroe@xyza.com,Kibo Monroe,90.83,2011,1


#### Creating Product Dimension

In [0]:
tb_target = "gold_dim_product"

# Extracting distinct products to dim products
dim_product_df = df_silver.select("ProductID", "Product", "Category").dropDuplicates()

# Adding our surrogate key
dim_product_df = dim_product_df.withColumn("sk_product", monotonically_increasing_id()+1)

# Saving DimProduct in Delta Format
if Dim_product_exists:
  dim_product_df.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .table(f"{database_name}.{tb_target}")

else:
  dim_product_df.write.option("maxRecordsPerFile", 50000) \
    .mode("overwrite") \
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .saveAsTable(f"{database_name}.{tb_target}")

display(dim_product_df.take(10))
#dim_product_df.count()

ProductID,Product,Category,sk_product
609,Maximus UC-74,Urban,1
686,Maximus UC-51,Urban,2
662,Maximus UC-27,Urban,3
577,Maximus UC-42,Urban,4
578,Maximus UC-43,Urban,5
690,Maximus UC-55,Urban,6
579,Maximus UC-44,Urban,7
689,Maximus UC-54,Urban,8
685,Maximus UC-50,Urban,9
636,Maximus UC-01,Urban,10


#### Creating Category Dimension

In [0]:
tb_target = "gold_dim_category"

# Extract distinct Categories
dim_category_df = df_silver.select("Category").dropDuplicates()

# Adding our surrogate key
dim_category_df = dim_category_df.withColumn("sk_category", monotonically_increasing_id()+1)

# Saving dimcategory in Delta Format
if Dim_category_exists:
  dim_category_df.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .table(f"{database_name}.{tb_target}")

else:
  dim_category_df.write.option("maxRecordsPerFile", 50000) \
    .mode("overwrite") \
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .saveAsTable(f"{database_name}.{tb_target}")

display(dim_category_df.take(10))
#dim_category_df.count()

Category,sk_category
Mix,1
Urban,2
Youth,3
Accessory,4
Rural,5


#### Creating Segment Dimension

In [0]:
tb_target = "gold_dim_segment"

# Extract distinct Segments 
dim_segment_df = df_silver.select("Segment").dropDuplicates()

# Adding surrogate key
dim_segment_df = dim_segment_df.withColumn("sk_segment", monotonically_increasing_id()+1)

# Saving in Delta Format
if Dim_segment_exists:
  dim_segment_df.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .table(f"{database_name}.{tb_target}")

else:
  dim_segment_df.write.option("maxRecordsPerFile", 50000) \
    .mode("overwrite") \
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .saveAsTable(f"{database_name}.{tb_target}")

display(dim_segment_df.take(10))
#dim_category_df.count()


Segment,sk_segment
All Season,1
Extreme,2
Productivity,3
Regular,4
Convenience,5
Moderation,6
Youth,7
Accessory,8
Select,9


#### Creating Manufacturer Dimension

In [0]:
tb_target = "gold_dim_manufacturer"

# Extract distinct manufacturer
dim_manufacturer_df = df_silver.select("ManufacturerID", "Manufacturer").dropDuplicates()

# Adding surrogate key
dim_manufacturer_df = dim_manufacturer_df.withColumn("sk_manufacturer", monotonically_increasing_id()+1)

# Saving DimProduct in Delta Format
if Dim_manufacturer_exists:
  dim_manufacturer_df.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .table(f"{database_name}.{tb_target}")

else:
  dim_manufacturer_df.write.option("maxRecordsPerFile", 50000) \
    .mode("overwrite") \
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .saveAsTable(f"{database_name}.{tb_target}")

display(dim_manufacturer_df.take(10))
#dim_manufacturer.count()

ManufacturerID,Manufacturer,sk_manufacturer
7,VanArsdel,1


#### Creating Region Dimension

In [0]:
tb_target = "gold_dim_region"

# Extrart distinct regions
dim_region_df = df_silver.select("City", "State", "Region", "District", "Country", "PostalCode").dropDuplicates()

# Adding surrogate key
dim_region_df = dim_region_df.withColumn("sk_region", monotonically_increasing_id()+1)


# Saving DimProduct in Delta Format
if Dim_region_exists:
  dim_region_df.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .table(f"{database_name}.{tb_target}")

else:
  dim_region_df.write.option("maxRecordsPerFile", 50000) \
    .mode("overwrite") \
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .saveAsTable(f"{database_name}.{tb_target}")

display(dim_region_df.take(10))
#dim_region_df.count()

City,State,Region,District,Country,PostalCode,sk_region
Marseilles,IL,Central,District #27,USA,61341,1
Rhome,TX,Central,District #22,USA,76078,2
Quakertown,PA,East,District #04,USA,18951,3
Troy,OH,East,District #16,USA,45373,4
Lima,OH,East,District #16,USA,45806,5
Wildwood,GA,East,District #19,USA,30757,6
Tulsa,OK,Central,District #21,USA,74105,7
Rolla,MO,Central,District #21,USA,65401,8
Painesville,OH,East,District #14,USA,44077,9
Summerville,GA,East,District #09,USA,30747,10


#### Creating Client Dimension

In [0]:
tb_target = "gold_dim_client"

# Step 1 - Extract distinct clients
dim_client_df = df_silver.select("ClientID", "Name", "Email", "City", "State", "Region", "District", "Country", "PostalCode").dropDuplicates()

# Step 2 - Doing some Join to get SK_Region
dim_client_with_sk_df = dim_client_df.alias("client") \
    .join(dim_region_df.alias("region"), 
          (col("client.City") == col("region.City")) &
          (col("client.State") == col("region.State")) &
          (col("client.Region") == col("region.Region")) &
          (col("client.District") == col("region.District")) &
          (col("client.Country") == col("region.Country")) &
          (col("client.PostalCode") == col("region.PostalCode")), 
          "left") \
    .select("client.ClientID", "client.Name", "client.Email", "region.sk_region")

# Step 3 - # Adding surrogate key
dim_client_with_sk_df = dim_client_with_sk_df.withColumn("sk_client", monotonically_increasing_id()+1)

# Step 4 - Getting Specific columns
dim_client_with_sk_df = dim_client_with_sk_df.select("ClientID", "Name","Email", "sk_region", "sk_client")

# Step 5 -Writing DimClient Delta Format
if Dim_client_exists:
  dim_client_with_sk_df.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .table(f"{database_name}.{tb_target}")

else:
  dim_client_with_sk_df.write.option("maxRecordsPerFile", 50000) \
    .mode("overwrite") \
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .saveAsTable(f"{database_name}.{tb_target}")

display(dim_client_with_sk_df.take(10))
#dim_region_df.count()

ClientID,Name,Email,sk_region,sk_client
93119,Amber Norton,amber.norton@xyza.com,2292,1
234644,Jermaine Lyons,jermaine.lyons@xyza.com,2970,2
200772,Maxwell Ruiz,maxwell.ruiz@xyza.com,8589934596,3
64317,Jacob Farmer,jacob.farmer@xyza.com,8589937789,4
244401,Whoopi Bowers,whoopi.bowers@xyza.com,17179870143,5
253093,Glenna Madden,glenna.madden@xyza.com,17179871075,6
50918,Brennan Larsen,brennan.larsen@xyza.com,17179873160,7
176404,Megan Mccoy,megan.mccoy@xyza.com,25769803884,8
205008,Hilel Long,hilel.long@xyza.com,25769806939,9
222641,Anne Beasley,anne.beasley@xyza.com,25769808520,10


#### Creating Sales Fact

In [0]:
# Using Broadcast to optimize processing small DF making a copy inside any node.
# Broadcasting that smaller DataFrame to all nodes in the cluster can be more efficient than the traditional shuffle-based join.
from pyspark.sql.functions import broadcast, year, month

tb_target = "gold_fact_sales"

# Join data from Silver Table with Dimension tables to get surrogate keys
salesfact_df = df_silver.alias("s") \
    .join(broadcast(dim_product_df.select("ProductID", "sk_product").alias("dprod")), "ProductID") \
    .join(broadcast(dim_category_df.select("Category", "sk_category").alias("dcat")), "Category") \
    .join(broadcast(dim_segment_df.select("Segment", "sk_segment").alias("dseg")), "Segment") \
    .join(broadcast(dim_manufacturer_df.select("Manufacturer", "sk_manufacturer").alias("dfab")), "Manufacturer") \
    .join(broadcast(dim_client_with_sk_df.select("ClientID", "sk_client").alias("dcli")), "ClientID") \
    .select(
        col("s.Date").alias("SalesDate"),
        "sk_product",
        "sk_category",
        "sk_segment",
        "sk_manufacturer",
        "sk_client",
        "Units",
        col("s.UnitPrice"),
        col("s.UnitCost"),
        col("s.SalesTotal")
    )

# Writing Delta Format
if Fact_sales_exists:
  salesfact_df.writeStream \
    .outputMode("append") \
    .withColumn("Year", year("SalesDate")) \
    .withColumn("Month", month("SalesDate")) \
    .partitionBy("Year", "Month")\
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .table(f"{database_name}.{tb_target}")

else:
  salesfact_df \
    .withColumn("Year", year("SalesDate")) \
    .withColumn("Month", month("SalesDate")) \
    .write.option("maxRecordsPerFile", 1000000) \
    .partitionBy("Year", "Month")\
    .mode("overwrite") \
    .format("delta") \
    .option("checkpointLocation", f"/mnt/{database_name}/_checkpoint_{tb_target}") \
    .saveAsTable(f"{database_name}.{tb_target}")

display(salesfact_df.take(10))
#salesfact_df.count()

SalesDate,sk_product,sk_category,sk_segment,sk_manufacturer,sk_client,Units,UnitPrice,UnitCost,SalesTotal
2011-04-27,77,2,6,1,60129543011,1,124.42,90.83,90.83
2011-04-27,77,2,6,1,17179876051,1,124.42,90.83,90.83
2011-04-06,77,2,6,1,60129542145,1,124.42,90.83,90.83
2011-04-10,77,2,6,1,34359738369,1,124.42,90.83,90.83
2011-04-10,77,2,6,1,1,1,124.42,90.83,90.83
2011-04-22,77,2,6,1,34359744263,1,124.42,90.83,90.83
2011-04-20,77,2,6,1,6833,1,124.42,90.83,90.83
2011-04-17,77,2,6,1,17179871351,1,124.42,90.83,90.83
2011-04-19,77,2,6,1,25769813106,1,124.42,90.83,90.83
2011-04-08,77,2,6,1,60129553167,1,124.42,90.83,90.83


In [0]:
# Checking our delta table information
display(dbutils.fs.ls("dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales"))

path,name,size,modificationTime
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2011/,Year=2011/,0,0
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2012/,Year=2012/,0,0
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2013/,Year=2013/,0,0
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/_delta_log/,_delta_log/,0,0


In [0]:
display(dbutils.fs.ls("dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2011"))

path,name,size,modificationTime
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2011/Month=1/,Month=1/,0,0
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2011/Month=10/,Month=10/,0,0
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2011/Month=11/,Month=11/,0,0
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2011/Month=12/,Month=12/,0,0
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2011/Month=2/,Month=2/,0,0
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2011/Month=3/,Month=3/,0,0
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2011/Month=4/,Month=4/,0,0
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2011/Month=5/,Month=5/,0,0
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2011/Month=6/,Month=6/,0,0
dbfs:/user/hive/warehouse/sales_case.db/gold_fact_sales/Year=2011/Month=7/,Month=7/,0,0


In [0]:
#Listing all tables in Sales_case schema
display(spark.sql("SHOW TABLES IN sales_case"))

database,tableName,isTemporary
sales_case,bronze_sales_table,False
sales_case,gold_dim_category,False
sales_case,gold_dim_client,False
sales_case,gold_dim_manufacturer,False
sales_case,gold_dim_product,False
sales_case,gold_dim_region,False
sales_case,gold_dim_segment,False
sales_case,gold_fact_sales,False
sales_case,silver_sales_table,False


### Cleaning DF from Memory to optmmize

In [0]:
import gc
gc.collect()

df_silver.unpersist()

dim_product_df.unpersist()
dim_category_df.unpersist()
dim_segment_df.unpersist()
dim_manufacturer_df.unpersist()
dim_region_df.unpersist()
dim_client_with_sk_df.unpersist()
salesfact_df.unpersist()