In [None]:
mysql_args = {
    "uid" : "root",
    "pwd" : "xxxx",
    "hostname" : "localhost",
    "dbname" : "adventureworks_dm"
}

mongodb_args = {
    "user_name" : "bgv2",
    "password" : "xxxx",
    "cluster_name" : "Cluster0",
    "cluster_subnet" : "xxxx",
    "cluster_location" : "atlas",
    "db_name" : "adventureworks"
}

Connecting to SQL Database

In [14]:
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine, text
import pymongo
import certifi


conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}/{mysql_args['dbname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

Loading Currency Rates from CSV

In [None]:
curr_rate = pd.read_csv("datafiles/currencyrate.csv")
curr_rate.drop(columns=['ModifiedDate'], inplace=True)
curr_rate.to_sql("currencyrate", con=connection, if_exists='replace', index=False)

Unnamed: 0,CurrencyRateID,CurrencyRateDate,FromCurrencyCode,ToCurrencyCode,AverageRate,EndOfDayRate,ModifiedDate
0,1,2001-07-01 00:00:00,USD,ARS,1.0,1.0002,2001-07-01 00:00:00
1,2,2001-07-01 00:00:00,USD,AUD,1.5491,1.55,2001-07-01 00:00:00
2,3,2001-07-01 00:00:00,USD,BRL,1.9379,1.9419,2001-07-01 00:00:00
3,4,2001-07-01 00:00:00,USD,CAD,1.4641,1.4683,2001-07-01 00:00:00
4,5,2001-07-01 00:00:00,USD,CNY,8.2781,8.2784,2001-07-01 00:00:00


In [8]:
sql_create_dim_customers = """
create table if not exists dim_customers as
select * from adventureworks.dim_customers_vw;
"""

sql_create_dim_products = """
create table if not exists dim_products as
select * from adventureworks.dim_products_vw;
"""

sql_create_dim_vendors = """
create table if not exists dim_vendors as
select * from adventureworks.dim_vendors_vw;
"""

connection.execute(text(sql_create_dim_customers))
connection.execute(text(sql_create_dim_products))
connection.execute(text(sql_create_dim_vendors))

<sqlalchemy.engine.cursor.CursorResult at 0x11abfd8d0>

In [10]:
def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe

def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

Loading Data from MongoDB

In [18]:
client = get_mongo_client(**mongodb_args)
productreview = get_mongo_dataframe(client, mongodb_args['db_name'], 'productreview', {})
productreview.to_sql("productreview", con=connection, if_exists='replace', index=False)

4

Creating Fact Table

In [None]:

query1 = """

select
soh.SalesOrderID,
		soh.RevisionNumber,
		soh.OrderDate,
		soh.DueDate,
		soh.ShipDate,
		soh.Status,
		soh.OnlineOrderFlag,
		soh.SalesOrderNumber,
		soh.PurchaseOrderNumber,
		soh.AccountNumber,
		soh.CustomerID,
		soh.ContactID,
		soh.SalesPersonID,
        soh.BillToAddressID,
		soh.ShipToAddressID,
soh.CreditCardApprovalCode,
		soh.SubTotal,
		soh.TaxAmt,
		soh.Freight,
		soh.TotalDue,
		sod.CarrierTrackingNumber,
		sod.OrderQty,
		sod.ProductID,
		sod.UnitPrice,
		sod.LineTotal,
sod.CarrierTrackingNumber,
		sod.OrderQty,
		sod.ProductID,
		sod.UnitPrice,
		sod.LineTotal

from adventureworks.salesorderheader AS soh
left outer join adventureworks.salesorderdetail AS sod
on soh.SalesOrderID = sod.SalesOrderID
"""
sales = get_sql_dataframe(query1, **mysql_args)
sales["OrderDateKey"] = sales["OrderDate"].dt.strftime('%Y%m%d').astype(int)
sales["DueDateKey"] = sales["DueDate"].dt.strftime('%Y%m%d').astype(int)
sales["ShipDateKey"] = sales["ShipDate"].dt.strftime('%Y%m%d').astype(int)

fact_table_sales = sales[[
    "TotalDue",
    "OrderDateKey",
    "DueDateKey",
    "ShipDateKey",
    "CustomerID",
    "ProductID",
    "SalesOrderID",
    "LineTotal"
]]

fact_table_sales.to_sql("fact_sales", con=connection, if_exists='replace', index=False)