# ETL Pipeline

Import libraries

In [34]:
import boto3
from botocore import UNSIGNED
from botocore.client import Config
import psycopg2
import snowflake.connector
import pandas as pd
from psycopg2.pool import ThreadedConnectionPool
import numpy as np
from psycopg2.extensions import register_adapter, AsIs
psycopg2.extensions.register_adapter(np.int64, psycopg2._psycopg.AsIs)
from datetime import datetime, timezone
import os
import glob

from sqlalchemy import create_engine
from psycopg2.extras import execute_values
import psycopg2.pool

from io import StringIO
import csv
import configparser

# Load the configuration file

In [35]:
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))

Extract the credentials from the config file

In [36]:
HOST=config.get('DWH', 'HOST')
DWH_DB=config.get('DWH', 'DWH_DB')
DWH_DB_USER=config.get('DWH', 'DWH_DB_USER')
DWH_DB_PASSWORD=config.get('DWH', 'DWH_DB_PASSWORD')
DWH_PORT=config.get('DWH', 'DWH_PORT')
STAGING_SCHEMA=config.get('DWH', 'STAGING_SCHEMA')
ANALYTICS_SCHEMA=config.get('DWH', 'ANALYTICS_SCHEMA')
S3_BUCKET_NAME=config.get('DWH', 'S3_BUCKET_NAME')
ID=config.get('DWH', 'ID')
EXPORT=config.get('DWH', 'EXPORT')

View the credentials

In [37]:
pd.DataFrame({"Param": ['HOST', 'DWH_DB', 'DWH_DB_USER', 'DWH_DB_PASSWORD', 'DWH_PORT', 
                       'STAGING_SCHEMA', 'ANALYTICS_SCHEMA', 'S3_BUCKET_NAME', 'ID', 'EXPORT'],
              "Value": [HOST, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, 
                       STAGING_SCHEMA, ANALYTICS_SCHEMA, S3_BUCKET_NAME, ID, EXPORT]
})

Unnamed: 0,Param,Value
0,HOST,d2b-internal-assessment-dwh.cxeuj0ektqdz.eu-ce...
1,DWH_DB,d2b_assessment
2,DWH_DB_USER,idrialug9071
3,DWH_DB_PASSWORD,fBSKguCTJs
4,DWH_PORT,5432
5,STAGING_SCHEMA,idrialug9071_staging
6,ANALYTICS_SCHEMA,idrialug9071_analytics
7,S3_BUCKET_NAME,d2b-internal-assessment-bucket
8,ID,idrialug9071
9,EXPORT,analytics_export


# Connect to S3 Bucket

In [38]:
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))

In [39]:
objects = s3.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix="orders_data/")

## Fucntion to load all needed data from S3 bucket

In [40]:
def download_and_load_query_results(data):
    object_lists = []
    for obj in objects['Contents'][1:]:
        object_lists.append(obj['Key'].split('/')[1].split('.')[0])
        
    if data in object_lists:
        path = f'{data}.csv'
        name = obj['Key'].split('/')[1].split('.')[0]
    s3.download_file(
        S3_BUCKET_NAME,
        Key = f"orders_data/{path}",
        Filename  = path,
    )

    return pd.read_csv(path)

In [41]:
queries = ['orders', 'reviews', 'shipment_deliveries']
data = {}

for query in queries:
    result = download_and_load_query_results(query)
    data[query] = result

In [42]:
orders = data['orders']
orders.head()

Unnamed: 0,order_id,customer_id,order_date,product_id,unit_price,quantity,total_price
0,1,5,2022-07-13,24,139,10,1390
1,2,14,2021-04-06,2,273,4,1092
2,3,17,2022-07-29,20,253,9,2277
3,4,14,2022-08-27,8,334,1,334
4,5,25,2021-12-15,6,334,3,1002


In [43]:
reviews = data['reviews']
reviews.head()

Unnamed: 0,review,product_id
0,1,21
1,3,1
2,2,8
3,1,5
4,5,22


In [44]:
shipment_deliveries = data['shipment_deliveries']
shipment_deliveries.head()

Unnamed: 0,shipment_id,order_id,shipment_date,delivery_date
0,1,1,2022-07-14,
1,2,2,,
2,3,3,2022-07-31,2022-08-03
3,4,4,2022-09-02,2022-09-05
4,5,5,2021-12-19,2021-12-20


Transform shipment deliveries data

In [45]:
# Format the date 
shipment_deliveries['shipment_date'] = pd.to_datetime(shipment_deliveries['shipment_date'], format= '%Y-%m-%d').dt.date

shipment_deliveries['delivery_date'] = pd.to_datetime(shipment_deliveries['delivery_date'], format= '%Y-%m-%d').dt.date
shipment_deliveries = shipment_deliveries.replace({np.NaN: None})

In [46]:
shipment_deliveries.head()

Unnamed: 0,shipment_id,order_id,shipment_date,delivery_date
0,1,1,2022-07-14,
1,2,2,,
2,3,3,2022-07-31,2022-08-03
3,4,4,2022-09-02,2022-09-05
4,5,5,2021-12-19,2021-12-20


# View the Tables Schema

In [47]:
# Create schema from orders dataframe

print(''.join(pd.io.sql.get_schema(orders, 'orders')))

CREATE TABLE "orders" (
"order_id" INTEGER,
  "customer_id" INTEGER,
  "order_date" TEXT,
  "product_id" INTEGER,
  "unit_price" INTEGER,
  "quantity" INTEGER,
  "total_price" INTEGER
)


In [48]:
# Create schema from reviews dataframe

print(''.join(pd.io.sql.get_schema(reviews, 'reviews')))

CREATE TABLE "reviews" (
"review" INTEGER,
  "product_id" INTEGER
)


In [49]:
# Create schema from shipment deliveries dataframe

print(''.join(pd.io.sql.get_schema(shipment_deliveries, 'shipment_deliveries')))

CREATE TABLE "shipment_deliveries" (
"shipment_id" INTEGER,
  "order_id" INTEGER,
  "shipment_date" DATE,
  "delivery_date" DATE
)


# Connect to Datawarehouse

In [50]:
# Create a connection pool with a maximum of 10 connections
POOL = psycopg2.pool.SimpleConnectionPool(
    1, 10,
    host=HOST,
    dbname=DWH_DB,
    user=DWH_DB_USER,
    password=DWH_DB_PASSWORD,
    port=DWH_PORT,
    options=f"-c search_path=dbo,{STAGING_SCHEMA}"
)

def open_cursor():
    try:
        # Get a connection from the pool
        conn = POOL.getconn()

        # Set autocommit mode
        conn.set_session(autocommit=True)
        
        
        # Open a cursor on the connection
        cur = conn.cursor()
        
        
        # Return the cursor and connection
        return cur, conn

    except psycopg2.Error as e:
        # If an error occurs, log it and raise an exception
        print('Error: Could not get connection from the pool')
        print(e)
        raise
        

Open Connection

In [51]:
cur, conn = open_cursor()

# Load Data into Datawarehouse

### Orders Table

In [52]:
data_orders = orders.values.tolist()

try:
    cur.execute('''
                CREATE TABLE IF NOT EXISTS orders (
                order_id INTEGER NOT NULL PRIMARY KEY,
                customer_id INTEGER NOT NULL,
                order_date DATE NOT NULL,
                product_id INTEGER NOT NULL,
                unit_price INTEGER NOT NULL,
                quantity INTEGER NOT NULL,
                total_price INTEGER NOT NULL
                );''')


    execute_values(cur, '''
                INSERT INTO orders(order_id, customer_id, order_date, product_id, unit_price, quantity, total_price) 
                VALUES %s
                ON CONFLICT (order_id) DO UPDATE 
                SET 
                customer_id = excluded.customer_id,
                order_date = excluded.order_date,
                product_id = excluded.product_id,
                unit_price = excluded.unit_price,
                quantity = excluded.quantity,
                total_price = excluded.total_price
                ;''', data_orders, page_size=1000)


except psycopg2.Error as e:
    print('Error: Issue loading table')
    print(e)

View the loaded orders table - first row

In [53]:
cur.execute('''
    select * from orders
    ''')

# View Data
row = cur.fetchone()
print(row)
# conn.commit()

(65, 20, datetime.datetime(2021, 7, 28, 0, 0), 7, 753, 5, 3765)


### Shipment Deliveries Table

In [54]:
data_shipment = shipment_deliveries.values.tolist()

try:
    cur.execute('''
            CREATE TABLE IF NOT EXISTS shipment_deliveries(
            shipment_id INTEGER NOT NULL PRIMARY KEY,
            order_id INTEGER NOT NULL,
            shipment_date DATE NULL,
            delivery_date DATE NULL
            );''')

    execute_values(cur, '''
            INSERT INTO shipment_deliveries(shipment_id, order_id, shipment_date, delivery_date) 
            VALUES %s
            ON CONFLICT (shipment_id) DO UPDATE 
            SET 
            order_id = excluded.order_id,
            shipment_date = excluded.shipment_date,
            delivery_date = excluded.delivery_date;
            ''', data_shipment, page_size=1000)


except psycopg2.Error as e:
    print('Error: Issue loading table')
    print(e)

View the loaded shipments deliveries table - first row

In [55]:
cur.execute('''
    select * from shipment_deliveries
    ''')

# View Data
row = cur.fetchone()
print(row)

(2053, 2053, None, None)


### Revies Table

In [56]:
data_reviews = reviews.values.tolist()

try:
    cur.execute('''
        CREATE TABLE IF NOT EXISTS reviews(
            review INTEGER NOT NULL PRIMARY KEY,
            product_id INTEGER NOT NULL
        );
    ''')

    execute_values(cur, '''
        INSERT INTO reviews(review, product_id) 
        VALUES %s
        ''', data_reviews, page_size=1000)

except psycopg2.Error as e:
    print('Error: Issue loading table')
    print(e)

View the loaded reviews table - first row

In [57]:
cur.execute('''
    select * from reviews
    ''')

# View Data
row = cur.fetchone()
print(row)

(1, 21)


## Task 1

In [58]:
# Create agg_public_holiday table schema
try:
    cur.execute('''
    DROP TABLE IF EXISTS idrialug9071_analytics.agg_public_holiday;
    CREATE TABLE idrialug9071_analytics.agg_public_holiday(
    ingestion_date date not null primary key,
    tt_order_hol_jan integer not null,
    tt_order_hol_feb integer not null,
    tt_order_hol_mar integer not null,
    tt_order_hol_apr integer not null,
    tt_order_hol_may integer not null,
    tt_order_hol_jun integer not null,
    tt_order_hol_jul integer not null,
    tt_order_hol_aug integer not null,
    tt_order_hol_sep integer not null,
    tt_order_hol_oct integer not null,
    tt_order_hol_nov integer not null,
    tt_order_hol_dec integer not null
    );''')
    
    # Query to get the number of orders placed on public holiday every month for the past year 

    cur.execute('''
        INSERT INTO idrialug9071_analytics.agg_public_holiday (
        ingestion_date,
        tt_order_hol_jan,
        tt_order_hol_feb,
        tt_order_hol_mar,
        tt_order_hol_apr,
        tt_order_hol_may,
        tt_order_hol_jun,
        tt_order_hol_jul,
        tt_order_hol_aug,
        tt_order_hol_sep,
        tt_order_hol_oct,
        tt_order_hol_nov,
        tt_order_hol_dec
        )
        SELECT 
            now() as ingestion_date, 
            COUNT(case when EXTRACT(MONTH FROM order_date) = 1 AND day_of_the_week_num <= 5 AND working_day = False THEN order_date END) as tt_order_hol_jan,
            COUNT(case when EXTRACT(MONTH FROM order_date) = 2 AND day_of_the_week_num <= 5 AND working_day = False THEN order_date END) as tt_order_hol_feb,
            COUNT(case when EXTRACT(MONTH FROM order_date) = 3 AND day_of_the_week_num <= 5 AND working_day = False THEN order_date END) as tt_order_hol_mar,
            COUNT(case when EXTRACT(MONTH FROM order_date) = 4 AND day_of_the_week_num <= 5 AND working_day = False THEN order_date END) as tt_order_hol_apr,
            COUNT(case when EXTRACT(MONTH FROM order_date) = 5 AND day_of_the_week_num <= 5 AND working_day = False THEN order_date END) as tt_order_hol_may,
            COUNT(case when EXTRACT(MONTH FROM order_date) = 6 AND day_of_the_week_num <= 5 AND working_day = False THEN order_date END) as tt_order_hol_jun,
            COUNT(case when EXTRACT(MONTH FROM order_date) = 7 AND day_of_the_week_num <= 5 AND working_day = False THEN order_date END) as tt_order_hol_jul,
            COUNT(case when EXTRACT(MONTH FROM order_date) = 8 AND day_of_the_week_num <= 5 AND working_day = False THEN order_date END) as tt_order_hol_aug,
            COUNT(case when EXTRACT(MONTH FROM order_date) = 9 AND day_of_the_week_num <= 5 AND working_day = False THEN order_date END) as tt_order_hol_sep,
            COUNT(case when EXTRACT(MONTH FROM order_date) = 10 AND day_of_the_week_num <= 5 AND working_day = False THEN order_date END) as tt_order_hol_oct,
            COUNT(case when EXTRACT(MONTH FROM order_date) = 11 AND day_of_the_week_num <= 5 AND working_day = False THEN order_date END) as tt_order_hol_nov,
            COUNT(case when EXTRACT(MONTH FROM order_date) = 12 AND day_of_the_week_num <= 5 AND working_day = False THEN order_date END) as tt_order_hol_dec
        FROM (
            SELECT order_date, day_of_the_week_num, working_day
            FROM idrialug9071_staging.orders 
            JOIN if_common.dim_dates 
                ON orders.order_date = dim_dates.calendar_dt
            WHERE EXTRACT(YEAR FROM order_date) = 2022
        ) subquery

            ''')


    
except psycopg2.Error as e:
    print('Error: Issue Creating table')
    print(e)

In [59]:
with open('agg_public_holiday.csv', 'w') as f:
    cur.copy_expert(f"COPY {ANALYTICS_SCHEMA}.agg_public_holiday TO STDOUT WITH HEADER CSV", f)
    
s3.upload_file('agg_public_holiday.csv', S3_BUCKET_NAME, f'{EXPORT}/{ID}/agg_public_holiday.csv')

## Task 2 & 3

In [60]:
# Create agg_shipments table schema
try:
    cur.execute('''
    DROP TABLE IF EXISTS idrialug9071_analytics.agg_shipments;
    CREATE TABLE idrialug9071_analytics.agg_shipments(
    ingestion_date date not null primary key,
    tt_late_shipments integer not null,
    tt_undelivered_shipments integer not null
    );''')
    
    # Query to get the number of late and undelivered shipments 

    cur.execute('''
        INSERT INTO idrialug9071_analytics.agg_shipments (
        ingestion_date,
        tt_late_shipments,
        tt_undelivered_shipments
        )
        SELECT 
            now() as ingestion_date, 
            COUNT(case when (order_date + INTERVAL '6 day') <= shipment_date AND delivery_date IS NULL THEN 1 END) as tt_late_shipments,
            COUNT(case when delivery_date IS NULL AND shipment_date IS NULL AND '2022-09-05' > (order_date + INTERVAL '15 day') THEN 1 END) as tt_undelivered_shipments
        FROM (
            SELECT orders.order_id, order_date, shipment_date, delivery_date
            FROM idrialug9071_staging.orders 
            JOIN idrialug9071_staging.shipment_deliveries
            ON orders.order_id = shipment_deliveries.order_id
        ) subquery

            ''')

    
except psycopg2.Error as e:
    print('Error: Issue Creating table')
    print(e)

In [61]:
with open('agg_shipments.csv', 'w') as f:
    cur.copy_expert(f"COPY {ANALYTICS_SCHEMA}.agg_shipments TO STDOUT WITH HEADER CSV", f)
    
s3.upload_file('agg_shipments.csv', S3_BUCKET_NAME, f'{EXPORT}/{ID}/agg_shipments.csv')

## Task 4

In [62]:
# # Create best_performing_product table schema
try:
    cur.execute('''
    DROP TABLE IF EXISTS idrialug9071_analytics.best_performing_product;
    CREATE TABLE idrialug9071_analytics.best_performing_product(
    ingestion_date date not null primary key,
    product_name varchar not null,
    most_ordered_day date not null,
    is_public_holiday bool not null,
    tt_review_points integer not null,
    pct_one_star_review float not null,
    pct_two_star_review float not null,
    pct_three_star_review float not null,
    pct_four_star_review float not null,
    pct_five_star_review float not null,
    pct_early_shipments float not null,
    pct_late_shipments float not null
    );''')
    
#     # Query to get the best selling product

    cur.execute('''
        INSERT INTO idrialug9071_analytics.best_performing_product (
        ingestion_date,
        product_name,
        most_ordered_day,
        is_public_holiday,
        tt_review_points,
        pct_one_star_review,
        pct_two_star_review,
        pct_three_star_review,
        pct_four_star_review,
        pct_five_star_review,
        pct_early_shipments,
        pct_late_shipments
        )
         WITH 
              reviews_cte AS (
                SELECT 
                  product_id, 
                  COUNT(review) AS total_reviews, 
                  AVG(review) AS avg_review_points, 
                  (SUM(CASE WHEN review = 1 THEN 1 ELSE 0 END) / COUNT(review))  AS pct_one_star_review, 
                  (SUM(CASE WHEN review = 2 THEN 1 ELSE 0 END) / COUNT(review))  AS pct_two_star_review, 
                  (SUM(CASE WHEN review = 3 THEN 1 ELSE 0 END) / COUNT(review))  AS pct_three_star_review, 
                  (SUM(CASE WHEN review = 4 THEN 1 ELSE 0 END) / COUNT(review))  AS pct_four_star_review, 
                  (SUM(CASE WHEN review = 5 THEN 1 ELSE 0 END) / COUNT(review))  AS pct_five_star_review
                FROM 
                  idrialug9071_staging.reviews
                GROUP BY 
                  product_id
                ORDER BY avg_review_points DESC
              ),

                orders_cte as(
                SELECT 
                        product_id, 
                        order_date,
                        day_of_the_week_num, 
                        working_day,
                        RANK() OVER (PARTITION BY product_id ORDER BY COUNT(*) DESC) as rank
                    FROM orders
                    JOIN if_common.dim_dates ON orders.order_date = dim_dates.calendar_dt
                    GROUP BY product_id, order_date, day_of_the_week_num, working_day
                    ),


              shipments_cte AS (
                SELECT 
                  orders.product_id, 
                  COUNT(case when (order_date + INTERVAL '6 day') >= shipment_date AND delivery_date IS NULL THEN 1 END) as tt_early_shipments,
                  COUNT(case when (order_date + INTERVAL '6 day') <= shipment_date AND delivery_date IS NULL THEN 1 END) as tt_late_shipments
                FROM idrialug9071_staging.orders 
                    JOIN idrialug9071_staging.shipment_deliveries
                    ON orders.order_id = shipment_deliveries.order_id
                GROUP BY 
                  orders.product_id
              )

            SELECT 
              now(),
              dim_products.product_name, 
              orders_cte.order_date as most_ordered_day,
              CASE 
                WHEN day_of_the_week_num <= 5 AND working_day = False THEN TRUE ELSE FALSE 
                END AS is_public_holiday,
              reviews_cte.avg_review_points,
              pct_one_star_review,
              pct_two_star_review,
              pct_three_star_review,
              pct_four_star_review,
              pct_five_star_review,
              (shipments_cte.tt_early_shipments / (shipments_cte.tt_late_shipments + shipments_cte.tt_early_shipments)) AS pct_early_shipments,
              (shipments_cte.tt_late_shipments / (shipments_cte.tt_late_shipments + shipments_cte.tt_early_shipments)) AS pct_late_shipments 
            FROM 
              if_common.dim_products 
            JOIN 
              reviews_cte ON dim_products .product_id = reviews_cte.product_id 
            JOIN 
              orders_cte ON dim_products.product_id = orders_cte.product_id 
            JOIN 
              shipments_cte ON dim_products.product_id = shipments_cte.product_id 
            WHERE 
              reviews_cte.avg_review_points = (
                SELECT 
                  MAX(avg_review_points) 
                FROM 
                  reviews_cte) AND orders_cte.rank = 1

            ORDER BY orders_cte.rank
            LIMIT 1

            ''')

    
except psycopg2.Error as e:
    print('Error: Issue Creating table')
    print(e)

In [63]:
with open('best_performing_product.csv', 'w') as f:
    cur.copy_expert(f"COPY {ANALYTICS_SCHEMA}.best_performing_product TO STDOUT WITH HEADER CSV", f)
    
s3.upload_file('best_performing_product.csv', S3_BUCKET_NAME, f'{EXPORT}/{ID}/best_performing_product.csv')

In [64]:
conn.close()