# Gourmet Meals Business -- SQL Project (Part 2.3 - Staging Tables)

Author: **Ethan Moody**

Date: **October 2022**

### Business Case

Assume you are a data engineer working closely with the data science team at Agile Gourmet Meals (AGM).

AGM executives are considering adding a delivery option, with the hopes of increasing sales, growing the customer base, and increasing profitability.   

Management decided to do a proof of concept (POC) in the form of a three month trial run using one delivery service at the Berkeley store. They have called upon the data science team to help with this effort. In turn, the data science team has asked for your help in the data engineering aspects of the POC.

Management chose Peak Deliveries primarily because it's a newer operation with a model that takes a percentage cut of the product pricing instead of charging customers a delivery fee. Peak's cut is 18%. So, for each $12 meal, that equates to approximately $2.16. Customers may tip the delivery driver if they wish. AGM is not given any visibility into customer tips. (Peak is protecting its data on good tippers.) Peak has an outstanding reputation for great, fast, and efficient deliveries, with excellent customer service. Peak will only deliver to zip codes within a 5 mile radius of the store.

Integration with any third party sales channel always comes with its challenges. For large companies, like McDonalds, the delivery companies are willing to integrate and modify their computer systems as needed to get the contract. For small companies, like AGM, one of your only options is to use Peak's API to send and receive data. However, that would require you to write a lot of code, which management does not want to spend money on until the POC has proven successful. As an alternative, Peak can provide you with a JSON file at the end of each day with detailed sales information for that day. Management has decided to go with the daily JSON option for now for the POC. 

For products, AGM will enter products into Peak's system. Peak will assign an ID in their system to the product. You will need to create a mapping table to map Peak's IDs to AGM's IDs. In AGM's case, all products cost $12 and are tax exempt. AGM will mark them as exempt from sales tax.

Regarding the customer list, AGM does not want to give out their full customer list to third parties.  Customers will have to sign up with Peak, either using the website, the app, or by telephone.  AGM executives anticipate and understand that the trade off to not giving them the customer list is that you will probably have to validate and/or cleanse the customer data. Peak will assign their customer ID to each customer.

In this POC, you will focus on only 1 store: the Berkeley store. Peak will create a pickup location for the store and assign their own location ID to it. Even though all data will have the same store for now, you still want to receive it and process it so you can help leadership plan for possible future expansion to other stores and/or pickup locations.

Assume today is October 4, 2020. The first day of sales was October 3, 2020. The JSON file came in very early this morning. As a data engineer, you need to get started with parsing, staging, validating, etc. the file as soon as possible.  

The executives are anxious to understand how good the data is, if you will be able to continue withholding the customer data from Peak, and to get some preliminary analytics. Even though it's just one day's worth of data, the executives want as much information as soon as they can get it (which is very typical).

The data science team has met with you, and together you came up with a plan to get the data loaded and validated, explore the customer data, and perform some preliminary analytics. The data science team has been requested to give the executives an assessment of the customer data and whether or not they should continue to withhold customer data from Peak. Since you are going to be the first one to have an extensive look at the data, the data science team wants and values your opinion on the customer data.

# Included Modules and Packages

In [1]:
import csv
import json
import math
import numpy as np
import pandas as pd
import psycopg2

# Additional Setup Code

In [2]:
# Function to run a select query and return rows in a pandas dataframe
# Note: pandas formats all numeric values from postgres as float

def my_select_query_pandas(query, rollback_before_flag, rollback_after_flag):
    "Function to run a select query and return rows in a pandas dataframe"
    
    if rollback_before_flag:
        connection.rollback()
    
    df = pd.read_sql_query(query, connection)
    
    if rollback_after_flag:
        connection.rollback()
    
    # Fix any float columns that really should be integers
    
    for column in df:
    
        if df[column].dtype == "float64":

            fraction_flag = False

            for value in df[column].values:
                
                if not np.isnan(value):
                    if value - math.floor(value) != 0:
                        fraction_flag = True

            if not fraction_flag:
                df[column] = df[column].astype('Int64')
    
    return(df)

In [3]:
# Set up connection to postgres
# Note: All connection inputs below have been removed for protection
connection = psycopg2.connect(
    user = "",
    password = "",
    host = "",
    port = "",
    database = ""
)

In [4]:
cursor = connection.cursor()

# 2.3.1 Drop the staging table stage_1_peak_sales if it exists

In [5]:
# Query drops the staging table stage_1_peak_sales if it already exists

connection.rollback()

query = """

drop table if exists stage_1_peak_sales;

"""

cursor.execute(query)

connection.commit()

# 2.3.2 Drop the staging table stage_1_peak_stores if it exists

In [6]:
# Query drops the staging table stage_1_peak_stores if it already exists

connection.rollback()

query = """

drop table if exists stage_1_peak_stores;

"""

cursor.execute(query)

connection.commit()

# 2.3.3 Drop the staging table stage_1_peak_customers if it exists 

In [7]:
# Query drops the staging table stage_1_peak_customers if it already exists

connection.rollback()

query = """

drop table if exists stage_1_peak_customers;

"""

cursor.execute(query)

connection.commit()

# 2.3.4 Drop the staging table stage_1_peak_line_items if it exists

In [8]:
# Query drops the staging table stage_1_peak_line_items if it already exists

connection.rollback()

query = """

drop table if exists stage_1_peak_line_items;

"""

cursor.execute(query)

connection.commit()

# 2.3.5 Create the staging table stage_1_peak_sales

In [9]:
# Query creates the staging table stage_1_peak_sales

connection.rollback()

query = """

create table stage_1_peak_sales (
  stage_id serial
, sale_id varchar(100)
, sale_date varchar(100)
, sub_total varchar(100)
, tax varchar(100)
, total_amount varchar(100)
)

;

"""

cursor.execute(query)

connection.commit()

# 2.3.6 Create the staging table stage_1_peak_stores

In [10]:
# Query creates the staging table stage_1_peak_stores

connection.rollback()

query = """

create table stage_1_peak_stores (
  stage_id serial
, sale_id varchar(100)
, location_id varchar(100)
, name varchar(100)
, street varchar(100)
, city varchar(100)
, state varchar(100)
, zip varchar(100)
)

;

"""

cursor.execute(query)

connection.commit()

# 2.3.7 Create the staging table stage_1_peak_customers

In [11]:
# Query creates the staging table stage_1_peak_customers

connection.rollback()

query = """

create table stage_1_peak_customers (
  stage_id serial
, sale_id varchar(100)
, customer_id varchar(100)
, first_name varchar(100)
, last_name varchar(100)
, street varchar(100)
, city varchar(100)
, state varchar(100)
, zip varchar(100)
)

;

"""

cursor.execute(query)

connection.commit()

# 2.3.8 Create the staging table stage_1_peak_line_items

In [12]:
# Query creates the staging table stage_1_peak_line_items

connection.rollback()

query = """

create table stage_1_peak_line_items (
  stage_id serial
, sale_id varchar(100)
, line_item_id varchar(100)
, product_id varchar(100)
, price varchar(100)
, quantity varchar(100)
, taxable varchar(100)
)

;

"""

cursor.execute(query)

connection.commit()

# 2.3.9 Load the CSV file peak_sales.csv into the table stage_1_peak_sales

In [13]:
# Query loads the .csv file peak_sales.csv into the staging table stage_1_peak_sales

connection.rollback()

query = """

copy stage_1_peak_sales (
  sale_id
, sale_date
, sub_total
, tax
, total_amount
)

from '/user/projects/peak_sales.csv' delimiter ',' NULL '' csv header

;

"""

cursor.execute(query)

connection.commit()

# 2.3.10 Load the CSV file peak_stores.csv into the table stage_1_peak_stores

In [14]:
# Query loads the .csv file peak_stores.csv into the staging table stage_1_peak_stores

connection.rollback()

query = """

copy stage_1_peak_stores (
  sale_id
, location_id
, name
, street
, city
, state
, zip
)

from '/user/projects/peak_stores.csv' delimiter ',' NULL '' csv header

;

"""

cursor.execute(query)

connection.commit()

# 2.3.11 Load the CSV file peak_customers.csv into the table stage_1_peak_customers

In [15]:
# Query loads the .csv file peak_customers.csv into the staging table stage_1_peak_customers

connection.rollback()

query = """

copy stage_1_peak_customers (
  sale_id
, customer_id
, first_name
, last_name
, street
, city
, state
, zip
)

from '/user/projects/peak_customers.csv' delimiter ',' NULL '' csv header

;

"""

cursor.execute(query)

connection.commit()

# 2.3.12 Load the CSV file peak_line_items.csv into the table stage_1_peak_line_items

In [16]:
# Query loads the .csv file peak_line_items.csv into the staging table stage_1_peak_line_items

connection.rollback()

query = """

copy stage_1_peak_line_items (
  sale_id
, line_item_id
, product_id
, price
, quantity
, taxable
)

from '/user/projects/peak_line_items.csv' delimiter ',' NULL '' csv header

;

"""

cursor.execute(query)

connection.commit()

# 2.3.13 Verify the load of stage_1_peak_sales by querying the entire table

In [17]:
# Query verifies the load of the staging table stage_1_peak_sales

rollback_before_flag = True
rollback_after_flag = True

query = """

select * 

from stage_1_peak_sales

order by stage_id

;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,sale_date,sub_total,tax,total_amount
0,1,5763728874,2020-10-03,12,0,12
1,2,5763729036,2020-10-03,72,0,72
2,3,5763728904,2020-10-03,24,0,24
3,4,5763728973,2020-10-03,96,0,96
4,5,5763728757,2020-10-03,108,0,108
...,...,...,...,...,...,...
92,93,5763728927,2020-10-03,72,0,72
93,94,5763729096,2020-10-03,48,0,48
94,95,5763729268,2020-10-03,84,0,84
95,96,5763729237,2020-10-03,60,0,60


# 2.3.14 Verify the load of stage_1_peak_stores by querying the entire table

In [18]:
# Query verifies the load of the staging table stage_1_peak_stores

rollback_before_flag = True
rollback_after_flag = True

query = """

select * 

from stage_1_peak_stores

order by stage_id

;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,location_id,name,street,city,state,zip
0,1,5763728874,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
1,2,5763729036,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
2,3,5763728904,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
3,4,5763728973,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
4,5,5763728757,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
...,...,...,...,...,...,...,...,...
92,93,5763728927,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
93,94,5763729096,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
94,95,5763729268,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
95,96,5763729237,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705


# 2.3.15 Verify the load of stage_1_peak_customers by querying the entire table

In [19]:
# Query verifies the load of the staging table stage_1_peak_customers

rollback_before_flag = True
rollback_after_flag = True

query = """

select * 

from stage_1_peak_customers

order by stage_id

;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,customer_id,first_name,last_name,street,city,state,zip
0,1,5763728874,3728404,Darrelle,Dohrmann,46 Farwell Terrace,Oakland,CA,94609
1,2,5763729036,3729309,Moria,Greenwood,8803 Delaware Crossing,Berkeley,CA,94705
2,3,5763728904,3728508,Josiah,Hulett,6755 Melby Plaza,Oakland,CA,94612
3,4,5763728973,3728534,Gayle,MacGarrity,286 Onsgard Center,Berkeley,CA,94703
4,5,5763728757,3729188,Courtenay,Shirrell,75 West Park,Emeryville,CA,94608
...,...,...,...,...,...,...,...,...,...
92,93,5763728927,3728568,Valina,Spring,119 Sachtjen Junction,Berkeley,CA,94702
93,94,5763729096,3728990,Claire,Mebes,358 Oxford Road,Albany,CA,94706
94,95,5763729268,3728901,Freddy,Mumford,6 Transport Lane,Orinda,CA,94563
95,96,5763729237,3729019,Arv,Doret,2120 Mesta Circle,Emeryville,CA,94608


# 2.3.16 Verify the load of stage_1_peak_line_items by querying the entire table

In [20]:
# Query verifies the load of the staging table stage_1_peak_line_items

rollback_before_flag = True
rollback_after_flag = True

query = """

select * 

from stage_1_peak_line_items

order by stage_id

;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,line_item_id,product_id,price,quantity,taxable
0,1,5763728874,1,42314780,12,1,N
1,2,5763729036,1,42314677,12,1,N
2,3,5763729036,2,42314782,12,3,N
3,4,5763729036,3,42314784,12,2,N
4,5,5763728904,1,42314780,12,1,N
...,...,...,...,...,...,...,...
347,348,5763729237,2,42314678,12,2,N
348,349,5763729237,3,42314782,12,2,N
349,350,5763728673,1,42314677,12,2,N
350,351,5763728673,2,42314678,12,1,N
