In [1]:
'''
Author : Oluwaloseyi Sufianu Sekoni
Role: Data Engineer @ Data2Bots
Date of Creation: 22nd May, 2023
'''

#Initialise Local Spark Instance
import findspark
findspark.init()

#Import Supporting ETL libraries
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
import botocore
import boto3
# import json

# Define path to JDBC driver for onPrem DB 
PATH_TO_JAR_FILE = r"C:\SparkApp\spark-3.4.0-bin-hadoop3\jars\postgresql-42.6.0.jar"

# create spark session
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master('local[1]')\
    .appName("Data2Bots_ETL")\
    .config("spark.jars", PATH_TO_JAR_FILE)\
    .getOrCreate()

sc = spark.sparkContext

spark

print('Spark Successfully Initialised')

Spark Successfully Initialised


In [2]:
spark

In [3]:
from botocore import UNSIGNED
from botocore.client import Config

s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED), region_name = 'eu-central-1')
# Use the client

print('Completed Sucessfully')

Completed Sucessfully


In [4]:
bucket =s3.list_objects_v2(Bucket='d2b-internal-assessment-bucket')

if bucket['Name'] == 'd2b-internal-assessment-bucket':
    print("The bucket exists")
else:
    print("The bucket does not exist")

print('Connection to AWS S3 is successful')

The bucket exists
Connection to AWS S3 is successful


In [5]:
#Connect to Bucket containing json files
response = s3.list_objects_v2(Bucket='d2b-internal-assessment-bucket')

# Get Bucket Contents
files = response.get("Contents")

In [46]:
#Define Destination Connection to AWS PostgresDB amd Load Operation as a Function
def jdbc_load_function(to_load, write_mode, tab_name):
    servername = "d2b-internal-assessment-dwh.cxeuj0ektqdz.eu-central-1.rds.amazonaws.com"
    portnumber = 5432
    dbname = "d2b_assessment"
    username = "oluwseko3351"
    password = "QWjmGssQaY"

    URL = f"jdbc:postgresql://{servername}:{portnumber}/{dbname}"
    print('JDBC URL Created')

    load_query = '''{to_load}.write.mode("{write_mode}")\
    .format("jdbc")\
    .option("url",URL)\
    .option("dbtable", "oluwseko3351_staging.{tab_name}")\
    .option("user", username)\
    .option("password", "QWjmGssQaY")\
    .option("driver", "org.postgresql.Driver")\
    .save()'''.format(to_load = "dataset",write_mode=write_mode, tab_name = tab_name)

    exec(load_query)
    
    final_message = '''Table Loading is Complete.\n\nPlease check PostgresDB for table:{tab_name}\n\nTables was generated from:{tab_name}.csv
    '''.format(tab_name=tab_name)
    
    print(final_message)
    
print('Load Function Created Successfully')

Load Function Created Successfully


In [45]:
bucket_name = "d2b-internal-assessment-bucket"

for file in files:
#Store file name as a variable
    table_name = str(file['Key'])  
    #check for .json files only
    if table_name.endswith('.csv') == True:
        if table_name.startswith('orders_data/orders') == True or  table_name.startswith('orders_data/reviews') == True or  table_name.startswith('orders_data/shipment_deliveries') == True:        
            print("File is a csv: '{file}'".format(file=table_name))
            dataset = (table_name.split('/')[1]).split('.')[0]
            tab_name = str((table_name.split('/')[1]).split('.')[0])
            print(tab_name)
            print(dataset)

            download_string = '''s3.download_file(Bucket=bucket_name, 
            Key='{table_name}', Filename=r"/Users/Oluwaloseyi Sekoni/csvstaging/{table_name}")'''.format(table_name=table_name)
        
            exec(download_string)
            print('File downloaded')
            
            read_string = '''dataset = spark.read.format("csv") \
            .option("header","true").load(r"/Users/Oluwaloseyi Sekoni/csvstaging/{table_name}")
            '''.format(table_name = table_name)
        
            exec(read_string)
            
            jdbc_load_function(dataset, "overwrite",tab_name)
            
            #dataset.printSchema()
            #dataset.show()
            

File is a csv: 'orders_data/orders.csv'
orders
orders
File downloaded
JDBC URL Created
Table Loading is Complete.

Please check onPrem DB for table:orders

Tables was generated from:orders.json
    
File is a csv: 'orders_data/reviews.csv'
reviews
reviews
File downloaded
JDBC URL Created
Table Loading is Complete.

Please check onPrem DB for table:reviews

Tables was generated from:reviews.json
    
File is a csv: 'orders_data/shipment_deliveries.csv'
shipment_deliveries
shipment_deliveries
File downloaded
JDBC URL Created
Table Loading is Complete.

Please check onPrem DB for table:shipment_deliveries

Tables was generated from:shipment_deliveries.json
    


In [63]:
Q1_query = '''(SELECT CURRENT_DATE ingestion_date ,
SUM(CASE WHEN TO_CHAR(to_date(a.order_date, 'YYYY-MM-DD'), 'Mon') = 'Jan' THEN 1 else 0 end) tt_order_hol_jan,
SUM(CASE WHEN TO_CHAR(to_date(a.order_date, 'YYYY-MM-DD'), 'Mon') = 'Feb' THEN 1 else 0 end) tt_order_hol_feb,
SUM(CASE WHEN TO_CHAR(to_date(a.order_date, 'YYYY-MM-DD'), 'Mon') = 'Mar' THEN 1 else 0 end) tt_order_hol_mar,
SUM(CASE WHEN TO_CHAR(to_date(a.order_date, 'YYYY-MM-DD'), 'Mon') = 'Apr' THEN 1 else 0 end) tt_order_hol_apr,
SUM(CASE WHEN TO_CHAR(to_date(a.order_date, 'YYYY-MM-DD'), 'Mon') = 'May' THEN 1 else 0 end) tt_order_hol_may,
SUM(CASE WHEN TO_CHAR(to_date(a.order_date, 'YYYY-MM-DD'), 'Mon') = 'Jun' THEN 1 else 0 end) tt_order_hol_jun,
SUM(CASE WHEN TO_CHAR(to_date(a.order_date, 'YYYY-MM-DD'), 'Mon') = 'Jul' THEN 1 else 0 end) tt_order_hol_jul,
SUM(CASE WHEN TO_CHAR(to_date(a.order_date, 'YYYY-MM-DD'), 'Mon') = 'Aug' THEN 1 else 0 end) tt_order_hol_aug,
SUM(CASE WHEN TO_CHAR(to_date(a.order_date, 'YYYY-MM-DD'), 'Mon') = 'Sep' THEN 1 else 0 end) tt_order_hol_sep,
SUM(CASE WHEN TO_CHAR(to_date(a.order_date, 'YYYY-MM-DD'), 'Mon') = 'Oct' THEN 1 else 0 end) tt_order_hol_oct,
SUM(CASE WHEN TO_CHAR(to_date(a.order_date, 'YYYY-MM-DD'), 'Mon') = 'Nov' THEN 1 else 0 end) tt_order_hol_nov,
SUM(CASE WHEN TO_CHAR(to_date(a.order_date, 'YYYY-MM-DD'), 'Mon') = 'Dec' THEN 1 else 0 end) tt_order_hol_dec
FROM oluwseko3351_staging.orders a, if_common.dim_dates b
where to_date(a.order_date, 'YYYY-MM-DD') = b.calendar_dt and day_of_the_week_num between 1 and 5 and working_day is false
GROUP BY CURRENT_DATE) foo'''

servername = "d2b-internal-assessment-dwh.cxeuj0ektqdz.eu-central-1.rds.amazonaws.com"
portnumber = 5432
dbname = "d2b_assessment"
username = "oluwseko3351"
password = "QWjmGssQaY"

URL = f"jdbc:postgresql://{servername}:{portnumber}/{dbname}"
print('JDBC URL Created')

q1_read_query = '''q1_dataframe=spark.read\
.format("jdbc")\
.option("url",URL)\
.option("dbtable",Q1_query )\
.option("user", username)\
.option("password", "QWjmGssQaY")\
.option("driver", "org.postgresql.Driver")\
.load()'''

exec(q1_read_query)

load_query = '''{to_load}.write.mode("{write_mode}")\
    .format("jdbc")\
    .option("url",URL)\
    .option("dbtable", "oluwseko3351_analytics.{tab_name}")\
    .option("user", username)\
    .option("password", "QWjmGssQaY")\
    .option("driver", "org.postgresql.Driver")\
    .save()'''.format(to_load = "q1_dataframe",write_mode="overwrite", tab_name = "agg_public_holiday")

exec(load_query)

print('Question 1 Complete')

JDBC URL Created
Question 1 Complete


In [65]:
Q2_query = '''(select CURRENT_DATE ingestion_date, 
(select COUNT(*)FROM oluwseko3351_staging.orders a, oluwseko3351_staging.shipment_deliveries b
where a.order_id = b.order_id and to_date(b.shipment_date, 'YYYY-MM-DD') >= to_date(a.order_date, 'YYYY-MM-DD') +6
and delivery_date is NULL) tt_late_shipments,
(select COUNT(*) FROM oluwseko3351_staging.orders a, oluwseko3351_staging.shipment_deliveries b
where a.order_id = b.order_id and CURRENT_DATE >= to_date(a.order_date, 'YYYY-MM-DD') +15
and delivery_date is NULL and b.shipment_date is NULL) tt_undelivered_items) query'''

servername = "d2b-internal-assessment-dwh.cxeuj0ektqdz.eu-central-1.rds.amazonaws.com"
portnumber = 5432
dbname = "d2b_assessment"
username = "oluwseko3351"
password = "QWjmGssQaY"

URL = f"jdbc:postgresql://{servername}:{portnumber}/{dbname}"
print('JDBC URL Created')

q1_read_query = '''q2_dataframe=spark.read\
.format("jdbc")\
.option("url",URL)\
.option("dbtable",Q2_query )\
.option("user", username)\
.option("password", "QWjmGssQaY")\
.option("driver", "org.postgresql.Driver")\
.load()'''

exec(q1_read_query)

load_query = '''{to_load}.write.mode("{write_mode}")\
    .format("jdbc")\
    .option("url",URL)\
    .option("dbtable", "oluwseko3351_analytics.{tab_name}")\
    .option("user", username)\
    .option("password", "QWjmGssQaY")\
    .option("driver", "org.postgresql.Driver")\
    .save()'''.format(to_load = "q2_dataframe",write_mode="overwrite", tab_name = "agg_shipments")

exec(load_query)

print('Question 2 Complete')

JDBC URL Created
Question 2 Complete


In [67]:
Q3_query = '''(select CURRENT_DATE ingestion_date,
(select product_name from if_common.dim_products a where product_id =
(select product_id from (select product_id, count(*) FROM oluwseko3351_staging.reviews group by product_id order by 2 desc
LIMIT 1) foo)::INTEGER) product_name,
(select order_date from 
(select to_date(a.order_date, 'YYYY-MM-DD') order_date, count(*) count FROM oluwseko3351_staging.orders a where product_id =
(select product_id from (select product_id, count(*) FROM oluwseko3351_staging.reviews group by product_id order by 2 desc
LIMIT 1) foo) group by to_date(a.order_date, 'YYYY-MM-DD') order by 2 desc LIMIT 1)foo) most_ordered_day,
(select CASE WHEN day_of_the_week_num between 1 and 5 and working_day is false THEN true else false end as is_public_holiday 
from if_common.dim_dates where calendar_dt = (select order_date from 
(select to_date(a.order_date, 'YYYY-MM-DD') order_date, count(*) count FROM oluwseko3351_staging.orders a where product_id =
(select product_id from (select product_id, count(*) FROM oluwseko3351_staging.reviews group by product_id order by 2 desc
LIMIT 1) foo) group by to_date(a.order_date, 'YYYY-MM-DD') order by 2 desc LIMIT 1)foo)) is_public_holiday,
(select sum from (select product_id, count(*) COUNT, SUM(review::INTEGER) SUM FROM oluwseko3351_staging.reviews group by product_id order by 2 desc
LIMIT 1) foo) tt_review_points,
(select pct_one_star_review from( 
select product_id, count(*) count, 
(sum(case when review = '1' then 1 else 0 end)::decimal /count(*)::decimal )*100 pct_one_star_review
FROM oluwseko3351_staging.reviews group by product_id order by 2 desc
LIMIT 1) foo) pct_one_star_review,
(select pct_two_star_review from( 
select product_id, count(*) count, 
(sum(case when review = '2' then 1 else 0 end)::decimal /count(*)::decimal )*100 pct_two_star_review
FROM oluwseko3351_staging.reviews group by product_id order by 2 desc
LIMIT 1) foo) pct_two_star_review,
(select pct_three_star_review from( 
select product_id, count(*) count, 
(sum(case when review = '3' then 1 else 0 end)::decimal /count(*)::decimal )*100 pct_three_star_review
FROM oluwseko3351_staging.reviews group by product_id order by 2 desc
LIMIT 1) foo) pct_three_star_review,
(select pct_four_star_review from( 
select product_id, count(*) count, 
(sum(case when review = '4' then 1 else 0 end)::decimal /count(*)::decimal )*100 pct_four_star_review
FROM oluwseko3351_staging.reviews group by product_id order by 2 desc
LIMIT 1) foo) pct_four_star_review,
(select pct_five_star_review from( 
select product_id, count(*) count, 
(sum(case when review = '3' then 1 else 0 end)::decimal /count(*)::decimal )*100 pct_five_star_review
FROM oluwseko3351_staging.reviews group by product_id order by 2 desc
LIMIT 1) foo) pct_five_star_review,
(SELECT ((select COUNT(*)FROM oluwseko3351_staging.orders a, oluwseko3351_staging.shipment_deliveries b
where product_id =(select product_id from (select product_id, count(*) FROM oluwseko3351_staging.reviews group by product_id order by 2 desc
LIMIT 1) foo)  and a.order_id = b.order_id and to_date(b.shipment_date, 'YYYY-MM-DD') < to_date(a.order_date, 'YYYY-MM-DD') +6
and delivery_date is NOT NULL)::decimal/(select COUNT(*)FROM oluwseko3351_staging.orders where product_id = (select product_id from (select product_id, count(*) FROM oluwseko3351_staging.reviews group by product_id order by 2 desc
LIMIT 1) foo))::decimal) * 100) pct_early_shipments,
(SELECT ((select COUNT(*)FROM oluwseko3351_staging.orders a, oluwseko3351_staging.shipment_deliveries b
where product_id =(select product_id from (select product_id, count(*) FROM oluwseko3351_staging.reviews group by product_id order by 2 desc
LIMIT 1) foo)  and a.order_id = b.order_id and to_date(b.shipment_date, 'YYYY-MM-DD') >= to_date(a.order_date, 'YYYY-MM-DD') +6
and delivery_date is NULL)::decimal/(select COUNT(*)FROM oluwseko3351_staging.orders where product_id = (select product_id from (select product_id, count(*) FROM oluwseko3351_staging.reviews group by product_id order by 2 desc
LIMIT 1) foo))::decimal) * 100) pct_late_shipments) query'''

servername = "d2b-internal-assessment-dwh.cxeuj0ektqdz.eu-central-1.rds.amazonaws.com"
portnumber = 5432
dbname = "d2b_assessment"
username = "oluwseko3351"
password = "QWjmGssQaY"

URL = f"jdbc:postgresql://{servername}:{portnumber}/{dbname}"
print('JDBC URL Created')

q1_read_query = '''q3_dataframe=spark.read\
.format("jdbc")\
.option("url",URL)\
.option("dbtable",Q3_query )\
.option("user", username)\
.option("password", "QWjmGssQaY")\
.option("driver", "org.postgresql.Driver")\
.load()'''

exec(q1_read_query)

load_query = '''{to_load}.write.mode("{write_mode}")\
    .format("jdbc")\
    .option("url",URL)\
    .option("dbtable", "oluwseko3351_analytics.{tab_name}")\
    .option("user", username)\
    .option("password", "QWjmGssQaY")\
    .option("driver", "org.postgresql.Driver")\
    .save()'''.format(to_load = "q3_dataframe",write_mode="overwrite", tab_name = "best_performing_product")

exec(load_query)

print('Question 3 Complete')

JDBC URL Created
Question 3 Complete
