In [None]:
%sql
create schema finance_data.db_data;

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# Read csv into dataframe
df = spark.read.format("csv").option("header", "true").option("inferSchema","true").option("escape", "\"").option("quote", "\"").load("/Volumes/finance_data/db_data/data/Product.csv")

# Replace invalid characters
for column_name in df.columns:
    new_column_name = column_name.replace(' ', '_').replace(',', '').replace(';', '').replace('{', '').replace('}', '').replace('(', '').replace(')', '').replace('\n', '').replace('\t', '').replace('=', '')
    df = df.withColumnRenamed(column_name, new_column_name)

# store result in table
df.write.saveAsTable("finance_data.db_data.raw_product")

In [None]:
from pyspark.sql.functions import col

# Read JSON into dataframe
df = spark.read.option("multiLine", "true").option("mode", "PERMISSIVE").json("/Volumes/finance_data/db_data/data/Order_new.json")

# Replace invalid characters
for column_name in df.columns:
    new_column_name = column_name.replace(' ', '_').replace(',', '').replace(';', '').replace('{', '').replace('}', '').replace('(', '').replace(')', '').replace('\n', '').replace('\t', '').replace('=', '')
    df = df.withColumnRenamed(column_name, new_column_name)

# store result in table
df.write.saveAsTable("finance_data.db_data.raw_order")

In [None]:
pip install openpyxl

Python interpreter will be restarted.
Collecting openpyxl
  Downloading openpyxl-3.1.3-py2.py3-none-any.whl (251 kB)
Collecting et-xmlfile
  Downloading et_xmlfile-1.1.0-py3-none-any.whl (4.7 kB)
Installing collected packages: et-xmlfile, openpyxl
Successfully installed et-xmlfile-1.1.0 openpyxl-3.1.3
Python interpreter will be restarted.


In [None]:

# Read EXCEL into dataframe
file_path = "/Volumes/finance_data/db_data/data/Customer.xlsx"
df = spark.read.format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_path)

# Replace invalid characters
for column_name in df.columns:
    new_column_name = column_name.replace(' ', '_').replace(',', '').replace(';', '').replace('{', '').replace('}', '').replace('(', '').replace(')', '').replace('\n', '').replace('\t', '').replace('=', '')
    df = df.withColumnRenamed(column_name, new_column_name)

# store result in table
df.write.saveAsTable("finance_data.db_data.raw_customer")

In [None]:
%sql
CREATE or REPLACE TABLE finance_data.db_data.refined_customer
as
SELECT
    Cust.Customer_ID,
    TRIM(REGEXP_REPLACE(REGEXP_REPLACE((Customer_Name), '[^a-zA-Z ]', ''), ' {2,}', '')) AS Customer_Name,
    TRIM(REGEXP_REPLACE(REGEXP_REPLACE((Country), '[^a-zA-Z ]', ''), ' {2,}', '')) AS Country
FROM
    finance_data.db_data.raw_customer as cust;


num_affected_rows,num_inserted_rows


In [None]:
%sql
CREATE or REPLACE TABLE finance_data.db_data.refined_product
as
SELECT
    prod.Product_ID,
    TRIM(REGEXP_REPLACE(REGEXP_REPLACE((Category), '[^a-zA-Z ]', ''), ' {2,}', '')) AS Product_Category,  --replacing invalid characters
    TRIM(REGEXP_REPLACE(REGEXP_REPLACE((`Sub-Category`), '[^a-zA-Z ]', ''), ' {2,}', '')) AS `Sub-Category` --replacing invalid characters
FROM
    finance_data.db_data.raw_product as prod
    group by prod.Product_ID, prod.Category, prod.`Sub-Category`;


num_affected_rows,num_inserted_rows


In [None]:
%sql
CREATE or REPLACE TABLE FINANCE_DATA.DB_DATA.REFINED_ORDER
AS
SELECT 
  ord.Order_ID,
  ord.Product_ID,
  ord.Customer_ID,
  IFNULL(CAST((split_part(order_date,'/',3 ) || '-' || split_part(order_date,'/',2)  || '-' || split_part(order_date,'/',1)) AS DATE), '2000-01-01')  AS order_date, --casting into correct date format
  CAST((ord.Profit) AS DECIMAL(10,2)) as Profit --rounding off to 2 digits
FROM finance_data.db_data.raw_order as ord

num_affected_rows,num_inserted_rows


In [None]:
%sql
create or REPLACE TABLE FINANCE_DATA.DB_DATA.SALES_MART
AS
(
select
IFNULL(cst.Customer_ID, 'NA') as Customer_ID, --handling Null value cases with NA since left join used
IFNULL(cst.Customer_Name, 'NA') as Customer_Name,  --handling Null value cases with NA since left join used
IFNULL(cst.Country, 'NA') as Country,  --handling Null value with NA since left join used
IFNULL(year(ord.Order_Date), 2099) as Year,  --handling Null value cases with year 2099 since left join used
IFNULL(prd.Product_Category, 'NA') as Product_Category,  --handling Null value cases with NA since left join used
IFNULL(prd.`Sub-Category`, 'NA') as `Sub-Category`,  --handling Null value cases with NA since left join used
IFNULL(ord.profit, 0.00) as Profit
from finance_data.db_data.refined_order as ord
left join finance_data.db_data.refined_product as prd  ----since few products are missing in product table w.r.t order table thus left join to preserve all orders
on ord.Product_ID = prd.Product_ID
inner join finance_data.db_data.refined_customer as cst
on ord.Customer_ID = cst.Customer_ID 
)

num_affected_rows,num_inserted_rows


In [None]:
%sql
CREATE or REPLACE VIEW FINANCE_DATA.DB_DATA.VW_SALES_BY_YEAR
AS
select 
Year,
sum(profit) as Total_Profit
from finance_data.db_data.sales_mart
group by year


In [None]:
%sql
CREATE or REPLACE VIEW FINANCE_DATA.DB_DATA.VW_SALES_BY_PRODUCT_CATEGORY
AS
select 
Product_Category,
sum(profit) as Total_Profit
from finance_data.db_data.sales_mart
group by Product_Category


In [None]:
%sql
CREATE or REPLACE VIEW FINANCE_DATA.DB_DATA.VW_SALES_BY_SUB_CATEGORY
AS
select 
`Sub-Category`,
sum(profit) as Total_Profit
from finance_data.db_data.sales_mart
group by `Sub-Category`


In [None]:
%sql
CREATE or REPLACE VIEW FINANCE_DATA.DB_DATA.VW_SALES_BY_CUSTOMER
AS
select 
Customer_ID,
Customer_Name,
sum(profit) as Total_Profit
from finance_data.db_data.sales_mart
group by Customer_ID, Customer_Name


In [None]:
%sql
select 
Year,
sum(profit) as Total_Profit
from finance_data.db_data.sales_mart
group by year

Year,Total_Profit
2015,63073.3
2014,39185.89
2016,65073.58
2017,111085.31


In [None]:
%sql
select 
Year,
Product_Category,
sum(profit) as Total_Profit
from finance_data.db_data.sales_mart
group by year, Product_Category

Year,Product_Category,Total_Profit
2017,Office Supplies,44273.35
2015,Office Supplies,24519.51
2016,Technology,23223.84
2014,Technology,21493.38
2015,Furniture,3027.2
2014,Furniture,-5331.05
2017,Technology,63281.91
2017,Furniture,3041.54
2014,,523.13
2015,Technology,34943.43


In [None]:
%sql
select 
Customer_ID,
Customer_Name,
sum(profit) as Total_Profit
from finance_data.db_data.sales_mart
group by Customer_ID, Customer_Name

Customer_ID,Customer_Name,Total_Profit
VP-21760,Victoria Pisteka,-1018.91
JK-15640,Jim Kriz,1172.9
PR-18880,Patrick Ryan,5596.2
RB-19435,Richard Bierner,477.63
CD-12280,Christina DeMoss,233.48
JM-15655,Jim Mitchum,117.2
CR-12730,Craig Reiter,306.92
GM-14680,Greg Matthias,35.47
CM-12115,Chad McGuire,408.58
PA-19060,Pete Armstrong,225.86


In [None]:
%sql
select 
Customer_ID,
Customer_Name,
Year,
sum(profit) as Total_Profit
from finance_data.db_data.sales_mart
group by Customer_ID, Customer_Name, Year

Customer_ID,Customer_Name,Year,Total_Profit
MO-17800,Meg OConnel,2016,5.5
CC-12475,Cindy Chapman,2017,40.35
MP-18175,Mike Pelletier,2014,-125.29
TH-21115,Thea Hudgings,2017,-181.36
JL-15235,Janet Lee,2017,88.19
VW-21775,Victoria Wilson,2015,-1186.41
RE-19450,Richard Eichhorn,2015,211.06
ND-18370,Natalie DeCherney,2014,47.61
VF-21715,Vicky Freymann,2017,62.48
JG-15805,John Grady,2015,8.69


In [None]:
%sql

--TESTING : row count testing

select count(*), 'raw_order_count' as table_name from finance_data.db_data.raw_order
union
select count(*), 'raw_product_count' as table_name from finance_data.db_data.raw_product
union
select count(*), 'raw_customer_count' as table_name from finance_data.db_data.raw_customer 
union
select count(*), 'refined_order_count' as table_name from finance_data.db_data.refined_order
union
select count(*), 'refined_product_count' as table_name from finance_data.db_data.refined_product
union
select count(*), 'refined_customer_count' as table_name from finance_data.db_data.refined_customer; 

count(1),table_name
1851,raw_product_count
9994,refined_order_count
793,refined_customer_count
9994,raw_order_count
793,raw_customer_count
1818,refined_product_count


In [None]:
%sql
--TESTING : dups check

select Customer_ID from finance_data.db_data.raw_customer
group by Customer_ID
having count(*) > 1; --0 No dups

select Product_id from finance_data.db_data.raw_product
group by Product_id
having count(*) > 1; --33 Need to be deduplicated in refined layer

select Customer_ID from finance_data.db_data.refined_customer
group by Customer_ID
having count(*) > 1; --0 No dups

select Product_id from finance_data.db_data.refined_product
group by Product_id
having count(*) > 1; --0

Product_id


In [None]:
%sql

--TESTING : tally total across different aggregations

select sum(Total_Profit), 'vw_sales_by_customer' as view_name  from finance_data.db_data.vw_sales_by_customer

union

select sum(Total_Profit), 'vw_sales_by_product_category' as view_name from finance_data.db_data.vw_sales_by_product_category

union

select sum(Total_Profit), 'vw_sales_by_sub_category' as view_name from finance_data.db_data.vw_sales_by_sub_category

union

select sum(Total_Profit), 'vw_sales_by_year' as view_name from finance_data.db_data.vw_sales_by_year;

sum(Total_Profit),view_name
278418.08,vw_sales_by_customer
278418.08,vw_sales_by_sub_category
278418.08,vw_sales_by_product_category
278418.08,vw_sales_by_year


In [None]:
%sql

--TESTING : null check in primary key fields
select count(*), 'refined_customer' as table_name  from finance_data.db_data.refined_customer where Customer_ID is null

union

select count(*), 'refined_product' as table_name from finance_data.db_data.refined_product where Product_Id is null

union

select count(*), 'refined_order' as table_name from finance_data.db_data.refined_order where Order_ID is null;



count(1),table_name
0,refined_product
0,refined_customer
0,refined_order


In [None]:
%sql

--TESTING : check for nulls across aggregated tables / views

select count(*), 'vw_sales_by_customer' as view_name  from finance_data.db_data.vw_sales_by_customer where Customer_ID is null

union

select count(*), 'vw_sales_by_product_category' as view_name from finance_data.db_data.vw_sales_by_product_category where Product_Category is null

union

select count(*), 'vw_sales_by_sub_category' as view_name from finance_data.db_data.vw_sales_by_sub_category where `Sub-Category` is null

union

select count(*), 'vw_sales_by_year' as view_name from finance_data.db_data.vw_sales_by_year where year is null;

count(1),view_name
0,vw_sales_by_customer
0,vw_sales_by_sub_category
0,vw_sales_by_product_category
0,vw_sales_by_year


In [None]:
%sql
--TESTING : Data Quality Check
select * from finance_data.db_data.sales_mart
where
customer_name NOT REGEXP '^[a-zA-Z ]+$'; --0

select * from finance_data.db_data.sales_mart
where
Product_Category NOT REGEXP '^[a-zA-Z ]+$'; --0

select * from finance_data.db_data.sales_mart
where
`Sub-Category` NOT REGEXP '^[a-zA-Z ]+$'; --0

Customer_ID,Customer_Name,Country,Year,Product_Category,Sub-Category,Profit


In [None]:
%sql
--TESTING for non-numeric value
select * from finance_data.db_data.sales_mart
where Profit regexp '^[a-zA-Z ]'; --0

Customer_ID,Customer_Name,Country,Year,Product_Category,Sub-Category,Profit


In [None]:
%sql

--TESTING for digits in decimal place
select MAX(LENGTH(SUBSTRING_INDEX(profit, '.', -1))) as decimal_precision from finance_data.db_data.sales_mart; --2

decimal_precision
2
