In [68]:
#import data
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

In [69]:
#connect to sql database
host_ip = "127.0.0.1"
user_id = "root"
pwd = "Surf4life!"

src_dbname = "classicmodels"
dst_dbname = "classicmodels"

In [80]:
#Connect To MongoDB

host_name = "localhost"
ports = {"mongo" : 27017, "mysql" : 3306}


atlas_cluster_name = "Cluster0"
atlas_user_name = "AustinC"
atlas_password = "DS2002"

conn_str = {"local" : f"mongodb://{host_name}:{ports['mongo']}/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}


In [81]:
#create functions
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def get_mongo_dataframe_local(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    

In [82]:
#Get JSON files
client = pymongo.MongoClient(conn_str["local"])
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"dim_date" : 'project1_datedim.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()  

In [73]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f99f83d58b0>

In [75]:
#Create Dimension Table 1

sql_customers = "SELECT * FROM classicmodels.customers;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

Unnamed: 0,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370.0,21000.0
1,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166.0,71800.0


In [96]:
#Create Dimension Table 2
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'project1_employees.csv')

df_products = pd.read_csv(data_file)
df_products.head(2)



Unnamed: 0,employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
0,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
1,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales


In [83]:
#Get Date Dimension From MongoDB JSON
query = {}
collection = "dim_date"

df_dimdate = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_dimdate.head(2)

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [84]:
#Transform Dimension Table 1 Customers
drop_cols = ['creditLimit','phone']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.rename(columns={"customerNumber":"customer_key"}, inplace=True)

df_customers.head(2)

Unnamed: 0,customer_key,customerName,contactLastName,contactFirstName,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber
0,103,Atelier graphique,Schmitt,Carine,"54, rue Royale",,Nantes,,44000,France,1370.0
1,112,Signal Gift Stores,King,Jean,8489 Strong St.,,Las Vegas,NV,83030,USA,1166.0


In [98]:
#Transform Dimension Table 2 Employees
df_employees.rename(columns={"employeeNumber":"employee_key"}, inplace=True)

df_employees.head(2)

Unnamed: 0,employee_key,lastName,firstName,extension,email,officeCode,jobTitle
0,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,President
1,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,VP Sales


In [100]:
#Load Transformed Dimension tables into Data Warehouse
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_employees', df_employees, 'employee_key'),
          ('dim_date', df_dimdate, 'date_key')]

In [101]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [103]:
#Get Data For Fact Table
sql_orders = "SELECT * FROM classicmodels.orders;"
df_orders = get_dataframe(user_id, pwd, host_name, src_dbname, sql_orders)
df_orders.rename(columns={"orderNumber":"order_id"}, inplace=True)
df_orders.head(2)

Unnamed: 0,order_id,orderDate,requiredDate,shippedDate,status,comments,customerNumber
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
1,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128


In [104]:
sql_order_details = "SELECT * FROM classicmodels.orderdetails;"
df_order_details = get_dataframe(user_id, pwd, host_name, src_dbname, sql_order_details)
df_order_details.rename(columns={"orderNumber":"order_id"}, inplace=True)
df_order_details.head(2)

Unnamed: 0,order_id,productCode,quantityOrdered,priceEach,orderLineNumber
0,10100,S18_1749,30,136.0,3
1,10100,S18_2248,50,55.09,2


In [105]:
#Create Fact Table
df_fact_orders = pd.merge(df_orders, df_order_details, on='order_id', how='inner')
df_fact_orders.head(2)

Unnamed: 0,order_id,orderDate,requiredDate,shippedDate,status,comments,customerNumber,productCode,quantityOrdered,priceEach,orderLineNumber
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363,S18_1749,30,136.0,3
1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363,S18_2248,50,55.09,2


In [106]:
#Perform additional transformations

# Reorder the columns

ordered_columns = ['order_id','orderDate','quantityOrdered','priceEach','orderLineNumber','status','shippedDate','requiredDate','customerNumber','productCode']
df_fact_orders = df_fact_orders[ordered_columns]

# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_fact_orders.insert(0, "order_key", range(1, df_fact_orders.shape[0]+1))
df_fact_orders.head(2)

Unnamed: 0,order_key,order_id,orderDate,quantityOrdered,priceEach,orderLineNumber,status,shippedDate,requiredDate,customerNumber,productCode
0,1,10100,2003-01-06,30,136.0,3,Shipped,2003-01-10,2003-01-13,363,S18_1749
1,2,10100,2003-01-06,50,55.09,2,Shipped,2003-01-10,2003-01-13,363,S18_2248


In [107]:
#Write Dataframe back to database
table_name = "fact_orders"
primary_key = "order_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, table_name, primary_key, db_operation)

In [116]:
sql_test = """
    SELECT `customers`.`contactLastName` AS `customer_name`,
        SUM(orders.`quantityOrdered`) AS `total_quantity`,
        SUM(orders.`priceEach`) AS `total_unit_price`
    FROM `{0}`.`fact_orders` AS orders
    INNER JOIN `{0}`.`dim_customers` AS customers
    ON orders.customerNumber = customers.customer_key
    GROUP BY customers.`contactLastName`
    ORDER BY total_unit_price DESC;
""".format(dst_dbname)


df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)

In [117]:
df_test.head()

Unnamed: 0,customer_name,total_quantity,total_unit_price
0,Freyre,9327.0,22680.0
1,Nelson,7161.0,18866.93
2,Young,4185.0,10845.51
3,Frick,3372.0,7884.86
4,Brown,3372.0,7795.13
