## DS2002 Project 1, Ben Harris - NRA2JE


#### Import Necessary Libraries

In [1]:
import os
import json
import csv
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

#### Connections for SQL and MondoDB Atlas

In [2]:
mysql_args = {
    "uid" : "root",
    "pwd" : "Passw0rd123",
    "hostname" : "localhost",
    "dbname" : "datamart"
}

mongodb_args = {
    "user_name" : "nra2je",
    "password" : "Passw0rd123",
    "cluster_name" : "sandbox",
    "cluster_subnet" : "4mvrkqg",
    "cluster_location" : "atlas",
    "db_name" : "Harris_Project_1"
}

#### Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(client, db_name, data_directory, csv_files):
    db = client[db_name]
    
    for collection_name, csv_file in csv_files.items():
        db[collection_name].drop()
        csv_path = os.path.join(data_directory, csv_file)
        df = pd.read_csv(csv_path)
        records = df.to_dict(orient='records')
        db[collection_name].insert_many(records)
    
    client.close()

#### Populate MobgoDB with data

In [4]:
client = get_mongo_client(**mongodb_args)


data_dir = os.path.join(os.getcwd(), 'Data')

csv_files = {"sales" : 'sales_data.csv',
             "stores" : 'stores_data.csv',
             "inspectors" : 'inspector_data.csv'
            }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, csv_files)         

#### Extract MongoDB Collections into Pandas DataFrames

In [5]:
client = get_mongo_client(**mongodb_args)

query = {}
collection = "stores"

df_stores = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_stores.head(2)

Unnamed: 0,Store,Type,Size,State,City,Address
0,1,A,151315,Illinois,Bourbonnais,"1601 N State Route 50 Bourbonnais, IL 60914 US"
1,2,A,202307,Illinois,Chicago,"41 W. 87th Street Chicago, IL 60620 US"


In [6]:
client = get_mongo_client(**mongodb_args)

query = {}
collection = "inspectors"

df_inspectors = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_inspectors.head(2)

Unnamed: 0,Inspector_ID,First,Last,Email
0,1,Luke,Gentry,lukegentry@retailinc.com
1,2,Pam,Brown,pambrown@retailinc.com


In [7]:
client = get_mongo_client(**mongodb_args)

query = {}
collection = "sales"

df_fact_sales = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_fact_sales.head(2)

Unnamed: 0,Store,Dept,Inspector,Date,Weekly_Sales,IsHoliday
0,1,1,2,5/2/2010,24924.5,False
1,1,1,2,12/2/2010,46039.49,True


#### Get data from Dim Date Table

In [8]:
sql_dim_date = "SELECT date_key, full_date FROM datamart.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [9]:
df_dim_sales_date = df_dim_date.rename(columns={"date_key" : "sale_date_key", "full_date" : "Date"})
df_fact_sales.Date = df_fact_sales.Date.astype('datetime64[ns]').dt.date
df_fact_sales = pd.merge(df_fact_sales, df_dim_sales_date, on='Date', how='left')
df_fact_sales.drop(['Date'], axis=1, inplace=True)
df_fact_sales.head(2)

  df_fact_sales.Date = df_fact_sales.Date.astype('datetime64[ns]').dt.date


Unnamed: 0,Store,Dept,Inspector,Weekly_Sales,IsHoliday,sale_date_key
0,1,1,2,24924.5,False,20100502.0
1,1,1,2,46039.49,True,20101202.0


#### Transform DataFrames

In [10]:
df_stores.rename(columns={"Store":"store_id"}, inplace=True)
df_stores.insert(0, "store_key", range(1, df_stores.shape[0]+1))
df_stores.head(2)

Unnamed: 0,store_key,store_id,Type,Size,State,City,Address
0,1,1,A,151315,Illinois,Bourbonnais,"1601 N State Route 50 Bourbonnais, IL 60914 US"
1,2,2,A,202307,Illinois,Chicago,"41 W. 87th Street Chicago, IL 60620 US"


In [11]:
df_inspectors.rename(columns={"Inspector_ID":"inspector_id"}, inplace=True)
df_inspectors.insert(0, "inspector_key", range(1, df_inspectors.shape[0]+1))
df_inspectors.head(2)

Unnamed: 0,inspector_key,inspector_id,First,Last,Email
0,1,1,Luke,Gentry,lukegentry@retailinc.com
1,2,2,Pam,Brown,pambrown@retailinc.com


In [12]:
df_fact_sales.insert(0, "sale_key", range(1, df_fact_sales.shape[0]+1))
df_fact_sales.rename(columns={"Inspector":"inspector_id","Store":"store_id"}, inplace=True)
df_fact_sales.head(2)

Unnamed: 0,sale_key,store_id,Dept,inspector_id,Weekly_Sales,IsHoliday,sale_date_key
0,1,1,1,2,24924.5,False,20100502.0
1,2,1,1,2,46039.49,True,20101202.0


#### Load DataFrames into SQL

In [13]:
dataframe = df_fact_sales
table_name = 'fact_sales'
primary_key = 'sale_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [14]:
dataframe = df_stores
table_name = 'dim_stores'
primary_key = 'store_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [15]:
dataframe = df_inspectors
table_name = 'dim_inspectors'
primary_key = 'inspector_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### Validate added to SQL

In [16]:
sql_stores = "SELECT * FROM datamart.dim_stores;"
df_dim_stores = get_sql_dataframe(sql_stores, **mysql_args)
df_dim_stores.head(2)

Unnamed: 0,store_key,store_id,Type,Size,State,City,Address
0,1,1,A,151315,Illinois,Bourbonnais,"1601 N State Route 50 Bourbonnais, IL 60914 US"
1,2,2,A,202307,Illinois,Chicago,"41 W. 87th Street Chicago, IL 60620 US"


In [17]:
sql_inspectors = "SELECT * FROM datamart.dim_inspectors;"
df_dim_inspectors = get_sql_dataframe(sql_inspectors, **mysql_args)
df_dim_inspectors.head(2)

Unnamed: 0,inspector_key,inspector_id,First,Last,Email
0,1,1,Luke,Gentry,lukegentry@retailinc.com
1,2,2,Pam,Brown,pambrown@retailinc.com


In [18]:
sql_fact_sales = "SELECT * FROM datamart.fact_sales;"
df_fact_sales = get_sql_dataframe(sql_fact_sales, **mysql_args)
df_fact_sales.head(2)

Unnamed: 0,sale_key,store_id,Dept,inspector_id,Weekly_Sales,IsHoliday,sale_date_key
0,1,1,1,2,24924.5,0,20100502.0
1,2,1,1,2,46039.49,1,20101202.0


#### Get Primary Keys from Dimension Tables and Fact Table

In [19]:
sql_dim_stores = "SELECT store_key, store_id FROM datamart.dim_stores;"
df_dim_stores = get_sql_dataframe(sql_dim_stores, **mysql_args)
df_dim_stores.head(2)

Unnamed: 0,store_key,store_id
0,1,1
1,2,2


In [20]:
sql_dim_inspectors = "SELECT inspector_key, inspector_id FROM datamart.dim_inspectors;"
df_dim_inspectors = get_sql_dataframe(sql_dim_inspectors, **mysql_args)
df_dim_inspectors.head(2)

Unnamed: 0,inspector_key,inspector_id
0,1,1
1,2,2


In [21]:
df_fact_sales = pd.merge(df_fact_sales, df_dim_stores, on='store_id', how='inner')
df_fact_sales.drop(['store_id'], axis=1, inplace=True)
df_fact_sales.head(2)

Unnamed: 0,sale_key,Dept,inspector_id,Weekly_Sales,IsHoliday,sale_date_key,store_key
0,1,1,2,24924.5,0,20100502.0,1
1,2,1,2,46039.49,1,20101202.0,1


In [22]:
df_fact_sales = pd.merge(df_fact_sales, df_dim_inspectors, on='inspector_id', how='inner')
df_fact_sales.drop(['inspector_id'], axis=1, inplace=True)
df_fact_sales.head(2)

Unnamed: 0,sale_key,Dept,Weekly_Sales,IsHoliday,sale_date_key,store_key,inspector_key
0,1,1,24924.5,0,20100502.0,1,2
1,2,1,46039.49,1,20101202.0,1,2


#### Load MongoDB data into SQL database

In [23]:
dataframe = df_fact_sales
table_name = 'fact_sales'
primary_key = 'sale_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### Validate

In [24]:
sql_sales_validation = """
SELECT *
FROM datamart.fact_sales
"""
df_sales_validation = get_sql_dataframe(sql_sales_validation, **mysql_args)
df_sales_validation.head(2)

Unnamed: 0,sale_key,Dept,Weekly_Sales,IsHoliday,sale_date_key,store_key,inspector_key
0,1,1,24924.5,0,20100502.0,1,2
1,2,1,46039.49,1,20101202.0,1,2


### Demonstrate Proper Functionality
#### Displays location, average sales, and total sales for each store

In [25]:
sql_sales = """
SELECT 
    st.store_id AS Store_Number,
    st.Address AS Store_Location,
    CONCAT('$', FORMAT(AVG(s.Weekly_Sales), 0)) AS Average_Weekly_Sales,
    CONCAT('$', FORMAT(SUM(s.Weekly_Sales), 0)) AS Total_Sales
FROM 
    datamart.fact_sales s
JOIN 
    datamart.dim_stores st ON s.store_key = st.store_key
GROUP BY 
    st.store_id,
    st.Address;
"""

In [26]:
df_fact_sales = get_sql_dataframe(sql_sales, **mysql_args)
df_fact_sales

Unnamed: 0,Store_Number,Store_Location,Average_Weekly_Sales,Total_Sales
0,1,"1601 N State Route 50 Bourbonnais, IL 60914 US","$21,711","$222,402,809"
1,2,"41 W. 87th Street Chicago, IL 60620 US","$26,898","$275,382,441"
2,3,"601 King St Alexandria, VA 22314 US","$6,373","$57,586,735"
3,4,"235 S State St Ann Arbor, MI 48104 US","$29,161","$299,543,953"
4,5,"1081 Pine Plaza Dr Apex, NC 27523 US","$5,053","$45,475,689"
5,6,"48557 Morongo Trail Cabazon, CA 92230 US","$21,913","$223,756,131"
6,7,"5 Bel Air S Pkwy Bel Air, MD 21014 US","$8,359","$81,598,275"
7,8,"305 S 6th St Boise, ID 83702 US","$13,133","$129,951,181"
8,9,"1132 S Clinton St Clinton & Grenshaw Chicago, ...","$8,773","$77,789,219"
9,10,"4400 Sharon Rd Space G41 Charlotte, NC 28211 US","$26,332","$271,617,714"


#### Displays total inspections for each inspector

In [27]:
sql_inspectors = """
SELECT
    i.Last, COUNT(*) AS Total_Inspections
FROM
    datamart.fact_sales s
JOIN
    datamart.dim_inspectors i ON s.inspector_key = i.inspector_key
GROUP BY
    i.Last;
"""

In [28]:
df_inspectors = get_sql_dataframe(sql_inspectors, **mysql_args)
df_inspectors

Unnamed: 0,Last,Total_Inspections
0,Brown,140319
1,Gentry,140604
2,Anderson,140647
