In [None]:
from pyspark.sql.functions import *

##### Create External Location to read input data from ADLS storage and containers


In [None]:
spark.sql("""CREATE EXTERNAL LOCATION IF NOT EXISTS staging_area
             URL 'abfss://container_name@storage_account_name.dfs.core.windows.net'
             WITH (CREDENTIAL `your-access-credentials`);
         """)

DataFrame[]

##### Create Catalog and Database 

In [None]:
spark.sql("CREATE CATALOG IF NOT EXISTS projects_catalog");
spark.sql("CREATE DATABASE IF NOT EXISTS projects_catalog.bronze_db");
spark.sql("CREATE DATABASE IF NOT EXISTS projects_catalog.silver_db");
spark.sql("CREATE DATABASE IF NOT EXISTS projects_catalog.gold_db");

##### Create spark dataframe for both customers and invoices input data

In [None]:
input_path = 'abfss://container_name@storage_account_name.dfs.core.windows.net/input_data'

customers_df = spark.read.format('csv') \
    .option('inferSchema', 'true') \
    .option('header', 'true') \
    .load(f"{input_path}/customers")

invoices_df = spark.read.format('csv') \
    .option('inferSchema', 'true') \
    .option('header', 'true') \
    .load(f"{input_path}/invoices")

##### Create bronze layer (customers_raw, invoices_raw) tables from spark dataframe

In [None]:
customers_df.withColumn('load_time', current_timestamp()) \
    .write.format('delta') \
    .mode('append') \
    .saveAsTable('projects_catalog.bronze_db.customers_raw');

invoices_df.withColumn('load_time', current_timestamp()) \
    .write.format('delta') \
    .partitionBy('load_time') \
    .mode('append') \
    .saveAsTable('projects_catalog.bronze_db.invoices_raw');

##### checking number of records in the raw tables

In [None]:
spark.sql("SELECT COUNT(*) FROM projects_catalog.bronze_db.customers_raw").display()
spark.sql("SELECT COUNT(*) FROM projects_catalog.bronze_db.invoices_raw").display()

count(1)
33


count(1)
512


##### Create silver layer tables (customers_cleaned, invoices_cleaned) from bronze layer tables 

In [None]:
spark.sql("""
CREATE TABLE IF NOT EXISTS projects_catalog.silver_db.customers_cleaned (
    customer_id INTEGER,
    customer_name STRING,
    load_time TIMESTAMP
)
""")

DataFrame[]

In [None]:
spark.sql("""
INSERT INTO projects_catalog.silver_db.customers_cleaned (customer_id, customer_name, load_time)
SELECT 
    CustomerID AS customer_id,
    CustomerName AS customer_name,
    load_time
FROM 
projects_catalog.bronze_db.customers_raw
WHERE (DATE(load_time) = DATE(current_timestamp())) 
AND (CustomerID IS NOT NULL) AND (CustomerName IS NOT NULL)
""")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [None]:
spark.sql("""
CREATE TABLE IF NOT EXISTS projects_catalog.silver_db.invoices_cleaned (
    invoice_no INTEGER,
    stock_code STRING,
    description STRING,
    quantity INTEGER,
    invoice_date DATE,
    unit_price DOUBLE,
    customer_id INTEGER,
    country STRING,
    load_time TIMESTAMP
)
PARTITIONED BY (load_time)
""")

DataFrame[]

In [None]:
spark.sql("""
INSERT INTO projects_catalog.silver_db.invoices_cleaned
SELECT 
    InvoiceNo AS invoice_no,
    StockCode AS stock_code,
    Description AS description,
    Quantity AS quantity,
    to_date(InvoiceDate, "d-M-y H.m") AS invoice_date,
    UnitPrice AS unit_price,
    CustomerID AS customer_id,
    Country AS country,
    load_time
FROM projects_catalog.bronze_db.invoices_raw
WHERE 
    (DATE(load_time) = DATE(current_timestamp()))
    AND (InvoiceNo IS NOT NULL) AND (StockCode IS NOT NULL) AND (Quantity > 0) AND (InvoiceDate IS NOT NULL) 
    AND (CustomerID IS NOT NULL)
""");

##### checking number of records in the cleaned tables

In [None]:
spark.sql("SELECT COUNT(*) FROM projects_catalog.silver_db.customers_cleaned").display()
spark.sql("SELECT COUNT(*) FROM projects_catalog.silver_db.invoices_cleaned").display()

count(1)
31


count(1)
510


##### Create another silver layer tables (customers (dimension table), invoices(fact table)) and capturing data changes from silver layer tables (customers_cleaned, invoices_cleaned)

In [None]:
spark.sql("""
CREATE TABLE IF NOT EXISTS projects_catalog.silver_db.customers (
    customer_id INTEGER,
    customer_name STRING,
    start_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
    end_date TIMESTAMP DEFAULT NULL
)
USING DELTA
TBLPROPERTIES ('delta.feature.allowColumnDefaults' = 'supported');
""")


DataFrame[]

##### Implementing SCD Type 2 in customers dimension table

In [None]:
spark.sql("""
MERGE INTO projects_catalog.silver_db.customers AS main
USING (
    SELECT * 
    FROM projects_catalog.silver_db.customers_cleaned
    WHERE DATE(load_time) = DATE(current_timestamp())
) AS source
ON main.customer_id = source.customer_id
WHEN MATCHED AND main.customer_name <> source.customer_name 
THEN UPDATE 
SET main.end_date = CASE WHEN main.end_date IS NULL THEN current_timestamp() ELSE main.end_date END;
""")

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [None]:
spark.sql("""
INSERT INTO projects_catalog.silver_db.customers (customer_id, customer_name)
SELECT customer_id, customer_name
FROM projects_catalog.silver_db.customers_cleaned
WHERE DATE(load_time) = DATE(current_timestamp())
""")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

##### Removing duplicate records from the invoices_cleaned table and creating silver layer invoices table containing unique records

In [None]:
invoices_cleaned_df = spark.read.format('delta').table('projects_catalog.silver_db.invoices_cleaned')

In [None]:
invoices_cleaned_df.filter(to_date(col('load_time')) == to_date(current_timestamp())) \
    .drop_duplicates() \
    .write.format('delta') \
    .partitionBy('country') \
    .mode('append') \
    .saveAsTable('projects_catalog.silver_db.invoices')

##### checking number of records in the final dimension (customers) and fact (invoices) table

In [None]:
spark.sql("SELECT COUNT(*) FROM projects_catalog.silver_db.customers").display()
spark.sql("SELECT COUNT(*) FROM projects_catalog.silver_db.invoices").display()

count(1)
31


count(1)
502


##### Creating View in gold layer to track country_yearly_sales

In [None]:
spark.sql("""
CREATE VIEW IF NOT EXISTS projects_catalog.gold_db.country_yearly_sales
AS 
SELECT 
    year(invoice_date) AS sale_year,
    country,
    SUM(quantity * unit_price) AS yearly_sales
FROM projects_catalog.silver_db.invoices
GROUP BY year(invoice_date), country
""")

DataFrame[]

##### Querying yearly sales of each country from View

In [None]:
spark.sql("SELECT * FROM projects_catalog.gold_db.country_yearly_sales;").display()

sale_year,country,yearly_sales
2022,United Kingdom,3532.7199999999984
2021,United Kingdom,11106.100000000006
2021,France,801.8600000000001
2021,Australia,358.25
