# Create and load staging tables



In [1]:
import csv

import json

import math
import numpy as np
import pandas as pd

import psycopg2

In [2]:
#
# function to run a select query and return rows in a pandas dataframe
# pandas puts all numeric values from postgres to float
# if it will fit in an integer, change it to integer
#

def my_select_query_pandas(query, rollback_before_flag, rollback_after_flag):
    "function to run a select query and return rows in a pandas dataframe"
    
    if rollback_before_flag:
        connection.rollback()
    
    df = pd.read_sql_query(query, connection)
    
    if rollback_after_flag:
        connection.rollback()
    
    # fix the float columns that really should be integers
    
    for column in df:
    
        if df[column].dtype == "float64":

            fraction_flag = False

            for value in df[column].values:
                
                if not np.isnan(value):
                    if value - math.floor(value) != 0:
                        fraction_flag = True

            if not fraction_flag:
                df[column] = df[column].astype('Int64')
    
    return(df)
    

In [3]:
connection = psycopg2.connect(
    user = "postgres",
    host = "postgres",
    port = "5432",
    database = "postgres"
)

In [4]:
cursor = connection.cursor()

# Drop the staging table stage_1_peak_sales if it exists


In [5]:
connection.rollback()

query = """

drop table if exists stage_1_peak_sales

"""

cursor.execute(query)

connection.commit()



# Drop the staging table stage_1_peak_stores if it exists


In [6]:
connection.rollback()

query = """

drop table if exists stage_1_peak_stores

"""

cursor.execute(query)

connection.commit()



# Drop the staging table stage_1_peak_customers if it exists
 

In [7]:
connection.rollback()

query = """

drop table if exists stage_1_peak_customers

"""

cursor.execute(query)

connection.commit()



# Drop the staging table stage_1_peak_line_items if it exists


In [8]:
connection.rollback()

query = """

drop table if exists stage_1_peak_line_items

"""

cursor.execute(query)

connection.commit()



# Create the staging table stage_1_peak_sales

The first column should be stage_id with the data type as serial.


In [9]:
connection.rollback()

query = """

create table stage_1_peak_sales (
    stage_id serial,
    sale_id varchar(100),
    sale_date varchar(100),
    sub_total varchar(100),
    tax varchar(100),
    total_amount varchar(100)
)

"""

cursor.execute(query)

connection.commit()



# Create the staging table stage_1_peak_stores

The first column should be stage_id with data type as serial.


In [10]:
connection.rollback()

query = """

create table stage_1_peak_stores (
stage_id serial,
sale_id varchar(100),
location_id varchar(100),
name varchar(100),
street varchar(100),
city varchar(100),
state varchar(100),
zip varchar(100)

)


"""

cursor.execute(query)

connection.commit()



# Create the staging table stage_1_peak_customers


The first column should be stage_id with data type as serial.



In [11]:
connection.rollback()

query = """

create table stage_1_peak_customers (
stage_id serial,
sale_id varchar(100),
customer_id varchar(100),
first_name varchar(100),
last_name varchar(100),
street varchar(100),
city varchar(100),
state varchar(100),
zip varchar(100)

)

"""

cursor.execute(query)

connection.commit()



# Create the staging table stage_1_peak_line_items


The first column should be stage_id with data type as serial.


In [12]:
connection.rollback()

query = """

create table stage_1_peak_line_items (
stage_id serial,
sale_id varchar(100),
line_item_id varchar(100),
product_id varchar(100),
price varchar(100),
quantity varchar(100),
taxable varchar(100)

)

"""

cursor.execute(query)

connection.commit()



# Load the CSV file peak_sales.csv into the table stage_1_peak_sales

The stage_id column is a serial and will auto populate, so we need to provide a column list and leave it out of the column list for the load.


In [13]:
connection.rollback()

query = """

copy stage_1_peak_sales (sale_id, sale_date, sub_total, tax, total_amount)
from '/user/projects/project-2-aryanafar/peak_sales.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

# Load the CSV file peak_stores.csv into the table stage_1_peak_stores

The stage_id column is a serial and will auto populate, so we need to provide a column list and leave it out of the column list for the load. 

In [14]:
connection.rollback()

query = """

copy stage_1_peak_stores (sale_id, location_id, name, street, city, state, zip)
from '/user/projects/project-2-aryanafar/peak_stores.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

# Load the CSV file peak_customers.csv into the table stage_1_peak_customers

The stage_id column is a serial and will auto populate, so we need to provide a column list and leave it out of the column list for the load.

In [15]:
connection.rollback()

query = """

copy stage_1_peak_customers (sale_id, customer_id, first_name, last_name, street, city, state, zip)
from '/user/projects/project-2-aryanafar/peak_customers.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

# Load the CSV file peak_line_items.csv into the table stage_1_peak_line_items

The stage_id column is a serial and will auto populate, so we need to provide a column list and leave it out of the column list for the load.

In [16]:
connection.rollback()

query = """

copy stage_1_peak_line_items (sale_id, line_item_id, product_id, price, quantity, taxable)
from '/user/projects/project-2-aryanafar/peak_line_items.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

# Verify the load of stage_1_peak_sales by querying the entire table


In [17]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select *
from stage_1_peak_sales
order by stage_id

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,sale_date,sub_total,tax,total_amount
0,1,5763728874,2020-10-03,12,0,12
1,2,5763729036,2020-10-03,72,0,72
2,3,5763728904,2020-10-03,24,0,24
3,4,5763728973,2020-10-03,96,0,96
4,5,5763728757,2020-10-03,108,0,108
...,...,...,...,...,...,...
92,93,5763728927,2020-10-03,72,0,72
93,94,5763729096,2020-10-03,48,0,48
94,95,5763729268,2020-10-03,84,0,84
95,96,5763729237,2020-10-03,60,0,60


# Verify the load of stage_1_peak_stores by querying the entire table



In [18]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select *
from stage_1_peak_stores
order by stage_id

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,location_id,name,street,city,state,zip
0,1,5763728874,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
1,2,5763729036,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
2,3,5763728904,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
3,4,5763728973,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
4,5,5763728757,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
...,...,...,...,...,...,...,...,...
92,93,5763728927,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
93,94,5763729096,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
94,95,5763729268,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
95,96,5763729237,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705


# Verify the load of stage_1_peak_customers by querying the entire table


In [19]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select *
from stage_1_peak_customers
order by stage_id

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,customer_id,first_name,last_name,street,city,state,zip
0,1,5763728874,3728404,Darrelle,Dohrmann,46 Farwell Terrace,Oakland,CA,94609
1,2,5763729036,3729309,Moria,Greenwood,8803 Delaware Crossing,Berkeley,CA,94705
2,3,5763728904,3728508,Josiah,Hulett,6755 Melby Plaza,Oakland,CA,94612
3,4,5763728973,3728534,Gayle,MacGarrity,286 Onsgard Center,Berkeley,CA,94703
4,5,5763728757,3729188,Courtenay,Shirrell,75 West Park,Emeryville,CA,94608
...,...,...,...,...,...,...,...,...,...
92,93,5763728927,3728568,Valina,Spring,119 Sachtjen Junction,Berkeley,CA,94702
93,94,5763729096,3728990,Claire,Mebes,358 Oxford Road,Albany,CA,94706
94,95,5763729268,3728901,Freddy,Mumford,6 Transport Lane,Orinda,CA,94563
95,96,5763729237,3729019,Arv,Doret,2120 Mesta Circle,Emeryville,CA,94608


# Verify the load of stage_1_peak_line_items by querying the entire table


In [20]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select *
from stage_1_peak_line_items
order by stage_id

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,line_item_id,product_id,price,quantity,taxable
0,1,5763728874,1,42314680,12,1,N
1,2,5763729036,1,42314677,12,1,N
2,3,5763729036,2,42314682,12,3,N
3,4,5763729036,3,42314684,12,2,N
4,5,5763728904,1,42314680,12,1,N
...,...,...,...,...,...,...,...
347,348,5763729237,2,42314678,12,2,N
348,349,5763729237,3,42314682,12,2,N
349,350,5763728673,1,42314677,12,2,N
350,351,5763728673,2,42314678,12,1,N
