#Refining and Cleaning Data in our Silver Layer


We need to get rid of some observations that are extrainous perform calculations. First we query our bronze table for the records we want

In [0]:
%sql


SELECT 
  STORE_ID
, Year
, Quarter
, Total_Revenue
, COGS
, Profit
 FROM 
 delta.`dbfs:/FileStore/tables/TraderJoesRevenue/bronze_tj_fact_revenue` WHERE Row_ID IS NOT NULL 
 AND Quarter = 'Q1'



STORE_ID,Year,Quarter,Total_Revenue,COGS,Profit
733,2023,Q1,20658.73557159825,20025.0,633.7355716
733,2023,Q1,40337.69150410763,22357.0,17980.6915
733,2023,Q1,42998.04061057825,24952.0,18046.04061
734,2023,Q1,10868.457517749213,,
734,2023,Q1,25114.320032515105,21442.0,3672.320033
734,2023,Q1,22891.055266249914,21785.0,1106.055266
735,2023,Q1,33281.28811200044,29714.0,3567.288112
735,2023,Q1,10536.219710456582,,
735,2023,Q1,12209.012377525998,,
736,2023,Q1,29553.321916571396,21328.0,8225.321917



We can easily rename and use the in the below step which is much more readable then using df = spark.sql(our_query) in a python only notebook.

In the below we are using a specific column reference function to drop duplicate records where there was more than one record of 


In [0]:
columns = ["STORE_ID","Year","Quarter","Total_Revenue","COGS","Profit"]

df = _sqldf.dropDuplicates(columns)

#df.display()

df.createOrReplaceTempView("silver_write")

STORE_ID,Year,Quarter,Total_Revenue,COGS,Profit
733,2023,Q1,20658.73557159825,20025.0,633.7355716
746,2023,Q1,16698.026233745455,,
779,2023,Q1,17862.183504272398,,
752,2023,Q1,19732.001376954882,,
752,2023,Q1,30435.33610053258,26790.0,3645.336101
762,2023,Q1,49113.23130254069,36626.0,12487.2313
774,2023,Q1,27814.73015050899,27698.0,116.7301505
742,2023,Q1,17170.135138271748,,
772,2023,Q1,40074.793396835026,30227.0,9847.793397
741,2023,Q1,16235.499707974266,,


Writing into silver using an upsert that will replace data with the same quarter Store, quarter, year etc with later batches (assuming those are more correct.) 

https://medium.com/@santosh_beora/distinct-and-dropduplicates-in-pyspark-fedb1e9e8738

Retyping Columns in our Write to be consistant with their eventual silver table

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW silver_write2
AS
SELECT
  CAST(Store_ID AS STRING)
, CAST(Year AS STRING)
, CAST(Quarter AS STRING)
, CAST(Total_Revenue AS DOUBLE)
, CAST(COGS AS DOUBLE)
, CAST(Profit AS DOUBLE)
FROM silver_write


In [0]:
%sql
USE tj_db;
CREATE TABLE IF NOT EXISTS tj_db.silver_tj_fact_revenue
(
  STORE_ID STRING
, Year STRING
, Quarter STRING
, Total_Revenue DOUBLE
, COGS DOUBLE
, Profit DOUBLE
)

USING delta


In [0]:
%sql

MERGE INTO tj_db.silver_tj_fact_revenue AS silver 
USING silver_write2 as bronze
ON silver.Store_ID = bronze.Store_ID AND silver.Year = bronze.Year AND silver.Quarter = bronze.Quarter 
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1680,0,0,1680


Replacing Profit Calculations with Correct Calculations and rounding to two decimals

In [0]:
%sql
USE tj_db;
CREATE TABLE IF NOT EXISTS tj_db.silver_tj_fact_revenue_profit_recalc
(
  STORE_ID STRING
, Year STRING
, Quarter STRING
, Total_Revenue DOUBLE
, COGS DOUBLE
, Profit DOUBLE
)

USING delta

In [0]:
%sql
MERGE INTO tj_db.silver_tj_fact_revenue_profit_recalc AS recalc
USING tj_db.silver_tj_fact_revenue as orig
ON recalc.Store_ID = orig.Store_ID AND recalc.Year = orig.Year AND recalc.Quarter = orig.Quarter 
WHEN MATCHED THEN UPDATE SET 
  recalc.Store_ID = orig.Store_ID,
  recalc.Year = orig.Year,
  recalc.Quarter = orig.Quarter,
  recalc.Total_Revenue = BROUND(orig.Total_Revenue,2),
  recalc.COGS = BROUND(orig.COGS,2),
  recalc.Profit = BROUND((orig.Total_Revenue - orig.COGS),2)
WHEN NOT MATCHED THEN INSERT (Store_ID, Year, Quarter, Total_Revenue, COGS, Profit)
  VALUES (orig.Store_ID, 
  orig.Year, 
  orig.Quarter, 
  BROUND(orig.Total_Revenue,2), 
  BROUND(orig.COGS,2), 
  BROUND((orig.Total_Revenue - orig.COGS),2))

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1680,0,0,1680
