### Prerequisites:
This notebook uses the PyMongo database connectivity library to connect to MySQL databases; therefore, you must have first installed that libary into your python environment by executing the following command in a Terminal window.

- `python -m pip install pymongo[srv]`

#### Import the Necessary Libraries

In [None]:
import os
import json
from json import 
import certifi
import pandas as pd
import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.38
Running PyMongo Version: 4.11.2


#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases

In [34]:
mysql_args_source = {
    "uid" : "root",
    "pwd" : "chubby100",
    "hostname" : "localhost",
    "dbname" : "adventureworks"
}

mysql_args_dw = {
    "uid": "root",
    "pwd": "chubby100",
    "hostname": "localhost",
    "dbname": "adventureworks_dw"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "",
    "password" : "password",
    "cluster_name" : "cluster_name",
    "cluster_subnet" : "xxxxx",
    "cluster_location" : "local", # "atlas"
    "db_name" : "adventureworks_mdb"
}

### Create Data Warehouse

In [None]:
conn_str = f"mysql+pymysql://{mysql_args_dw['uid']}:{mysql_args_dw['pwd']}@{mysql_args_dw['hostname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{mysql_args_dw}`;"))
connection.execute(text("CREATE DATABASE IF NOT EXISTS adventureworks_dw;"))
connection.execute(text(f"USE {mysql_args_dw};"))

connection.close()

#### Functions for Getting Data From and Setting Data Into Databases

In [43]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Creation of dim_date: In order for this database to correctly work, you must run the dim_date_creation.sql file located in the scripts directory on the newly created database, adventureworks_dw

#### Get data from source to be loaded into csv, json, and fact_table

In [14]:
# Product data to be used for MongoDB

sql_products = """
SELECT 
    p.ProductID,
    p.Name,
    p.ProductNumber,
    p.Color,
    p.ListPrice,
    p.StandardCost,
    pc.Name AS ProductCategory,
    psc.Name AS ProductSubcategory
FROM 
    product AS p
LEFT JOIN 
    productsubcategory AS psc ON p.ProductSubcategoryID = psc.ProductSubcategoryID
LEFT JOIN 
    productcategory AS pc ON psc.ProductCategoryID = pc.ProductCategoryID
"""
df_products = get_sql_dataframe(sql_products, **mysql_args_source)

In [15]:
# Customer data to be used for CSV

sql_customers = """
SELECT 
    c.CustomerID,
    c.AccountNumber,
    c.CustomerType,
    co.FirstName,
    co.LastName,
    co.EmailAddress,
    a.City,
    sp.Name AS StateProvince
FROM 
    customer AS c
LEFT JOIN 
    individual AS i ON c.CustomerID = i.CustomerID
LEFT JOIN 
    contact AS co ON i.ContactID = co.ContactID
LEFT JOIN 
    customeraddress AS ca ON c.CustomerID = ca.CustomerID
LEFT JOIN 
    address AS a ON ca.AddressID = a.AddressID
LEFT JOIN 
    stateprovince AS sp ON a.StateProvinceID = sp.StateProvinceID
WHERE
    co.FirstName IS NOT NULL
"""
df_customers = get_sql_dataframe(sql_customers, **mysql_args_source)


In [16]:
# Sales data to be used for fact table

sql_sales = """
SELECT 
    soh.SalesOrderID,
    soh.OrderDate,
    soh.CustomerID,
    sod.ProductID,
    sod.OrderQty,
    sod.UnitPrice,
    sod.UnitPriceDiscount,
    sod.LineTotal
FROM 
    salesorderheader AS soh
JOIN 
    salesorderdetail AS sod ON soh.SalesOrderID = sod.SalesOrderID
"""
df_sales = get_sql_dataframe(sql_sales, **mysql_args_source)

In [32]:
# Data directory 

data_dir = os.path.join(os.getcwd(), 'data')

In [None]:
# Save products to a csv
products_csv_path = os.path.join(data_dir, 'products.csv')
df_products.to_csv(products_csv_path, index=False)

In [36]:
# Save customer data to JSON
# Reference: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_json.html#pandas-dataframe-to-json 

customers_json_path = os.path.join('data', 'customers.json')
json_str = df_customers.to_json(orient='records')
parsed = json.loads(json_str)
with open(customers_json_path, 'w') as f:
    json.dump(parsed, f, indent=4)

In [37]:
# Load Customer data into MongoDB

client = get_mongo_client(**mongodb_args)
json_files = {"customers": "customers.json"}
set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)

In [38]:
# Read CSV Data

df_products_from_csv = pd.read_csv(products_csv_path)

In [None]:
# Read MongoDB data

client = get_mongo_client(**mongodb_args)
query = {} 
df_customers_from_mongo = get_mongo_dataframe(client, mongodb_args["db_name"], "customers", query)

### Create/Load dimension tables for data warehouse

In [None]:
# Product dimensions 

df_dim_products = df_products_from_csv.copy()
df_dim_products.insert(0, "product_key", range(1, df_dim_products.shape[0]+1))

Index(['product_key', 'ProductID', 'Name', 'ProductNumber', 'Color',
       'ListPrice', 'StandardCost', 'ProductCategory', 'ProductSubcategory'],
      dtype='object')


In [63]:
# Customer dimensions 

df_dim_customers = df_customers_from_mongo.copy()
df_dim_customers.insert(0, "customer_key", range(1, df_dim_customers.shape[0]+1))

In [64]:
# Load the dimension tables into data warehouse

set_dataframe(df_dim_products, 'dim_product', 'product_key', 'insert', **mysql_args_dw)
set_dataframe(df_dim_customers, 'dim_customer', 'customer_key', 'insert', **mysql_args_dw)

In [75]:
# transform data to match dim_date

# Copy data to not affect original https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.copy.html 
df_fact_sales = df_sales.copy()

df_fact_sales['OrderDate'] = pd.to_datetime(df_fact_sales['OrderDate'])
df_fact_sales['order_date_key'] = df_fact_sales['OrderDate'].dt.strftime('%Y%m%d').astype(int) #to match dim_date https://stackoverflow.com/questions/70105247/convert-y-m-d-hms-format-to-int-in-python


In [76]:
# Look up dataframes for products and customers

df_product_lookup = df_dim_products[['product_key', 'ProductID']]
df_customer_lookup = df_dim_customers[['customer_key', 'CustomerID']]

In [77]:
# Joining in order to get surrogate keys for the fact table

df_fact_sales = pd.merge(df_fact_sales, df_product_lookup, on='ProductID', how='left')
df_fact_sales = pd.merge(df_fact_sales, df_customer_lookup, on='CustomerID', how='left')

In [78]:
df_fact_sales = df_fact_sales[[ 
    'ProductID', 'product_key', 'CustomerID', 'customer_key', 'order_date_key',
    'SalesOrderID', 'OrderQty', 'UnitPrice', 'UnitPriceDiscount', 'LineTotal'
]]
df_fact_sales.insert(0, "sales_key", range(1, df_fact_sales.shape[0]+1))

In [79]:
# Load the fact table

set_dataframe(df_fact_sales, 'fact_sales', 'sales_key', 'insert', **mysql_args_dw)

In [82]:
validation_sql = """
SELECT 
    d.month_name,
    p.Name AS ProductName,
    p.ProductCategory,
    SUM(s.OrderQty) AS TotalQuantity,
    SUM(s.LineTotal) AS TotalSales
FROM 
    adventureworks_dw.fact_sales AS s
JOIN 
    adventureworks_dw.dim_product AS p ON s.product_key = p.product_key
JOIN 
    adventureworks_dw.dim_customer AS c ON s.customer_key = c.customer_key
JOIN 
    adventureworks_dw.dim_date AS d ON s.order_date_key = d.date_key
GROUP BY 
    d.month_name, p.Name, p.ProductCategory
ORDER BY 
    TotalSales DESC
"""
result_df = get_sql_dataframe(validation_sql, **mysql_args_dw)
print(result_df)

     month_name              ProductName ProductCategory  TotalQuantity  \
0          June  Mountain-200 Silver, 38           Bikes           84.0   
1           May  Mountain-200 Silver, 46           Bikes           79.0   
2          June   Mountain-200 Black, 42           Bikes           79.0   
3           May   Mountain-200 Black, 38           Bikes           77.0   
4          June  Mountain-200 Silver, 42           Bikes           75.0   
...         ...                      ...             ...            ...   
1544   November          Racing Socks, L        Clothing           20.0   
1545   December          Racing Socks, L        Clothing           20.0   
1546       July          Racing Socks, M        Clothing           19.0   
1547   February          Racing Socks, L        Clothing           19.0   
1548       July          Racing Socks, L        Clothing           12.0   

       TotalSales  
0     189410.6112  
1     178059.2316  
2     176386.3740  
3     170812.8268  