# Using Python to Perform ETL Processing With the Chinook Database

Code used from Labs 2, 3, and 4 to perform the necessary functions. I denote where I extracted code from previous assignments in the comments for each chunk of code. In this document, I create a new empty data warehouse in MySQL, connect to a cluster in MongoDB to hold data from chinook as a JSON file, extract data from MySQL as CSV files to read locally, and create a fact table and dataframe transformations to result in running summarization queries in MySQL at the end.

Link to project Github Repository: https://github.com/efredenburgh/DS2002_p1/tree/main

Before running this code, I began by creating and populating the Chinook database in MySQL by running the following scripts:

    1-create_chinook.sql
    
    2-populate_chinook.sql
    
After running these 2 scripts, I had a Chinook database in MySQL that I could then use to make extractions and populate the chinook_dw database in the script below.

In [1]:
# Lab 3 starts by importing necessary libraries to perform certain functions. I have done the same
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [2]:
# Also extracted from Lab 3, which will allow me to declare & assign connection variables for the MySQL server & database I'm working with
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Sadie2017!!!"

src_dbname = "chinook" # original chinook database is the source
dst_dbname = "chinook_dw" # the transformed data mart database

In [3]:
# These functions were provided in Lab 3 to get data from a database, as well as set it into the database. The mongo functions were defined
# in Lab 4 to get and set data into databases for use in MongoDB.

def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}" # connection string format specific to MySQL to pass parameters into string
    sqlEngine = create_engine(conn_str, pool_recycle=3600) #create object by making engine and passing in conn_str
    connection = sqlEngine.connect() # instance of connection to MySQL server
    dframe = pd.read_sql(sql_query, connection); # use this query against this database
    connection.close() # conserve resources
    
    return dframe

def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['user_id']}:{args['pwd']}@{args['host_name']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['user_id']}:{args['pwd']}@{args['host_name']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()
    
def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

In [7]:
# From Lab 3 - drops the chinook_dw database if it already exists somewhere, then creates the chinook_dw database to use for the rest of
# the operations in this code.

conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle = 3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")) # drop database if it exists and make it again
connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;")) # creates creates chinook_dw
connection.execute(text(f"USE {dst_dbname};")) # makes sure we target the new database

connection.close() # it worked - I now have chinook_dw in MySQL

### Date Dimension Table

In [None]:
dim_date = """
USE chinook_dw; -- using my extracted data warehouse
-- adapted code from Lab 2c to create and populate a dim_date table
-- There were not many date variables to choose from in chinook, so I separated Year, Month, and Day, but also included the full invoice_date in the
-- table. Additionally, I created a primary key called date_key. Had to change it a little to fit my needs

DROP TABLE IF EXISTS dim_date; -- get rid of dim_date in case it is preexisting

CREATE TABLE dim_date ( -- create the dim_date table and include variables like date_key, invoice_date, Year, Month, and Day
    date_key INT NOT NULL AUTO_INCREMENT, -- generate unique key values for each piece of data in the dim_date table
    invoice_date DATE NULL,
    Year INT, -- contained as integers
    Month INT,
    Day INT,
    PRIMARY KEY (`date_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


INSERT INTO dim_date (invoice_date, Year, Month, Day) -- populate dim_date
SELECT DISTINCT 
    InvoiceDate AS invoice_date,   -- takes the old format from chinook (InvoiceDate) and populates it in dim_date as invoice_date
    YEAR(InvoiceDate) AS Year,     -- takes the year out of invoice_date only to populate the Year variable
    MONTH(InvoiceDate) AS Month,   -- takes the month out of invoice_date only to populate the Month variable
    DAY(InvoiceDate) AS Day        -- takes the day out of invoice_date only to populate the Day variable
FROM chinook.Invoice  -- tells the code where to get the data from (the source database)
ORDER BY InvoiceDate; -- how to sort it


use chinook_dw;
SELECT * FROM dim_date; -- my failsafe to check that the table was populated
"""

At this point, I am running the code I included above in (dim_date_chinook.sql) in MySQL to create and populate a dim_date table in MySQL in the chinook_dw database.

In [9]:
# from lab 4
# after I created the dim_date table in chinook_dw, I make sure it is populated correctly
sql_dim_date = "SELECT * FROM chinook_dw.dim_date;" # choose everything from the dim_date table in chinook_dw in MySQL
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date) # use the get_dataframe function I defined earlier
df_dim_date.head(2) # unit test to make sure it worked properly

Unnamed: 0,date_key,invoice_date,Year,Month,Day
0,1,2021-01-01,2021,1,1
1,2,2021-01-02,2021,1,2


In [11]:
# code from Lab 4 that I will be using to connect to MongoDb and MySQL when I export the tables from Chinook and then write the dimension
# tables back into MySQL after transformations.

mysql_args = {
    "user_id" : "root",
    "pwd" : "Sadie2017!!!",
    "host_name" : "localhost",
    "dbname" : "chinook_dw"
}

# cluster location is atlas
mongodb_args = {
    "user_name" : "efredenburgh",
    "password" : "HsLLnnnt4lhtfZaV",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "rrru4",
    "cluster_location" : "atlas", # "local"
    "db_name" : "chinook_dw"
}

# Create all the dimension tables before tackling the Fact table

When I extract Chinook data for each dimension table, I run the corresponding lines in Chinook_extraction.sql in MySQL to get the data from the correct table. After the extraction, I run code in this Jupyter Notebook that creates and populates the dimension table in chinook_dw. I included the scripts I ran in MySQL below for reference. For each, I would simply highlight the use chinook; and SELECT * FROM _; lines.

In [None]:
chinook_extraction = """
-- exporting to json and mongodb
-- These are the small bits of code I used to extract data from each table in chinook for use in the chinook_dw database. I extracted Employee and Customer data as JSON files for use
-- in MongoDB, and I extracted Invoice and InvoiceLine as CSV files that I turned into pandas dataframes.

use chinook;

SELECT * FROM Customer; -- selects everything from the customer table

use chinook;

SELECT * FROM Invoice; -- selects everything from the invoice table

use chinook;
SELECT * FROM Employee; -- selects everything from the employee table

use chinook; 
SELECT * FROM InvoiceLine; -- selects everything from the invoice line table
"""

### Customer Dimension Table

Here I selected everything from the Customer table in Chinook and exported it into a JSON file called Customer.json.

In [13]:
# from Lab 4 to populate mongoDB with source data
client = get_mongo_client(**mongodb_args) # connect to mongo

# get path of cwd (current working directory) for the notebook, then appending it to the data directory that is created
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"customers" : 'Customer.json' # file i'm using
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files) # sets it into mongo collections

In [15]:
# from lab 4 "Extract Data from the Source MongoDB Collections Into DataFrames"
client = get_mongo_client(**mongodb_args)

query = {} # get all the columns and rows in the customers table
collection = "customers"

df_customers = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query) # makes the dataframe I'm using in the Jupyter notebook
df_customers.head(2) # check that it worked

Unnamed: 0,CustomerId,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Fax,Email,SupportRepId
0,1,Luís,Gonçalves,Embraer - Empresa Brasileira de Aeronáutica S.A.,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,+55 (12) 3923-5566,luisg@embraer.com.br,3
1,2,Leonie,Köhler,,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,+49 0711 2842222,,leonekohler@surfeu.de,5


In [16]:
# lab 4 "Perform Any Necessary Transformations to the DataFrames"
# change the id column name for business key for lookup operations
df_customers.rename(columns = {"CustomerId":"customer_id"}, inplace = True)
# dropping fax because it is sparsely populated and pretty useless these days
df_customers.drop(['Fax'], axis = 1, inplace = True)
# put in a new primary key column that is an ever-incrementing numeric value
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))
df_customers.head(2) # check that it's right

Unnamed: 0,customer_key,customer_id,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Email,SupportRepId
0,1,1,Luís,Gonçalves,Embraer - Empresa Brasileira de Aeronáutica S.A.,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,luisg@embraer.com.br,3
1,2,2,Leonie,Köhler,,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,+49 0711 2842222,leonekohler@surfeu.de,5


In [17]:
# from Lab 4 "Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables" - write it back to MySQL chinook_dw
dataframe = df_customers
table_name = 'dim_customers'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [21]:
# from lab 4 to validate that the new 'dim_customers' table is in MySQL chinook_dw
sql_customers = "SELECT * FROM chinook_dw.dim_customers;"
df_dim_customers = get_sql_dataframe(sql_customers, **mysql_args)
df_dim_customers.head(2) 

Unnamed: 0,customer_key,customer_id,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Email,SupportRepId
0,1,1,Luís,Gonçalves,Embraer - Empresa Brasileira de Aeronáutica S.A.,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,luisg@embraer.com.br,3
1,2,2,Leonie,Köhler,,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,+49 0711 2842222,leonekohler@surfeu.de,5


### Invoice Dimension Table

Here I selected everything from the Invoice table in Chinook and exported it into a CSV file called Invoice_chinook.csv to be read locally.

In [25]:
# from 03-Python-DataFiles - load data from csv file
# get path of cwd (current working directory) for the notebook, then appending it to the data directory that is created
data_dir = os.path.join(os.getcwd(), 'data') 
data_file = os.path.join(data_dir, 'Invoice_chinook.csv') # put the file in the data directory

df_invoices = pd.read_csv(data_file, header = 0, index_col = 0) # read the file as a pandas dataframe
df_invoices.head(2) # check that it worked

Unnamed: 0_level_0,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total
InvoiceId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
1,2,2021-01-01 00:00:00,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98
2,4,2021-01-02 00:00:00,Ullevålsveien 14,Oslo,,Norway,171,3.96


In [27]:
# from 03-Python-DataFiles
# still with the invoice csv
df_invoices.drop(['BillingState','BillingPostalCode', 'BillingAddress', 'BillingCity', 'BillingCountry'], axis = 1, inplace = True) # useless variables
df_invoices.columns # check that it worked

Index(['CustomerId', 'InvoiceDate', 'Total'], dtype='object')

In [29]:
# from Lab 4 "Perform any necesary transformations to the Dataframes"

# renaming columns I will be using a lot to make them nicer to use
df_invoices.rename(columns = {
    'CustomerId': 'customer_key',
    'InvoiceDate': 'invoice_date',
    'Total': 'invoice_total'
}, inplace = True)

# converting the invoice_date variable to a date-time format
df_invoices['invoice_date'] = df_invoices['invoice_date'].astype('datetime64[ns]').dt.date

# put in an invoice_key to act as the primary key for this table
df_invoices.insert(0, 'invoice_key', range(1, len(df_invoices) + 1)) # incrementing
df_invoices.head(2) # make sure it worked

Unnamed: 0_level_0,invoice_key,customer_key,invoice_date,invoice_total
InvoiceId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,1,2,2021-01-01,1.98
2,2,4,2021-01-02,3.96


In [31]:
# from Lab 4 "Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables" - write it back to MySQL chinook_dw
table_name = 'dim_invoices'
primary_key = 'invoice_key'
db_operation = "insert"

set_dataframe(df_invoices, table_name, primary_key, db_operation, **mysql_args)

In [33]:
# from lab 4 to check that the new 'dim_invoices' table is in MySQL chinook_dw
sql_invoices = "SELECT * FROM chinook_dw.dim_invoices;"
df_dim_invoices = get_sql_dataframe(sql_invoices, **mysql_args)
df_dim_invoices.head(2)

Unnamed: 0,invoice_key,customer_key,invoice_date,invoice_total
0,1,2,2021-01-01,1.98
1,2,4,2021-01-02,3.96


### Invoice Line Dimension Table

Here I selected everything from the InvoiceLine table in Chinook and exported it into a CSV file called invoiceline.csv to be read locally.

In [37]:
# from 03-Python-DataFiles- load data from csv file
# get path of cwd (current working directory) for the notebook, then appending it to the data directory that is created
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'invoiceline.csv') # put the file in the data directory

df_invoiceline = pd.read_csv(data_file, header = 0, index_col = 0)# read the file as a pandas dataframe
df_invoiceline.head(2)# check that it worked

Unnamed: 0_level_0,InvoiceId,TrackId,UnitPrice,Quantity
InvoiceLineId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,1,2,0.99,1
2,1,4,0.99,1


In [39]:
# from Lab 4 "Perform any necesary transformations to the Dataframes"
# again renaming some of the columns the way I like so they're easier to use
df_invoiceline.rename(columns = {
    'InvoiceLineId': 'invoice_line_key',
    'InvoiceId': 'invoice_key',
    'UnitPrice': 'unit_price',
    'Quantity': 'quantity',
    'TrackId': 'track_id'
}, inplace = True)

# add a primary key invoice_line_key
df_invoiceline.insert(0, 'invoice_line_key', range(1, len(df_invoiceline) + 1)) # incrementing

In [41]:
# from Lab 4 "Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables" - write it back to MySQL chinook_dw
table_name = 'dim_invoice_line'
primary_key = 'invoice_line_key'
db_operation = "insert"

set_dataframe(df_invoiceline, table_name, primary_key, db_operation, **mysql_args)

In [43]:
# from lab 4 to validate that the new 'dim_invoices' table is in MySQL chinook_dw
sql_invoice_line = "SELECT * FROM chinook_dw.dim_invoice_line;"
df_dim_invoice_line = get_sql_dataframe(sql_invoice_line, **mysql_args)
df_dim_invoice_line.head(2)

Unnamed: 0,invoice_line_key,invoice_key,track_id,unit_price,quantity
0,1,1,2,0.99,1
1,2,1,4,0.99,1


### Employees Dimension Table

Here I selected everything from the Employee table in Chinook and exported it into a JSON file called employee_chinook.json.

In [47]:
# i am going to make another json and read it into mongodb then back to mysql
# from Lab 4 to populate mongoDB with source data
client = get_mongo_client(**mongodb_args)
# get path of cwd (current working directory) for the notebook, then appending it to the data directory that is created
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"employees" : 'employee_chinook.json' # file I'm using
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files) # sets it into mongo collections

In [48]:
# from lab 4 "Extract Data from the Source MongoDB Collections Into DataFrames"
client = get_mongo_client(**mongodb_args)

query = {} # get all the columns and rows in the customers table
collection = "employees"

df_employees = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)# makes the dataframe I'm using in the Jupyter notebook
df_employees.head(2) # check that it worked

Unnamed: 0,EmployeeId,LastName,FirstName,Title,ReportsTo,BirthDate,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
0,1,Adams,Andrew,General Manager,,1962-02-18 00:00:00,2002-08-14 00:00:00,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
1,2,Edwards,Nancy,Sales Manager,1.0,1958-12-08 00:00:00,2002-05-01 00:00:00,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com


In [49]:
# lab 4 Perform Any Necessary Transformations to the DataFrames¶
# change the id column name for business key for lookup operations
df_employees.rename(columns = {"EmployeeId":"employee_key"}, inplace=True)
# dropping BirthDate because I do not need that
df_employees.drop(['BirthDate'], axis = 1, inplace = True) 
df_employees.head(2) # check that it all looks good

Unnamed: 0,employee_key,LastName,FirstName,Title,ReportsTo,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
0,1,Adams,Andrew,General Manager,,2002-08-14 00:00:00,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
1,2,Edwards,Nancy,Sales Manager,1.0,2002-05-01 00:00:00,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com


In [53]:
# from Lab 4 "Perform any necesary transformations to the Dataframes"
# changed the format of HireDate to be more consistent with other date variables even though I don't really need it
df_employees['HireDate'] = df_employees['HireDate'].astype('datetime64[ns]').dt.date
df_employees.head(2) # check if it worked

Unnamed: 0,employee_key,LastName,FirstName,Title,ReportsTo,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
0,1,Adams,Andrew,General Manager,,2002-08-14,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
1,2,Edwards,Nancy,Sales Manager,1.0,2002-05-01,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com


In [55]:
# from Lab 4 "Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables" - write it back to MySQL chinook_dw
dataframe = df_employees
table_name = 'dim_employees'
primary_key = 'employee_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [57]:
# from lab 4 to validate that the new 'dim_employees' table is in MySQL chinook_dw
sql_employees = "SELECT * FROM chinook_dw.dim_employees;"
df_dim_employees = get_sql_dataframe(sql_employees, **mysql_args)
df_dim_employees.head(2) # check if it worked

Unnamed: 0,employee_key,LastName,FirstName,Title,ReportsTo,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
0,1,Adams,Andrew,General Manager,,2002-08-14,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
1,2,Edwards,Nancy,Sales Manager,1.0,2002-05-01,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com


## Full Invoice Dimension Table

Merging df_dim_invoices and df_dim_invoice_line for easier use in fact table.

In [61]:
# from lab 4: "Lookup the DateKeys from the Date Dimension Table" - which includes merging instructions
# merging dim_invoice_line and dim_invoices on invoice_key - doesn't make sense to work with 2 separate invoice tables when i do fact table
df_full_invoice = pd.merge(df_dim_invoice_line, df_dim_invoices, on = 'invoice_key', how = 'left') # left join on invoice_key
df_full_invoice['full_invoice_key'] = range(1, len(df_full_invoice) + 1) # makes a new primary key for the new merged table
df_full_invoice.head(2) # check

Unnamed: 0,invoice_line_key,invoice_key,track_id,unit_price,quantity,customer_key,invoice_date,invoice_total,full_invoice_key
0,1,1,2,0.99,1,2,2021-01-01,1.98,1
1,2,1,4,0.99,1,2,2021-01-01,1.98,2


In [63]:
# also Lab 4: "Lookup the DateKeys from the Date Dimension Table" and perform necessary transformations
# following the example from class in 2.2.2
# just used transaction date in the renaming here because it aligns with the example and is basically the same thing as invoice date
df_dim_transaction_date = df_dim_date.rename(columns={"date_key": "transaction_created_date_key", "invoice_date": "transaction_created_date"})
# convert invoice_date to datetime and coerced any errors that may appear
df_full_invoice['invoice_date'] = pd.to_datetime(df_full_invoice['invoice_date'], errors = 'coerce').dt.date
# merge the combined invoice table (df_full_invoice) with the dim_transaction_date table (has values from the df_dim_date table) with a left join on invoice_date and a right join on transaction_created_date
# had to look up new merge operations in pandas to do this. see reference [3] in the Github repo with my source for this method
df_full_invoice = pd.merge(df_full_invoice, df_dim_transaction_date, left_on = 'invoice_date', right_on = 'transaction_created_date', how = 'left')
df_full_invoice.drop(['invoice_date'], axis = 1, inplace = True) # don't need invoice_date anymore
df_full_invoice.head(2) # check that it worked

Unnamed: 0,invoice_line_key,invoice_key,track_id,unit_price,quantity,customer_key,invoice_total,full_invoice_key,transaction_created_date_key,transaction_created_date,Year,Month,Day
0,1,1,2,0.99,1,2,1.98,1,1,2021-01-01,2021,1,1
1,2,1,4,0.99,1,2,1.98,2,1,2021-01-01,2021,1,1


In [65]:
# from Lab 4 "Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables" - write it back to MySQL chinook_dw
dataframe = df_full_invoice
table_name = 'dim_full_invoices'
primary_key = 'full_invoice_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [67]:
# from lab 4 to validate that the new 'dim_full_invoices' table is in MySQL chinook_dw
sql_fullinvoice = "SELECT * FROM chinook_dw.dim_full_invoices;"
df_full_invoice = get_sql_dataframe(sql_fullinvoice, **mysql_args)
df_full_invoice.head(2) #check

Unnamed: 0,invoice_line_key,invoice_key,track_id,unit_price,quantity,customer_key,invoice_total,full_invoice_key,transaction_created_date_key,transaction_created_date,Year,Month,Day
0,1,1,2,0.99,1,2,1.98,1,1,2021-01-01,2021,1,1
1,2,1,4,0.99,1,2,1.98,2,1,2021-01-01,2021,1,1


# Create the Fact Table

Here I will be compiling information from the dimension tables I made above. This part is heavily informed by the work I did in Lab 4.
After creating this fact table, called dim_fact_invoices, I will be able to complete the last part of this assignment: authoring summarization queries.

In [71]:
# From Lab 4: use keys to merge dimension tables and drop unnecessary columns to create a cohesive fact table

# merge df_full_invoice with df_dim_customers - they both contain customer_key so I will join them on customer_key. I will also select customer_id and SuportRepId from the customers table
df_fact_invoice = pd.merge(df_full_invoice, df_dim_customers[['customer_key', 'customer_id', 'SupportRepId']],on = 'customer_key', how = 'left') # left join on customer_key
# now merge the new df_fact_invoice with df_dim_employees using SupportRepId (from customers) and employee_key - also uses reference [3]
df_fact_invoice = pd.merge(df_fact_invoice, df_dim_employees[['employee_key', 'LastName', 'FirstName', 'Title']],left_on = 'SupportRepId', right_on = 'employee_key', how = 'left')

# here I create total_invoice_amount to create a cleaner variable in df_fact_invoice that calculates the total price per invoice by multiplying quantity and unit_price
df_fact_invoice['total_invoice_amount'] = df_fact_invoice['unit_price'] * df_fact_invoice['quantity'] # not really necessary, but I think it's a nice touch

df_fact_invoice.drop(['SupportRepId', 'employee_key'], axis = 1, inplace = True) # these columns are not necessary for the fact table anymore
df_fact_invoice.head(2) # make sure it worked well

Unnamed: 0,invoice_line_key,invoice_key,track_id,unit_price,quantity,customer_key,invoice_total,full_invoice_key,transaction_created_date_key,transaction_created_date,Year,Month,Day,customer_id,LastName,FirstName,Title,total_invoice_amount
0,1,1,2,0.99,1,2,1.98,1,1,2021-01-01,2021,1,1,2,Johnson,Steve,Sales Support Agent,0.99
1,2,1,4,0.99,1,2,1.98,2,1,2021-01-01,2021,1,1,2,Johnson,Steve,Sales Support Agent,0.99


In [73]:
# needs some cleaning
# need to drop some unnecessay columns from the fact table to make it cleaner
df_fact_invoice.drop(['transaction_created_date', 'Year', 'Month', 'Day', 'customer_id', 'LastName', 'FirstName', 'Title'], axis=1, inplace=True)
df_fact_invoice.head(2) # check that I like the information contained and that I like how it looks

Unnamed: 0,invoice_line_key,invoice_key,track_id,unit_price,quantity,customer_key,invoice_total,full_invoice_key,transaction_created_date_key,total_invoice_amount
0,1,1,2,0.99,1,2,1.98,1,1,0.99
1,2,1,4,0.99,1,2,1.98,2,1,0.99


In [75]:
# from Lab 4 "Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables" - write it back to MySQL chinook_dw
dataframe = df_fact_invoice
table_name = 'dim_fact_invoices'
primary_key = 'full_invoice_key' # this from the merged tables is the primary key for the fact table
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [77]:
# from lab 4 to validate that the new 'dim_fact_invoices' table is in MySQL chinook_dw
sql_fact = "SELECT * FROM chinook_dw.dim_fact_invoices;"
df_fact_invoice = get_sql_dataframe(sql_fact, **mysql_args)
df_fact_invoice.head(2) # check the first few rows

Unnamed: 0,invoice_line_key,invoice_key,track_id,unit_price,quantity,customer_key,invoice_total,full_invoice_key,transaction_created_date_key,total_invoice_amount
0,1,1,2,0.99,1,2,1.98,1,1,0.99
1,2,1,4,0.99,1,2,1.98,2,1,0.99


# Author Summarization Queries

I took inspiration here from section 3.0 of Lab 4: "Demonstrate that the New Data Warehouse Exists and Contains the Correct Data."
I wrote 3 queries and ran each one in MySQL using the chinook_dw database. The script I ran each of these queries in is contained in SCRIPTS/MySQL/final_queries.sql in the Github Repo. I exported each of the outputs as csv files and loaded them in the OUTPUT/ folder in the Github Repo. 

In [83]:
# this one is very similar to sql_purchase_orders in 3.0 of Lab 4

sql_invoices_per_month = """
    SELECT 
        d.Month,
        d.Year,
        SUM(f.total_invoice_amount) AS sales_total
    FROM 
        dim_fact_invoices f
    JOIN 
        dim_date d 
        ON f.transaction_created_date_key = d.date_key
    GROUP BY 
        d.Year, d.Month
    ORDER BY 
        d.Year, d.Month;
"""

In [85]:
df_fact_invoices = get_sql_dataframe(sql_invoices_per_month, **mysql_args) # check that it worked
df_fact_invoices

Unnamed: 0,Month,Year,sales_total
0,1,2021,35.64
1,2,2021,37.62
2,3,2021,37.62
3,4,2021,37.62
4,5,2021,37.62
5,6,2021,37.62
6,7,2021,37.62
7,8,2021,37.62
8,9,2021,37.62
9,10,2021,37.62


In [91]:
sql_units_by_customer = """
    SELECT 
        c.customer_id,
        c.FirstName,
        c.LastName,
        SUM(f.quantity) AS total_sold_units
    FROM 
        dim_fact_invoices f
    JOIN 
        dim_customers c 
        ON f.customer_key = c.customer_key
    GROUP BY 
        c.customer_key
    ORDER BY 
        total_sold_units ASC;
"""

In [93]:
df_fact_invoices = get_sql_dataframe(sql_units_by_customer, **mysql_args)
df_fact_invoices

Unnamed: 0,customer_id,FirstName,LastName,total_sold_units
0,25,Victor,Stevens,7.0
1,8,Daan,Peeters,9.0
2,46,Hugh,O'Reilly,9.0
3,27,Patrick,Gray,11.0
4,4,Bjørn,Hansen,11.0
5,44,Terhi,Hämäläinen,11.0
6,6,Helena,Holý,11.0
7,59,Puja,Srivastava,11.0
8,21,Kathy,Chase,11.0
9,42,Wyatt,Girard,11.0
