This project will utilize the MySQL database sakila, the MongoDB database mflix, and a randomly generated .csv file containing customer information to create a relational pandas dataframe, including a customer dimension table and a product (movie) dimension table, as well as a fact_orders table synthesizing information from both dimensional tables, which will be useful for post-hoc analysis of the most popular movies available for rent. All further information, as well as the source code for the sample databases I've used, can be found as part of the GitHub repository I have set up for my midterm project: ...

Note: since the sakila database uses fake movie names, I chose to join the movie dimension table (pulled from MongoDB) and the dimension tables pulled from sakila on the movie_key key from the MongoDB collection, so I'm not using movie information that lines us with the rest of the sakila database. As such, the movies in the datawarehouse won't line up with movies from sakila. Not a big deal because I'm not using the movies table from sakila for anything in this project, just wanted to note that in a real business environment the data would line up more cleanly, I just did the best I could with sample data.

Import necessary packages

In [1]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine
import json
import datetime
import pymongo

Declare + assign variables for interacting with MySQL and MongoDB collection, check to make sure the URL is usable

In [2]:
host_name = "cwp5xyj-mysql.mysql.database.azure.com"
port = "3306"
mysql_uid = "cwp5xyj"
mysql_pwd = "Azuresql1"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

src_dbname = "sakila"
dst_dbname = "sakila_datawarehouse"

atlas_cluster_name = "cluster0.ynxna8b"
atlas_user_name = "cwp5xyj"
atlas_password = "nMW0O0gwwFHpBjZT"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://cwp5xyj:nMW0O0gwwFHpBjZT@cluster0.ynxna8b.mongodb.net


Create database sakila_datawarehouse and run code to install functions

In [8]:
conn_str_sql = f"mysql+pymysql://{mysql_uid}:{mysql_pwd}@{host_name}"
sqlEngine = create_engine(conn_str_sql, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

def get_sql_dataframe(user_id, pwd, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@cwp5xyj-mysql.mysql.database.azure.com/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    client.close()
    return dframe


def set_dataframe(user_id, pwd, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@cwp5xyj-mysql.mysql.database.azure.com/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

Import .csv file and read into pandas table, check for successful integration

In [5]:
filepath = r"C:\Users\charl\Downloads\customers-1000.csv"
customers_table = pd.read_csv(filepath, skiprows=1, names=['customer_key', 'customer_id',
                                                          'first_name', 'last_name', 'company',
                                                          'city', 'country', 'phone1', 'phone2',
                                                          'email', 'subscription_date', 'website'])
customers_table.head(5)

Unnamed: 0,customer_key,customer_id,first_name,last_name,company,city,country,phone1,phone2,email,subscription_date,website
0,1,dE014d010c7ab0c,Andrew,Goodman,Stewart-Flynn,Rowlandberg,Macao,846-790-4623x4715,(422)787-2331x71127,marieyates@gomez-spencer.info,2021-07-26,http://www.shea.biz/
1,2,2B54172c8b65eC3,Alvin,Lane,"Terry, Proctor and Lawrence",Bethside,Papua New Guinea,124-597-8652x05682,321.441.0588x6218,alexandra86@mccoy.com,2021-06-24,http://www.pena-cole.com/
2,3,d794Dd48988d2ac,Jenna,Harding,Bailey Group,Moniquemouth,China,(335)987-3085x3780,001-680-204-8312,justincurtis@pierce.org,2020-04-05,http://www.booth-reese.biz/
3,4,3b3Aa4aCc68f3Be,Fernando,Ford,Moss-Maxwell,Leeborough,Macao,(047)752-3122,048.779.5035x9122,adeleon@hubbard.org,2020-11-29,http://www.hebert.com/
4,5,D60df62ad2ae41E,Kara,Woods,Mccarthy-Kelley,Port Jacksonland,Nepal,+1-360-693-4419x19272,163-627-2565,jesus90@roberson.info,2022-04-22,http://merritt.com/


Perform necessary transformations

In [6]:
drop_cols = ['customer_id','subscription_date', 'website']
customers_table.drop(drop_cols, axis=1, inplace=True)
customers_table.head(5)

Unnamed: 0,customer_key,first_name,last_name,company,city,country,phone1,phone2,email
0,1,Andrew,Goodman,Stewart-Flynn,Rowlandberg,Macao,846-790-4623x4715,(422)787-2331x71127,marieyates@gomez-spencer.info
1,2,Alvin,Lane,"Terry, Proctor and Lawrence",Bethside,Papua New Guinea,124-597-8652x05682,321.441.0588x6218,alexandra86@mccoy.com
2,3,Jenna,Harding,Bailey Group,Moniquemouth,China,(335)987-3085x3780,001-680-204-8312,justincurtis@pierce.org
3,4,Fernando,Ford,Moss-Maxwell,Leeborough,Macao,(047)752-3122,048.779.5035x9122,adeleon@hubbard.org
4,5,Kara,Woods,Mccarthy-Kelley,Port Jacksonland,Nepal,+1-360-693-4419x19272,163-627-2565,jesus90@roberson.info


Populate dim_customers from customers_table dataframe

In [9]:
dataframe = customers_table
table_name = 'dim_customers'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

Create and populate dim_movies table from the sample_mflix.movies table in MongoDB

In [10]:
query = {}
collection = "movies"
src_dbname_mongo = "sample_mflix"

df_movie_info = get_mongo_dataframe(conn_str['atlas'], src_dbname_mongo, collection, query)
df_movie_info.head(2)

Unnamed: 0,_id,plot,genres,runtime,rated,cast,num_mflix_comments,poster,title,fullplot,...,directors,writers,awards,lastupdated,year,imdb,countries,type,tomatoes,metacritic
0,573a1390f29313caabcd587d,"At 10 years old, Owens becomes a ragged orphan...","[Biography, Crime, Drama]",72.0,PASSED,"[John McCann, James A. Marcus, Maggie Weston, ...",1,https://m.media-amazon.com/images/M/MV5BNDkxZG...,Regeneration,"At 10 years old, Owens becomes a ragged orphan...",...,[Raoul Walsh],"[Owen Frawley Kildare (book), Raoul Walsh (ada...","{'wins': 1, 'nominations': 0, 'text': '1 win.'}",2015-08-14 01:28:18.957000000,1915,"{'rating': 6.8, 'votes': 626, 'id': 5960}",[USA],movie,"{'viewer': {'rating': 3.4, 'numReviews': 395, ...",
1,573a1390f29313caabcd5ea4,A District Attorney's outspoken stand on abort...,[Drama],62.0,APPROVED,"[Tyrone Power Sr., Helen Riaume, Marie Walcamp...",0,,Where Are My Children?,While prosecuting a physician for the death of...,...,"[Phillips Smalley, Lois Weber]","[Lucy Payton (from the story by), Franklin Hal...","{'wins': 1, 'nominations': 0, 'text': '1 win.'}",2015-09-07 00:51:32.560000000,1916,"{'rating': 5.9, 'votes': 247, 'id': 7558}",[USA],movie,"{'viewer': {'rating': 3.1, 'numReviews': 34, '...",


Get rid of columns I don't want

In [11]:
drop_cols = ['_id','num_mflix_comments', 'plot', 'runtime', 'fullplot', 'lastupdated','poster', 'languages',
            'writers', 'metacritic', 'year', 'type', 'cast', 'countries', 'directors', 'awards', 'tomatoes']
df_movie_info.drop(drop_cols, axis=1, inplace=True)
length = len(df_movie_info['title'])
df_movie_info.insert(loc=0, column='movie_key', value=range(1,length+1))
df_movie_info.head(2)


Unnamed: 0,movie_key,genres,rated,title,released,imdb
0,1,"[Biography, Crime, Drama]",PASSED,Regeneration,1915-09-13,"{'rating': 6.8, 'votes': 626, 'id': 5960}"
1,2,[Drama],APPROVED,Where Are My Children?,1916-05-01,"{'rating': 5.9, 'votes': 247, 'id': 7558}"


Conversions necessary to read the data into MySQL (eliminating lists, dictionaries, etc). Goal here is to unpack the important lists and dictionaries and translate them into something that doesn't break SQL syntax rules. I'm only concerned with genre and imdb rating, so I'm going to drop most of the columns because it takes unnecessary time and computing power. However, this general pattern (first two lines and fourth line for dictionaries, third line for lists) can be applied to any similar problems with trying to convert MongoDB NoSQL-type tables into something that can be used in SQL.

In [12]:
df_movie_info['imdb_rating'] = df_movie_info['imdb'].apply(lambda x: x['rating'])
df_movie_info['imdb_votes'] = df_movie_info['imdb'].apply(lambda x: x['votes'])
df_movie_info['genres'] = df_movie_info['genres'].str.join(', ')
df_movie_info.drop('imdb', axis=1, inplace=True)
df_movie_info.head(2)

Unnamed: 0,movie_key,genres,rated,title,released,imdb_rating,imdb_votes
0,1,"Biography, Crime, Drama",PASSED,Regeneration,1915-09-13,6.8,626
1,2,Drama,APPROVED,Where Are My Children?,1916-05-01,5.9,247


Create and populate dim_rentals table and dim_inventory table from sakila, perform useful transformations

In [13]:
sql_rentals = "SELECT * FROM sakila.rental;"
df_rentals = get_sql_dataframe(mysql_uid, mysql_pwd, src_dbname, sql_rentals)
df_rentals.rename(columns={"rental_id":"rental_key", "inventory_id":"inventory_key",
                          "customer_id":"customer_key"}, inplace=True)
rentals_drop_cols = ['staff_id', 'last_update']
df_rentals.drop(rentals_drop_cols, axis=1, inplace=True)

import datetime

df_rentals['rental_date'] = pd.to_datetime(df_rentals['rental_date']).dt.date
df_rentals['return_date'] = pd.to_datetime(df_rentals['return_date']).dt.date


df_rentals.head(2)


Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date
0,1,2005-05-24,367,130,2005-05-26
1,2,2005-05-24,1525,459,2005-05-28


In [14]:
sql_inventory = "SELECT * FROM sakila.inventory;"
df_inventory = get_sql_dataframe(mysql_uid, mysql_pwd, src_dbname, sql_inventory)
df_inventory.rename(columns={"inventory_id":"inventory_key", "film_id":"movie_key"},
                   inplace=True)
df_inventory.drop('last_update', axis=1, inplace=True)
df_inventory.head(2)

Unnamed: 0,inventory_key,movie_key,store_id
0,1,1,1
1,2,1,1


Create dim_date table (in MySQL, code is separately available in GitHub repo). Important that this is done at this point because if you run the SQL code and then rerun this whole script it'll delete the sakila_datawarehouse and the dim_date table will also be deleted and not recreated.

Put dimension tables into the sakila_datawarehouse SQL database

In [15]:
dataframe = df_movie_info
table_name = 'dim_movie_info'
primary_key = 'movie_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [16]:
dataframe = df_rentals
table_name = 'dim_rentals'
primary_key = 'rental_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [17]:
dataframe = df_inventory
table_name = 'dim_inventory'
primary_key = 'inventory_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

Create and populate fact_orders table, pulling exclusively from information already populated into the dimension tables (dimension tables should contain all necessary information from source material):

First, join together tables on common keys

In [18]:
df_rentals = pd.merge(df_rentals, df_inventory, on = "inventory_key", how = "inner")
df_rentals.head(5)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date,movie_key,store_id
0,1,2005-05-24,367,130,2005-05-26,80,1
1,1577,2005-06-16,367,327,2005-06-24,80,1
2,3584,2005-07-06,367,207,2005-07-13,80,1
3,10507,2005-08-01,367,45,2005-08-04,80,1
4,13641,2005-08-20,367,281,2005-08-26,80,1


In [19]:
df_rentals = pd.merge(df_rentals, customers_table, on = 'customer_key', how = 'inner')
df_rentals.head(5)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date,movie_key,store_id,first_name,last_name,company,city,country,phone1,phone2,email
0,1,2005-05-24,367,130,2005-05-26,80,1,Stacey,Travis,Medina-Castro,Dudleyfurt,Ethiopia,835-675-9702x438,+1-421-986-8630,ygarcia@andrade.com
1,1864,2005-06-17,1815,130,2005-06-24,394,1,Stacey,Travis,Medina-Castro,Dudleyfurt,Ethiopia,835-675-9702x438,+1-421-986-8630,ygarcia@andrade.com
2,746,2005-05-29,4272,130,2005-06-02,930,1,Stacey,Travis,Medina-Castro,Dudleyfurt,Ethiopia,835-675-9702x438,+1-421-986-8630,ygarcia@andrade.com
3,2535,2005-06-19,901,130,2005-06-28,200,2,Stacey,Travis,Medina-Castro,Dudleyfurt,Ethiopia,835-675-9702x438,+1-421-986-8630,ygarcia@andrade.com
4,14111,2005-08-21,1294,130,2005-08-22,285,2,Stacey,Travis,Medina-Castro,Dudleyfurt,Ethiopia,835-675-9702x438,+1-421-986-8630,ygarcia@andrade.com


In [20]:
df_rentals = pd.merge(df_rentals, df_movie_info, on = 'movie_key', how = 'inner')
df_rentals.head(5)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date,movie_key,store_id,first_name,last_name,company,...,country,phone1,phone2,email,genres,rated,title,released,imdb_rating,imdb_votes
0,1,2005-05-24,367,130,2005-05-26,80,1,Stacey,Travis,Medina-Castro,...,Ethiopia,835-675-9702x438,+1-421-986-8630,ygarcia@andrade.com,"Animation, Family, Comedy",,Steamboat Willie,1930-08-11,7.7,4230
1,1577,2005-06-16,367,327,2005-06-24,80,1,Bailey,Waters,Fox-Frey,...,Swaziland,(174)118-8288x93680,001-609-376-6347x05955,jillian22@wyatt-olsen.net,"Animation, Family, Comedy",,Steamboat Willie,1930-08-11,7.7,4230
2,3584,2005-07-06,367,207,2005-07-13,80,1,Adrienne,Hunter,Escobar-Cannon,...,Kazakhstan,952-008-0777x0963,248-271-9569x5820,lindsay75@levy-valentine.com,"Animation, Family, Comedy",,Steamboat Willie,1930-08-11,7.7,4230
3,10507,2005-08-01,367,45,2005-08-04,80,1,Ivan,Schroeder,"Peck, Nicholson and Knox",...,Saint Pierre and Miquelon,524-391-9866,(626)644-4777x075,qgeorge@singh.com,"Animation, Family, Comedy",,Steamboat Willie,1930-08-11,7.7,4230
4,13641,2005-08-20,367,281,2005-08-26,80,1,Carolyn,Hendrix,Jimenez Inc,...,Anguilla,784-550-5247,9246470346,huntercathy@henry.com,"Animation, Family, Comedy",,Steamboat Willie,1930-08-11,7.7,4230


Add date dimension lookup logic.

In [21]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_datawarehouse.dim_date;"
df_dim_date = get_sql_dataframe(mysql_uid, mysql_pwd, src_dbname, sql_dim_date)
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [22]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_orders = pd.merge(df_rentals, df_dim_rental_date, on='rental_date', how='left')
df_fact_orders.drop(['rental_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,rental_key,inventory_key,customer_key,return_date,movie_key,store_id,first_name,last_name,company,city,...,phone1,phone2,email,genres,rated,title,released,imdb_rating,imdb_votes,rental_date_key
0,1,367,130,2005-05-26,80,1,Stacey,Travis,Medina-Castro,Dudleyfurt,...,835-675-9702x438,+1-421-986-8630,ygarcia@andrade.com,"Animation, Family, Comedy",,Steamboat Willie,1930-08-11,7.7,4230,20050524
1,1577,367,327,2005-06-24,80,1,Bailey,Waters,Fox-Frey,South Gary,...,(174)118-8288x93680,001-609-376-6347x05955,jillian22@wyatt-olsen.net,"Animation, Family, Comedy",,Steamboat Willie,1930-08-11,7.7,4230,20050616


In [23]:
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_orders = pd.merge(df_fact_orders, df_dim_return_date, on='return_date', how='left')
df_fact_orders.drop(['return_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,rental_key,inventory_key,customer_key,movie_key,store_id,first_name,last_name,company,city,country,...,phone2,email,genres,rated,title,released,imdb_rating,imdb_votes,rental_date_key,return_date_key
0,1,367,130,80,1,Stacey,Travis,Medina-Castro,Dudleyfurt,Ethiopia,...,+1-421-986-8630,ygarcia@andrade.com,"Animation, Family, Comedy",,Steamboat Willie,1930-08-11,7.7,4230,20050524,20050526.0
1,1577,367,327,80,1,Bailey,Waters,Fox-Frey,South Gary,Swaziland,...,001-609-376-6347x05955,jillian22@wyatt-olsen.net,"Animation, Family, Comedy",,Steamboat Willie,1930-08-11,7.7,4230,20050616,20050624.0


Populate fact_orders table in sakila_datawarehouse

In [24]:
dataframe = df_fact_orders
table_name = 'fact_orders'
primary_key = 'rental_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

SQL queries to check for functionality

A couple basic select all queries to show that all three dimensional tables were created:

In [23]:
sql_dim_rentals = "SELECT * FROM sakila_datawarehouse.dim_rentals;"
df_dim_rentals = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_dim_rentals)
df_dim_rentals.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date
0,1,2005-05-24,367,130,2005-05-26
1,2,2005-05-24,1525,459,2005-05-28


In [24]:
sql_dim_customers = "SELECT * FROM sakila_datawarehouse.dim_customers;"
df_dim_customers = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_dim_customers)
df_dim_customers.head(2)

Unnamed: 0,customer_key,first_name,last_name,company,city,country,phone1,phone2,email
0,1,Andrew,Goodman,Stewart-Flynn,Rowlandberg,Macao,846-790-4623x4715,(422)787-2331x71127,marieyates@gomez-spencer.info
1,2,Alvin,Lane,"Terry, Proctor and Lawrence",Bethside,Papua New Guinea,124-597-8652x05682,321.441.0588x6218,alexandra86@mccoy.com


In [32]:
sql_dim_movie_info = "SELECT * FROM sakila_datawarehouse.dim_movie_info;"
df_dim_movie_info = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_dim_movie_info)
df_dim_movie_info.head(2)

Unnamed: 0,movie_key,genres,title,released,rated,imdb_rating,imdb_votes
0,1,Short,Blacksmith Scene,1893-05-09,UNRATED,6.2,1189
1,2,"Short, Western",The Great Train Robbery,1903-12-01,TV-G,7.4,9847


Test query customer_rentals to show potential for successful aggregation, grouping, etc.:

In [33]:
sql_customer_rentals = """
    SELECT customers.`last_name` AS `customer_last_name`,
        AVG(orders.`imdb_rating`) AS `avg_imdb_rating`,
        COUNT(orders.`title`) AS `total_movies`
    FROM `{0}`.`fact_orders` AS orders
    INNER JOIN `{0}`.`dim_customers` AS customers
    ON orders.customer_key = customers.customer_key
    GROUP BY customers.`last_name`
    ORDER BY total_movies DESC;
""".format(dst_dbname)
df_test = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_customer_rentals)

df_test.head(10)

Unnamed: 0,customer_last_name,avg_imdb_rating,total_movies
0,Waters,7.32659,173
1,Mclean,7.432231,121
2,Woods,7.367568,111
3,Goodman,7.268627,102
4,Ferguson,7.261386,101
5,Bradshaw,7.290426,94
6,Choi,7.273034,89
7,Hendrix,7.455056,89
8,Potts,7.151724,87
9,Crane,7.249412,85
