## DS 2002 Midterm Project
#### By Emma Hickey

## Importing the necessary libraries

In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

### Declaring & Assigning Connection Variables for the MongoDB Server, the MySQL Server & Databases 

In [2]:
src_dbname = "Chinook"
dst_dbname = "Music_Data_Mart"
mysql_args = {
    "uid" : "root",
    "pwd" : "Susieq1752!",
    "hostname" : "localhost",
    "dbname" : "northwind_dw2"
}

mongodb_args = {
    "user_name" : "emmahickey1752",
    "password" : "p.P43rrC-srbub9",
    "cluster_name" : "Cluster0",
    "cluster_subnet" : "kpk6z",
    "cluster_location" : "atlas", 
    "db_name" : "northwind_purchasing"
}

### Defining the get and set dataframe functions for getting data from database and setting data into database 

In [3]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

### Using code from the “03-Python-DataFiles” notebook to read the data from my newly created CSV file into Pandas Dataframe

In [4]:
data_dir = os.path.join(os.getcwd(), 'chinookfiles')
data_file = os.path.join(data_dir, 'chinookalbumtable.csv')
df = pd.read_csv(data_file, header=0, index_col=0)
df.head()

Unnamed: 0_level_0,Title,ArtistId
AlbumId,Unnamed: 1_level_1,Unnamed: 2_level_1
1,For Those About To Rock We Salute You,1
2,Balls to the Wall,2
3,Restless and Wild,2
4,Let There Be Rock,1
5,Big Ones,3


## Making necessary transformations (droping the Artist Id column and inserting the Album Key column)

In [5]:
df.drop(['ArtistId'], axis=1, inplace=True)
df.insert(0, "AlbumKey", range(1, df.shape[0]+1))
df.head()

Unnamed: 0_level_0,AlbumKey,Title
AlbumId,Unnamed: 1_level_1,Unnamed: 2_level_1
1,1,For Those About To Rock We Salute You
2,2,Balls to the Wall
3,3,Restless and Wild
4,4,Let There Be Rock
5,5,Big Ones


## Writing it back to the MySQL data warehouse database as a new dimension table

In [6]:
table_name = "dim_album" 
primary_key="AlbumKey"
db_operation= "insert"
set_dataframe(df, table_name, primary_key, db_operation, **mysql_args)

## Reading data from my newly created MongoDB collection into Pandas Dataframe

### Importing the necessary libraries

In [7]:
import json
import numpy
import datetime
import certifi

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 1.4.39
Running PyMongo Version: 4.10.1


### Declaring/assigning variables

In [8]:
mysql_args = {
    "uid" : "root",
    "pwd" : "Susieq1752!",
    "hostname" : "localhost",
    "dbname" : "Music_Data_Mart"
}

mongodb_args = {
    "user_name" : "emmahickey1752",
    "password" : "p.P43rrC-srbub9",
    "cluster_name" : "Cluster0",
    "cluster_subnet" : "kpk6z",
    "cluster_location" : "atlas", 
    "db_name" : "Chinook_purchasing"
}

### Connecting to MongoDB, putting artist table JSON file into MongoDB, and then putting it into Pandas Dataframe

In [9]:
client = get_mongo_client(**mongodb_args)
data_dir = os.path.join(os.getcwd(), 'chinookfiles')
json_files= {"ArtistTable" :'chinookartisttable.json'}
set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)

In [10]:
client = get_mongo_client(**mongodb_args)
query = {}
collection = "ArtistTable"
df_artist = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_artist.head(2)

Unnamed: 0,AlbumId,Title,ArtistId
0,1,For Those About To Rock We Salute You,1
1,2,Balls to the Wall,2


## Making necessary transformation (Inserting the Artist Key column)

In [11]:
df.insert(0, "ArtistKey", range(1, df.shape[0]+1))
df.head(2)

Unnamed: 0_level_0,ArtistKey,AlbumKey,Title
AlbumId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,1,1,For Those About To Rock We Salute You
2,2,2,Balls to the Wall


## Writing it back to the MySQL data warehouse database as a new dimension table

In [12]:
table_name = "dim_artist" 
primary_key="ArtistKey"
db_operation= "insert"
set_dataframe(df, table_name, primary_key, db_operation, **mysql_args)

## Establishing connections and functions

In [13]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Susieq1752!"
src_dbname = "Chinook"
dst_dbname = "Music_Data_Mart"

In [14]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [15]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;"))
connection.execute(text(f"USE {dst_dbname};"))

connection.close()

## Using code from the Lab 4 notebook to read data from customer, employee, genre, invoice, invoice line, and media type entities into Pandas Dataframe

In [16]:
sql_customer = "SELECT * FROM Chinook.Customer;"
df_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
df_customer.head(2)

Unnamed: 0,CustomerId,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Fax,Email,SupportRepId
0,1,Luís,Gonçalves,Embraer - Empresa Brasileira de Aeronáutica S.A.,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,+55 (12) 3923-5566,luisg@embraer.com.br,3
1,2,Leonie,Köhler,,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,+49 0711 2842222,,leonekohler@surfeu.de,5


In [17]:
sql_employee = "SELECT * FROM Chinook.Employee;"
df_employee = get_dataframe(user_id, pwd, host_name, src_dbname, sql_employee)
df_employee.head(2)

Unnamed: 0,EmployeeId,LastName,FirstName,Title,ReportsTo,BirthDate,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
0,1,Adams,Andrew,General Manager,,1962-02-18,2002-08-14,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
1,2,Edwards,Nancy,Sales Manager,1.0,1958-12-08,2002-05-01,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com


In [18]:
sql_genre = "SELECT * FROM Chinook.Genre;"
df_genre = get_dataframe(user_id, pwd, host_name, src_dbname, sql_genre)
df_genre.head(2)

Unnamed: 0,GenreId,Name
0,1,Rock
1,2,Jazz


In [19]:
sql_invoice = "SELECT * FROM Chinook.Invoice;"
df_invoice = get_dataframe(user_id, pwd, host_name, src_dbname, sql_invoice)
df_invoice.head(2)

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total
0,1,2,2021-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98
1,2,4,2021-01-02,Ullevålsveien 14,Oslo,,Norway,171,3.96


In [20]:
sql_invoice_line = "SELECT * FROM Chinook.InvoiceLine;"
df_invoice_line = get_dataframe(user_id, pwd, host_name, src_dbname, sql_invoice_line)
df_invoice_line.head(2)

Unnamed: 0,InvoiceLineId,InvoiceId,TrackId,UnitPrice,Quantity
0,1,1,2,0.99,1
1,2,1,4,0.99,1


In [21]:
sql_media_type = "SELECT * FROM Chinook.MediaType;"
df_media_type = get_dataframe(user_id, pwd, host_name, src_dbname, sql_media_type)
df_media_type.head(2)

Unnamed: 0,MediaTypeId,Name
0,1,MPEG audio file
1,2,Protected AAC audio file


## Making necessary transformations (dropping the Support Representative Id and inserting keys into the different tables)

In [22]:
drop_cols = ['SupportRepId']
df_customer.drop(drop_cols, axis=1, inplace=True)
df_customer.insert(0, "CustomerKey", range(1, df_customer.shape[0]+1))
df_customer.head(2)

Unnamed: 0,CustomerKey,CustomerId,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Fax,Email
0,1,1,Luís,Gonçalves,Embraer - Empresa Brasileira de Aeronáutica S.A.,"Av. Brigadeiro Faria Lima, 2170",São José dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,+55 (12) 3923-5566,luisg@embraer.com.br
1,2,2,Leonie,Köhler,,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,+49 0711 2842222,,leonekohler@surfeu.de


In [23]:
df_employee.insert(0, "EmployeeKey", range(1, df_employee.shape[0]+1))
df_employee.head(2)

Unnamed: 0,EmployeeKey,EmployeeId,LastName,FirstName,Title,ReportsTo,BirthDate,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
0,1,1,Adams,Andrew,General Manager,,1962-02-18,2002-08-14,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
1,2,2,Edwards,Nancy,Sales Manager,1.0,1958-12-08,2002-05-01,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com


In [24]:
df_genre.insert(0, "GenreKey", range(1, df_genre.shape[0]+1))
df_genre.head(2)

Unnamed: 0,GenreKey,GenreId,Name
0,1,1,Rock
1,2,2,Jazz


In [25]:
df_invoice.insert(0, "InvoiceKey", range(1, df_invoice.shape[0]+1))
df_invoice.head(2)

Unnamed: 0,InvoiceKey,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total
0,1,1,2,2021-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98
1,2,2,4,2021-01-02,Ullevålsveien 14,Oslo,,Norway,171,3.96


In [26]:
df_invoice_line.insert(0, "InvoiceLineKey", range(1, df_invoice_line.shape[0]+1))
df_invoice_line.head(2)

Unnamed: 0,InvoiceLineKey,InvoiceLineId,InvoiceId,TrackId,UnitPrice,Quantity
0,1,1,1,2,0.99,1
1,2,2,1,4,0.99,1


In [27]:
df_media_type.insert(0, "MediaTypeKey", range(1, df_media_type.shape[0]+1))
df_media_type.head(2)

Unnamed: 0,MediaTypeKey,MediaTypeId,Name
0,1,1,MPEG audio file
1,2,2,Protected AAC audio file


## Writing it to the MySQL data warehouse database as new dimension tables

In [28]:
db_operation = "insert"

tables = [('dim_customer', df_customer, 'CustomerKey'),
          ('dim_employee', df_employee, 'EmployeeId'),
          ('dim_genre', df_genre, 'GenreId'),
          ('dim_invoice', df_invoice, 'InvoiceId'),
          ('dim_invoice_line', df_invoice_line, 'InvoiceLineId'),
          ('dim_media_type', df_media_type, 'MediaTypeId')]

In [29]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

## Creating the Fact table:

## Using code from the Lab 4 notebook, I'm reading data from Chinook, my source database, and merging the invoice and invoice line tables 

In [30]:
sql_invoice = "SELECT * FROM Chinook.Invoice;"
df_invoice = get_dataframe(user_id, pwd, host_name, src_dbname, sql_invoice)
df_invoice.head(2)

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total
0,1,2,2021-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98
1,2,4,2021-01-02,Ullevålsveien 14,Oslo,,Norway,171,3.96


In [31]:
sql_invoice_line = "SELECT * FROM Chinook.InvoiceLine;"
df_invoice_line = get_dataframe(user_id, pwd, host_name, src_dbname, sql_invoice_line)
df_invoice_line.head(2)

Unnamed: 0,InvoiceLineId,InvoiceId,TrackId,UnitPrice,Quantity
0,1,1,2,0.99,1
1,2,1,4,0.99,1


In [32]:
df_fact_music_sales=pd.merge(df_invoice, df_invoice_line, on='InvoiceId', how='left')
df_fact_music_sales.head(2)

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total,InvoiceLineId,TrackId,UnitPrice,Quantity
0,1,2,2021-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,1,2,0.99,1
1,1,2,2021-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,2,4,0.99,1


In [33]:
df_fact_music_sales.shape

(2240, 13)

## Performing lookup operations to replace business codes in my fact dataframe with the corresponding surrogate primary keys from each of the dimension tables. 

In [34]:
sql_dim_customer="SELECT CustomerKey, CustomerId FROM Music_Data_Mart.dim_customer;"
df_dim_customer=get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_customer)
df_dim_customer.head(2)

Unnamed: 0,CustomerKey,CustomerId
0,1,1
1,2,2


In [35]:
sql_dim_employee="SELECT EmployeeKey, EmployeeId FROM Music_Data_Mart.dim_employee;"
df_dim_employee=get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_employee)
df_dim_employee.head(2)

Unnamed: 0,EmployeeKey,EmployeeId
0,1,1
1,2,2


In [36]:
sql_dim_genre="SELECT GenreKey, GenreId FROM Music_Data_Mart.dim_genre;"
df_dim_genre=get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_genre)
df_dim_genre.head(2)

Unnamed: 0,GenreKey,GenreId
0,1,1
1,2,2


In [37]:
sql_dim_media_type="SELECT MediaTypeKey, MediaTypeId FROM Music_Data_Mart.dim_media_type;"
df_dim_media_type=get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_media_type)
df_dim_media_type.head(2)

Unnamed: 0,MediaTypeKey,MediaTypeId
0,1,1
1,2,2


In [38]:
sql_dim_invoice="SELECT InvoiceDate, InvoiceKey, InvoiceId FROM Music_Data_Mart.dim_invoice;"
df_dim_invoice=get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_invoice)
df_dim_invoice.head(2)

Unnamed: 0,InvoiceDate,InvoiceKey,InvoiceId
0,2021-01-01,1,1
1,2021-01-02,2,2


## Code is then ran in SQL before the proceeding code to create the date dimension

In [41]:
sql_dim_date = "SELECT date_key, full_date FROM Music_Data_Mart.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


## Replacing DateTime values with the DateKeys from the dim_date table.

In [42]:
df_fact_music_sales['InvoiceDate'] = pd.to_datetime(df_fact_music_sales['InvoiceDate']).dt.date
df_dim_date['full_date'] = pd.to_datetime(df_dim_date['full_date']).dt.date
df_dim_invoice_date = df_dim_date.rename(columns={"date_key": "InvoiceDateKey", "full_date": "InvoiceDate"})
df_fact_music_sales = pd.merge(df_fact_music_sales, df_dim_invoice_date[['InvoiceDate', 'InvoiceDateKey']], 
                               on="InvoiceDate", how="left")
df_fact_music_sales.drop(['InvoiceDate'], axis=1, inplace=True)

df_fact_music_sales.head(2)


Unnamed: 0,InvoiceId,CustomerId,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total,InvoiceLineId,TrackId,UnitPrice,Quantity,InvoiceDateKey
0,1,2,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,1,2,0.99,1,
1,1,2,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,2,4,0.99,1,


## Ensuring the Customer and Invoice Keys are included in the fact table through getting the dataframe function and then merging the customer and dimension tables with the fact table.   

In [43]:
sql_dim_customer = "SELECT CustomerKey, CustomerId FROM Music_Data_Mart.dim_customer;"
df_dim_customer = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_dim_customer)
df_fact_music_sales = pd.merge(df_fact_music_sales, df_dim_customer[['CustomerId', 'CustomerKey']], 
on="CustomerId", how="left")

In [44]:
sql_dim_invoice = "SELECT InvoiceKey, InvoiceId FROM Music_Data_Mart.dim_invoice;"
df_dim_invoice = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_dim_invoice)
df_fact_music_sales = pd.merge(df_fact_music_sales, df_dim_invoice[['InvoiceId', 'InvoiceKey']], 
on="InvoiceId", how="left")

In [45]:
df_fact_music_sales.head()


Unnamed: 0,InvoiceId,CustomerId,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total,InvoiceLineId,TrackId,UnitPrice,Quantity,InvoiceDateKey,CustomerKey,InvoiceKey
0,1,2,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,1,2,0.99,1,,2,1
1,1,2,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,2,4,0.99,1,,2,1
2,2,4,Ullevålsveien 14,Oslo,,Norway,171,3.96,3,6,0.99,1,,4,2
3,2,4,Ullevålsveien 14,Oslo,,Norway,171,3.96,4,8,0.99,1,,4,2
4,2,4,Ullevålsveien 14,Oslo,,Norway,171,3.96,5,10,0.99,1,,4,2


## Making additional transformation necessary to define the fact music sales dataframe so it represents what I need to refine my fact table. In this case I chose to drop the Billing State because most values are "None". I then reordered the columns and inserted the fact music sale key column as the primary key.

In [46]:
drop_columns=['BillingState']
df_fact_music_sales.drop(drop_columns, axis=1, inplace=True)
ordered_columns=['InvoiceId','CustomerId','InvoiceKey','InvoiceLineId','CustomerKey','InvoiceDateKey','TrackId',
                 'BillingAddress', 'BillingPostalCode', 'BillingCity',
                 'BillingCountry','Total','UnitPrice','Quantity']
df_fact_music_sales=df_fact_music_sales[ordered_columns]
df_fact_music_sales.insert(0, "fact_music_sale_key", range(1,df_fact_music_sales.shape[0]+1))
df_fact_music_sales.head(2)

Unnamed: 0,fact_music_sale_key,InvoiceId,CustomerId,InvoiceKey,InvoiceLineId,CustomerKey,InvoiceDateKey,TrackId,BillingAddress,BillingPostalCode,BillingCity,BillingCountry,Total,UnitPrice,Quantity
0,1,1,2,1,1,2,,2,Theodor-Heuss-Straße 34,70174,Stuttgart,Germany,1.98,0.99,1
1,2,1,2,1,2,2,,4,Theodor-Heuss-Straße 34,70174,Stuttgart,Germany,1.98,0.99,1


## Writing it to the MySQL data warehouse database (Music_Data_Mart) as a new fact table

In [47]:
table_name = "fact_music_sales"
primary_key = "fact_music_sale_key"
db_operation = "insert"
set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_music_sales, table_name, primary_key, db_operation)

## Authoring a summarization query that demonstrates the proper functionality of my new Data Warehouse database

In [48]:
sql_customer_summarization = """
SELECT customer.`LastName` AS `Customer Name`,
    SUM(Music_Sales.`Quantity`) AS `Total Quantity`,
    SUM(Music_Sales.`Total`) AS `Total Sales`
FROM `Music_Data_Mart`.`fact_music_sales` AS Music_Sales
INNER JOIN `Music_Data_Mart`.`dim_customer` AS Customer
ON Music_Sales.CustomerKey = Customer.CustomerKey
INNER JOIN `Music_Data_Mart`.`dim_invoice` As Invoice
ON music_sales.InvoiceKey= invoice.InvoiceKey
GROUP BY customer.`LastName`
ORDER BY `Total Sales` DESC;
"""
df_test = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_customer_summarization)
df_test.head()




Unnamed: 0,Customer Name,Total Quantity,Total Sales
0,Holý,38.0,502.62
1,Cunningham,38.0,474.62
2,O'Reilly,38.0,446.62
3,Kovács,38.0,446.62
4,Rojas,38.0,415.62


## Thank you!