In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text
import sys
print(sys.executable)

/opt/anaconda3/envs/pysparkenv/bin/python


In [2]:
#Passing args for mysql and mongodb.
mysql_args = {
    "uid" : "root",
    "pwd" : "mango4Pickle#",
    "hostname" : "localhost",
    "dbname" : "adventureworks"
}


mongodb_args = {
    "user_name" : "arya_rajesh",
    "password" : "mango4Pickle#",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "wsz4m57",
    "cluster_location" : "atlas", # "local"
    "db_name" : "adventureworks"
}

In [3]:
# Defining MongoDB get and set functions.

def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

In [4]:
# defining SQL get and set functions.
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_sql_datamart_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()
    


In [5]:
# Dimension tables: dim_products and dim_vendors and dim_date
# fact_purchaseorders fact table
# purchase orders through MongoDB, products through csv, vendor and dim_date through mysql

In [6]:
# read products csv and create df

data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'adventureworks_products.csv')
df_products = pd.read_csv(data_file, header=0, index_col=0)
df_products.rename(columns={"ProductID":"product_key"}, inplace=True)
df_products.head(6)

Unnamed: 0,product_key,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
1,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,0.0,...,0,,,,,,,6/1/98 0:00,,
2,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,0.0,...,0,,,,,,,6/1/98 0:00,,
3,3,BB Ball Bearing,BE-2349,1,0,,800,600,0.0,0.0,...,1,,,,,,,6/1/98 0:00,,
4,4,Headset Ball Bearings,BE-2908,0,0,,800,600,0.0,0.0,...,0,,,,,,,6/1/98 0:00,,
5,316,Blade,BL-2036,1,0,,800,600,0.0,0.0,...,1,,,,,,,6/1/98 0:00,,
6,317,LL Crankarm,CA-5965,0,0,Black,500,375,0.0,0.0,...,0,,L,,,,,6/1/98 0:00,,


In [7]:
# create purchase_orders in MongoDB and create df
client = get_mongo_client(**mongodb_args)

data_dir = os.path.join(os.getcwd(), 'data')
#concatenating a string to get current working directory
json_files = {"purchase_orders" : 'adventureworks_purchaseorders.json'}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)   

query = {} 
collection = "purchase_orders"
df_purchaseorders = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_purchaseorders.rename(columns={"VendorID":"vendor_key","ProductID":"product_key" }, inplace=True)

print(df_purchaseorders.head(2))

   PurchaseOrderID  RevisionNumber  Status  EmployeeID  vendor_key  \
0                1               0       4         244          83   
1                2               0       1         231          32   

   product_key  OrderQty  UnitPrice  LineTotal            OrderDate  ...  \
0            1         4    50.2600   201.0400  2001-05-17 00:00:00  ...   
1          360         3    45.5805   136.7415  2001-05-17 00:00:00  ...   

  ShipRate             ShipDate  SubTotal   TaxAmt  Freight  TotalDue  \
0     2.99  2001-05-26 00:00:00  201.0400  16.0832   5.0260  222.1492   
1     1.49  2001-05-26 00:00:00  272.1015  21.7681   6.8025  300.6721   

               DueDate  ReceivedQty RejectedQty  StockedQty  
0  2001-05-31 00:00:00          3.0         0.0         3.0  
1  2001-05-31 00:00:00          3.0         0.0         3.0  

[2 rows x 22 columns]


In [8]:
# get vendors from MySQL and create df
sql_vendor = "SELECT * FROM adventureworks.dim_vendors_vw;"
df_vendor = get_sql_dataframe(sql_vendor, **mysql_args)
df_vendor.rename(columns={"VendorID":"vendor_key"}, inplace=True)
df_vendor.head(2)

Unnamed: 0,vendor_key,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode
0,1,INTERNAT0001,International,1,b'\x01',b'\x01',Main Office,683 Larch Ct.,,Salt Lake City,UT,Utah,84101
1,2,ELECTRON0002,Electronic Bike Repair & Supplies,1,b'\x01',b'\x01',Main Office,8547 Catherine Way,,Tacoma,WA,Washington,98403


In [9]:
# creating dim_date df
sql_dim_date = "SELECT * FROM adventureworks.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [10]:
# merging purchaseorders and vendor dfs
df_fact_purchaseorders = pd.merge(df_purchaseorders, df_vendor, on='vendor_key', how='inner')
df_fact_purchaseorders.rename(columns={"Name":"vendor_name"}, inplace=True)
df_fact_purchaseorders.drop(['EmployeeID'], axis=1, inplace=True)
df_fact_purchaseorders.head(2)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,vendor_key,product_key,OrderQty,UnitPrice,LineTotal,OrderDate,ShipMethod,...,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode
0,1,0,4,83,1,4,50.26,201.04,2001-05-17 00:00:00,OVERSEAS - DELUXE,...,1,b'\x01',b'\x01',Main Office,4405 Balboa Court,,Santa Cruz,CA,California,95062
1,2,0,1,32,360,3,45.5805,136.7415,2001-05-17 00:00:00,CARGO TRANSPORT 5,...,1,b'\x01',b'\x01',Main Office,7995 Edwards Ave.,,Lynnwood,WA,Washington,98036


In [11]:
# merging purchaseorders and product dfs
df_fact_purchaseorders = pd.merge(df_fact_purchaseorders, df_products, on='product_key', how='inner')
df_fact_purchaseorders.rename(columns={"Name":"product_name"}, inplace=True)
df_fact_purchaseorders.head(2)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,vendor_key,product_key,OrderQty,UnitPrice,LineTotal,OrderDate,ShipMethod,...,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
0,1,0,4,83,1,4,50.26,201.04,2001-05-17 00:00:00,OVERSEAS - DELUXE,...,0,,,,,,,6/1/98 0:00,,
1,2,0,1,32,360,3,45.5805,136.7415,2001-05-17 00:00:00,CARGO TRANSPORT 5,...,0,,,,,,,6/1/98 0:00,,


In [12]:
#merging purchaseorders and dim_date
df_fact_purchaseorders['order_date'] = pd.to_datetime(df_fact_purchaseorders['OrderDate']).astype('datetime64[ns]').dt.date
df_dim_order_date = df_dim_date.rename(columns={"date_key" : "order_date_key", "full_date" : "order_date"})
df_fact_purchaseorders.drop(['OrderDate'], axis=1, inplace=True)
df_fact_purchaseorders = pd.merge(df_fact_purchaseorders, df_dim_order_date, on='order_date', how='left')
df_fact_purchaseorders.head(2)

#Deleting Columns and Ordering Fact Table
df_fact_purchaseorders.drop(columns=['RevisionNumber','ShipBase','CreditRating','PreferredVendorStatus','MakeFlag',
 'FinishedGoodsFlag','Color','SafetyStockLevel','ReorderPoint','Size','SizeUnitMeasureCode','WeightUnitMeasureCode',
                                     'Weight','DaysToManufacture','ProductLine','Class','Style' ], axis=1, inplace=True)

desired_order = ['PurchaseOrderID',
  'order_date',
 'Status',
 'vendor_key',       
 'product_key', 
'order_date_key',
 'ProductNumber',
 'ProductCategory',
 'ProductSubcategory',
 'ProductModel',
 'OrderQty',
 'UnitPrice',
 'LineTotal',
 'ShipMethod',
 'ShipRate',
 'ShipDate',
 'SubTotal',
 'TaxAmt',
 'Freight',
 'TotalDue',
 'DueDate',
 'ReceivedQty',
 'RejectedQty',
 'StockedQty',
 'AccountNumber',
 'StandardCost',
 'ListPrice',
 'SellStartDate',
 'SellEndDate',
 'DiscontinuedDate',
]

In [13]:
# Order and check columns
df_fact_purchaseorders = df_fact_purchaseorders[desired_order]
list(df_fact_purchaseorders)
df_fact_purchaseorders.head(2)

Unnamed: 0,PurchaseOrderID,order_date,Status,vendor_key,product_key,order_date_key,ProductNumber,ProductCategory,ProductSubcategory,ProductModel,...,DueDate,ReceivedQty,RejectedQty,StockedQty,AccountNumber,StandardCost,ListPrice,SellStartDate,SellEndDate,DiscontinuedDate
0,1,2001-05-17,4,83,1,20010517,AR-5381,,,,...,2001-05-31 00:00:00,3.0,0.0,3.0,LITWARE0001,0.0,0.0,6/1/98 0:00,,
1,2,2001-05-17,1,32,360,20010517,HJ-1220,,,,...,2001-05-31 00:00:00,3.0,0.0,3.0,ADVANCED0001,0.0,0.0,6/1/98 0:00,,


In [14]:
df_fact_purchaseorders.insert(0, 'purchaseorder_key', range(1, len(df_fact_purchaseorders) + 1))

In [15]:
# Create new destination database (data mart)

mysql_datamart_args = {
    "uid" : "root",
    "pwd" : "mango4Pickle#",
    "hostname" : "localhost",
    "dbname" : "adventureworks_datamart"
}

conn_str = f"mysql+pymysql://{mysql_datamart_args['uid']}:{mysql_datamart_args['pwd']}@{mysql_datamart_args['hostname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{mysql_datamart_args['dbname']}`;"))
connection.execute(text(f"CREATE DATABASE `{mysql_datamart_args['dbname']}`;"))
connection.execute(text(f"USE {mysql_datamart_args['dbname']};"))

connection.close()

In [16]:
# Insert tables into data mart
db_operation = "insert"

tables = [('dim_vendor', df_vendor, 'vendor_key'),
          ('dim_products', df_products, 'product_key'),
          ('fact_purchaseorders', df_fact_purchaseorders, 'purchaseorder_key'),
         ('dim_date', df_dim_date, 'date_key')]
for table_name, dataframe, primary_key in tables:
    set_sql_datamart_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_datamart_args)


In [17]:
df_fact_purchaseorders.head(2)

Unnamed: 0,purchaseorder_key,PurchaseOrderID,order_date,Status,vendor_key,product_key,order_date_key,ProductNumber,ProductCategory,ProductSubcategory,...,DueDate,ReceivedQty,RejectedQty,StockedQty,AccountNumber,StandardCost,ListPrice,SellStartDate,SellEndDate,DiscontinuedDate
0,1,1,2001-05-17,4,83,1,20010517,AR-5381,,,...,2001-05-31 00:00:00,3.0,0.0,3.0,LITWARE0001,0.0,0.0,6/1/98 0:00,,
1,2,2,2001-05-17,1,32,360,20010517,HJ-1220,,,...,2001-05-31 00:00:00,3.0,0.0,3.0,ADVANCED0001,0.0,0.0,6/1/98 0:00,,


In [18]:
#Create analysis queries:

sql_fact_purchaseorders = """
SELECT 
dt.full_date, p.productnumber, v.accountnumber, po.purchaseorderid, po.order_date
FROM fact_purchaseorders AS po
INNER JOIN dim_vendor AS v ON v.vendor_key = po.vendor_key
INNER JOIN dim_products AS p ON p.product_key = po.product_key
INNER JOIN dim_date as dt ON dt.date_key = po.order_date_key
"""
df_datamart_purchaseorders = get_sql_dataframe(sql_fact_purchaseorders, **mysql_datamart_args)
df_datamart_purchaseorders.head(2)

Unnamed: 0,full_date,productnumber,accountnumber,purchaseorderid,order_date
0,2001-05-17,AR-5381,LITWARE0001,1,2001-05-17
1,2001-05-17,HJ-1220,ADVANCED0001,2,2001-05-17


In [19]:
#Query 1: Showing total amount due per vendor for each product
sql_fact_purchaseorders = """
SELECT 
v.name AS 'Vendor Name', 
p.productnumber AS 'Product Number',
SUM(po.totaldue) AS 'Total Due Amount'
FROM fact_purchaseorders AS po
INNER JOIN dim_vendor AS v ON v.vendor_key = po.vendor_key
INNER JOIN dim_products AS p ON p.product_key = po.product_key
INNER JOIN dim_date as dt ON dt.date_key = po.order_date_key
GROUP BY v.vendor_key, p.productnumber
ORDER BY v.name, p.productnumber
"""
df_datamart_purchaseorders = get_sql_dataframe(sql_fact_purchaseorders, **mysql_datamart_args)
df_datamart_purchaseorders.head(10)

Unnamed: 0,Vendor Name,Product Number,Total Due Amount
0,Advanced Bicycles,HJ-1213,1228.6401
1,Advanced Bicycles,HJ-1220,1228.6401
2,Advanced Bicycles,HJ-1420,1546.7758
3,Advanced Bicycles,HJ-1428,1546.7758
4,Advanced Bicycles,HJ-3410,1546.7758
5,Advanced Bicycles,HJ-3416,1546.7758
6,Advanced Bicycles,HJ-3816,1277.0872
7,Advanced Bicycles,HJ-3824,1277.0872
8,Advanced Bicycles,HJ-5161,1277.0872
9,Advanced Bicycles,HJ-5162,1509.6709


In [20]:
# Query 2: By vendor, showing quarterly total amount due:

sql_fact_purchaseorders = """
SELECT 
v.name AS 'Vendor Name', 
dt.calendar_year_qtr AS 'Year Quarter',
SUM(po.totaldue) AS 'Total Due Amount'
FROM fact_purchaseorders AS po
INNER JOIN dim_vendor AS v ON v.vendor_key = po.vendor_key
INNER JOIN dim_date as dt ON dt.date_key = po.order_date_key
GROUP BY v.vendor_key, dt.calendar_year_qtr
ORDER BY v.name, dt.calendar_year_qtr
"""
df_datamart_purchaseorders = get_sql_dataframe(sql_fact_purchaseorders, **mysql_datamart_args)
df_datamart_purchaseorders.head(10)

Unnamed: 0,Vendor Name,Year Quarter,Total Due Amount
0,Advanced Bicycles,2001Q2,601.3442
1,Advanced Bicycles,2002Q1,2475.2312
2,Advanced Bicycles,2002Q3,1297.9716
3,Advanced Bicycles,2002Q4,3326.2045
4,Advanced Bicycles,2003Q2,521.3468
5,Advanced Bicycles,2003Q3,10634.388
6,Allenson Cycles,2001Q2,9776.2665
7,Allenson Cycles,2002Q1,9776.2665
8,Allenson Cycles,2002Q3,9776.2665
9,Allenson Cycles,2002Q4,9776.2665
