# DS 2002 Midterm Project 1
## Creating a Data Warehouse Using Data from 3 Different Sources
- Author: Andrew Otwell
- Computing Id: gft3fr
- Goal: The goal of this project is to demonstrate (1) an understanding of and (2) competence creating and implementing basic data science systems such as pipelines, scripts, data transformations, APIs, databases and cloud services.

### Prerequisites:
This notebook uses the PyMongo database connectivity library to connect to MySQL databases; therefore, you must have first installed that libary into your python environment by executing the following command in a Terminal window.

- `python -m pip install pymongo[srv]`

MAKE SURE TO RUN **chinook_create_database.sql** AND YOUR RESPECTIVE **dim_date** SETUP FILE BEFORE RUNNING THIS NOTEBOOK

#### Import the Necessary Libraries

In [32]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text, create_engine

In [33]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.43
Running PyMongo Version: 4.15.3


#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [34]:
mysql_args_dw2 = {
    "uid" : "root",
    "pwd" : "",
    "hostname" : "localhost",
    "dbname" : "chinook_dw2"
}

mysql_args = {
    "uid" : "root",
    "pwd" : "",
    "hostname" : "localhost",
    "dbname" : "chinook"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "",
    "password" : "",
    "cluster_name" : "",
    "cluster_subnet" : "",
    "cluster_location" : "atlas", # "local"
    "db_name" : "chinook"
}

In [36]:
conn_str = f"mysql+pymysql://root:pwd!@localhost"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

with sqlEngine.connect() as conn:
    conn.execute(text("DROP DATABASE IF EXISTS `chinook_dw2`;"))
    conn.execute(text("CREATE DATABASE `chinook_dw2`;"))
    conn.execute(text("USE chinook_dw2;"))

In [37]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Define Functions for Getting Data From and Setting Data Into Databases

#### Populate MongoDB with Chinook Data
You only need to run this cell once; however, the operation is *idempotent*.  In other words, it can be run multiple times without changing the end result.

In [38]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"customers" : 'customers.json',
              "employees" : 'employees.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)       

### 1.0 Use MongoDB to create dimension tables: dim_customers and dim_employees
#### 1.1 Extract Data From MongoDB

In [39]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "customers"

df_customers = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customers.head(2)

Unnamed: 0,CustomerId,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Fax,Email,SupportRepId
0,1,Luís,Gonçalves,Embraer - Empresa Brasileira de Aeronáutica S.A.,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,+55 (12) 3923-5566,luisg@embraer.com.br,3
1,2,Leonie,Köhler,,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,+49 0711 2842222,,leonekohler@surfeu.de,5


In [40]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "employees"

df_employees = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_employees.head(2)

Unnamed: 0,EmployeeId,LastName,FirstName,Title,ReportsTo,BirthDate,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
0,1,Adams,Andrew,General Manager,0,1962-02-18 00:00:00,2002-08-14 00:00:00,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
1,2,Edwards,Nancy,Sales Manager,1,1958-12-08 00:00:00,2002-05-01 00:00:00,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com


#### 1.2 DataFrame Transformations

In [41]:
# 1. Rename the "id" column to reflect the entity as it will serve as the business key for lookup operations
df_customers.rename(columns={"CustomerId":"customer_key","FirstName":"customer_FirstName","LastName":"customer_LastName"}, inplace=True)

# 2. Since there are no values in the 'fax' column go ahead and drop it.
df_customers.drop(['Fax'], axis=1, inplace=True)
df_customers.head(2)

Unnamed: 0,customer_key,customer_FirstName,customer_LastName,Company,Address,City,State,Country,PostalCode,Phone,Email,SupportRepId
0,1,Luís,Gonçalves,Embraer - Empresa Brasileira de Aeronáutica S.A.,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,luisg@embraer.com.br,3
1,2,Leonie,Köhler,,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,+49 0711 2842222,leonekohler@surfeu.de,5


In [42]:
# 1. Rename the "id" column to reflect the entity as it will serve as the business key for lookup operations
df_employees.rename(columns={"EmployeeId":"employee_key","FirstName":"employee_FirstName","LastName":"employee_LastName", "ReportsTo" : "ReportsTo_employee_key"}, inplace=True)

# 2. Since there are no values in the 'fax' column go ahead and drop it.
df_employees.drop(['BirthDate','HireDate'], axis=1, inplace=True)
df_customers.head(2)

Unnamed: 0,customer_key,customer_FirstName,customer_LastName,Company,Address,City,State,Country,PostalCode,Phone,Email,SupportRepId
0,1,Luís,Gonçalves,Embraer - Empresa Brasileira de Aeronáutica S.A.,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,luisg@embraer.com.br,3
1,2,Leonie,Köhler,,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,+49 0711 2842222,leonekohler@surfeu.de,5


#### 1.3 Load Transformed DataFrames Into the New Data Warehouse by Creating New Tables

In [43]:
dataframe = df_customers
table_name = 'dim_customers'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args_dw2)

In [44]:
dataframe = df_employees
table_name = 'dim_employees'
primary_key = 'employee_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args_dw2)

#### 1.4 Validate New Dimension Tables Were Created

In [45]:
sql_customers = "SELECT * FROM chinook_dw2.dim_customers"
df_dim_customers = get_sql_dataframe(sql_customers, **mysql_args_dw2)
df_dim_customers.head(2)

Unnamed: 0,customer_key,customer_FirstName,customer_LastName,Company,Address,City,State,Country,PostalCode,Phone,Email,SupportRepId
0,1,Luís,Gonçalves,Embraer - Empresa Brasileira de Aeronáutica S.A.,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,luisg@embraer.com.br,3
1,2,Leonie,Köhler,,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,+49 0711 2842222,leonekohler@surfeu.de,5


In [46]:
sql_employees = "SELECT * FROM chinook_dw2.dim_employees"
df_dim_employees = get_sql_dataframe(sql_employees, **mysql_args_dw2)
df_dim_employees.head(2)

Unnamed: 0,employee_key,employee_LastName,employee_FirstName,Title,ReportsTo_employee_key,Address,City,State,Country,PostalCode,Phone,Fax,Email
0,1,Adams,Andrew,General Manager,0,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
1,2,Edwards,Nancy,Sales Manager,1,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com


### 2.0 Create & Populate dim_tracks (Another Dimension Table) and fact_invoices (fact_table)
#### 2.1 dim_tracks
Query the **chinook** database to fill a dataframe for each of the source tables needed to create the dim_tracks dimension table (track, album, artist, genre, and mediatype). These dataframes are joined using the merge() method of Pandas. The dataframe is then written back to the MySQL server to create and populate the new respective dimension table.

In [47]:
sql_tracks = "SELECT * FROM chinook.track"
df_tracks = get_sql_dataframe(sql_tracks, **mysql_args)
df_tracks.rename(columns = {"TrackId" : "track_key", "Name" : "track_name", "AlbumId" : "album_key", "MediaTypeId" : "mediatype_key", "GenreId" : "genre_key"}, inplace=True)
df_tracks.head(2)

Unnamed: 0,track_key,track_name,album_key,mediatype_key,genre_key,Composer,Milliseconds,Bytes,UnitPrice
0,1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99
1,2,Balls to the Wall,2,2,1,,342562,5510424,0.99


In [48]:
sql_albums = "SELECT * FROM chinook.album"
df_albums = get_sql_dataframe(sql_albums, **mysql_args)
df_albums.rename(columns = {"AlbumId" : "album_key", "Title" : "album_title", "ArtistId" : "artist_key"}, inplace=True)
df_albums.head(2)

Unnamed: 0,album_key,album_title,artist_key
0,1,For Those About To Rock We Salute You,1
1,2,Balls to the Wall,2


In [49]:
sql_artists = "SELECT * FROM chinook.artist"
df_artists = get_sql_dataframe(sql_artists, **mysql_args)
df_artists.rename(columns = {"ArtistId" : "artist_key" , "Name" : "artist_name"}, inplace=True)
df_artists.head(2)

Unnamed: 0,artist_key,artist_name
0,1,AC/DC
1,2,Accept


In [50]:
sql_genres = "SELECT * FROM chinook.genre"
df_genres = get_sql_dataframe(sql_genres, **mysql_args)
df_genres.rename(columns = {"GenreId" : "genre_key" , "Name" : "genre"}, inplace=True)
df_genres.head(2)

Unnamed: 0,genre_key,genre
0,1,Rock
1,2,Jazz


In [51]:
sql_mediatypes = "SELECT * FROM chinook.mediatype"
df_mediatypes = get_sql_dataframe(sql_mediatypes, **mysql_args)
df_mediatypes.rename(columns = {"MediaTypeId" : "mediatype_key" , "Name" : "mediatype"}, inplace=True)
df_mediatypes.head(2)

Unnamed: 0,mediatype_key,mediatype
0,1,MPEG audio file
1,2,Protected AAC audio file


##### 2.1.2 Get Album Title and Artist Id

In [52]:
df_tracks = pd.merge(df_tracks, df_albums, on='album_key', how='left')
df_tracks.head(2)

Unnamed: 0,track_key,track_name,album_key,mediatype_key,genre_key,Composer,Milliseconds,Bytes,UnitPrice,album_title,artist_key
0,1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,For Those About To Rock We Salute You,1
1,2,Balls to the Wall,2,2,1,,342562,5510424,0.99,Balls to the Wall,2


##### 2.1.3 Get Artist Name

In [53]:
df_tracks = pd.merge(df_tracks, df_artists, on='artist_key', how='left')
df_tracks.head(2)

Unnamed: 0,track_key,track_name,album_key,mediatype_key,genre_key,Composer,Milliseconds,Bytes,UnitPrice,album_title,artist_key,artist_name
0,1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,For Those About To Rock We Salute You,1,AC/DC
1,2,Balls to the Wall,2,2,1,,342562,5510424,0.99,Balls to the Wall,2,Accept


##### 2.1.4 Get Media Type

In [54]:
df_tracks = pd.merge(df_tracks, df_mediatypes, on='mediatype_key', how='left')
df_tracks.head(2)

Unnamed: 0,track_key,track_name,album_key,mediatype_key,genre_key,Composer,Milliseconds,Bytes,UnitPrice,album_title,artist_key,artist_name,mediatype
0,1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,For Those About To Rock We Salute You,1,AC/DC,MPEG audio file
1,2,Balls to the Wall,2,2,1,,342562,5510424,0.99,Balls to the Wall,2,Accept,Protected AAC audio file


##### 2.1.5 Get Genre

In [55]:
df_tracks = pd.merge(df_tracks, df_genres, on='genre_key', how='left')
df_tracks.head(2)

Unnamed: 0,track_key,track_name,album_key,mediatype_key,genre_key,Composer,Milliseconds,Bytes,UnitPrice,album_title,artist_key,artist_name,mediatype,genre
0,1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,For Those About To Rock We Salute You,1,AC/DC,MPEG audio file,Rock
1,2,Balls to the Wall,2,2,1,,342562,5510424,0.99,Balls to the Wall,2,Accept,Protected AAC audio file,Rock


##### 2.1.6 Drop and Reorder Columns

In [56]:
drop_columns = ['album_key','mediatype_key','genre_key','artist_key']
df_tracks.drop(drop_columns, axis=1, inplace=True)

df_tracks = df_tracks[['track_key', 'track_name', 'album_title', 'artist_name', 'Composer','genre','Milliseconds','Bytes','UnitPrice','mediatype']]

df_tracks.head(2)

Unnamed: 0,track_key,track_name,album_title,artist_name,Composer,genre,Milliseconds,Bytes,UnitPrice,mediatype
0,1,For Those About To Rock (We Salute You),For Those About To Rock We Salute You,AC/DC,"Angus Young, Malcolm Young, Brian Johnson",Rock,343719,11170334,0.99,MPEG audio file
1,2,Balls to the Wall,Balls to the Wall,Accept,,Rock,342562,5510424,0.99,Protected AAC audio file


##### 2.1.7 Write DataFrame Back to MySQL Database

In [58]:
table_name = "dim_tracks"
dataframe = df_tracks
primary_key = "track_key"
db_operation = "insert"
set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args_dw2)

##### 2.1.8 Verify The Write Executed Correctly

In [59]:
sql_dim_tracks = "SELECT * FROM chinook_dw2.dim_tracks;"
df_dim_tracks = get_sql_dataframe(sql_dim_tracks, **mysql_args_dw2)
df_dim_tracks.head(2)

Unnamed: 0,track_key,track_name,album_title,artist_name,Composer,genre,Milliseconds,Bytes,UnitPrice,mediatype
0,1,For Those About To Rock (We Salute You),For Those About To Rock We Salute You,AC/DC,"Angus Young, Malcolm Young, Brian Johnson",Rock,343719,11170334,0.99,MPEG audio file
1,2,Balls to the Wall,Balls to the Wall,Accept,,Rock,342562,5510424,0.99,Protected AAC audio file


#### 2.2 fact_invoices
The same method we just used to create **dim_tracks** will now be used to create the **fact_invoices** table. This requires **invoice** and **invoiceline** from the **chinook** database.
##### 2.2.1 Get Data From Each Chinook Table

In [60]:
sql_invoices = "SELECT * FROM chinook.invoice;"
df_invoices = get_sql_dataframe(sql_invoices, **mysql_args_dw2)
df_invoices.rename(columns = {"InvoiceId":"invoice_key","CustomerId":"customer_key"}, inplace = True)
df_invoices.head(2)

Unnamed: 0,invoice_key,customer_key,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total
0,1,2,2009-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98
1,2,4,2009-01-02,Ullevålsveien 14,Oslo,,Norway,171,3.96


In [61]:
sql_invoicelines = "SELECT * FROM chinook.invoiceline;"
df_invoicelines = get_sql_dataframe(sql_invoicelines, **mysql_args_dw2)
df_invoicelines.rename(columns = {"InvoiceLineId":"fact_invoice_key","InvoiceId":"invoice_key", "TrackId":"track_key"}, inplace = True)
df_invoicelines.head(2)

Unnamed: 0,fact_invoice_key,invoice_key,track_key,UnitPrice,Quantity
0,1,1,2,0.99,1
1,2,1,4,0.99,1


##### 2.2.2 Merge Tables

In [62]:
df_invoices = pd.merge(df_invoices, df_invoicelines, on='invoice_key', how='right')
df_invoices.head(2)

Unnamed: 0,invoice_key,customer_key,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total,fact_invoice_key,track_key,UnitPrice,Quantity
0,1,2,2009-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,1,2,0.99,1
1,1,2,2009-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,2,4,0.99,1


In [63]:
df_invoices = df_invoices[['fact_invoice_key','invoice_key','customer_key', 'track_key', 'InvoiceDate', 'BillingAddress',
'BillingCity','BillingState','BillingCountry','BillingPostalCode','UnitPrice','Quantity', 'Total']]
df_invoices.head(2)

Unnamed: 0,fact_invoice_key,invoice_key,customer_key,track_key,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,UnitPrice,Quantity,Total
0,1,1,2,2,2009-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,0.99,1,1.98
1,2,1,2,4,2009-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,0.99,1,1.98


##### 2.2.3 Write DataFrame Back to MySQL Database

In [64]:
dataframe = df_invoices
table_name = "fact_invoices"
primary_key = "fact_invoice_key"
db_operation = "insert"
set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args_dw2)

##### 2.2.4 Verify the Write Executed Correctly

In [65]:
sql_invoices = "SELECT * FROM chinook_dw2.fact_invoices;"
df_fact_invoices = get_sql_dataframe(sql_invoices, **mysql_args_dw2)
df_fact_invoices.head(2)

Unnamed: 0,fact_invoice_key,invoice_key,customer_key,track_key,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,UnitPrice,Quantity,Total
0,1,1,2,2,2009-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,0.99,1,1.98
1,2,1,2,4,2009-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,0.99,1,1.98


### 3.0 Load dim_date From Local JSON Files
The last table needed in our data warehouse is the date dimension table. The table was created in SQL using code from Lab2C.sql and exported to a csv.
#### 3.1 Load dim_date From Local File System

In [67]:
file_path = "/Users/user/Documents/DS-2002/data/dim_date.json"
df_dim_date = pd.read_json(file_path)
df_dim_date.head()

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
2,20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
3,20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
4,20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


#### 3.2 Integrate Date Dimension in fact_invoices
Now that we have dim_date, we need to replace the **InvoiceDate** column in **fact_invoices** with the date_key from dim_date.

##### 3.2.1 Merge Tables

In [68]:
df_invoices.rename(columns={"InvoiceDate":"full_date"}, inplace=True)
df_dim_date['full_date'] = df_dim_date['full_date'].astype('datetime64[ns]')
df_invoices = pd.merge(df_invoices, df_dim_date, on='full_date', how='inner')
df_invoices.head(2)

Unnamed: 0,fact_invoice_key,invoice_key,customer_key,track_key,full_date,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,1,1,2,2,2009-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,...,N,1,2009,2009-01,2009Q1,7,3,2009,2009-07,2009Q3
1,2,1,2,4,2009-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,...,N,1,2009,2009-01,2009Q1,7,3,2009,2009-07,2009Q3


In [69]:
drop_columns = ['date_name',
 'date_name_us',
 'date_name_eu',
 'day_of_week',
 'day_name_of_week',
 'day_of_month',
 'day_of_year',
 'weekday_weekend',
 'week_of_year',
 'month_name',
 'month_of_year',
 'is_last_day_of_month',
 'calendar_quarter',
 'calendar_year',
 'calendar_year_month',
 'calendar_year_qtr',
 'fiscal_month_of_year',
 'fiscal_quarter',
 'fiscal_year',
 'fiscal_year_month',
 'fiscal_year_qtr']
df_invoices.drop(drop_columns, axis=1, inplace=True)

In [70]:
df_invoices = df_invoices[['fact_invoice_key','invoice_key','customer_key', 'track_key', 'full_date','date_key',
                           'BillingAddress', 'BillingCity','BillingState','BillingCountry','BillingPostalCode','UnitPrice','Quantity', 'Total']]
df_invoices.head(2)

Unnamed: 0,fact_invoice_key,invoice_key,customer_key,track_key,full_date,date_key,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,UnitPrice,Quantity,Total
0,1,1,2,2,2009-01-01,20090101,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,0.99,1,1.98
1,2,1,2,4,2009-01-01,20090101,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,0.99,1,1.98


In [71]:
df_invoices.drop('full_date', axis=1, inplace=True)
df_invoices.head(2)

Unnamed: 0,fact_invoice_key,invoice_key,customer_key,track_key,date_key,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,UnitPrice,Quantity,Total
0,1,1,2,2,20090101,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,0.99,1,1.98
1,2,1,2,4,20090101,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,0.99,1,1.98


#### 3.3 Write DataFrame Back to MySQL Database

In [74]:
dataframe = df_dim_date
table_name = "dim_date"
primary_key = "date_key"
db_operation = "insert"
set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args_dw2)

##### 3.3.1 Verify the Write Executed Correctly

In [75]:
sql_dates = "SELECT * FROM chinook_dw2.dim_date;"
df_dates = get_sql_dataframe(sql_dates, **mysql_args_dw2)
df_dates.head(2)

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


##### 3.3.2 Drop fact_invoices Table and Insert New fact_invoices DataFrame

In [77]:
with sqlEngine.connect() as conn:
    conn.execute(text("DROP TABLE IF EXISTS chinook_dw2.fact_invoices;"))

In [78]:
dataframe = df_invoices
table_name = "fact_invoices"
primary_key = "fact_invoice_key"
db_operation = "insert"
set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args_dw2)

##### 3.3.3 Verify Write Executed Correctly

In [79]:
sql_invoices = "SELECT * FROM chinook_dw2.fact_invoices;"
df_fact_invoices = get_sql_dataframe(sql_invoices, **mysql_args_dw2)
df_fact_invoices.head(2)

Unnamed: 0,fact_invoice_key,invoice_key,customer_key,track_key,date_key,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,UnitPrice,Quantity,Total
0,1,1,2,2,20090101,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,0.99,1,1.98
1,2,1,2,4,20090101,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,0.99,1,1.98


### 4.0 Demonstrate That the New Data Warehouse Exists and Contains the Correct Data
To demonstrate that the data warehouse is functioning properly, I authored a SQL query that returns:
- Each Customer's First and Last Name
- The total amount of the tracks each customer has purchased
- The total amount of money each customer has spent
- The name of the support representative (employee) that is responsible for the customer's transaction

In [83]:
validate_data = """
SELECT
    dc.customer_LastName AS Customer_Last_Name,
    dc.customer_FirstName AS Customer_First_Name,
    SUM(fi.Quantity) AS Total_Tracks,
    ROUND(SUM(fi.UnitPrice),2) AS Total_Money_Spent,
    de.employee_LastName AS Employee_Last_Name,
    de.employee_FirstName AS Employee_First_Name
FROM
    chinook_dw2.fact_invoices AS fi
LEFT OUTER JOIN chinook_dw2.dim_customers AS dc
    ON fi.customer_key = dc.customer_key
LEFT OUTER JOIN chinook_dw2.dim_employees AS de
    ON dc.SupportRepId = de.employee_key
GROUP BY
    dc.customer_LastName,
    dc.customer_FirstName,
    de.employee_LastName,
    de.employee_FirstName
ORDER BY Total_Money_Spent DESC;
"""

fact_invoice_summary = get_sql_dataframe(validate_data, **mysql_args_dw2)
fact_invoice_summary

Unnamed: 0,Customer_Last_Name,Customer_First_Name,Total_Tracks,Total_Money_Spent,Employee_Last_Name,Employee_First_Name
0,Holý,Helena,38.0,49.62,Johnson,Steve
1,Cunningham,Richard,38.0,47.62,Park,Margaret
2,Rojas,Luis,38.0,46.62,Johnson,Steve
3,O'Reilly,Hugh,38.0,45.62,Peacock,Jane
4,Kovács,Ladislav,38.0,45.62,Peacock,Jane
5,Zimmermann,Fynn,38.0,43.62,Peacock,Jane
6,Ralston,Frank,38.0,43.62,Peacock,Jane
7,Barnett,Julia,38.0,43.62,Johnson,Steve
8,Stevens,Victor,38.0,42.62,Johnson,Steve
9,Gruber,Astrid,38.0,42.62,Johnson,Steve
