## Using Python to Integrate MongoDB Data into ETL Process

### 0.0 - Set up

In [None]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

In [None]:
host_name = "localhost"
ports = {"mongo" : 27017, "mysql" : 3306}

# SQL Server login
user_id = "root"
pwd = "Lyella227"

# atlas
atlas_cluster_name = "cluster0"
atlas_default_dbname = "classicmodels"
atlas_user_name = "emrkraisinger"
atlas_password = "Lyella227"

conn_str = {"local" : f"mongodb://{host_name}:{ports['mongo']}/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.pfsh0cc.mongodb.net/{atlas_default_dbname}"
}

src_dbname = "classicmodels"
dst_dbname = "classicmodels_dw3"
print(conn_str) # this gives you the conn string

#mongodb+srv://emrkraisinger:<password>@cluster0.pfsh0cc.mongodb.net/test

In [None]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True) 
    client.close()
    return dframe


def get_mongo_dataframe_local(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe

def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
   
        #sqlEngine.execute(f"ALTER TABLE {table_name} DROP CONSTRANT ({pk_column});")
        #sqlEngine.execute(f"ADD CONSTRAINT {pk_column} PRIMARY KEY NONCLUSTERED({primary_key});")
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()



In [None]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'classicdata')

json_files = {"customers" : 'classicmodels_customers.json',
              "payments" : 'classicmodels_payments.json',
              "products" : 'classicmodels_products.json',
              "date" : 'classicmodels_date.json',
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()  

### 1.0 - Create and Populate the New Dimension Tables

#### 1.1 - Extract Data from the Source MongoDB Collections Into DataFrames

In [None]:
query = {}
collection = "customers"

df_customers = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query) #connect to atlas (mongodb in cloud). pull info that matches query {} from suppliers collection
df_customers.head(2)

In [None]:
query = {}
collection = "payments"

df_payments = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query) #connect to atlas (mongodb in cloud). pull info that matches query {} from suppliers collection
df_payments.head(2)

In [None]:
query = {}
collection = "products"

df_products = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query) #connect to atlas (mongodb in cloud). pull info that matches query {} from suppliers collection
df_products.head(2)

In [None]:
query = {}
collection = "date"

df_date = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query) #connect to atlas (mongodb in cloud). pull info that matches query {} from suppliers collection
df_date.head(2)

#### 1.2 - Perform Transformations to Dataframes

In [None]:
drop_cols = ['salesRepEmployeeNumber','creditLimit']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))

df_customers.head(2)

In [None]:
# add a primary key to payments table
df_payments.insert(0, "payments_key", range(1, df_payments.shape[0]+1))
df_payments.head(2)

In [None]:
drop_cols = ['productScale','productVendor','productDescription']
df_products.drop(drop_cols, axis=1, inplace=True)
df_products.insert(0, "products_key", range(1, df_products.shape[0]+1))

df_products.head(2)

In [None]:
drop_cols = ['date_name','date_name_us','date_name_eu', 'date_name_eu',
            'day_of_week','day_name_of_week','date_name_eu','day_of_month',
            'day_of_year','weekday_weekend','is_last_day_of_month','day_of_month',
            'calendar_year_month','calendar_year_qtr','fiscal_month_of_year',
            'fiscal_year_month','fiscal_year_qtr','month_of_year','calendar_quarter','calendar_year','week_of_year','month_name']
df_date.drop(drop_cols, axis=1, inplace=True)

df_date.head(2)

#### 1.3 - Load transformed dataframes into data warehouse by creating new tables

In [None]:
dataframe = df_customers
table_name = 'dim_customers'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation) # connect to mySQL server

In [None]:
dataframe = df_payments
table_name = 'dim_payments'
primary_key = 'payments_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [None]:
dataframe = df_products
table_name = 'dim_products'
primary_key = 'products_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [None]:
dataframe = df_date
table_name = 'dim_date'
primary_key = 'date_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### 1.4 - Validate that new dimension tables were created

In [None]:
sql_customers = "SELECT * FROM classicmodels_dw3.dim_customers;"
df_dim_customers = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_customers)
df_dim_customers.head(2)

In [None]:
sql_payments = "SELECT * FROM classicmodels_dw3.dim_payments;"
df_dim_payments = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_payments)
df_dim_payments.head(2)

In [None]:
sql_products = "SELECT * FROM classicmodels_dw3.dim_products;"
df_dim_products = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_products)
df_dim_products.head(2)

In [None]:
sql_date = "SELECT * FROM classicmodels_dw3.dim_date;"
df_dim_date= get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_date)
df_dim_date.head(2)

### 2.0 - Create and populate new fact tables

#### 2.1 - Extract orders/orderdetails tables from MySQL into pandas dataframes

In [None]:
# CREATE THE FACT ORDER TABLE FROM ORDERS
query = {} 

sql_fact_orders = "SELECT * FROM classicmodels.orders;"
df_fact_orders = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_fact_orders)
df_fact_orders.insert(0, "fact_order_key", range(1, df_fact_orders.shape[0]+1))

df_fact_orders.head(2)

In [None]:
# EXTRACT ORDERDETAILS
query = {} 

sql_order_details = "SELECT * FROM classicmodels.orderdetails;"
df_order_details = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_order_details)
df_order_details.head(2)

#### 2.1.2 - Fix date/time columns to properly merge dim_date into fact orders table

In [None]:
df_dim_date['full_date'] =  df_dim_date['full_date'].astype('datetime64[ns]')

df_dim_date.head()

#### 2.2 - Transform dataframes

##### 2.2.1 - Merge tables to add primary keys of the dimension tables to fact order table

In [None]:
# MERGE ORDERS & ORDER DETAILS
df_fact_orders_table = pd.merge(df_fact_orders, df_order_details, on='orderNumber', how='inner') # inner means only vals that match (not NaN)

df_fact_orders_table.rename(columns={"orderNumber":"order_key"}, inplace=True)
df_fact_orders_table.drop(['comments','orderLineNumber','requiredDate','shippedDate'], axis=1, inplace=True)

# change fact/orders data type to merge properly with date dimension table
df_fact_orders_table['order_date'] =  df_fact_orders_table['orderDate'].astype('datetime64[ns]')

df_fact_orders_table.head(2)

In [None]:
# MERGE FACT ORDERS & CUSTOMERS
df_fact_orders_table = pd.merge(df_fact_orders_table, df_customers, on='customerNumber', how='inner') # inner means only vals that match (not NaN)
df_fact_orders_table.drop(['customerName','contactLastName','contactFirstName','phone','addressLine1','addressLine2','city', 'state', 'postalCode', 'country'], axis=1, inplace=True)

df_fact_orders_table.head(2)


In [None]:
# MERGE FACT ORDERS & PAYMENTS
df_fact_orders_table = pd.merge(df_fact_orders_table, df_payments, on='customerNumber', how='inner') # inner means only vals that match (not NaN)
df_fact_orders_table.drop(['paymentDate', 'checkNumber', 'amount','quantityOrdered','priceEach'], axis=1, inplace=True)

df_fact_orders_table.head(2)

In [None]:
# MERGE FACT ORDERS & PRODUCTS
df_fact_orders_table = pd.merge(df_fact_orders_table, df_products, on='productCode', how='inner') # inner means only vals that match (not NaN)
df_fact_orders_table.rename(columns={"productCode":"product_key"}, inplace=True)
df_fact_orders_table.drop(['productName', 'productLine', 'buyPrice','MSRP'], axis=1, inplace=True)

df_fact_orders_table.head(2)

In [None]:
# MERGE FACT ORDERS AND DATE DIMENSION
df_fact_orders_table = pd.merge(df_fact_orders_table, df_dim_date, left_on='order_date', right_on='full_date', how='left') 
df_fact_orders_table = df_fact_orders_table.drop(labels=['orderDate', 'full_date'], axis=1)

df_fact_orders_table.head()

#### 2.2.2 - Clean up df_fact_orders by reordering columns and dropping duplicate values if they exist

In [None]:
# order columns 
ordered_columns = ['fact_order_key','order_key','product_key','customer_key','payments_key','products_key',
                   'date_key','status']
df_fact_orders = df_fact_orders_table[ordered_columns]
df_fact_orders.head(2)

In [None]:
# drop duplicates
df_fact_orders = df_fact_orders.sort_values(by=['fact_order_key'], axis=0, ascending=True)
df_fact_orders = df_fact_orders.drop_duplicates(subset=['fact_order_key'], keep='first', inplace=False, ignore_index=False)

df_fact_orders.head(10)

#### 2.3 - Load transformed fact orders table into classicmodels_dw3

In [None]:
dataframe = df_fact_orders
table_name = 'fact_orders_table'
pk_column = 'fact_order_key'
db_operation = "update"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, pk_column, db_operation)

#### 2.4 - Validate that new fact table was loaded correctly into MySQL

In [None]:
sql_fact_orders = "SELECT * FROM classicmodels_dw3.fact_orders_table;"
df_fact_orders_verification = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_fact_orders)
df_fact_orders_verification.head(2)

### 2.5 - Create 3 select statements

In [None]:
# average payment made
sql_test1 = """ 
   SELECT AVG(`amount`) FROM `classicmodels_dw3`.`dim_payments`;""".format(dst_dbname)
df_test1 = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_test1)
df_test1.head()

In [None]:
# total number of unique primary fact order keys
sql_test2 = """ 
   SELECT COUNT(`fact_order_key`) FROM `classicmodels_dw3`.`fact_orders_table`;""".format(dst_dbname)
df_test2 = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_test2)
df_test2.head()

In [None]:
# maximum MSRP (manufacturer's suggested retail price) for a product
sql_test3 = """ 
   SELECT MAX(`MSRP`) FROM `classicmodels_dw3`.`dim_products`;""".format(dst_dbname)
df_test3 = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_test3)
df_test3.head()