**To setup everything, first run the AvdentureWorks_MySQL script to create and populate the adventureworks database**

In [2]:
mysql_args = {
    "uid" : "root",
    "pwd" : "asdf", 
    "hostname" : "localhost",
    "dbname" : "adventureworks"
}

dstdb_args = {
    "uid" : "root",
    "pwd" : "asdf", 
    "hostname" : "localhost",
    "dbname" : "adventureworks_dw"
}

mongodb_args = {
    "user_name" : "username",
    "password" : "asdf", 
    "cluster_name" : "cluster0",
    "cluster_subnet" : "n1yd4dj",
    "cluster_location" : "atlas", # "local"
    "db_name" : "adventureworks"
}

In [3]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [4]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

def create_database(**args):
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/"
    engine = create_engine(conn_str)
    with engine.connect() as conn:
        conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {args['dbname']}"))
    engine.dispose()

def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

# create adventure works data warehouse
create_database(**dstdb_args)

***Extract Tables From MySQL into DataFrames***

In [6]:
# need to pick which tables to use
# tables_df = get_sql_dataframe("SHOW TABLES;", **mysql_args)
# print(tables_df.to_string(index=False))

**Run the lab2c script to create the date dimension**

In [8]:
# retrieve customer dimension
sql_dim_customers = get_sql_dataframe("SELECT * FROM customer;", **mysql_args)
sql_dim_customers.head(2)

Unnamed: 0,CustomerID,TerritoryID,AccountNumber,CustomerType,rowguid,ModifiedDate
0,1,1,AW00000001,S,b'^\xe9Z?}\xb8\xedJ\x95\xb4\xc3yz\xfc\xb7O',2004-10-13 11:15:07
1,2,1,AW00000002,S,b'W\xf6R\xe5\xaf\xa9}J\xa6E\xc4)\xd6\xe0$\x91',2004-10-13 11:15:07


In [9]:
# retrieve product dimension
sql_dim_products = get_sql_dataframe("SELECT * FROM product;", **mysql_args)
sql_dim_products.head(2)

Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,rowguid,ModifiedDate
0,1,Adjustable Race,AR-5381,b'\x00',b'\x00',,1000,750,0.0,0.0,...,,,,,,1998-06-01,NaT,,b'\xb7\x15Bi\xf7\x08\rL\xac\xb1\xd74\xbaD\xc0\...,2004-03-11 10:01:36
1,2,Bearing Ball,BA-8327,b'\x00',b'\x00',,1000,750,0.0,0.0,...,,,,,,1998-06-01,NaT,,b' <\xaeX:OIG\xa7\xd4\xd5h\x80l\xc57',2004-03-11 10:01:36


In [10]:
# retrieve date dimension
sql_dim_date = "SELECT date_key, full_date FROM adventureworks_dw.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **dstdb_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


***Convert dataframes to mongodb and csv database***

In [12]:
# output path
data_dir = os.path.join(os.getcwd(), "data")
os.makedirs(data_dir, exist_ok=True)

# export products data frame to a csv
csv_product_path = os.path.join(data_dir, "products.csv")
sql_dim_products.to_csv(csv_product_path, index=False)

# export customer data frame to a mongodb database
client = get_mongo_client(**mongodb_args)
db = client[mongodb_args["db_name"]]
collection = db["customer"]
customer_records = sql_dim_customers.to_dict(orient="records")
collection.drop()
collection.insert_many(customer_records)
client.close()

# json_product_path = os.path.join(data_dir, "products.json")
# sql_products.to_json(json_product_path, orient="records")
# client = get_mongo_client(**mongodb_args)
# json_files = {"product": "products.json"}
# set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)

***import the data from mongodb and csv***

In [14]:
# import data from mongodb
client = get_mongo_client(**mongodb_args)
df_dim_customer = get_mongo_dataframe(client, mongodb_args["db_name"], "customer", {})
df_dim_customer.head(2)

Unnamed: 0,CustomerID,TerritoryID,AccountNumber,CustomerType,rowguid,ModifiedDate
0,1,1,AW00000001,S,b'^\xe9Z?}\xb8\xedJ\x95\xb4\xc3yz\xfc\xb7O',2004-10-13 11:15:07
1,2,1,AW00000002,S,b'W\xf6R\xe5\xaf\xa9}J\xa6E\xc4)\xd6\xe0$\x91',2004-10-13 11:15:07


In [15]:
# import data from csv
df_dim_product = pd.read_csv(csv_product_path)
df_dim_product.head(2)

Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,rowguid,ModifiedDate
0,1,Adjustable Race,AR-5381,b'\x00',b'\x00',,1000,750,0.0,0.0,...,,,,,,1998-06-01,,,b'\xb7\x15Bi\xf7\x08\rL\xac\xb1\xd74\xbaD\xc0\...,2004-03-11 10:01:36
1,2,Bearing Ball,BA-8327,b'\x00',b'\x00',,1000,750,0.0,0.0,...,,,,,,1998-06-01,,,b' <\xaeX:OIG\xa7\xd4\xd5h\x80l\xc57',2004-03-11 10:01:36


***Transforming and cleaning data***

In [17]:
df_dim_customer = df_dim_customer[[
    "CustomerID",
    "AccountNumber",
    "CustomerType",
    "TerritoryID"
]]
set_dataframe(df_dim_customer, "dim_customer", "CustomerID", "insert", **dstdb_args)
df_dim_customer.head(2)

Unnamed: 0,CustomerID,AccountNumber,CustomerType,TerritoryID
0,1,AW00000001,S,1
1,2,AW00000002,S,1


In [18]:
df_dim_product= df_dim_product[[
    "ProductID",
    "Name",
    "ProductNumber",
    "StandardCost",
    "ListPrice",
    "ProductSubcategoryID",
    "ProductModelID",
    "SellStartDate",
    "SellEndDate"
]]
set_dataframe(df_dim_product, "dim_product", "ProductID", "insert", **dstdb_args)
df_dim_product.head(2)

Unnamed: 0,ProductID,Name,ProductNumber,StandardCost,ListPrice,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate
0,1,Adjustable Race,AR-5381,0.0,0.0,,,1998-06-01,
1,2,Bearing Ball,BA-8327,0.0,0.0,,,1998-06-01,


***CReate fact table***

In [20]:
sql_fact_sales = """
SELECT 
    sod.SalesOrderDetailID AS fact_sales_key,
    soh.SalesOrderID AS sales_order_id,
    soh.CustomerID AS CustomerID,
    sod.ProductID AS ProductID,
    soh.SalesPersonID AS sales_person_id,
    soh.ShipMethodID AS ship_method_id,
    soh.OrderDate AS order_date,
    sod.OrderQty AS quantity,
    sod.UnitPrice AS unit_price,
    sod.UnitPriceDiscount AS discount,
    sod.LineTotal AS line_total,
    soh.SubTotal AS order_subtotal,
    soh.TaxAmt AS tax_amount,
    soh.Freight AS freight,
    soh.TotalDue AS total_due
FROM SalesOrderHeader soh
JOIN SalesOrderDetail sod
    ON soh.SalesOrderID = sod.SalesOrderID;
"""

df_fact_sales = get_sql_dataframe(sql_fact_sales, **mysql_args)
df_fact_sales.head(2)

Unnamed: 0,fact_sales_key,sales_order_id,CustomerID,ProductID,sales_person_id,ship_method_id,order_date,quantity,unit_price,discount,line_total,order_subtotal,tax_amount,freight,total_due
0,1,43659,676,776,279.0,5,2001-07-01,1,2024.994,0.0,2024.994,24643.9362,1971.5149,616.0984,27231.5495
1,2,43659,676,777,279.0,5,2001-07-01,3,2024.994,0.0,6074.982,24643.9362,1971.5149,616.0984,27231.5495


***Integrate dimensions into fact table***

In [22]:
# surrogate key for customer dim
df_fact_sales = df_fact_sales.merge(
    df_dim_customer,
    how="left",
    on="CustomerID"
)

# surrogate key for product dim
df_fact_sales = df_fact_sales.merge(
    df_dim_product,
    how="left",
    on="ProductID"
)

df_fact_sales.rename(columns={
    "CustomerID": "customer_key",
    "ProductID": "product_key"
}, inplace=True)

# surrogate key for date dim
df_fact_sales["order_date"] = pd.to_datetime(df_fact_sales["order_date"]).dt.date
df_fact_sales = df_fact_sales.merge(
    df_dim_date,
    how="left",
    left_on="order_date",
    right_on="full_date"
)

df_fact_sales.rename(columns={"date_key": "order_date_key"}, inplace=True)
df_fact_sales.drop(columns=["order_date", "full_date"], inplace=True)

# reorder columns so that order_date_key is the 5th column
cols = list(df_fact_sales.columns)
if "order_date_key" in cols:
    cols.insert(4, cols.pop(cols.index("order_date_key")))
    df_fact_sales = df_fact_sales[cols]

# create destination database
set_dataframe(df_dim_customer, "dim_customer", "CustomerID", "insert", **dstdb_args)
set_dataframe(df_dim_product, "dim_product", "ProductID", "insert", **dstdb_args)
set_dataframe(df_fact_sales, "fact_sales", "fact_sales_key", "insert", **dstdb_args)
df_fact_sales.head(2)

Unnamed: 0,fact_sales_key,sales_order_id,customer_key,product_key,order_date_key,sales_person_id,ship_method_id,quantity,unit_price,discount,...,CustomerType,TerritoryID,Name,ProductNumber,StandardCost,ListPrice,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate
0,1,43659,676,776,20010701,279.0,5,1,2024.994,0.0,...,S,5,"Mountain-100 Black, 42",BK-M82B-42,1898.0944,3374.99,1.0,19.0,2001-07-01,2002-06-30
1,2,43659,676,777,20010701,279.0,5,3,2024.994,0.0,...,S,5,"Mountain-100 Black, 44",BK-M82B-44,1898.0944,3374.99,1.0,19.0,2001-07-01,2002-06-30


***SQL Queries***

In [24]:
query = """
SELECT 
    dc.AccountNumber,
    dp.Name AS ProductName,
    SUM(fs.line_total) AS TotalSales,
    AVG(fs.unit_price) AS AvgUnitPrice
FROM fact_sales fs
JOIN dim_customer dc ON fs.customer_key = dc.CustomerID
JOIN dim_product dp ON fs.product_key = dp.ProductID
GROUP BY dc.AccountNumber, dp.Name
ORDER BY AccountNumber, TotalSales DESC
LIMIT 10;
"""
df_query = get_sql_dataframe(query, **dstdb_args)
df_query

Unnamed: 0,AccountNumber,ProductName,TotalSales,AvgUnitPrice
0,AW00000001,"Road-150 Red, 56",17175.696,2146.962
1,AW00000001,"Road-150 Red, 44",6440.886,2146.962
2,AW00000001,"Road-450 Red, 48",6123.558,874.794
3,AW00000001,"Road-450 Red, 52",5248.764,874.794
4,AW00000001,"Road-450 Red, 44",4373.97,874.794
5,AW00000001,"Road-450 Red, 58",4373.97,874.794
6,AW00000001,"Road-150 Red, 52",4293.924,2146.962
7,AW00000001,"Road-150 Red, 62",4293.924,2146.962
8,AW00000001,"Road-650 Red, 58",2936.2123,419.4589
9,AW00000001,"Road-650 Red, 48",2936.2123,419.4589


In [25]:
query2 = """
SELECT 
    dd.full_date AS OrderDate,
    dc.CustomerType,
    dp.Name AS ProductName,
    SUM(fs.line_total) AS TotalSales,
    AVG(fs.unit_price) AS AvgUnitPrice,
    COUNT(fs.sales_order_id) AS NumOrders
FROM fact_sales fs
JOIN dim_customer dc ON fs.customer_key = dc.CustomerID
JOIN dim_product dp ON fs.product_key = dp.ProductID
JOIN dim_date dd ON fs.order_date_key = dd.date_key
GROUP BY dd.full_date, dc.CustomerType, dp.Name
ORDER BY dd.full_date ASC, TotalSales DESC
LIMIT 15;
"""

df_query2 = get_sql_dataframe(query2, **dstdb_args)
df_query2

Unnamed: 0,OrderDate,CustomerType,ProductName,TotalSales,AvgUnitPrice,NumOrders
0,2001-07-01,S,"Mountain-100 Black, 44",46574.862,2024.994,7
1,2001-07-01,S,"Mountain-100 Black, 38",44549.868,2024.994,7
2,2001-07-01,S,"Mountain-100 Black, 48",40499.88,2024.994,9
3,2001-07-01,S,"Road-450 Red, 52",40240.524,874.794,16
4,2001-07-01,S,"Mountain-100 Black, 42",32399.904,2024.994,8
5,2001-07-01,S,"Road-150 Red, 56",30057.468,2146.962,10
6,2001-07-01,S,"Mountain-100 Silver, 44",28559.916,2039.994,9
7,2001-07-01,S,"Road-450 Red, 58",23619.438,874.794,11
8,2001-07-01,S,"Mountain-100 Silver, 38",20399.94,2039.994,4
9,2001-07-01,S,"Road-650 Red, 44",18456.1916,419.4589,14
