#Silver Layer ETL
With the new modeling done, time to ingest the original data into the new tables.

In [0]:
# Imports
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth, weekofyear, monotonically_increasing_id

# Cleaning
Before distributing the data from the bronze layer into the new silver layer tables, we'll check if the data needs cleaning

In [0]:
# For later...
df_bronze = spark.table("bronze.raw_data")

In [0]:
# We will work with data frames for ease of use before converting them to delta tables
df_silver = spark.table("bronze.raw_data")

In [0]:
# Check for duplicates
duplicates = (
    df_silver
    .groupBy(df_silver.columns)
    .count()
    .filter("count > 1")
)
display(duplicates)

# Remove duplicates 
#df_silver = df_silver.dropDuplicates()     

In [0]:
# Check for NAs
nas = df_silver.select([col(c).isNull().alias(c) for c in df_silver.columns]).groupBy().sum()
display(nas)

# Remove NAs
# Remove '#' when ready
#df_silver = df_silver.dropna()

In [0]:
# Total amount of rows should be consistent going forward
display(df_silver.count())

Checking the different columns

In [0]:
display(df_silver.select("Education").distinct())
# Seems fine.

In [0]:
display(df_silver.select("Marital_Status").distinct())

Out of the gate, I believe "Absurd" and "YOLO" do not belong to a cleaned dataset, and should be made part of a "Unknown" category for the time being. Meanwhile, "Alone" clearly belongs to the category "Single".

In [0]:
df_silver = df_silver.replace({"Absurd": "Unknown", "YOLO": "Unknown"}, subset=["Marital_Status"])

In [0]:
df_silver = df_silver.replace({"Alone": "Single"}, subset=["Marital_Status"])

In [0]:
display(df_silver.select("Marital_Status").distinct())

# Data Ingestion
From the cleaned dataframe into the new tables

In [0]:
%skip
# Branch 1 que deu problema WIP!!!!!!!!
df_customer = df_silver.select(
"ID",
"Year_Birth",
"Education",
"Marital_Status",
"Kidhome",
"Teenhome",
"Income",
"Complain"
)#.dropDuplicates()


# Creating surrogate key
#df_customer = df_customer.withColumn(
#"id_customer",
#monotonically_increasing_id()
#)


# Saving DIM_Customer as a delta table
df_customer.write.format("delta") \
.mode("overwrite") \
.saveAsTable("silver.dim_customer") 

In [0]:
# Branch 2
df_customer = (
    df_silver
    .select(
        "ID",
        "Year_Birth",
        "Education",
        "Marital_Status",
        "Kidhome",
        "Teenhome",
        "Income",
        "Complain"
    )
    .dropDuplicates(["ID"])   # ðŸ”´ ESSENCIAL
)


In [0]:
df_customer.count()
# Hmmmmm... WIP

In [0]:
display(df_customer.count())

In [0]:
%skip
# DIM_Promo
# BRAnch 1 WIPPPPPP
df_promo = df_silver.select(
    "AcceptedCmp1",
    "AcceptedCmp2",
    "AcceptedCmp3",
    "AcceptedCmp4",
    "AcceptedCmp5",
    "Response"
)#.dropDuplicates()


# Creating surrogate key
#df_customer = df_customer.withColumn(
#"id_customer",
#monotonically_increasing_id()
#)


# Saving DIM_Customer as a delta table
df_promo.write.format("delta") \
.mode("overwrite") \
.saveAsTable("silver.dim_promo")

In [0]:
df_promo = (
    df_silver
    .select(
        "AcceptedCmp1",
        "AcceptedCmp2",
        "AcceptedCmp3",
        "AcceptedCmp4",
        "AcceptedCmp5",
        "Response"
    )
    .dropDuplicates()
)


In [0]:
display(df_promo.count())

In [0]:
%skip
# DIM_Calendar
# Branch 1 WIPPPPPPP
df_date = (
    df_silver
    .select(col("Dt_Customer").alias("full_date"))
    .withColumn("id_date", monotonically_increasing_id().cast("integer"))
    .withColumn("year", year(col("full_date")))
    .withColumn("month", month(col("full_date")))
    .withColumn("day", dayofmonth(col("full_date")))
    .withColumn("week_of_year", weekofyear(col("full_date")))
    .select(
        "id_date",
        "full_date",
        "year",
        "month",
        "day",
        "week_of_year"
    )
)

df_date.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver.dim_calendar")

In [0]:
df_date = (
    df_silver
    .select(col("Dt_Customer").alias("full_date"))
    .dropDuplicates()  # ðŸ”´ FUNDAMENTAL
    .withColumn("year", year("full_date"))
    .withColumn("month", month("full_date"))
    .withColumn("day", dayofmonth("full_date"))
    .withColumn("week_of_year", weekofyear("full_date"))
)


In [0]:
display(df_date.count())

Only at the end do we ingest the data into the fact table because it will reference all of the other dimensional tables!

In [0]:
%sql
CREATE OR REPLACE TABLE silver.fact_sales
USING DELTA
AS
SELECT
    c.id_customer        AS fk_customer,
    p.id_promo           AS fk_promo,
    d.id_date            AS fk_date,

    b.MntWines,
    b.MntFruits,
    b.MntMeatProducts,
    b.MntFishProducts,
    b.MntSweetProducts,
    b.MntGoldProds,

    b.NumDealsPurchases,
    b.NumWebPurchases,
    b.NumCatalogPurchases,
    b.NumStorePurchases,
    b.NumWebVisitsMonth,
    b.Recency

FROM bronze.raw_data b

LEFT JOIN silver.dim_customer c
    ON  b.Income          = c.Income
    AND b.Kidhome         = c.Kidhome
    AND b.Teenhome        = c.Teenhome
    AND b.Education       = c.Education
    AND b.Marital_Status  = c.Marital_Status
    AND b.Complain        = c.Complain

LEFT JOIN silver.DIM_Promo p
    ON  b.AcceptedCmp1 = p.AcceptedCmp1
    AND b.AcceptedCmp2 = p.AcceptedCmp2
    AND b.AcceptedCmp3 = p.AcceptedCmp3
    AND b.AcceptedCmp4 = p.AcceptedCmp4
    AND b.AcceptedCmp5 = p.AcceptedCmp5
    AND b.Response     = p.Response

LEFT JOIN silver.DIM_Calendar d
    ON b.Dt_Customer = d.full_date;

In [0]:
display(spark.table("silver.fact_sales").count())

Something is clearly wrong...

In [0]:
# Validation
#????
spark.sql("SELECT COUNT(*) FROM silver.fact_sales").display()
spark.sql("SELECT * FROM silver.fact_sales LIMIT 10").display()