#### Creating raw tables and using autoloader to load the data from the landing_zone

#### Reading the data from landing zone and saving it to the bronze layer table


In [0]:
%python
from pyspark.sql.functions import current_timestamp
#Read the data from landing zone into the dataframe and write it as table in the bronze layer..

customer_raw_df = spark.read.format('csv')\
                        .option('header', True)\
                        .option('inferSchema',True)\
                        .load("/Volumes/dev/demo_db/landing_zone/customers")\
                        .withColumn('load_time', current_timestamp())
customer_raw_df.show(10)
customer_raw_df.write \
    .format("delta") \
    .partitionBy("load_time") \
    .mode("append") \
    .saveAsTable("dev.bronze_db.customer_raw")




+----------+------------+--------------------+
|CustomerID|CustomerName|           load_time|
+----------+------------+--------------------+
|     16029|   Katherina|2024-02-27 05:37:...|
|     16098|    Lisandra|2024-02-27 05:37:...|
|     16244|      Lizeth|2024-02-27 05:37:...|
|     16250|       Neida|2024-02-27 05:37:...|
|     17420|     Nereida|2024-02-27 05:37:...|
|     17511|      Olinda|2024-02-27 05:37:...|
|     17809|     Pamella|2024-02-27 05:37:...|
|     17841|   Petronila|2024-02-27 05:37:...|
|     17850|     Sharika|2024-02-27 05:37:...|
|     18041|        NULL|2024-02-27 05:37:...|
+----------+------------+--------------------+
only showing top 10 rows



In [0]:
%python
from pyspark.sql.functions import current_timestamp
#Read the data from landing zone into the dataframe and write it as table in the bronze layer..

invoice_raw_df = spark.read.format('csv')\
                        .option('header', True)\
                        .option('inferSchema',True)\
                        .load("/Volumes/dev/demo_db/landing_zone/invoices")\
                        .withColumn('load_time', current_timestamp())
invoice_raw_df.show(10)
invoice_raw_df.write \
    .format("delta") \
    .partitionBy("load_time") \
    .mode("append") \
    .saveAsTable("dev.bronze_db.invoice_raw")


+---------+---------+--------------------+--------+----------------+---------+----------+--------------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|           load_time|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+--------------------+
|   536627|    22699|ROSES REGENCY TEA...|      12|02-01-2022 10.53|     2.95|     15658|United Kingdom|2024-02-27 05:38:...|
|   536627|    21755|LOVE BUILDING BLO...|       4|02-01-2022 10.53|     5.95|     15658|United Kingdom|2024-02-27 05:38:...|
|   536628|   85123A|WHITE HANGING HEA...|       6|07-01-2022 10.54|     2.55|     17850|United Kingdom|2024-02-27 05:38:...|
|   536628|    71053| WHITE METAL LANTERN|       6|07-01-2022 10.54|     3.39|     17850|United Kingdom|2024-02-27 05:38:...|
|   536628|   84406B|CREAM CUPID HEART...|       6|07-01-2022 10.54|     2.75|     17850|United Kingdom|2024-02-27 05:

#### Reading the data from bronze layer and after cleaning and then saving it to the silver layer table as "_cleaned"

In [0]:
%sql

INSERT INTO dev.silver_db.customer_cleaned
SELECT CustomerId as customer_id, CustomerName as customer_name, load_time
from dev.bronze_db.customer_raw
WHERE date(load_time)=date(current_timestamp()) AND CustomerID is not NULL AND CustomerName is not NULL


num_affected_rows,num_inserted_rows
12,12


In [0]:
%sql
INSERT INTO dev.silver_db.invoice_cleaned
SELECT InvoiceNo AS invoice_no,
    StockCode AS stock_code,
    Description AS description,
    Quantity AS quantity,
    to_date(InvoiceDate, "d-M-y H.m") AS invoice_date,
    UnitPrice AS unit_price,
    CustomerID AS customer_id,
    Country AS country,
    load_time
from dev.bronze_db.invoice_raw
WHERE date(load_time)=date(current_timestamp()) AND InvoiceNo is not NULL AND StockCode is NOT NULL AND quantity > 0


num_affected_rows,num_inserted_rows
258,258


#### Reading the data from silver layer "_cleaned" tables and do change data capture and then save them into silver layer table --> final tables

In [0]:
%sql
MERGE INTO dev.silver_db.customers targ
USING(SELECT * FROM dev.silver_db.customer_cleaned WHERE date(load_time)=date(current_timestamp())) source
ON (targ.customer_id=source.customer_id) and (targ.customer_name<>source.customer_name)
WHEN MATCHED THEN
  UPDATE SET targ.end_timestamp=case when targ.end_timestamp is null THEN current_timestamp() END
  , targ.isRecordLatest=case when targ.isRecordLatest = TRUE THEN FALSE END


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,2,0,0


In [0]:
%sql
--Inserting all the records into customer cleaned ...
INSERT INTO dev.silver_db.customers(customer_id, customer_name)
SELECT customer_id, customer_name
FROM dev.silver_db.customer_cleaned WHERE date(load_time)=date(current_timestamp())

num_affected_rows,num_inserted_rows
12,12


In [0]:
%sql
-- Performing CDC for the invoices 
-- Remove the duplicates also...
INSERT INTO dev.silver_db.invoices
SELECT invoice_no, stock_code, description, quantity, invoice_date, unit_price, customer_id, country, current_timestamp() as load_time
FROM dev.silver_db.invoice_cleaned
WHERE date(load_time)=date(current_timestamp())
group by invoice_no, stock_code, description, quantity, invoice_date, unit_price, customer_id, country



num_affected_rows,num_inserted_rows
250,250


#### Now In the Gold Layer, we will be making our views...

In [0]:
%sql
CREATE VIEW IF NOT EXISTS dev.gold_db.country_yearly_sales
AS 
SELECT 
    year(invoice_date) AS sale_year,
    country,
    SUM(quantity * unit_price) AS yearly_sales
FROM dev.silver_db.invoices
GROUP BY year(invoice_date), country

In [0]:
%sql
select *
from dev.gold_db.country_yearly_sales

sale_year,country,yearly_sales
2021,Australia,358.25
2021,France,801.86
2021,United Kingdom,11090.80000000001
2022,United Kingdom,3532.7200000000007
