# Midterm Project: ETL Pipeline 
By: Erin Moulton 

I first opened my source database, called "SOURCE_DATABASE.mysqlsampledatabase.sql", in mySQL and ran the code to get my database "classicmodels"

Part 1: Design a dimensional data mart through Jupyter

In [1]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine

In [2]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "classicmodels"
dst_dbname = "classicmodels_dw2"

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x26e3b55dc50>

1.1: Create and populate the dimension tables 

In [5]:
#customers
#extracting data from the customers collection
sql_customers = "SELECT * FROM classicmodels.customers;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

Unnamed: 0,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370.0,21000.0
1,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166.0,71800.0


In [6]:
#productlines 
#extracting data from the productlines collection
sql_productlines = "SELECT * FROM classicmodels.productlines;"
df_productlines = get_dataframe(user_id, pwd, host_name, src_dbname, sql_productlines)
df_productlines.head(2)

Unnamed: 0,productLine,textDescription,htmlDescription,image
0,Classic Cars,Attention car enthusiasts: Make your wildest c...,,
1,Motorcycles,Our motorcycles are state of the art replicas ...,,


1.2: Create the date dimension table in mySQL: I used the script located in my GitHub repo called "Create date dimension.sql"

 Perform any necessary transformations 

In [7]:
#customers
# A List that enumerates the names of each column to drop from the Pandas DataFrame

drop_cols = ['phone','addressLine2']
df_customers.drop(drop_cols, axis=1, inplace=True)

# Rename the "id" column to reflect the entity as it will serve as the business key for lookup operations
df_customers.rename(columns={"customerNumber":"customer_id"}, inplace=True)

# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))

# first 2 rows of the dataframe to validate your work
df_customers.head(2)

Unnamed: 0,customer_key,customer_id,customerName,contactLastName,contactFirstName,addressLine1,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,1,103,Atelier graphique,Schmitt,Carine,"54, rue Royale",Nantes,,44000,France,1370.0,21000.0
1,2,112,Signal Gift Stores,King,Jean,8489 Strong St.,Las Vegas,NV,83030,USA,1166.0,71800.0


In [8]:
#productlines (repeat the same process as the customers table above)

drop_cols = ['textDescription','htmlDescription','image']
df_productlines.drop(drop_cols, axis=1, inplace=True)

df_productlines.rename(columns={"productLine":"productlines_id"}, inplace=True)

df_productlines.insert(0, "productlines_key", range(1, df_productlines.shape[0]+1))

df_productlines.head(2)

Unnamed: 0,productlines_key,productlines_id
0,1,Classic Cars
1,2,Motorcycles


1.3: load transformed dataframes into the new datawarehouse by creating new tables 

In [9]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_productlines', df_productlines, 'productlines_key')]

In [10]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

Part 2: MongoDB integration for dim_products 
-I exported "products" as a json file into MongoDB  

In [11]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

In [12]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 1.4.39
Running PyMongo Version: 4.6.2


In [13]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"
mysql_hostname = "localhost"

atlas_cluster_name = "Cluster0.wfbnlq4"
atlas_user_name = "emoulton_admin"
atlas_password = "Passw0rd"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "classicmodels_products"
dst_dbname = "classicmodels_dw2"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://emoulton_admin:Passw0rd@Cluster0.wfbnlq4.mongodb.net


In [14]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [15]:
client = pymongo.MongoClient(conn_str["atlas"], tlsCAFile=certifi.where())
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'downloads')

json_files = {"products" : 'classicmodels_products.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()    

2.1: create and populate dim_products 

In [16]:
#extracting data from the products collection
query = {} 
collection = "products"

df_products = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_products.head(2)

Unnamed: 0,productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front...",7933,48.81,95.7
1,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3


In [17]:
#Rename the "id" column to reflect the entity as it will serve as the business key for lookup operations
df_products.rename(columns={"productCode":"product_id"}, inplace=True)

#drop 'productScale' and 'productDescription' columns because text is too long in description and product scale isn't helpful
df_products.drop(['productScale','productDescription'], axis=1, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_products.insert(0, "product_key", range(1, df_products.shape[0]+1))
df_products.head(2)

Unnamed: 0,product_key,product_id,productName,productLine,productVendor,quantityInStock,buyPrice,MSRP
0,1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,Min Lin Diecast,7933,48.81,95.7
1,2,S10_1949,1952 Alpine Renault 1300,Classic Cars,Classic Metal Creations,7305,98.58,214.3


2.2: load transformed dataframe into the new datawarehouse by creating a table  

In [18]:
dataframe = df_products
table_name = 'dim_products'
primary_key = 'product_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [19]:
sql_products = "SELECT * FROM classicmodels_dw2.dim_products;"
df_dim_products = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_products)
df_dim_products.head(2)

Unnamed: 0,product_key,product_id,productName,productLine,productVendor,quantityInStock,buyPrice,MSRP
0,1,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,Min Lin Diecast,7933,48.81,95.7
1,2,S10_1949,1952 Alpine Renault 1300,Classic Cars,Classic Metal Creations,7305,98.58,214.3


Part 3: create fact orders by combining orders and orderdetails 

In [20]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "classicmodels"
dst_dbname = "classicmodels_dw2"

In [21]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x26e3c67aa90>

In [22]:
#combining orders and order details to create fact orders 
sql_fact_orders = """
    SELECT o.orderNumber AS order_id,
        o.customerNumber AS customer_id,
        od.productCode AS product_id,
        o.orderDate AS order_date,
        o.requiredDate AS required_date,
        o.shippedDate AS shipped_date,
        od.quantityOrdered AS quantity,
        od.priceEach AS price_each,
        o.status AS order_status
    FROM classicmodels.orders AS o
    INNER JOIN classicmodels.orderdetails AS od
    ON o.orderNumber = od.orderNumber;
    
"""


df_fact_orders = get_dataframe(user_id, pwd, host_name, src_dbname, sql_fact_orders)
df_fact_orders.head(2)



Unnamed: 0,order_id,customer_id,product_id,order_date,required_date,shipped_date,quantity,price_each,order_status
0,10100,363,S18_1749,2003-01-06,2003-01-13,2003-01-10,30,136.0,Shipped
1,10100,363,S18_2248,2003-01-06,2003-01-13,2003-01-10,50,55.09,Shipped


3.1: look up the primary keys from the dimension table: Surrogate Primary Key and the Business Key from each of the Dimension tables

In [23]:
#customers
sql_dim_customers = "SELECT customer_key, customer_id FROM classicmodels_dw2.dim_customers;"
df_dim_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_customers)
df_dim_customers.head(2)

Unnamed: 0,customer_key,customer_id
0,1,103
1,2,112


In [24]:
#productlines
sql_dim_productlines = "SELECT productlines_key, productlines_id FROM classicmodels_dw2.dim_productlines;"
df_dim_productlines = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_productlines)
df_dim_productlines.head(2)

Unnamed: 0,productlines_key,productlines_id
0,1,Classic Cars
1,2,Motorcycles


In [25]:
# Products
sql_dim_products = "SELECT product_key, product_id FROM classicmodels_dw2.dim_products;"
df_dim_products = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_products)
df_dim_products.head(2)

Unnamed: 0,product_key,product_id
0,1,S10_1678
1,2,S10_1949


3.2:  looking up the  Surrogate Primary Key values in the customer and products Dimension table 


In [26]:
#customers
# Modify 'df_fact_orders' by merging it with 'df_dim_customers' on the 'customer_id' column
df_fact_orders = pd.merge(df_fact_orders, df_dim_customers, on = 'customer_id', how='inner')
# Drop the 'customer_id' column
df_fact_orders.drop(['customer_id'], axis=1, inplace=True)
# Display the first 2 rows of the dataframe 
df_fact_orders.head(2)

Unnamed: 0,order_id,product_id,order_date,required_date,shipped_date,quantity,price_each,order_status,customer_key
0,10100,S18_1749,2003-01-06,2003-01-13,2003-01-10,30,136.0,Shipped,86
1,10100,S18_2248,2003-01-06,2003-01-13,2003-01-10,50,55.09,Shipped,86


In [27]:
#products (repeat customers process)
df_fact_orders = pd.merge(df_fact_orders, df_dim_products, on = 'product_id', how='inner')
df_fact_orders.drop(['product_id'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,order_id,order_date,required_date,shipped_date,quantity,price_each,order_status,customer_key,product_key
0,10100,2003-01-06,2003-01-13,2003-01-10,30,136.0,Shipped,86,23
1,10379,2005-02-10,2005-02-18,2005-02-11,39,156.4,Shipped,11,23


3.3:  Lookup the DateKeys from the Date Dimension Table.

In [28]:
sql_dim_date = "SELECT date_key, full_date FROM classicmodels_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20030101,2003-01-01
1,20030102,2003-01-02


In [29]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "order_date" Column.
df_dim_order_date = df_dim_date.rename(columns={"date_key" : "order_date_key", "full_date" : "order_date"})
df_fact_orders.order_date = df_fact_orders.order_date.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_order_date, on='order_date', how='left')
df_fact_orders.drop(['order_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,order_id,required_date,shipped_date,quantity,price_each,order_status,customer_key,product_key,order_date_key
0,10100,2003-01-13,2003-01-10,30,136.0,Shipped,86,23,20030106
1,10379,2005-02-18,2005-02-11,39,156.4,Shipped,11,23,20050210


In [30]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "required_date" Column.
df_dim_required_date = df_dim_date.rename(columns={"date_key" : "required_date_key", "full_date" : "required_date"})
df_fact_orders.required_date = df_fact_orders.required_date.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_required_date, on='required_date', how='left')
df_fact_orders.drop(['required_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,order_id,shipped_date,quantity,price_each,order_status,customer_key,product_key,order_date_key,required_date_key
0,10100,2003-01-10,30,136.0,Shipped,86,23,20030106,20030113
1,10379,2005-02-11,39,156.4,Shipped,11,23,20050210,20050218


In [31]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "shipped_date" Column.
df_dim_shipped_date = df_dim_date.rename(columns={"date_key" : "shipped_date_key", "full_date" : "shipped_date"})
df_fact_orders.shipped_date = df_fact_orders.shipped_date.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_shipped_date, on='shipped_date', how='left')
df_fact_orders.drop(['shipped_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,order_id,quantity,price_each,order_status,customer_key,product_key,order_date_key,required_date_key,shipped_date_key
0,10100,30,136.0,Shipped,86,23,20030106,20030113,20030110.0
1,10379,39,156.4,Shipped,11,23,20050210,20050218,20050211.0


3.4: perform any additional transactions 

In [32]:
# Reorder the fact orders columns
# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key for fact orders.
ordered_columns = ['order_id','customer_key','product_key',
                  'order_date_key','shipped_date_key','required_date_key','quantity','price_each',
                  'order_status']
df_fact_orders = df_fact_orders[ordered_columns]
df_fact_orders.insert(0, "fact_order_key", range(1, df_fact_orders.shape[0]+1))
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,order_id,customer_key,product_key,order_date_key,shipped_date_key,required_date_key,quantity,price_each,order_status
0,1,10100,86,23,20030106,20030110.0,20030113,30,136.0,Shipped
1,2,10379,11,23,20050210,20050211.0,20050218,39,156.4,Shipped


In [33]:
table_name = "fact_orders"
primary_key = "fact_order_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, table_name, primary_key, db_operation)

Part 4: SQL statements

4.1 SQL statement that returns: 
Each Customer’s  Name, The total amount of the order quantity associated with each customer, The total amount of the order unit price associated with each customer

In [34]:
sql_test = """
    SELECT customers.`customerName` AS `Customer Name`,
        SUM(orders.`quantity`) AS `Total Quantity`,
        SUM(orders.`price_each`) AS 'Total Unit Price'
    FROM `{0}`.`fact_orders` AS orders
    INNER JOIN`{0}`.`dim_customers` AS customers
    ON orders.customer_key = customers.customer_key
    GROUP BY customers.`customerName`
    ORDER BY 'Total Unit Price' DESC 
""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head()

Unnamed: 0,Customer Name,Total Quantity,Total Unit Price
0,Online Diecast Creations Co.,1248.0,3188.15
1,Euro+ Shopping Channel,9327.0,22680.0
2,Rovelli Gifts,1650.0,3793.04
3,Motor Mint Distributors Inc.,730.0,2376.39
4,"AV Stores, Co.",1778.0,4297.81


4.2 SQL that returns:
The product Id where the product line from dim_products matches the product line from dim_productlines

In [49]:
sql_test = """
    SELECT products.`productLine` AS `Product Line`,
        products.`product_id` AS `Product ID`
    FROM `{0}`.`dim_products` AS products
    INNER JOIN`{0}`.`dim_productlines` AS productLines
    ON products.productLine = productLines.productlines_id
    GROUP BY productLines.`productlines_id`, products.`product_id`
    ORDER BY `Product Line` DESC
""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head()

Unnamed: 0,Product Line,Product ID
0,Vintage Cars,S18_2248
1,Vintage Cars,S24_3151
2,Vintage Cars,S18_4522
3,Vintage Cars,S18_2949
4,Vintage Cars,S24_3420
