In [1]:
import os
import pandas as pd
from sqlalchemy import create_engine, text
import json
import numpy
import certifi
import pymongo

SQL Data Ingestion (creating fact tables from adventureworks)

In [2]:
# SQL setup
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "password"

src_dbname = "adventureworks"
dst_dbname = "adventureworks_dw"

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection)
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [None]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

# connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
# connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;"))
# connection.execute(text(f"USE {dst_dbname};"))

connection.close()

Run Dim Date and Adventureworks SQL information HERE! 

In [5]:
# tables of interest - salesorderdetail, salesorderheader, product, customer
sql_sod = "SELECT * FROM adventureworks.salesorderdetail;"
df_sod = get_dataframe(user_id, pwd, host_name, src_dbname, sql_sod)
drop = ['rowguid']
df_sod = df_sod.drop(drop, axis=1)
df_sod.head(2)

Unnamed: 0,SalesOrderID,SalesOrderDetailID,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,ModifiedDate
0,43659,1,4911-403C-98,1,776,1,2024.994,0.0,2024.994,2001-07-01
1,43659,2,4911-403C-98,3,777,1,2024.994,0.0,6074.982,2001-07-01


In [6]:
sql_soh = "SELECT * FROM adventureworks.salesorderheader;"
df_soh = get_dataframe(user_id, pwd, host_name, src_dbname, sql_soh)
drop  = ['rowguid', 'Comment']
df_soh = df_soh.drop(drop, axis=1)
df_soh.head(2)

Unnamed: 0,SalesOrderID,RevisionNumber,OrderDate,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,...,ShipToAddressID,ShipMethodID,CreditCardID,CreditCardApprovalCode,CurrencyRateID,SubTotal,TaxAmt,Freight,TotalDue,ModifiedDate
0,43659,1,2001-07-01,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,985,5,16281.0,105041Vi84182,,24643.9362,1971.5149,616.0984,27231.5495,2001-07-08
1,43660,1,2001-07-01,2001-07-13,2001-07-08,5,b'\x00',SO43660,PO18850127500,10-4020-000117,...,921,5,5618.0,115213Vi29411,,1553.1035,124.2483,38.8276,1716.1794,2001-07-08


In [7]:
sql_product = "SELECT * FROM adventureworks.product;"
df_product = get_dataframe(user_id, pwd, host_name, src_dbname, sql_product)
drop = ['rowguid']
df_product = df_product.drop(drop, axis=1)
df_product.head(2)

Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,ModifiedDate
0,1,Adjustable Race,AR-5381,b'\x00',b'\x00',,1000,750,0.0,0.0,...,0,,,,,,1998-06-01,NaT,,2004-03-11 10:01:36
1,2,Bearing Ball,BA-8327,b'\x00',b'\x00',,1000,750,0.0,0.0,...,0,,,,,,1998-06-01,NaT,,2004-03-11 10:01:36


In [8]:
sql_customer = "SELECT * FROM adventureworks.customer;"
df_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
df_customer = df_customer.drop(drop, axis=1)
df_customer.head(2)

Unnamed: 0,CustomerID,TerritoryID,AccountNumber,CustomerType,ModifiedDate
0,1,1,AW00000001,S,2004-10-13 11:15:07
1,2,1,AW00000002,S,2004-10-13 11:15:07


In [9]:
# get dim date to replace with date keys
sql_date = "SELECT date_key, full_date FROM adventureworks_dw.dim_date;"
df_date = get_dataframe(user_id, pwd, host_name, 'adventureworks_dw', sql_date)
df_date.head(2)

ProgrammingError: (pymysql.err.ProgrammingError) (1146, "Table 'adventureworks_dw.dim_date' doesn't exist")
[SQL: SELECT date_key, full_date FROM adventureworks_dw.dim_date;]
(Background on this error at: https://sqlalche.me/e/20/f405)

In [None]:
# altering Datetime entries for the dimdate table
date_md = df_date.rename(columns={'date_key': 'ModifiedDateKey', 'full_date': "ModifiedDate"})
df_sod.ModifiedDate =  df_sod.ModifiedDate.astype('datetime64[ns]').dt.date
df_sod = pd.merge(df_sod, date_md, on='ModifiedDate', how='left')
df_sod.drop(['ModifiedDate'], axis=1, inplace=True)
df_sod.head(2)

Unnamed: 0,SalesOrderID,SalesOrderDetailID,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,ModifiedDateKey
0,43659,1,4911-403C-98,1,776,1,2024.994,0.0,2024.994,20010701
1,43659,2,4911-403C-98,3,777,1,2024.994,0.0,6074.982,20010701


In [None]:
df_soh.OrderDate = df_soh.OrderDate.astype('datetime64[ns]').dt.date
date_od = df_date.rename(columns={'date_key':"OrderDateKey", "full_date":"OrderDate"})
df_soh = pd.merge(df_soh, date_od, on='OrderDate', how='left')

df_soh.ShipDate = df_soh.ShipDate.astype('datetime64[ns]').dt.date
date_sd = df_date.rename(columns= {'date_key':"ShipDateKey", "full_date":"ShipDate"})
df_soh = pd.merge(df_soh, date_sd, on='ShipDate', how='left')

df_soh.DueDate = df_soh.DueDate.astype('datetime64[ns]').dt.date
date_dd = df_date.rename(columns={'full_date':"DueDate", 'date_key':'DueDateKey'})
df_soh = pd.merge(df_soh, date_dd, on='DueDate', how='left')

df_soh.ModifiedDate = df_soh.ModifiedDate.astype('datetime64[ns]').dt.date
df_soh = pd.merge(df_soh, date_md, on='ModifiedDate', how='left')

df_soh.drop(['OrderDate', 'ShipDate', 'DueDate', 'ModifiedDate'], axis=1, inplace=True)

df_soh.head(2)

Unnamed: 0,SalesOrderID,RevisionNumber,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,ContactID,SalesPersonID,...,CreditCardApprovalCode,CurrencyRateID,SubTotal,TaxAmt,Freight,TotalDue,OrderDateKey,ShipDateKey,DueDateKey,ModifiedDateKey
0,43659,1,5,b'\x00',SO43659,PO522145787,10-4020-000676,676,378,279.0,...,105041Vi84182,,24643.9362,1971.5149,616.0984,27231.5495,20010701,20010708,20010713,20010708
1,43660,1,5,b'\x00',SO43660,PO18850127500,10-4020-000117,117,216,279.0,...,115213Vi29411,,1553.1035,124.2483,38.8276,1716.1794,20010701,20010708,20010713,20010708


In [None]:
df_product.SellStartDate = df_product.SellStartDate.astype('datetime64[ns]').dt.date
date_ssd = df_date.rename(columns={'full_date':"SellStartDate", 'date_key':"SellStartDateKey"})
df_product = pd.merge(df_product, date_ssd, on='SellStartDate', how='left')


df_product.SellEndDate = df_product.SellEndDate.astype('datetime64[ns]').dt.date
date_sed = df_date.rename(columns={'full_date' :'SellEndDate', 'date_key': "SellEndDateKey"})
df_product = pd.merge(df_product, date_sed, on='SellEndDate', how='left')

date_dd = df_date.rename(columns={'full_date':"DiscontinuedDate", "date_key":"DiscontinuedDateKey"})
df_product = pd.merge(df_product, date_dd, on='DiscontinuedDate', how='left')
df_product.head(2)

df_product.ModifiedDate = df_product.ModifiedDate.astype('datetime64[ns]').dt.date
df_product = pd.merge(df_product, date_md, on='ModifiedDate', how='left')

df_product.drop(['SellStartDate', 'SellEndDate', 'ModifiedDate', 'DiscontinuedDate'], axis=1, inplace=True)
df_product.head(2)

Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,DaysToManufacture,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDateKey,SellEndDateKey,DiscontinuedDateKey,ModifiedDateKey
0,1,Adjustable Race,AR-5381,b'\x00',b'\x00',,1000,750,0.0,0.0,...,0,,,,,,19980601,,,20040311
1,2,Bearing Ball,BA-8327,b'\x00',b'\x00',,1000,750,0.0,0.0,...,0,,,,,,19980601,,,20040311


In [None]:
df_customer.ModifiedDate = df_customer.ModifiedDate.astype('datetime64[ns]').dt.date
df_customer = pd.merge(df_customer, date_md, on='ModifiedDate', how='left')

df_customer.drop(['ModifiedDate'], axis=1, inplace=True)
df_customer.head(2)

Unnamed: 0,CustomerID,TerritoryID,AccountNumber,CustomerType,ModifiedDateKey
0,1,1,AW00000001,S,20041013
1,2,1,AW00000002,S,20041013


In [None]:
# insert customer and product tables as dimension tables into the database
set_dataframe(user_id, pwd, host_name, dst_dbname, df_customer, "dim_customers", "CustomerID", "insert")
set_dataframe(user_id, pwd, host_name, dst_dbname, df_product, "dim_product", "ProductID", "insert")

In [None]:
# merge df_soh and df_sod to create a fact table (one row for each part of a sales order that connects
# to the product and customer tables
df_fact = pd.merge(df_soh, df_sod, on="SalesOrderID", how="inner")
df_fact.insert(0, 'fact_sales_key', range(1, len(df_fact) + 1))
set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact, "fact_sales", "fact_sales_key", "insert")

Mongo Data Ingestion (Uploading data into mongo and ingesting)

In [None]:
mongodb_args = {
    "user_name" : "db_user",
    "password" : "pAssw0rd!",
    "cluster_name" : "ds2002",
    "cluster_subnet" : "ryhluic.mongodb.net/?appName=ds2002",
    "cluster_location" : "atlas",
    "db_name" : "adventureworks_sales"
}

In [None]:
def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

In [None]:
# ingest data into Atlas
client = get_mongo_client(**mongodb_args)


data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"reviews" :"adventureworks_product_reviews.json"}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)

In [None]:
# now use the data from atlas and create a reviews dim table
client = get_mongo_client(**mongodb_args)
df_reviews = get_mongo_dataframe(client, mongodb_args['db_name'], "reviews", {})
df_reviews.head(2)

Unnamed: 0,CustomerID,ProductID,Rating,Comment,DatePosted
0,18790,422,5,"Padding could be thicker, but overall decent.",2002-01-26 08:12:29
1,14011,386,5,"A bit heavy, but handles rough trails beautifu...",2003-02-13 15:44:41


In [None]:
# date posted
date_dp = df_date.rename(columns={"full_date" :"DatePosted", "date_key": "DatePostedKey"})
df_reviews.DatePosted = df_reviews.DatePosted.astype('datetime64[ns]').dt.date
df_reviews = pd.merge(df_reviews, date_dp, on="DatePosted", how="inner")
df_reviews.drop(['DatePosted'], inplace=True, axis=1)

df_reviews.insert(0, "ReviewID", range(1, len(df_reviews) + 1))
df_reviews.head(2)

Unnamed: 0,ReviewID,CustomerID,ProductID,Rating,Comment,DatePostedKey
0,1,18790,422,5,"Padding could be thicker, but overall decent.",20020126
1,2,14011,386,5,"A bit heavy, but handles rough trails beautifu...",20030213


In [None]:
# add into data warehouse
set_dataframe(user_id, pwd, host_name, dst_dbname, df_reviews, "dim_reviews", "ReviewID", "insert")

File System Data Ingestion

In [None]:
# download csv from data
df_holiday_sales = pd.read_csv("./data/adventureworks_sales_periods.csv")
df_holiday_sales.head(2)

Unnamed: 0,SalesPeriodName,StartDate,EndDate
0,Summer Sale (Jul 2001),2001-07-01 00:00:00,2001-07-12 00:00:00
1,Fall Promotion (Aug 2001),2001-08-25 00:00:00,2001-09-08 00:00:00


In [None]:
# cleaning
df_holiday_sales.StartDate = df_holiday_sales.StartDate.astype('datetime64[ns]').dt.date
df_sd = df_date.rename(columns={"full_date": "StartDate", "date_key" : "StartDateKey"})
df_holiday_sales = pd.merge(df_holiday_sales, df_sd, how="left", on="StartDate")

df_holiday_sales.EndDate = df_holiday_sales.EndDate.astype('datetime64[ns]').dt.date
df_ed = df_date.rename(columns={"full_date":"EndDate", "date_key":"EndDateKey"})
df_holiday_sales = pd.merge(df_holiday_sales, df_ed, on='EndDate', how='left')

df_holiday_sales.drop(['StartDate', 'EndDate'], axis=1, inplace=True)
df_holiday_sales.head(2)

In [None]:
df_holiday_sales.insert(0, "SalesTimeKey", range(1, len(df_holiday_sales) + 1))
df_holiday_sales.head(2)

ValueError: cannot insert SalesTimeKey, already exists

In [None]:
set_dataframe(user_id, pwd, host_name, dst_dbname, df_holiday_sales, "dim_sale_dates", "SalesTimeKey", "insert")

Findings using the Data Warehouse

In [None]:
# Accounts of the highest paying people
sql = """
    SELECT c.AccountNumber, SUM(LineTotal) as Total_Spending FROM adventureworks_dw.fact_sales as fs
    JOIN adventureworks_dw.dim_customers as c ON c.CustomerID = fs.CustomerID
    JOIN 
    (SELECT StartDateKey, EndDateKey FROM adventureworks_dw.dim_sale_dates
    WHERE SalesPeriodName = 'Summer Sale (Jul 2001)') as sale_period
    WHERE sale_period.StartDateKey <= OrderDateKey and OrderDateKey <= sale_period.EndDateKey
    GROUP BY fs.CustomerID
    ORDER BY Total_Spending DESC;
"""

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql)
df_test

Unnamed: 0,AccountNumber,Total_Spending
0,AW00000506,42813.4333
1,AW00000027,39373.7810
2,AW00000221,38510.8973
3,AW00000514,35944.1562
4,AW00000166,33997.3702
...,...,...
81,AW00014520,699.0982
82,AW00020042,699.0982
83,AW00025249,699.0982
84,AW00000510,419.4589


In [None]:
sql = """ 
SELECT p.Name, COUNT(fs.fact_sales_key) as orders, AVG(r.Rating) as Rating FROM adventureworks_dw.fact_sales as fs
JOIN adventureworks_dw.dim_reviews as r ON r.ProductID = fs.ProductID
JOIN adventureworks_dw.dim_product as p on p.ProductID = fs.ProductID
GROUP BY fs.ProductID
ORDER BY orders DESC;
"""
df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql)
df_test

Unnamed: 0,Name,orders,Rating
0,"Sport-100 Helmet, Red",6166,2.5000
1,"Sport-100 Helmet, Black",6014,2.0000
2,Touring Tire Tube,4464,2.6667
3,LL Mountain Tire,3448,3.2500
4,AWC Logo Cap,3382,3.0000
...,...,...,...
175,"LL Touring Frame - Yellow, 58",20,5.0000
176,"LL Mountain Frame - Black, 52",18,3.5000
177,"ML Mountain Frame-W - Silver, 38",10,3.0000
178,"LL Touring Frame - Blue, 58",6,1.6667
