In [1]:
import pandas as pd
import boto3 
from botocore import UNSIGNED 
from botocore.client import Config

In [2]:
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) 
bucket_name = "d2b-internal-assessment-bucket" 
response = s3.list_objects(Bucket=bucket_name, Prefix="orders_data") 



In [3]:
for item in response.get('Contents'):
    print(item.get('Key'))

orders_data/
orders_data/orders.csv
orders_data/reviews.csv
orders_data/shipment_deliveries.csv


In [4]:
def download_files(response, filenames):
    for filename in filenames:
        try:
            s3.download_file(bucket_name, f"orders_data/{filename}", f"{filename}")
            print(f"{filename} downloaded!")
        except:
            print(f"Problem downloading {filename}")
    print("files download completed!")

In [5]:
filenames = ['orders.csv', 'reviews.csv', 'shipment_deliveries.csv']

download_files(response, filenames)

orders.csv downloaded!
reviews.csv downloaded!
shipment_deliveries.csv downloaded!
files download completed!


In [6]:
from psycopg2 import connect

In [7]:
# Create connection
def connection(params: dict):
    try:
        return connect(**params)
    except Exception as e:
        print(e)


In [13]:
# load credentials to environment variables
from dotenv import load_dotenv
from os import environ

if not load_dotenv(override=True):
    print("No evironment variables set")
    exit()


params = {
    "host": environ['DB_HOST'],
    "database": environ['DB_NAME'],
    "user": environ['DB_USER'],
    "password": environ['DB_PASSWD'],
    "port": environ['DB_PORT'],
}

In [9]:
print(params)

{'host': 'd2b-internal-assessment-dwh.cxeuj0ektqdz.eu-central-1.rds.amazonaws.com', 'database': 'd2b_assessment', 'user': 'okondivi4898', 'password': 'cwlo4unWbV', 'port': '5432'}


In [21]:
conn = connection(params)


In [11]:
from typing import Dict


def load_csv_todb(conn, schema_name, csvs: Dict):
    cursor = conn.cursor()
    cursor.execute(f"SET search_path TO {schema_name}")

    for item in csvs.items():
        try:
            table_name = item[0]
            filepath = item[1][0]
            columns_info = ",".join(item[1][-1])
            query = f'''
                DROP TABLE IF EXISTS {table_name};
                CREATE TABLE {table_name} ({columns_info});
            '''
            cursor.execute(query)

            with open(filepath) as f:
                data = f.read()
                data = data.strip('\n').split('\n')[1:]

            column_names = [line.split(' ')[0] for line in item[1][-1]]
            subs = ['%s' for _ in column_names]
            columns = ', '.join(column_names)
            subs = ', '.join(subs)
            for row in data:
                row = row.strip().split(',')
                row = tuple([_  if _ else None for _ in row ])
                print(row)
                query2 = f'''INSERT INTO {table_name} ({columns}) VALUES({subs});'''
                cursor.execute(query2, row)
            conn.commit()
            
        except Exception as e:
            raise Exception("".format(e))
    
    
    cursor.close()
    print("done")



In [None]:
csvs = {
    "orders":  [
        r'C:\Users\owner\Downloads\DE1\orders.csv',
        [
            'order_id INT NOT NULL',
            'customer_id INT NOT NULL',
            'order_date DATE NOT NULL',
            'product_id INT NOT NULL',
            'unit_price NUMERIC NOT NULL',
            'quantity NUMERIC NOT NULL',
            'total_price NUMERIC NOT NULL'
        ]
    ],

    'reviews': [
        r'C:\Users\owner\Downloads\DE1\reviews.csv',
        [
            'review INT NOT NULL',
            'product_id INT NOT NULL'
        ]
    ],

    'shipment_deliveries': [
        r'C:\Users\owner\Downloads\DE1\shipment_deliveries.csv',
        [
            'shipment_id INT NOT NULL',
            'order_id INT NOT NULL',
            'shipment_date DATE',
            'delivery_date DATE'
        ]
    ]
}

staging_schema = environ['STAGING_SCHEMA']

load_csv_todb(conn, staging_schema, csvs)

In [54]:
'''
1. The total number of orders placed on a public holiday every month, for the past year. 

'''

if conn:
    conn.close()

conn = connection(params)


analytics_schama = environ['ANALYTICS_SCHEMA']

query = f'''
DROP TABLE IF EXISTS {analytics_schama}.agg_public_holiday;
CREATE TABLE {analytics_schama}.agg_public_holiday AS
SELECT
    date_trunc('month', o.order_date) AS ingestion_date,
    COUNT(o.order_id) AS num_orders
FROM {staging_schema}.orders o
JOIN if_common.dim_dates d ON o.order_date::date = d.calendar_dt
WHERE
    d.day_of_the_week_num BETWEEN 1 AND 5
    AND d.working_day = false
    AND DATE_PART('year', o.order_date) = DATE_PART('year', CURRENT_DATE) - 1
GROUP BY ingestion_date
ORDER BY num_orders DESC;
'''

query1 = f'''
DROP TABLE IF EXISTS {analytics_schama}.agg_public_holiday;
CREATE TABLE {analytics_schama}.agg_public_holiday AS
SELECT
  date_trunc('day', o.order_date) AS ingestion_date,
  count(CASE WHEN d.day_of_the_week_num BETWEEN 1 AND 5 AND NOT d.working_day THEN 1 ELSE NULL END) AS tt_order_hol_jan,
  count(CASE WHEN d.day_of_the_week_num BETWEEN 1 AND 5 AND NOT d.working_day THEN 1 ELSE NULL END) AS tt_order_hol_feb,
  count(CASE WHEN d.day_of_the_week_num BETWEEN 1 AND 5 AND NOT d.working_day THEN 1 ELSE NULL END) AS tt_order_hol_mar,
  count(CASE WHEN d.day_of_the_week_num BETWEEN 1 AND 5 AND NOT d.working_day THEN 1 ELSE NULL END) AS tt_order_hol_apr,
  count(CASE WHEN d.day_of_the_week_num BETWEEN 1 AND 5 AND NOT d.working_day THEN 1 ELSE NULL END) AS tt_order_hol_may,
  count(CASE WHEN d.day_of_the_week_num BETWEEN 1 AND 5 AND NOT d.working_day THEN 1 ELSE NULL END) AS tt_order_hol_jun,
  count(CASE WHEN d.day_of_the_week_num BETWEEN 1 AND 5 AND NOT d.working_day THEN 1 ELSE NULL END) AS tt_order_hol_jul,
  count(CASE WHEN d.day_of_the_week_num BETWEEN 1 AND 5 AND NOT d.working_day THEN 1 ELSE NULL END) AS tt_order_hol_aug,
  count(CASE WHEN d.day_of_the_week_num BETWEEN 1 AND 5 AND NOT d.working_day THEN 1 ELSE NULL END) AS tt_order_hol_sep,
  count(CASE WHEN d.day_of_the_week_num BETWEEN 1 AND 5 AND NOT d.working_day THEN 1 ELSE NULL END) AS tt_order_hol_oct,
  count(CASE WHEN d.day_of_the_week_num BETWEEN 1 AND 5 AND NOT d.working_day THEN 1 ELSE NULL END) AS tt_order_hol_nov,
  count(CASE WHEN d.day_of_the_week_num BETWEEN 1 AND 5 AND NOT d.working_day THEN 1 ELSE NULL END) AS tt_order_hol_dec
FROM {staging_schema}.orders o
JOIN if_common.dim_dates d ON o.order_date = d.calendar_dt
WHERE o.order_date >= NOW() - INTERVAL '1 YEAR'
GROUP BY 1;
'''

query3 = f'''
DROP TABLE IF EXISTS {analytics_schama}.agg_public_holiday;
CREATE TABLE {analytics_schama}.agg_public_holiday (
    ingestion_date DATE NOT NULL DEFAULT CURRENT_DATE,
    tt_order_hol_jan INTEGER,
    tt_order_hol_feb INTEGER,
    tt_order_hol_mar INTEGER,
    tt_order_hol_apr INTEGER,
    tt_order_hol_may INTEGER,
    tt_order_hol_jun INTEGER,
    tt_order_hol_jul INTEGER,
    tt_order_hol_aug INTEGER,
    tt_order_hol_sep INTEGER,
    tt_order_hol_oct INTEGER,
    tt_order_hol_nov INTEGER,
    tt_order_hol_dec INTEGER
);

INSERT INTO {analytics_schama}.agg_public_holiday (
    tt_order_hol_jan, tt_order_hol_feb, tt_order_hol_mar, tt_order_hol_apr, tt_order_hol_may, tt_order_hol_jun, tt_order_hol_jul, tt_order_hol_aug, tt_order_hol_sep, tt_order_hol_oct, tt_order_hol_nov, tt_order_hol_dec
) (
    SELECT
        count(*) FILTER (WHERE EXTRACT(MONTH FROM o.order_date) = 1) AS tt_order_hol_jan,
        count(*) FILTER (WHERE EXTRACT(MONTH FROM o.order_date) = 2) AS tt_order_hol_feb,
        count(*) FILTER (WHERE EXTRACT(MONTH FROM o.order_date) = 3) AS tt_order_hol_mar,
        count(*) FILTER (WHERE EXTRACT(MONTH FROM o.order_date) = 4) AS tt_order_hol_apr,
        count(*) FILTER (WHERE EXTRACT(MONTH FROM o.order_date) = 5) AS tt_order_hol_may,
        count(*) FILTER (WHERE EXTRACT(MONTH FROM o.order_date) = 6) AS tt_order_hol_jun,
        count(*) FILTER (WHERE EXTRACT(MONTH FROM o.order_date) = 7) AS tt_order_hol_jul,
        count(*) FILTER (WHERE EXTRACT(MONTH FROM o.order_date) = 8) AS tt_order_hol_aug,
        count(*) FILTER (WHERE EXTRACT(MONTH FROM o.order_date) = 9) AS tt_order_hol_sep,
        count(*) FILTER (WHERE EXTRACT(MONTH FROM o.order_date) = 10) AS tt_order_hol_oct,
        count(*) FILTER (WHERE EXTRACT(MONTH FROM o.order_date) = 11) AS tt_order_hol_nov,
        count(*) FILTER (WHERE EXTRACT(MONTH FROM o.order_date) = 12) AS tt_order_hol_dec
    FROM {staging_schema}.orders o
    JOIN if_common.dim_dates d ON o.order_date = d.calendar_dt
    WHERE d.day_of_the_week_num BETWEEN 1 AND 5 AND NOT d.working_day
);

'''
cursor = conn.cursor()
cursor.execute(query3)
conn.commit()
cursor.close()

query2 = f"SELECT * FROM {analytics_schama}.agg_public_holiday;"

df = pd.read_sql(query2, conn)

df



Unnamed: 0,ingestion_date,tt_order_hol_jan,tt_order_hol_feb,tt_order_hol_mar,tt_order_hol_apr,tt_order_hol_may,tt_order_hol_jun,tt_order_hol_jul,tt_order_hol_aug,tt_order_hol_sep,tt_order_hol_oct,tt_order_hol_nov,tt_order_hol_dec
0,2023-02-22,20,17,29,32,0,0,13,35,0,16,12,0


In [59]:
'''
2. Total number of late shipments 

'''

if conn:
    conn.close()

conn = connection(params)

query1 = f'''
DROP TABLE IF EXISTS {analytics_schama}.agg_shipments;
CREATE TABLE {analytics_schama}.agg_shipments (
    ingestion_date DATE NOT NULL DEFAULT CURRENT_DATE,
    tt_late_shipments INTEGER,
    tt_undelivered_shipments INTEGER
);

INSERT INTO {analytics_schama}.agg_shipments (
    SELECT 
        COUNT(*) FILTER (WHERE shipment_date >= order_date + 6 AND delivery_date IS NULL) AS tt_late_shipments,
        COUNT(*) FILTER (WHERE shipment_date IS NULL AND delivery_date IS NULL AND current_date >= order_date + 15) AS tt_undelivered_shipments
    FROM 
        {staging_schema}.shipment_deliveries s
        JOIN {staging_schema}.orders o ON s.order_id = o.order_id
);

'''

query3 = f'''
DROP TABLE IF EXISTS {analytics_schama}.agg_shipments;
CREATE TABLE {analytics_schama}.agg_shipments AS
SELECT
    current_date AS ingestion_date,
    COUNT(CASE WHEN s.shipment_date >= o.order_date + INTERVAL '6 days' AND s.delivery_date IS NULL THEN 1 END) AS tt_late_shipments,
    COUNT(CASE WHEN s.delivery_date IS NULL AND s.shipment_date IS NULL AND '2022-09-05' >= o.order_date + INTERVAL '15 days' THEN 1 END) AS tt_undelivered_shipments
FROM {staging_schema}.shipment_deliveries s
JOIN {staging_schema}.orders o ON s.order_id = o.order_id;
'''

cursor = conn.cursor()
cursor.execute(query3)
conn.commit()
cursor.close()

query2 = f"SELECT * FROM {analytics_schama}.agg_shipments;"

df2 = pd.read_sql(query2, conn)

df2




Unnamed: 0,ingestion_date,tt_late_shipments,tt_undelivered_shipments
0,2023-02-23,175,6586


In [70]:
'''
3. The product with the highest reviews, the day it was ordered the most, either
that day was a public holiday, total review points,  percentage distribution of the review points,
and percentage distribution of early shipments to late shipments for that particular  product. 
'''

if conn:
    conn.close()

conn = connection(params)

query1 = f'''
DROP TABLE IF EXISTS {analytics_schama}.product_stats;
CREATE TABLE {analytics_schama}.product_stats (
  product_name VARCHAR(255),
  most_ordered_on_public_holiday DATE,
  total_review_points INTEGER,
  num_reviews INTEGER,
  percent_of_reviews NUMERIC,
  num_undelivered_shipments INTEGER,
  num_late_shipments INTEGER,
  num_early_shipments INTEGER,
  percent_of_early_shipments NUMERIC
);

INSERT INTO {analytics_schama}.product_stats (
    product_name, 
    most_ordered_on_public_holiday, 
    total_review_points, 
    num_reviews, 
    percent_of_reviews, 
    num_undelivered_shipments, 
    num_late_shipments, 
    num_early_shipments, 
    percent_of_early_shipments
)
SELECT 
  p.product_name, 
  (
    SELECT 
      c.calendar_dt
    FROM 
      if_common.dim_dates c
      JOIN {staging_schema}.orders o ON o.order_date = c.calendar_dt
    WHERE 
      o.product_id = p.product_id 
      AND c.working_day = false 
      AND c.day_of_the_week_num BETWEEN 1 AND 5 
    GROUP BY 
      c.calendar_dt
    ORDER BY 
      COUNT(*) DESC 
    LIMIT 1
  ) AS most_ordered_on_public_holiday,
  sum(r.review) AS total_review_points,
  count(r.review) AS num_reviews,
  count(r.review) * 100.0 / sum(count(*)) OVER () AS percent_of_reviews,
  count(*) FILTER (WHERE s.shipment_date IS NULL AND o.order_date + INTERVAL '15 days' <= current_date) AS num_undelivered_shipments,
  count(*) FILTER (WHERE s.shipment_date IS NOT NULL AND s.shipment_date >= o.order_date + INTERVAL '6 days') AS num_late_shipments,
  count(*) FILTER (WHERE s.shipment_date IS NOT NULL AND s.shipment_date < o.order_date + INTERVAL '6 days') AS num_early_shipments,
  count(*) FILTER (WHERE s.shipment_date IS NOT NULL AND s.shipment_date < o.order_date + INTERVAL '6 days') * 100.0 / count(*) OVER () AS percent_of_early_shipments
FROM 
  if_common.dim_products p 
  LEFT JOIN {staging_schema}.orders o ON p.product_id = o.product_id 
  LEFT JOIN {staging_schema}.shipment_deliveries s ON o.order_id = s.order_id 
  LEFT JOIN {staging_schema}.reviews r ON p.product_id = r.product_id 
GROUP BY 
  p.product_id, p.product_name;


'''

query3 = f'''
DROP TABLE IF EXISTS {analytics_schama}.agg_shipments;
CREATE TABLE {analytics_schama}.agg_shipments AS
SELECT
    current_date AS ingestion_date,
    COUNT(CASE WHEN s.shipment_date >= o.order_date + INTERVAL '6 days' AND s.delivery_date IS NULL THEN 1 END) AS tt_late_shipments,
    COUNT(CASE WHEN s.delivery_date IS NULL AND s.shipment_date IS NULL AND '2022-09-05' >= o.order_date + INTERVAL '15 days' THEN 1 END) AS tt_undelivered_shipments
FROM {staging_schema}.shipment_deliveries s
JOIN {staging_schema}.orders o ON s.order_id = o.order_id;
'''

cursor = conn.cursor()
cursor.execute(query1)
conn.commit()
cursor.close()

query2 = f"SELECT * FROM {analytics_schama}.product_stats;"

df3 = pd.read_sql(query2, conn)

df3




Unnamed: 0,product_name,most_ordered_on_public_holiday,total_review_points,num_reviews,percent_of_reviews,num_undelivered_shipments,num_late_shipments,num_early_shipments,percent_of_early_shipments
0,Jeans,2021-11-11,354521,115924,4.004466,78840,6132,30952,123808.0
1,Scooter,2021-04-01,429348,137196,4.739283,95172,5562,36462,145848.0
2,Socks,2022-08-30,353616,113632,3.925291,72092,6968,34572,138288.0
3,Screwdriver,2022-04-01,358140,120396,4.158946,80264,7268,32864,131456.0
4,Nails,2021-01-01,374625,125145,4.322995,84666,7416,33063,132252.0
5,Saucepan,2021-11-11,367316,119899,4.141778,80132,3887,35880,143520.0
6,Motorcycle,2021-01-01,329120,108086,3.733711,71094,7803,29189,116756.0
7,Shoes,2021-01-01,350336,117300,4.051998,84300,6900,26100,104400.0
8,Non-stick pan,2022-02-14,363690,120690,4.169102,78076,7450,35164,140656.0
9,Smartphone,2021-10-01,304580,103596,3.578609,68619,5073,29904,119616.0
