# Midterm Project

Using MySQL and MongoDB databases to extract, transform and load data from the Chinook sample database. The Chinook data model represents a digital media store, including tables for artists, albums, media tracks, invoices and customers.

#### Importing the Necessary Libraries

In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 1.4.39
Running PyMongo Version: 4.6.2


#### Declaring & Assigning Connection Variables for the MongoDB Server, the MySQL Server & Databases

In [3]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"
mysql_hostname = "localhost"

atlas_cluster_name = "cluster_name.xxxxx"
atlas_user_name = ""
atlas_password = "password"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "Chinook"
dst_dbname = "Chinook_dw"

#### Defining Functions for Getting Data From and Setting Data Into Databases

In [4]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Populating MongoDB with Source Data

In [5]:
client = pymongo.MongoClient(conn_str["local"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"Chinook_customer" : 'Chinook_customer.json'}

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()            

### Creating and Populating the New Dimension Tables
#### Extracting Data from the Source MongoDB Collection

In [6]:
query = {}
collection = "Chinook_customer"

df_customers = get_mongo_dataframe(conn_str["local"], src_dbname, collection, query)
df_customers.head(5)

Unnamed: 0,CustomerId,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Fax,Email,SupportRepId
0,1,LuÃ­s,GonÃ§alves,Embraer - Empresa Brasileira de AeronÃ¡utica S.A.,"Av. Brigadeiro Faria Lima, 2170",SÃ£o JosÃ© dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,+55 (12) 3923-5566,luisg@embraer.com.br,3
1,2,Leonie,KÃ¶hler,,Theodor-Heuss-StraÃŸe 34,Stuttgart,,Germany,70174,+49 0711 2842222,,leonekohler@surfeu.de,5
2,3,FranÃ§ois,Tremblay,,1498 rue BÃ©langer,MontrÃ©al,QC,Canada,H2G 1A7,+1 (514) 721-4711,,ftremblay@gmail.com,3
3,4,BjÃ¸rn,Hansen,,UllevÃ¥lsveien 14,Oslo,,Norway,0171,+47 22 44 22 22,,bjorn.hansen@yahoo.no,4
4,5,FrantiÅ¡ek,WichterlovÃ¡,JetBrains s.r.o.,Klanova 9/506,Prague,,Czech Republic,14700,+420 2 4172 5555,+420 2 4172 5555,frantisekw@jetbrains.com,4


#### Extracting the invoice table from MySQL

In [7]:
sql_invoices = "SELECT `invoice`.`InvoiceId`, `invoice`.`CustomerId`,`invoice`.`InvoiceDate`,`invoice`.`BillingAddress`,`invoice`.`BillingCity`,`invoice`.`BillingState`,`invoice`.`BillingCountry`,`invoice`.`BillingPostalCode`,`invoice`.`Total`,`invoiceline`.`InvoiceLineId`,`invoiceline`.`TrackId`,`invoiceline`.`UnitPrice`,`invoiceline`.`Quantity` FROM chinook.invoice INNER JOIN invoiceline ON invoice.InvoiceId = invoiceline.InvoiceId;"
df_invoices = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, src_dbname, sql_invoices)
df_invoices.head()

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total,InvoiceLineId,TrackId,UnitPrice,Quantity
0,1,2,2021-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,1,2,0.99,1
1,1,2,2021-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,2,4,0.99,1
2,2,4,2021-01-02,Ullevålsveien 14,Oslo,,Norway,171,3.96,3,6,0.99,1
3,2,4,2021-01-02,Ullevålsveien 14,Oslo,,Norway,171,3.96,4,8,0.99,1
4,2,4,2021-01-02,Ullevålsveien 14,Oslo,,Norway,171,3.96,5,10,0.99,1


#### Extracting the track table from local device

In [8]:
df_track = pd.read_csv(r'C:\Users\ds2002-student\Desktop\DS-2002-main\Midterm-project\Chinook_track_genre.csv')
df_track.head()

Unnamed: 0,TrackId,track_name,AlbumId,MediaTypeId,GenreId,Composer,Milliseconds,Bytes,UnitPrice,genre_name
0,1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,Rock
1,2,Balls to the Wall,2,2,1,"U. Dirkschneider, W. Hoffmann, H. Frank, P. Ba...",342562,5510424,0.99,Rock
2,3,Fast As a Shark,3,2,1,"F. Baltes, S. Kaufman, U. Dirkscneider & W. Ho...",230619,3990994,0.99,Rock
3,4,Restless and Wild,3,2,1,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. D...",252051,4331779,0.99,Rock
4,5,Princess of the Dawn,3,2,1,Deaffy & R.A. Smith-Diesel,375418,6290521,0.99,Rock


#### Lookup the Invoice Date Keys from the Date Dimension Table.

##### Getting the Data from the Date Dimension Table.

In [9]:
sql_dim_date = "SELECT date_key, full_date FROM chinook.dim_date;"
df_dim_date = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20200101,2020-01-01
1,20200102,2020-01-02


##### Lookup the Surrogate Primary Key (date_key) that Corresponds to the InvoiceDate Column

In [10]:
df_dim_invoice_date = df_dim_date.rename(columns={"date_key" : "invoice_date_key", "full_date" : "InvoiceDate"})
df_invoices.InvoiceDate = df_invoices.InvoiceDate.astype('datetime64[ns]').dt.date
df_invoices = pd.merge(df_invoices, df_dim_invoice_date, on='InvoiceDate', how='left')
df_invoices.drop(['InvoiceDate'], axis=1, inplace=True)
df_invoices.head(2)

Unnamed: 0,InvoiceId,CustomerId,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total,InvoiceLineId,TrackId,UnitPrice,Quantity,invoice_date_key
0,1,2,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,1,2,0.99,1,20210101.0
1,1,2,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,2,4,0.99,1,20210101.0


#### Performing Any Necessary Transformations to the DataFrames

In [11]:
# Dropping extra columns
df_invoices.drop(['BillingAddress', 'BillingCity', 'BillingState', 'BillingPostalCode'], axis=1, inplace=True)

# Inserting a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_invoices.insert(0, "invoice_key", range(1, df_invoices.shape[0]+1))
df_invoices.head(2)

Unnamed: 0,invoice_key,InvoiceId,CustomerId,BillingCountry,Total,InvoiceLineId,TrackId,UnitPrice,Quantity,invoice_date_key
0,1,1,2,Germany,1.98,1,2,0.99,1,20210101.0
1,2,1,2,Germany,1.98,2,4,0.99,1,20210101.0


In [12]:
# Dropping extra columns
df_customers.drop(['Company', 'Address', 'City', 'State', 'PostalCode', 'Fax', 'Email'], axis=1, inplace=True)

# Inserting a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))
df_customers.head(2)

Unnamed: 0,customer_key,CustomerId,FirstName,LastName,Country,Phone,SupportRepId
0,1,1,LuÃ­s,GonÃ§alves,Brazil,+55 (12) 3923-5555,3
1,2,2,Leonie,KÃ¶hler,Germany,+49 0711 2842222,5


In [13]:
# Dropping extra columns
df_track.drop(['AlbumId', 'MediaTypeId', 'Milliseconds', 'Bytes'], axis=1, inplace=True)

# Inserting a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_track.insert(0, "track_key", range(1, df_track.shape[0]+1))
df_track.head(2)

Unnamed: 0,track_key,TrackId,track_name,GenreId,Composer,UnitPrice,genre_name
0,1,1,For Those About To Rock (We Salute You),1,"Angus Young, Malcolm Young, Brian Johnson",0.99,Rock
1,2,2,Balls to the Wall,1,"U. Dirkschneider, W. Hoffmann, H. Frank, P. Ba...",0.99,Rock


#### Loading the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [14]:
dataframe = df_invoices
table_name = 'dim_invoices'
primary_key = 'invoice_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [15]:
dataframe = df_customers
table_name = 'dim_customers'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [16]:
dataframe = df_track
table_name = 'dim_track'
primary_key = 'track_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validating that the New Dimension Tables were Created.

In [17]:
sql_invoices = "SELECT * FROM chinook_dw.dim_invoices;"
df_dim_invoices = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_invoices)
df_dim_invoices.head(2)

Unnamed: 0,invoice_key,InvoiceId,CustomerId,BillingCountry,Total,InvoiceLineId,TrackId,UnitPrice,Quantity,invoice_date_key
0,1,1,2,Germany,1.98,1,2,0.99,1,20210101.0
1,2,1,2,Germany,1.98,2,4,0.99,1,20210101.0


In [18]:
sql_customers = "SELECT * FROM chinook_dw.dim_customers;"
df_dim_customers = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_customers)
df_dim_customers.head(2)

Unnamed: 0,customer_key,CustomerId,FirstName,LastName,Country,Phone,SupportRepId
0,1,1,LuÃ­s,GonÃ§alves,Brazil,+55 (12) 3923-5555,3
1,2,2,Leonie,KÃ¶hler,Germany,+49 0711 2842222,5


In [19]:
sql_track = "SELECT * FROM chinook_dw.dim_track;"
df_dim_track = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_track)
df_dim_track.head(2)

Unnamed: 0,track_key,TrackId,track_name,GenreId,Composer,UnitPrice,genre_name
0,1,1,For Those About To Rock (We Salute You),1,"Angus Young, Malcolm Young, Brian Johnson",0.99,Rock
1,2,2,Balls to the Wall,1,"U. Dirkschneider, W. Hoffmann, H. Frank, P. Ba...",0.99,Rock


### Creating a Fact Orders table to merge customer and order information

In [20]:
# Merging the customer and invoice tables
df_fact_orders = pd.merge(df_dim_customers, df_dim_invoices, on='CustomerId', how='left')
df_fact_orders.drop(['CustomerId'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,customer_key,FirstName,LastName,Country,Phone,SupportRepId,invoice_key,InvoiceId,BillingCountry,Total,InvoiceLineId,TrackId,UnitPrice,Quantity,invoice_date_key
0,1,LuÃ­s,GonÃ§alves,Brazil,+55 (12) 3923-5555,3,531,98,Brazil,3.98,531,3247,1.99,1,20220311.0
1,1,LuÃ­s,GonÃ§alves,Brazil,+55 (12) 3923-5555,3,532,98,Brazil,3.98,532,3248,1.99,1,20220311.0


In [30]:
# Adding the track table to fact orders
df_fact_orders = pd.merge(df_fact_orders, df_dim_track, on='TrackId', how='left')
df_fact_orders.drop(['TrackId'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,customer_key,FirstName,LastName,Country,invoice_key,Total,UnitPrice,Quantity,invoice_date_key,track_key,genre_name
0,1,1,LuÃ­s,GonÃ§alves,Brazil,531,3.98,1.99,1,20220311.0,,
1,2,1,LuÃ­s,GonÃ§alves,Brazil,532,3.98,1.99,1,20220311.0,,


#### Performing Any Necessary Transformations to the fact orders DataFrame

In [33]:
df_fact_orders.insert(0, "fact_order_key", range(1, df_fact_orders.shape[0]+1))

# Renaming the Foreign Key Columns
column_name_map = {"UnitPrice_x" : "UnitPrice"}

df_fact_orders.rename(columns=column_name_map, inplace=True)

# Reordering the Columns
ordered_columns = ['fact_order_key','customer_key','FirstName','LastName','Country','invoice_key','Total', 'UnitPrice','Quantity','track_key','genre_name','invoice_date_key']

df_fact_orders = df_fact_orders[ordered_columns]
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,customer_key,FirstName,LastName,Country,invoice_key,Total,UnitPrice,Quantity,track_key,genre_name,invoice_date_key
0,1,1,LuÃ­s,GonÃ§alves,Brazil,531,3.98,1.99,1,,,20220311.0
1,2,1,LuÃ­s,GonÃ§alves,Brazil,532,3.98,1.99,1,,,20220311.0


#### Loading Newly Transformed Data into the chinook_dw Data Warehouse

In [34]:
dataframe = df_fact_orders
table_name = 'fact_orders'
primary_key = 'fact_order_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validating that the New Fact Table was Created

In [35]:
sql_fact_orders = "SELECT * FROM chinook_dw.fact_orders;"
df_fact_orders = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_fact_orders)
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,customer_key,FirstName,LastName,Country,invoice_key,Total,UnitPrice,Quantity,track_key,genre_name,invoice_date_key
0,1,1,LuÃ­s,GonÃ§alves,Brazil,531,3.98,1.99,1,,,20220311.0
1,2,1,LuÃ­s,GonÃ§alves,Brazil,532,3.98,1.99,1,,,20220311.0


### Authoring a Query that Returns the Order Data

In [42]:
# Writing a SELECT statement summarizing how the amount and diversity of purchases varies across sales in different countries
sql_fact_orders = """
    SELECT Country, sum(Quantity) AS Total_quantity, sum(Total) AS Total_price, count(genre_name) AS number_of_genres
	FROM chinook_dw.fact_orders
	GROUP BY Country
	ORDER BY Total_price DESC;

"""

In [43]:
df_fact_orders = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_fact_orders)
df_fact_orders

Unnamed: 0,Country,Total_quantity,Total_price,number_of_genres
0,USA,494.0,4667.06,106
1,Canada,304.0,2689.96,94
2,France,190.0,1722.1,58
3,Brazil,190.0,1677.1,74
4,Germany,152.0,1392.48,50
5,United Kingdom,114.0,1003.86,33
6,Czech Republic,76.0,879.24,12
7,Portugal,76.0,687.24,19
8,India,74.0,667.28,25
9,Hungary,38.0,446.62,6
