In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.30
Running PyMongo Version: 4.10.1


In [3]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Password123"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

In [4]:
mysql_args = {
    "uid" : "root",
    "pwd" : "Password123",
    "hostname" : "localhost",
    "dbname" : "sakila"
}

mongodb_args = {
    "user_name" : "yxy5dp",
    "password" : "no4d9lgBR3k1NUkZ",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "a2g8d",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila"
}

In [5]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

In [6]:
client = get_mongo_client(**mongodb_args)

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"inventory" : 'sakila_inventory.json',
             "rental" : 'sakila_rental.json',
             "payment" : 'sakila_payment.json'}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)    

In [7]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "inventory"

df_inventory = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_inventory.rename(columns={"id":"inventory_id"}, inplace=True)

df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


In [8]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "rental"

df_rental = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)

df_rental.rename(columns={"id":"rental_id"}, inplace=True)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [9]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "payment"

df_payment = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)

df_payment.rename(columns={"id":"payment_id"}, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


In [10]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [11]:
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'sakila_customer.csv')

df_customer = pd.read_csv(data_file)
df_customer.head()

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [12]:
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'sakila_store.csv')

df_store = pd.read_csv(data_file)
df_store.head()

Unnamed: 0,store_id,manager_staff_id,address_id,last_update
0,1,1,1,2006-02-15 04:57:12
1,2,2,2,2006-02-15 04:57:12


In [13]:
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'sakila_film.csv')

df_film = pd.read_csv(data_file)
df_film.head()

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42
2,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a ...,2006,1,,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15 05:03:42
3,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumb...,2006,1,,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15 05:03:42
4,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And ...,2006,1,,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15 05:03:42


In [14]:
df_customer.rename(columns={"id":"customer_id"}, inplace=True)
df_customer.insert(0, "customer_key", range(1, df_customer.shape[0]+1))
df_customer.head(2)

Unnamed: 0,customer_key,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [15]:
df_store.rename(columns={"id":"store_id"}, inplace=True)
df_store.insert(0, "store_key", range(1, df_store.shape[0]+1))
df_store.head(2)

Unnamed: 0,store_key,store_id,manager_staff_id,address_id,last_update
0,1,1,1,1,2006-02-15 04:57:12
1,2,2,2,2,2006-02-15 04:57:12


In [16]:
df_film.rename(columns={"id":"film_id"}, inplace=True)
df_film.insert(0, "film_key", range(1, df_film.shape[0]+1))
df_film.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [17]:
db_operation = "insert"

tables = [('dim_customer', df_customer, 'customer_key'),
          ('dim_store', df_store, 'store_key'),
          ('dim_film', df_film, 'film_key'),]

In [18]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [19]:
df_fact_orders = pd.merge(df_inventory, df_rental, on='inventory_id', how='right')
df_fact_orders.drop(['last_update_x', 'last_update_y', 'customer_id', 'staff_id'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,inventory_id,film_id,store_id,rental_id,rental_date,return_date
0,367,80.0,1.0,1,2005-05-24 22:53:30,2005-05-26 22:04:30
1,1525,,,2,2005-05-24 22:54:33,2005-05-28 19:40:33


In [20]:
df_fact_orders = pd.merge(df_fact_orders, df_payment, on='rental_id', how='right')
df_fact_orders.head(2)

Unnamed: 0,inventory_id,film_id,store_id,rental_id,rental_date,return_date,payment_id,customer_id,staff_id,amount,payment_date,last_update
0,3021.0,,,76,2005-05-25 11:30:37,2005-06-03 12:00:37,1,1,1,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,4020.0,,,573,2005-05-28 10:35:23,2005-06-03 06:32:23,2,1,1,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


In [21]:
sql_dim_customer = "SELECT customer_key, customer_id FROM sakila_dw.dim_customer;"
df_dim_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_customer)
df_dim_customer.head(2)

Unnamed: 0,customer_key,customer_id
0,1,1
1,2,2


In [22]:
sql_dim_store = "SELECT store_key, store_id FROM sakila_dw.dim_store;"
df_dim_store = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_store)
df_dim_store.head(2)

Unnamed: 0,store_key,store_id
0,1,1
1,2,2


In [23]:
sql_dim_film = "SELECT film_key, film_id FROM sakila_dw.dim_film;"
df_dim_film = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_film)
df_dim_film.head(2)

Unnamed: 0,film_key,film_id
0,1,1
1,2,2


In [24]:
df_fact_orders = pd.merge(df_fact_orders, df_dim_customer, on='customer_id', how='inner')
df_fact_orders.drop(['customer_id'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,inventory_id,film_id,store_id,rental_id,rental_date,return_date,payment_id,staff_id,amount,payment_date,last_update,customer_key
0,3021.0,,,76,2005-05-25 11:30:37,2005-06-03 12:00:37,1,1,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30,1
1,4020.0,,,573,2005-05-28 10:35:23,2005-06-03 06:32:23,2,1,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30,1


In [25]:
df_fact_orders = pd.merge(df_fact_orders, df_dim_store, on='store_id', how='inner')
df_fact_orders.drop(['store_id'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,inventory_id,film_id,rental_id,rental_date,return_date,payment_id,staff_id,amount,payment_date,last_update,customer_key,store_key
0,375.0,83.0,577,2005-05-28 11:09:14,2005-06-01 13:27:14,147,1,2.99,2005-05-28 11:09:14,2006-02-15 22:12:30,6,1
1,886.0,196.0,877,2005-05-30 05:48:59,2005-06-02 09:30:59,232,2,0.99,2005-05-30 05:48:59,2006-02-15 22:12:31,9,1


In [26]:
df_fact_orders = pd.merge(df_fact_orders, df_dim_film, on='film_id', how='inner')
df_fact_orders.drop(['film_id'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,inventory_id,rental_id,rental_date,return_date,payment_id,staff_id,amount,payment_date,last_update,customer_key,store_key,film_key
0,375.0,577,2005-05-28 11:09:14,2005-06-01 13:27:14,147,1,2.99,2005-05-28 11:09:14,2006-02-15 22:12:30,6,1,83
1,886.0,877,2005-05-30 05:48:59,2005-06-02 09:30:59,232,2,0.99,2005-05-30 05:48:59,2006-02-15 22:12:31,9,1,196


In [27]:
sql_dim_date = "SELECT date_key, full_date FROM sakila.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [28]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_orders.rental_date = df_fact_orders.rental_date.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_rental_date, on='rental_date', how='left')
df_fact_orders.drop(['rental_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,inventory_id,rental_id,return_date,payment_id,staff_id,amount,payment_date,last_update,customer_key,store_key,film_key,rental_date_key
0,375.0,577,2005-06-01 13:27:14,147,1,2.99,2005-05-28 11:09:14,2006-02-15 22:12:30,6,1,83,20050528
1,886.0,877,2005-06-02 09:30:59,232,2,0.99,2005-05-30 05:48:59,2006-02-15 22:12:31,9,1,196,20050530


In [29]:
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_orders.return_date = df_fact_orders.return_date.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_return_date, on='return_date', how='left')
df_fact_orders.drop(['return_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,inventory_id,rental_id,payment_id,staff_id,amount,payment_date,last_update,customer_key,store_key,film_key,rental_date_key,return_date_key
0,375.0,577,147,1,2.99,2005-05-28 11:09:14,2006-02-15 22:12:30,6,1,83,20050528,20050601
1,886.0,877,232,2,0.99,2005-05-30 05:48:59,2006-02-15 22:12:31,9,1,196,20050530,20050602


In [30]:
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_orders.payment_date = df_fact_orders.payment_date.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_payment_date, on='payment_date', how='left')
df_fact_orders.drop(['payment_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,inventory_id,rental_id,payment_id,staff_id,amount,last_update,customer_key,store_key,film_key,rental_date_key,return_date_key,payment_date_key
0,375.0,577,147,1,2.99,2006-02-15 22:12:30,6,1,83,20050528,20050601,20050528
1,886.0,877,232,2,0.99,2006-02-15 22:12:31,9,1,196,20050530,20050602,20050530


In [31]:
drop_columns = ['staff_id', 'inventory_id']
df_fact_orders.drop(drop_columns, axis=1, inplace=True)
df_fact_orders.insert(0, "fact_order_key", range(1, df_fact_orders.shape[0]+1))
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,rental_id,payment_id,amount,last_update,customer_key,store_key,film_key,rental_date_key,return_date_key,payment_date_key
0,1,577,147,2.99,2006-02-15 22:12:30,6,1,83,20050528,20050601,20050528
1,2,877,232,0.99,2006-02-15 22:12:31,9,1,196,20050530,20050602,20050530


In [32]:
table_name = "fact_orders"
primary_key = "fact_order_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, table_name, primary_key, db_operation)

In [33]:
sql_test = """
    SELECT customer.`last_name` AS `Customer Name`,
        SUM(orders.`amount`) AS `Total Amount`
    FROM `{0}`.`fact_orders` AS orders
    INNER JOIN `{0}`.`dim_customer` AS customer
    ON orders.customer_key = customer.customer_key
    GROUP BY customer.`last_name` 
    ORDER BY `Total Amount` DESC;
""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head()

Unnamed: 0,Customer Name,Total Amount
0,CLARK,7.98
1,ANDERSON,6.99
2,MARTINEZ,5.98
3,GARCIA,4.99
4,ROBINSON,4.98


In [69]:
sql_test2 = """
    SELECT film.`TITLE` AS `Title`,
        SUM(film.`rental_duration`) AS `Total Rental Time`
    FROM `{0}`.`fact_orders` AS orders
    INNER JOIN `{0}`.`dim_film` AS film
    ON orders.film_key = film.film_key
    GROUP BY film.`Title` 
    ORDER BY `Total Rental Time` DESC;
""".format(dst_dbname)

df_test2 = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test2)
df_test2.head()

Unnamed: 0,Title,Total Rental Time
0,BLUES INSTINCT,10.0
1,BREAKING HOME,8.0
2,CRUELTY UNFORGIVEN,7.0
3,CONTACT ANONYMOUS,7.0
4,BOOGIE AMELIE,6.0
