In [1]:
import os
from time import time
import configparser
import json
import zipfile

import pandas as pd

import boto3
from botocore.exceptions import ClientError

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

import psycopg2


In [2]:
config = configparser.ConfigParser()
config.read_file(open('./credentials/dwh.cfg'))

KEY = config.get('AWS', 'KEY_ID')
SECRET = config.get('AWS', 'SECRET_KEY')

DWH_DB=config.get('CLUSTER', 'DWH_DB')
DWH_DB_USER=config.get('CLUSTER', 'DWH_DB_USER')
DWH_DB_PASSWORD=config.get('CLUSTER', 'DWH_DB_PASSWORD')
DWH_PORT=config.get('CLUSTER', 'DWH_PORT')
DWH_ENDPOINT=config.get('DB', 'HOST')
DWH_ROLE_ARN=config.get('IAM_ROLE', 'ARN')


In [3]:
engine = create_engine('postgresql+psycopg2://{}:{}@{}:{}/{}'.format(DWH_DB_USER, DWH_DB_PASSWORD, 
                                                                     DWH_ENDPOINT, DWH_PORT, DWH_DB))
session = sessionmaker(bind=engine)()

***SQL QUERIES***

In [4]:
# Create Schema
create_schema="""
CREATE EXTERNAL SCHEMA PROD
FROM DATA CATALOG DATABASE '{}'
iam_role '{}'
CREATE EXTERNAL DATABASE IF NOT EXISTS;
""".format(DWH_DB, DWH_ROLE_ARN)

# Drop tables
drop_transactions= 'DROP TABLE prod.transactions'
drop_campaign_desc='DROP TABLE prod.campaign_desc'
drop_campaign_table='DROP TABLE prod.campaign_table'
drop_hh_demographic='DROP TABLE prod.hh_demographic'
drop_product='DROP TABLE prod.product'

# Create tables
create_transactions= """
CREATE EXTERNAL TABLE prod.transactions (
    "household_key" character varying(45),
    "basket_id" character varying(45),
    "day" int,
    "product_id" character varying(45),
    "quantity" int,
    "sales_value" double precision,
    "store_id" character varying(45),
    "retail_disc" double precision,
    "trans_time" int,
    "week_no" int,
    "coupon_disc" double precision,
    "coupon_match_disc" double precision,
    "campaign" character varying(45),
    "basket_with_disc" boolean,
    "basket_with_camp" boolean)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE
LOCATION 's3://dunnhumby-1/prod/transactions/' 
TABLE PROPERTIES ('skip.header.line.count'='1');
"""

create_campaign_desc="""
CREATE EXTERNAL TABLE prod.campaign_desc(
    campaign character varying(45), 
    description character varying(45), 
    start_day int, 
    end_day int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE
LOCATION 's3://dunnhumby-1/prod/campaign_desc/campaign_desc.csv' 
TABLE PROPERTIES ('skip.header.line.count'='1');
"""

create_campaign_table="""
CREATE EXTERNAL TABLE prod.campaign_table(
    household_key character varying(45), 
    campaign character varying(45), 
    description character varying(45)
    )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE
LOCATION 's3://dunnhumby-1/prod/campaign_table/campaign_table.csv' 
TABLE PROPERTIES ('skip.header.line.count'='1');
"""

create_hh_demographic="""
CREATE EXTERNAL TABLE prod.hh_demographic(
    age_desc character varying(45), 
    marital_status_code character varying(45), 
    income_desc character varying(45), 
    homeowner_desc character varying(45), 
    hh_comp_desc character varying(45), 
    household_size_desc character varying(45), 
    kid_category_desc character varying(45), 
    household_key character varying(45)
    )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE
LOCATION 's3://dunnhumby-1/prod/hh_demographic/hh_demographic.csv' 
TABLE PROPERTIES ('skip.header.line.count'='1');
"""

create_product="""
CREATE EXTERNAL TABLE prod.product(
    product_id character varying(45), 
    manufacturer character varying(45), 
    department character varying(45), 
    brand character varying(45), 
    commodity_desc character varying(45), 
    sub_commodity_desc character varying(45), 
    curr_size_of_product character varying(45)
    )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE
LOCATION 's3://dunnhumby-1/prod/product/product.csv' 
TABLE PROPERTIES ('skip.header.line.count'='1');
"""


***CREATE SCHEMA***

In [5]:
# Create schema
session.connection().connection.set_isolation_level(0)
session.execute(create_schema)
session.connection().connection.set_isolation_level(1)

***CREATE TABLES***

In [6]:
create_queries = [create_transactions, create_campaign_desc, create_campaign_table, 
                  create_hh_demographic, create_product]

for i in create_queries:
    session.connection().connection.set_isolation_level(0)
    session.execute(i)
    session.connection().connection.set_isolation_level(1)

***campaign_desc table***

In [7]:
camp_desc_cols = ['description', 'campaign', 'start_day', 'end_day']
session.connection().connection.set_isolation_level(0)
camp_desc_results = session.execute('SELECT * FROM prod.{} LIMIT 10'.format('campaign_desc')).fetchall()
session.connection().connection.set_isolation_level(1)

campaign_desc = pd.DataFrame(camp_desc_results, columns=camp_desc_cols)
campaign_desc

Unnamed: 0,description,campaign,start_day,end_day
0,TypeC,15,547,708
1,TypeB,25,659,691
2,TypeC,20,615,685
3,TypeB,23,646,684
4,TypeB,21,624,656
5,TypeB,22,624,656
6,TypeA,18,587,642
7,TypeB,19,603,635
8,TypeB,17,575,607
9,TypeC,14,531,596


***campaign_table table***

In [8]:
# Create dataframes
camp_table_cols = ['description', 'household_key', 'campaign']
session.connection().connection.set_isolation_level(0)
camp_table_results = session.execute('SELECT * FROM prod.{} LIMIT 10'.format('campaign_table')).fetchall()
session.connection().connection.set_isolation_level(1)

campaign_table = pd.DataFrame(camp_table_results, columns=camp_table_cols)
campaign_table

Unnamed: 0,description,household_key,campaign
0,TypeB,895,29
1,TypeB,877,29
2,TypeB,876,29
3,TypeB,817,29
4,TypeB,771,29
5,TypeB,766,29
6,TypeB,601,29
7,TypeB,575,29
8,TypeB,569,29
9,TypeB,404,29


***transaction_data table***

In [9]:
start_day = 500
end_day = 504

trns_cols = ['household_key', 'basket_id', 'day', 'product_id', 'quantity', 'sales_value', 'store_id', 
             'retail_disc', 'trans_time', 'week_no', 'coupon_disc', 'coupon_match_disc', 'campaign', 
             'basket_with_discount', 'basket_with_camp']

trns_results = session.execute("SELECT * FROM prod.{} \
                               WHERE DAY >={} AND DAY <={}".format('transactions', str(start_day), 
                                                                   str(end_day))).fetchall()
transaction_data = pd.DataFrame(trns_results, columns=trns_cols)
transaction_data

Unnamed: 0,household_key,basket_id,day,product_id,quantity,sales_value,store_id,retail_disc,trans_time,week_no,coupon_disc,coupon_match_disc,campaign,basket_with_discount,basket_with_camp
0,2050,34577523043,500,824176,2,6.68,432,0.00,26,72,0.00,0.00,"""""",False,False
1,2050,34577523043,500,990797,1,2.00,432,-0.68,26,72,0.00,0.00,"""""",False,False
2,232,34577534849,500,995242,1,1.00,361,-0.85,110,72,0.00,0.00,"""""",False,False
3,1598,34577537604,500,908531,1,1.75,311,-0.10,3,72,0.00,0.00,"""""",False,False
4,1598,34577537604,500,1089066,1,2.25,311,0.00,3,72,0.00,0.00,"""""",False,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
22693,1989,1036470,501,34642840189,1,1.75,343,-0.88,710,72,-0.75,-0.25,"""""",True,False
22694,2387,1058543,501,34642771278,1,1.50,320,-1.99,1505,72,-0.50,-0.50,"""""",True,False
22695,1687,1077579,501,34642596340,2,2.93,439,0.00,1249,72,-1.75,-0.25,"""""",True,False
22696,88,1110224,503,34749212286,1,1.89,404,-1.60,1848,73,-0.50,-0.50,"""""",True,False


***hh_demographic table***

In [10]:
# Create dataframes
hh_cols = ['age_desc', 'marital_status_code', 'income_desc', 'homeowner_desc', 'hh_comp_desc', 
           'household_size_desc', 'kid_category_desc', 'household_key']
session.connection().connection.set_isolation_level(0)
hh_results = session.execute('SELECT * FROM prod.{} LIMIT 10'.format('hh_demographic')).fetchall()
session.connection().connection.set_isolation_level(1)

hh_demographic = pd.DataFrame(hh_results, columns=hh_cols)
hh_demographic

Unnamed: 0,age_desc,marital_status_code,income_desc,homeowner_desc,hh_comp_desc,household_size_desc,kid_category_desc,household_key
0,19-24,A,15-24K,Homeowner,2 Adults Kids,3.0,1,1219
1,45-54,U,15-24K,Homeowner,Single Female,1.0,None/Unknown,1222
2,25-34,B,35-49K,Homeowner,1 Adult Kids,3.0,2,1226
3,45-54,U,100-124K,Unknown,Single Female,1.0,None/Unknown,1228
4,55-64,A,150-174K,Homeowner,2 Adults No Kids,2.0,None/Unknown,1229
5,45-54,B,35-49K,Homeowner,2 Adults No Kids,2.0,None/Unknown,1234
6,35-44,U,50-74K,Unknown,2 Adults Kids,3.0,1,1236
7,45-54,U,Under 15K,Homeowner,2 Adults No Kids,2.0,None/Unknown,1240
8,35-44,A,35-49K,Homeowner,2 Adults No Kids,2.0,None/Unknown,1247
9,45-54,U,50-74K,Unknown,1 Adult Kids,2.0,1,1248


***product table***

In [11]:
# Create dataframes
product_cols = ['product_id', 'manufacturer', 'department', 'brand', 'commodity_desc', 'sub_commodity_desc', 
                'curr_size_of_product']
session.connection().connection.set_isolation_level(0)
product_results = session.execute('SELECT * FROM prod.{} LIMIT 10'.format('product')).fetchall()
session.connection().connection.set_isolation_level(1)

product = pd.DataFrame(product_results, columns=product_cols)
product

Unnamed: 0,product_id,manufacturer,department,brand,commodity_desc,sub_commodity_desc,curr_size_of_product
0,7409487,1136,DRUG GM,National,SHAVING CARE PRODUCTS,SHAVE CREAMS AND POWDERS,"""\\\""\\\"""""
1,7409488,1136,DRUG GM,National,SHAVING CARE PRODUCTS,SHAVE CREAMS AND POWDERS,"""\\\""\\\"""""
2,7409495,1922,DRUG GM,National,TOYS AND GAMES,GAMES,"""\\\""\\\"""""
3,7409501,165,GROCERY,National,FRZN VEGETABLE/VEG DSH,FRZN BAGGED VEGETABLES - PLAIN,16 OZ
4,7409506,1136,DRUG GM,National,SHAVING CARE PRODUCTS,SHAVE CREAMS AND POWDERS,"""\\\""\\\"""""
5,7409508,1136,DRUG GM,National,SHAVING CARE PRODUCTS,SHAVE CREAMS AND POWDERS,7 OZ
6,7409511,2211,COSMETICS,National,MAKEUP AND TREATMENT,WET N WILD,"""\\\""\\\"""""
7,7409519,6029,GROCERY,National,TEAS,TEA BAGS HERBAL & FLAVORED,20 CT
8,7409520,3213,FLORAL,National,FLORAL-FRESH CUT,CALLA LILY,10 STEM
9,7409521,6029,GROCERY,National,TEAS,TEA BAGS HERBAL & FLAVORED,20 CT
