### Import Necessary Libraries

In [179]:
import os
import json 
import numpy 
import datetime
import numpy
import pandas as pd
import pymongo 
from sqlalchemy import create_engine

# Manipulation of SQL Data

#### Setting up connection variables 

In [180]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "!Brapneet72"

src_dbname = "classicmodels"
dst_dbname = "classic_dw"

#### Define Functions for Getting Data From and Setting Data Into Databases

In [197]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Create the Data Warehouse and Switch the Connection Context

In [182]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x24239443820>

#### Extraction of Source Data from Employees and Offices Tables

In [183]:
sql_employees = "SELECT * FROM classicmodels.employees;"
df_employees = get_dataframe(user_id, pwd, host_name, src_dbname, sql_employees)
df_employees.head(5)

Unnamed: 0,employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
0,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
1,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales
2,1076,Firrelli,Jeff,x9273,jfirrelli@classicmodelcars.com,1,1002.0,VP Marketing
3,1088,Patterson,William,x4871,wpatterson@classicmodelcars.com,6,1056.0,Sales Manager (APAC)
4,1102,Bondur,Gerard,x5408,gbondur@classicmodelcars.com,4,1056.0,Sale Manager (EMEA)


In [184]:
sql_offices = "SELECT * FROM classicmodels.offices;"
df_offices = get_dataframe(user_id, pwd, host_name, src_dbname, sql_offices)
df_offices.head(5)

Unnamed: 0,officeCode,city,phone,addressLine1,addressLine2,state,country,postalCode,territory
0,1,San Francisco,+1 650 219 4782,100 Market Street,Suite 300,CA,USA,94080,
1,2,Boston,+1 215 837 0825,1550 Court Place,Suite 102,MA,USA,02107,
2,3,NYC,+1 212 555 3000,523 East 53rd Street,apt. 5A,NY,USA,10022,
3,4,Paris,+33 14 723 4404,43 Rue Jouffroy D'abbans,,,France,75017,EMEA
4,5,Tokyo,+81 33 224 5000,4-1 Kioicho,,Chiyoda-Ku,Japan,102-8578,Japan


#### Cleaning and Merging of Employees and Offices Tables

In [185]:
drop_cols = ['extension', 'email']
df_employees.drop(drop_cols, axis=1, inplace=True)
df_employees.rename(columns={"employeeNumber":"empID"}, inplace=True)
df_employees.rename(columns={"officeCode":"officeID"}, inplace=True)
df_employees.head(2)

Unnamed: 0,empID,lastName,firstName,officeID,reportsTo,jobTitle
0,1002,Murphy,Diane,1,,President
1,1056,Patterson,Mary,1,1002.0,VP Sales


In [186]:
drop_cols = ['territory', 'phone', 'postalCode']
df_offices.drop(drop_cols, axis=1, inplace=True)
df_offices.rename(columns={"officeCode":"officeID"}, inplace=True)
df_offices.head(2)

Unnamed: 0,officeID,city,addressLine1,addressLine2,state,country
0,1,San Francisco,100 Market Street,Suite 300,CA,USA
1,2,Boston,1550 Court Place,Suite 102,MA,USA


In [187]:
df_workers = pd.merge(df_employees, df_offices, on="officeID", how="right")
df_workers.head(10)

Unnamed: 0,empID,lastName,firstName,officeID,reportsTo,jobTitle,city,addressLine1,addressLine2,state,country
0,1002,Murphy,Diane,1,,President,San Francisco,100 Market Street,Suite 300,CA,USA
1,1056,Patterson,Mary,1,1002.0,VP Sales,San Francisco,100 Market Street,Suite 300,CA,USA
2,1076,Firrelli,Jeff,1,1002.0,VP Marketing,San Francisco,100 Market Street,Suite 300,CA,USA
3,1143,Bow,Anthony,1,1056.0,Sales Manager (NA),San Francisco,100 Market Street,Suite 300,CA,USA
4,1165,Jennings,Leslie,1,1143.0,Sales Rep,San Francisco,100 Market Street,Suite 300,CA,USA
5,1166,Thompson,Leslie,1,1143.0,Sales Rep,San Francisco,100 Market Street,Suite 300,CA,USA
6,1188,Firrelli,Julie,2,1143.0,Sales Rep,Boston,1550 Court Place,Suite 102,MA,USA
7,1216,Patterson,Steve,2,1143.0,Sales Rep,Boston,1550 Court Place,Suite 102,MA,USA
8,1286,Tseng,Foon Yue,3,1143.0,Sales Rep,NYC,523 East 53rd Street,apt. 5A,NY,USA
9,1323,Vanauf,George,3,1143.0,Sales Rep,NYC,523 East 53rd Street,apt. 5A,NY,USA


# Manipulation of MongoDB Data 

#### Setting Up Connection Variables 

In [188]:
atlas_cluster_name = "cluster0"
atlas_user_name = "rsingh"
atlas_password = "ZvjUdS5eKaFGiGwS"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.yig585i.mongodb.net"
}


#### Definition of MongoDB Functions 

In [189]:
def get_sql_dataframe(user_id, pwd, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Loading Source Data (run once)

In [105]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"customers" : 'classic_customers.json'}
                
for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        # print(f"{file} was successfully loaded.")

        
client.close()        

Collection(Database(MongoClient(host=['ac-4z6jcg6-shard-00-00.yig585i.mongodb.net:27017', 'ac-4z6jcg6-shard-00-01.yig585i.mongodb.net:27017', 'ac-4z6jcg6-shard-00-02.yig585i.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='atlas-vd62ml-shard-0', ssl=True), 'classicmodels'), 'customers') was successfully loaded.


#### Extraction of Source Data from Customers Table 

In [190]:
query = {}
collection = "customers"

df_customers = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_customers.head(5)

Unnamed: 0,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370,21000.0
1,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166,71800.0
2,114,"Australian Collectors, Co.",Ferguson,Peter,03 9520 4555,636 St Kilda Road,Level 3,Melbourne,Victoria,3004,Australia,1611,117300.0
3,119,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",,Nantes,,44000,France,1370,118200.0
4,121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,,Stavern,,4110,Norway,1504,81700.0


#### Cleaning of Customers Table 

In [191]:
drop_cols = ['contactLastName', 'contactFirstName', 'phone']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.rename(columns={"customerNumber":"customerID", "salesRepEmployeeNumber":"empID"}, inplace=True)
df_customers.head(5)

Unnamed: 0,customerID,customerName,addressLine1,addressLine2,city,state,postalCode,country,empID,creditLimit
0,103,Atelier graphique,"54, rue Royale",,Nantes,,44000,France,1370,21000.0
1,112,Signal Gift Stores,8489 Strong St.,,Las Vegas,NV,83030,USA,1166,71800.0
2,114,"Australian Collectors, Co.",636 St Kilda Road,Level 3,Melbourne,Victoria,3004,Australia,1611,117300.0
3,119,La Rochelle Gifts,"67, rue des Cinquante Otages",,Nantes,,44000,France,1370,118200.0
4,121,Baane Mini Imports,Erling Skakkes gate 78,,Stavern,,4110,Norway,1504,81700.0


# Manipulation of CSV Data 

#### Extraction of Source Data from Products Table 

In [192]:
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'classic_products.csv')

df_products = pd.read_csv(data_file, header=0, encoding='latin1')
df_products.head(5)

Unnamed: 0,productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front...",7933,48.81,95.7
1,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3
2,S10_2016,1996 Moto Guzzi 1100i,Motorcycles,1:10,Highway 66 Mini Classics,"Official Moto Guzzi logos and insignias, saddl...",6625,68.99,118.94
3,S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,1:10,Red Start Diecast,"Model features, official Harley Davidson logos...",5582,91.02,193.66
4,S10_4757,1972 Alfa Romeo GTA,Classic Cars,1:10,Motor City Art Classics,Features include: Turnable front wheels; steer...,3252,85.68,136.0


In [193]:
drop_cols = ['productScale', 'productDescription']
df_products.drop(drop_cols, axis=1, inplace=True)
df_products['productID'] = range(1, len(df_products) + 1)
df_products.head(5)

Unnamed: 0,productCode,productName,productLine,productVendor,quantityInStock,buyPrice,MSRP,productID
0,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,Min Lin Diecast,7933,48.81,95.7,1
1,S10_1949,1952 Alpine Renault 1300,Classic Cars,Classic Metal Creations,7305,98.58,214.3,2
2,S10_2016,1996 Moto Guzzi 1100i,Motorcycles,Highway 66 Mini Classics,6625,68.99,118.94,3
3,S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,Red Start Diecast,5582,91.02,193.66,4
4,S10_4757,1972 Alfa Romeo GTA,Classic Cars,Motor City Art Classics,3252,85.68,136.0,5


## Loading the Dimension Tables into the Data Warehouse 

In [199]:
# Run the SQL cell for getting and setting dataframes (near the beginning of this notebook)

db_operation = "insert"

tables = [('dim_workers', df_workers, 'empID'),
          ('dim_customers', df_customers, 'customerID'),
          ('dim_products', df_products, 'productID'),
         ]

for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validating that the Dimension Tables Were Created

In [200]:
sql_workers = "SELECT * FROM classic_dw.dim_workers;"
df_dim_workers = get_sql_dataframe(user_id, pwd, dst_dbname, sql_workers)
df_dim_workers.head(5)

Unnamed: 0,empID,lastName,firstName,officeID,reportsTo,jobTitle,city,addressLine1,addressLine2,state,country
0,1002,Murphy,Diane,1,,President,San Francisco,100 Market Street,Suite 300,CA,USA
1,1056,Patterson,Mary,1,1002.0,VP Sales,San Francisco,100 Market Street,Suite 300,CA,USA
2,1076,Firrelli,Jeff,1,1002.0,VP Marketing,San Francisco,100 Market Street,Suite 300,CA,USA
3,1088,Patterson,William,6,1056.0,Sales Manager (APAC),Sydney,5-11 Wentworth Avenue,Floor #2,,Australia
4,1102,Bondur,Gerard,4,1056.0,Sale Manager (EMEA),Paris,43 Rue Jouffroy D'abbans,,,France


In [201]:
sql_customers = "SELECT * FROM classic_dw.dim_customers;"
df_dim_customers = get_sql_dataframe(user_id, pwd, dst_dbname, sql_customers)
df_dim_customers.head(5)

Unnamed: 0,customerID,customerName,addressLine1,addressLine2,city,state,postalCode,country,empID,creditLimit
0,103,Atelier graphique,"54, rue Royale",,Nantes,,44000,France,1370,21000.0
1,112,Signal Gift Stores,8489 Strong St.,,Las Vegas,NV,83030,USA,1166,71800.0
2,114,"Australian Collectors, Co.",636 St Kilda Road,Level 3,Melbourne,Victoria,3004,Australia,1611,117300.0
3,119,La Rochelle Gifts,"67, rue des Cinquante Otages",,Nantes,,44000,France,1370,118200.0
4,121,Baane Mini Imports,Erling Skakkes gate 78,,Stavern,,4110,Norway,1504,81700.0


In [202]:
sql_products = "SELECT * FROM classic_dw.dim_products;"
df_dim_products = get_sql_dataframe(user_id, pwd, dst_dbname, sql_products)
df_dim_products.head(5)

Unnamed: 0,productCode,productName,productLine,productVendor,quantityInStock,buyPrice,MSRP,productID
0,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,Min Lin Diecast,7933,48.81,95.7,1
1,S10_1949,1952 Alpine Renault 1300,Classic Cars,Classic Metal Creations,7305,98.58,214.3,2
2,S10_2016,1996 Moto Guzzi 1100i,Motorcycles,Highway 66 Mini Classics,6625,68.99,118.94,3
3,S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,Red Start Diecast,5582,91.02,193.66,4
4,S10_4757,1972 Alfa Romeo GTA,Classic Cars,Motor City Art Classics,3252,85.68,136.0,5


## Creation and Population of the Fact Table 

#### Extraction of Source Data From Orders, Order Details, and Payments Tables 

In [203]:
sql_orders = "SELECT * FROM classicmodels.orders;"
df_orders = get_dataframe(user_id, pwd, host_name, src_dbname, sql_orders)
df_orders.rename(columns={"orderNumber":"order_id"}, inplace=True)
df_orders.head(5)

Unnamed: 0,order_id,orderDate,requiredDate,shippedDate,status,comments,customerNumber
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
1,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128
2,10102,2003-01-10,2003-01-18,2003-01-14,Shipped,,181
3,10103,2003-01-29,2003-02-07,2003-02-02,Shipped,,121
4,10104,2003-01-31,2003-02-09,2003-02-01,Shipped,,141


In [204]:
sql_order_details = "SELECT * FROM classicmodels.orderdetails;"
df_order_details = get_dataframe(user_id, pwd, host_name, src_dbname, sql_order_details)
df_order_details.rename(columns={"orderNumber":"order_id"}, inplace=True)
df_order_details.head(5)

Unnamed: 0,order_id,productCode,quantityOrdered,priceEach,orderLineNumber
0,10100,S18_1749,30,136.0,3
1,10100,S18_2248,50,55.09,2
2,10100,S18_4409,22,75.46,4
3,10100,S24_3969,49,35.29,1
4,10101,S18_2325,25,108.06,4


In [205]:
drop_cols = ['comments']
df_orders.drop(drop_cols, axis=1, inplace=True)
df_orders.rename(columns={"customerNumber":"customerID"}, inplace=True)
df_orders.head(5)

Unnamed: 0,order_id,orderDate,requiredDate,shippedDate,status,customerID
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363
1,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,128
2,10102,2003-01-10,2003-01-18,2003-01-14,Shipped,181
3,10103,2003-01-29,2003-02-07,2003-02-02,Shipped,121
4,10104,2003-01-31,2003-02-09,2003-02-01,Shipped,141


In [206]:
drop_cols = ['orderLineNumber']
df_order_details.drop(drop_cols, axis=1, inplace=True)
df_order_details.head(5)

Unnamed: 0,order_id,productCode,quantityOrdered,priceEach
0,10100,S18_1749,30,136.0
1,10100,S18_2248,50,55.09
2,10100,S18_4409,22,75.46
3,10100,S24_3969,49,35.29
4,10101,S18_2325,25,108.06


In [207]:
df_fact_transactions = pd.merge(df_orders, df_order_details, on="order_id")
df_fact_transactions.insert(0, "order_key", range(1, len(df_fact_transactions) + 1))
df_fact_transactions.head(10)

Unnamed: 0,order_key,order_id,orderDate,requiredDate,shippedDate,status,customerID,productCode,quantityOrdered,priceEach
0,1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,S18_1749,30,136.0
1,2,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,S18_2248,50,55.09
2,3,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,S18_4409,22,75.46
3,4,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,S24_3969,49,35.29
4,5,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,128,S18_2325,25,108.06
5,6,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,128,S18_2795,26,167.06
6,7,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,128,S24_1937,45,32.53
7,8,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,128,S24_2022,46,44.35
8,9,10102,2003-01-10,2003-01-18,2003-01-14,Shipped,181,S18_1342,39,95.55
9,10,10102,2003-01-10,2003-01-18,2003-01-14,Shipped,181,S18_1367,41,43.13


##### 


#### Write the Fact Table Back to the Data Warehouse 

In [208]:
dataframe = df_fact_transactions
table_name = 'fact_transactions'
primary_key = 'order_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_transactions, table_name, primary_key, db_operation)

### Integration of Date Dimension (done in MySQL)

### Verify that the Data Warehouse Exists and Contains the Correct Data 

In [209]:
sql_test = """
    SELECT customers.`customerName` AS `Name of Customer`, 
        SUM(transactions.`quantityOrdered`) AS `Order_Quantity`
    FROM `{0}`.`fact_transactions` AS transactions 
    INNER JOIN `{0}`.`dim_customers` AS customers
    ON transactions.customerID = customers.customerID
    GROUP BY customers.`customerName`
    ORDER BY Order_Quantity DESC;
""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)

df_test.head(10)

Unnamed: 0,Name of Customer,Order_Quantity
0,Euro+ Shopping Channel,9327.0
1,Mini Gifts Distributors Ltd.,6366.0
2,"Australian Collectors, Co.",1926.0
3,La Rochelle Gifts,1832.0
4,"AV Stores, Co.",1778.0
5,Muscle Machine Inc,1775.0
6,"Down Under Souveniers, Inc",1691.0
7,The Sharp Gifts Warehouse,1656.0
8,Rovelli Gifts,1650.0
9,Kelly's Gift Shop,1647.0


In [210]:
sql_test2 = """
    SELECT products.`productLine` AS `Type of Product`,
        COUNT(transactions.`quantityOrdered`) AS `Number_Ordered`
    FROM `{0}`.`fact_transactions` AS transactions
    INNER JOIN `{0}`.`dim_products` AS products
    ON transactions.productCode = products.productCode
    GROUP BY products.`productLine`
    ORDER BY Number_Ordered DESC;
""".format(dst_dbname)


df_test2 = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test2)

df_test2.head(10)

Unnamed: 0,Type of Product,Number_Ordered
0,Classic Cars,1010
1,Vintage Cars,657
2,Motorcycles,359
3,Planes,336
4,Trucks and Buses,308
5,Ships,245
6,Trains,81
