## Mustafa Lonandwala (paq6ha) - Chinook Music Store Data Warehouse Project DS 2002

### Overview
This project implements an ETL (Extract, Transform, Load) pipeline to create a comprehensive data warehouse for analyzing digital music sales patterns. Using the Chinook database, which mirrors the iTunes business model, I've developed a dimensional model that enables analysis of customer behavior, sales trends, and employee performance across multiple business dimensions.

### Data Source
The foundation of this project is the [**Chinook Database**](https://github.com/lerocha/chinook-database/tree/master), an open-source sample database that provides a rich representation of a digital media store's operations. This database encompasses a complete business model including artists, albums, tracks, customers, employees, and detailed sales information, making it an ideal candidate for dimensional modeling and analysis.

### Dimension Tables
The dimensional model consists of four key tables. The **dim_customer** table provides detailed customer demographics and support representative information, allowing for deep customer behavior analysis across different markets. The **dim_product** table unifies track, album, and artist information into a cohesive product hierarchy, supporting analysis of music sales across various genres and artists. The **dim_employee** table captures the organizational structure and employee details, facilitating the evaluation of sales performance and team effectiveness. Finally, the **dim_date** table incorporates temporal attributes that enable trend analysis across different time periods, providing insights into seasonal patterns and historical performance.


### Analysis Capabilities
The data warehouse enables analysis of the business's operations. Users can examine sales trends over time, analyze customer purchasing patterns by region, evaluate product performance across different genres, and assess employee sales effectiveness. The dimensional model supports detailed operational reporting, providing valuable insights.

#### Import the Necessary Libraries

In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

#### Declare & Assign Connection Variables for the MongoDB Server and the MySQL Server

In [2]:
mysql_args = {
    "uid" : "root",
    "pwd" : "Datascience2002",
    "hostname" : "localhost",
    "src_dbname": "Chinook",  
    "dst_dbname": "chinook_dw"
}


mongodb_args = {
    "user_name" : "mustafalonandwala750",  
    "password" : "MongodbDS2002",          
    "cluster_name" : "cluster-ds-2002",    
    "cluster_subnet" : "kvu5a",           
    "cluster_location" : "atlas",
    "db_name" : "chinook_dw"              
}

#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dst_dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dst_dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

def get_source_sql_dataframe(sql_query, **args):
    '''Create a connection to the source MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['src_dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Query the source database'''
    dframe = pd.read_sql(sql_query, connection)
    connection.close()
    
    return dframe

def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client

def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe

def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Create the Chinook Data Warehouse and Switch the Context

In [4]:
conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

# Drop the data warehouse if it exists and create a new one
connection.execute(text(f"DROP DATABASE IF EXISTS `{mysql_args['dst_dbname']}`;"))
connection.execute(text(f"CREATE DATABASE `{mysql_args['dst_dbname']}`;"))
connection.execute(text(f"USE {mysql_args['dst_dbname']};"))

connection.close()

### Create Dimension Tables

#### Read data from CSV file to PD Dataframe
This will be our product table

In [5]:
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'product_dimension.csv')

df_product = pd.read_csv(data_file)

#Create surrogate key for dimension table
df_product['ProductKey'] = df_product.index + 1

#Reorder columns to put surrogate key first
columns_order = ['ProductKey', 'TrackId', 'TrackName', 'Composer', 'Milliseconds', 
                 'Bytes', 'UnitPrice', 'AlbumTitle', 'ArtistName', 'GenreName', 
                 'MediaTypeName']
df_product = df_product[columns_order]

df_product.head()

Unnamed: 0,ProductKey,TrackId,TrackName,Composer,Milliseconds,Bytes,UnitPrice,AlbumTitle,ArtistName,GenreName,MediaTypeName
0,1,1,For Those About To Rock (We Salute You),"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,For Those About To Rock We Salute You,AC/DC,Rock,MPEG audio file
1,2,2,Balls to the Wall,"U. Dirkschneider, W. Hoffmann, H. Frank, P. Ba...",342562,5510424,0.99,Balls to the Wall,Accept,Rock,Protected AAC audio file
2,3,3,Fast As a Shark,"F. Baltes, S. Kaufman, U. Dirkscneider & W. Ho...",230619,3990994,0.99,Restless and Wild,Accept,Rock,Protected AAC audio file
3,4,4,Restless and Wild,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. D...",252051,4331779,0.99,Restless and Wild,Accept,Rock,Protected AAC audio file
4,5,5,Princess of the Dawn,Deaffy & R.A. Smith-Diesel,375418,6290521,0.99,Restless and Wild,Accept,Rock,Protected AAC audio file


In [6]:
print("Total number of products:", len(df_product))

print("\nNull values in Product Data:")
print(df_product.isnull().sum())

print("\nKey Validations:")
print("Unique TrackIds:", df_product['TrackId'].is_unique)
print("Duplicate TrackIds:", df_product['TrackId'].duplicated().sum())

Total number of products: 1000

Null values in Product Data:
ProductKey         0
TrackId            0
TrackName          0
Composer         316
Milliseconds       0
Bytes              0
UnitPrice          0
AlbumTitle         0
ArtistName         0
GenreName          0
MediaTypeName      0
dtype: int64

Key Validations:
Unique TrackIds: True
Duplicate TrackIds: 0


In [7]:
print("\nArtist Analysis:")
print("Number of unique artists:", df_product['ArtistName'].nunique())
print("Number of unique albums:", df_product['AlbumTitle'].nunique())


Artist Analysis:
Number of unique artists: 48
Number of unique albums: 80


In [8]:
print("\nCategory Distributions:")
print("\nGenre Distribution:")
print(df_product['GenreName'].value_counts())
print("\nMedia Type Distribution:")
print(df_product['MediaTypeName'].value_counts())


Category Distributions:

Genre Distribution:
GenreName
Rock                  342
Latin                 275
Alternative & Punk    101
Jazz                   88
Metal                  65
Blues                  43
Reggae                 31
Bossa Nova             15
Pop                    14
Soundtrack             14
Rock And Roll          12
Name: count, dtype: int64

Media Type Distribution:
MediaTypeName
MPEG audio file             996
Protected AAC audio file      4
Name: count, dtype: int64


#### Read data from MongoDB Collection to PD Dataframe
This will be our customer table

In [9]:
client = get_mongo_client(**mongodb_args)

query = {}
collection = "customers"

df_customers = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customers.head(2)

# Add surrogate key
df_customers.insert(0, "CustomerKey", range(1, df_customers.shape[0]+1))

# Rename columns to follow dimension naming conventions
column_renames = {
    "CustomerId": "CustomerBusinessKey", 
    "FirstName": "FirstName",
    "LastName": "LastName",
    "Company": "CompanyName",
    "Address": "Address",
    "City": "City",
    "State": "State",
    "Country": "Country",
    "PostalCode": "PostalCode",
    "Phone": "Phone",
    "Email": "Email",
    "SupportRepFirstName": "SupportRepFirstName",
    "SupportRepLastName": "SupportRepLastName",
    "TotalPurchases": "TotalPurchases",
    "TotalSpent": "TotalSpent"
}
df_customers.rename(columns=column_renames, inplace=True)
df_customers.head()

Unnamed: 0,CustomerKey,CustomerBusinessKey,FirstName,LastName,CompanyName,Address,City,State,Country,PostalCode,Phone,Email,SupportRepFirstName,SupportRepLastName,TotalPurchases,TotalSpent
0,1,1,Luís,Gonçalves,"""Embraer - Empresa Brasileira de Aeronáutica S...","""Av. Brigadeiro Faria Lima, 2170""","""São José dos Campos""","""SP""","""Brazil""","""12227-000""","""+55 (12) 3923-5555""","""luisg@embraer.com.br""","""Jane""","""Peacock""",7,39.62
1,2,2,Leonie,Köhler,,"""Theodor-Heuss-Straße 34""","""Stuttgart""",,"""Germany""","""70174""","""+49 0711 2842222""","""leonekohler@surfeu.de""","""Steve""","""Johnson""",7,37.62
2,3,3,François,Tremblay,,"""1498 rue Bélanger""","""Montréal""","""QC""","""Canada""","""H2G 1A7""","""+1 (514) 721-4711""","""ftremblay@gmail.com""","""Jane""","""Peacock""",7,39.62
3,4,4,Bjørn,Hansen,,"""Ullevålsveien 14""","""Oslo""",,"""Norway""","""0171""","""+47 22 44 22 22""","""bjorn.hansen@yahoo.no""","""Margaret""","""Park""",7,39.62
4,5,5,František,Wichterlová,"""JetBrains s.r.o.""","""Klanova 9/506""","""Prague""",,"""Czech Republic""","""14700""","""+420 2 4172 5555""","""frantisekw@jetbrains.com""","""Margaret""","""Park""",7,40.62


In [10]:
print("Number of records:", len(df_customers))
print("\nNull values in Customer Data:")
print(df_customers.isnull().sum())
print("\nUnique Customer IDs:", df_customers['CustomerBusinessKey'].is_unique)

Number of records: 59

Null values in Customer Data:
CustomerKey            0
CustomerBusinessKey    0
FirstName              0
LastName               0
CompanyName            0
Address                0
City                   0
State                  0
Country                0
PostalCode             0
Phone                  1
Email                  0
SupportRepFirstName    0
SupportRepLastName     0
TotalPurchases         0
TotalSpent             0
dtype: int64

Unique Customer IDs: True


### Read data from MySQL into PD Dataframe
This will be our employee table

In [11]:
employee_query = """
SELECT 
    e.*,  -- This gets all employee fields
    CONCAT(m.FirstName, ' ', m.LastName) as ReportsToName
FROM Employee e
LEFT JOIN Employee m ON e.ReportsTo = m.EmployeeId;
"""

df_employees = get_source_sql_dataframe(employee_query, **mysql_args)

df_employees

df_employees.insert(0, "EmployeeKey", range(1, df_employees.shape[0]+1))

# Convert dates to proper datetime format and handle null values in manager names
df_employees['BirthDate'] = pd.to_datetime(df_employees['BirthDate'])
df_employees['HireDate'] = pd.to_datetime(df_employees['HireDate'])
df_employees['ReportsToName'] = df_employees['ReportsToName'].fillna('No Manager')

df_employees.head()

Unnamed: 0,EmployeeKey,EmployeeId,LastName,FirstName,Title,ReportsTo,BirthDate,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email,ReportsToName
0,1,1,Adams,Andrew,General Manager,,1962-02-18,2002-08-14,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com,No Manager
1,2,2,Edwards,Nancy,Sales Manager,1.0,1958-12-08,2002-05-01,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com,Andrew Adams
2,3,3,Peacock,Jane,Sales Support Agent,2.0,1973-08-29,2002-04-01,1111 6 Ave SW,Calgary,AB,Canada,T2P 5M5,+1 (403) 262-3443,+1 (403) 262-6712,jane@chinookcorp.com,Nancy Edwards
3,4,4,Park,Margaret,Sales Support Agent,2.0,1947-09-19,2003-05-03,683 10 Street SW,Calgary,AB,Canada,T2P 5G3,+1 (403) 263-4423,+1 (403) 263-4289,margaret@chinookcorp.com,Nancy Edwards
4,5,5,Johnson,Steve,Sales Support Agent,2.0,1965-03-03,2003-10-17,7727B 41 Ave,Calgary,AB,Canada,T3B 1Y7,1 (780) 836-9987,1 (780) 836-9543,steve@chinookcorp.com,Nancy Edwards


In [12]:
print("Number of employees:", len(df_employees))
print("\nNull values in Employee Data:")
print(df_employees.isnull().sum())
print("\nDate ranges:")
print("Earliest hire date:", df_employees['HireDate'].min())
print("Latest hire date:", df_employees['HireDate'].max())

Number of employees: 8

Null values in Employee Data:
EmployeeKey      0
EmployeeId       0
LastName         0
FirstName        0
Title            0
ReportsTo        1
BirthDate        0
HireDate         0
Address          0
City             0
State            0
Country          0
PostalCode       0
Phone            0
Fax              0
Email            0
ReportsToName    0
dtype: int64

Date ranges:
Earliest hire date: 2002-04-01 00:00:00
Latest hire date: 2004-03-04 00:00:00


### Load the dataframes into the Chinook Data Warehouse
Note that the date dimension table has already been inserted into the warehouse through the provided script from **Lab C**

#### Load the products data frame

In [13]:
set_dataframe(
    df=df_product,
    table_name='dim_product',
    pk_column='ProductKey',
    db_operation='insert',
    **mysql_args
)

#### Load the customers data frame

In [14]:
set_dataframe(
    df=df_customers,
    table_name='dim_customer',
    pk_column='CustomerKey',
    db_operation='insert',
    **mysql_args
)

#### Load the employees data frame

In [15]:
set_dataframe(
    df=df_employees,
    table_name='dim_employee',
    pk_column='EmployeeKey',
    db_operation='insert',
    **mysql_args
)

### Fact Table Creation and Integration
After loading all dimension tables (customer, product, employee, date), we now create
the central fact table that will connect these dimensions and store all sales transactions.
This process involves:
 1. Extracting transaction data from source tables (Invoice and InvoiceLine)
 2. Creating surrogate keys for referential integrity
 3. Preparing for dimension key lookups to connect with our dimension tables

In [16]:
# Extract transaction data from source database (Chinook)
fact_query = """
SELECT 
    il.InvoiceLineId,
    il.InvoiceId,
    i.CustomerId,
    il.TrackId,
    i.InvoiceDate,
    il.UnitPrice,
    il.Quantity,
    (il.UnitPrice * il.Quantity) as TotalAmount
FROM InvoiceLine il
JOIN Invoice i ON il.InvoiceId = i.InvoiceId
"""

# Read data into DataFrame using our source database function
df_fact = get_source_sql_dataframe(fact_query, **mysql_args)

# Add surrogate key for fact table
df_fact.insert(0, "SalesKey", range(1, df_fact.shape[0]+1))

df_fact.head()

Unnamed: 0,SalesKey,InvoiceLineId,InvoiceId,CustomerId,TrackId,InvoiceDate,UnitPrice,Quantity,TotalAmount
0,1,1,1,2,2,2021-01-01,0.99,1,0.99
1,2,2,1,2,4,2021-01-01,0.99,1,0.99
2,3,3,2,4,6,2021-01-02,0.99,1,0.99
3,4,4,2,4,8,2021-01-02,0.99,1,0.99
4,5,5,2,4,10,2021-01-02,0.99,1,0.99


### Fetch Surrogate Keys from Dimension Tables

#### Fetch Date Key from dim_date

In [19]:
date_query = "SELECT date_key, full_date FROM chinook_dw.dim_date"
df_dim_date = get_sql_dataframe(date_query, **mysql_args)
df_dim_date.head()

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02
2,20000103,2000-01-03
3,20000104,2000-01-04
4,20000105,2000-01-05


#### Fetch Customer Key from dim_customer

In [20]:
# Get CustomerKey from dim_customer
customer_query = "SELECT CustomerKey, CustomerBusinessKey FROM chinook_dw.dim_customer"
df_dim_customer = get_sql_dataframe(customer_query, **mysql_args)
df_dim_customer.head()

Unnamed: 0,CustomerKey,CustomerBusinessKey
0,1,1
1,2,2
2,3,3
3,4,4
4,5,5


#### Fetch Product Key from dim_product

In [21]:
product_query = "SELECT ProductKey, TrackId FROM chinook_dw.dim_product"
df_dim_product = get_sql_dataframe(product_query, **mysql_args)
df_dim_product.head()

Unnamed: 0,ProductKey,TrackId
0,1,1
1,2,2
2,3,3
3,4,4
4,5,5


### Dimension Key Lookups

#### Date Lookup (fixing the date format)

In [22]:
df_fact['InvoiceDate'] = pd.to_datetime(df_fact['InvoiceDate']).dt.date
df_dim_date['full_date'] = pd.to_datetime(df_dim_date['full_date']).dt.date

df_dim_date_renamed = df_dim_date.rename(columns={
    "date_key": "DateKey",
    "full_date": "InvoiceDate"
})
df_fact = pd.merge(df_fact, df_dim_date_renamed, on='InvoiceDate', how='left')
df_fact.drop(['InvoiceDate'], axis=1, inplace=True)

#### Customer Lookup

In [23]:
df_fact = pd.merge(df_fact, df_dim_customer, 
                  left_on='CustomerId', 
                  right_on='CustomerBusinessKey', 
                  how='left')
df_fact.drop(['CustomerId', 'CustomerBusinessKey'], axis=1, inplace=True)

#### Product Lookup

In [24]:
df_fact = pd.merge(df_fact, df_dim_product, 
                  left_on='TrackId', 
                  right_on='TrackId', 
                  how='left')
df_fact.drop(['TrackId'], axis=1, inplace=True)

##  Fact Table Final Transformations and Loading
This section performs the final preparation of our fact table before loading it into the data warehouse. We ensure data quality, add derived metrics, and structure the table for optimal analysis.

In [25]:
# Create a copy of the dataframe to avoid SettingWithCopyWarning
df_fact = df_fact.copy()

# 1. Round numeric fields
df_fact['UnitPrice'] = df_fact['UnitPrice'].round(2)

#### Derived Metrics Creation

In [26]:
# Calculate total amount if it doesn't exist
df_fact['GrossAmount'] = (df_fact['UnitPrice'] * df_fact['Quantity']).round(2)
df_fact['Discount'] = 0  # placeholder for future use
df_fact['NetAmount'] = df_fact['GrossAmount'].round(2)  # Same as gross amount since no discount

#### Fact Table Data Quality Handling

In [27]:
# 3. Handle any NaN values in dimension keys
df_fact['DateKey'] = df_fact['DateKey'].fillna(0).astype(int)
df_fact['CustomerKey'] = df_fact['CustomerKey'].fillna(0).astype(int)
df_fact['ProductKey'] = df_fact['ProductKey'].fillna(0).astype(int)

# 4. Reorder columns for clarity
final_columns = [
    'SalesKey',          # Surrogate key
    'DateKey',           # Dimension keys
    'CustomerKey',
    'ProductKey',
    'InvoiceId',         # Business keys
    'InvoiceLineId',
    'Quantity',          # Measures
    'UnitPrice',
    'GrossAmount',
    'Discount',
    'NetAmount'
]

# 5. Select and reorder columns
df_fact = df_fact[final_columns]

In [28]:
print("Final Fact Table Structure:")
df_fact

Final Fact Table Structure:


Unnamed: 0,SalesKey,DateKey,CustomerKey,ProductKey,InvoiceId,InvoiceLineId,Quantity,UnitPrice,GrossAmount,Discount,NetAmount
0,1,0,2,2,1,1,1,0.99,0.99,0,0.99
1,2,0,2,4,1,2,1,0.99,0.99,0,0.99
2,3,0,4,6,2,3,1,0.99,0.99,0,0.99
3,4,0,4,8,2,4,1,0.99,0.99,0,0.99
4,5,0,4,10,2,5,1,0.99,0.99,0,0.99
...,...,...,...,...,...,...,...,...,...,...,...
2235,2236,0,44,0,411,2236,1,0.99,0.99,0,0.99
2236,2237,0,44,0,411,2237,1,0.99,0.99,0,0.99
2237,2238,0,44,0,411,2238,1,0.99,0.99,0,0.99
2238,2239,0,44,0,411,2239,1,0.99,0.99,0,0.99


In [29]:
print("Number of transactions:", len(df_fact))
print("\nNull values in fact table:")
print(df_fact.isnull().sum())

Number of transactions: 2240

Null values in fact table:
SalesKey         0
DateKey          0
CustomerKey      0
ProductKey       0
InvoiceId        0
InvoiceLineId    0
Quantity         0
UnitPrice        0
GrossAmount      0
Discount         0
NetAmount        0
dtype: int64


In [30]:
print("\nDimension key coverage:")
print("Unique CustomerKeys:", df_fact['CustomerKey'].nunique())
print("Unique ProductKeys:", df_fact['ProductKey'].nunique())
print("Unique DateKeys:", df_fact['DateKey'].nunique())


Dimension key coverage:
Unique CustomerKeys: 59
Unique ProductKeys: 581
Unique DateKeys: 1


In [31]:
print("\nValue ranges:")
print("Quantity range:", df_fact['Quantity'].min(), "to", df_fact['Quantity'].max())
print("UnitPrice range:", df_fact['UnitPrice'].min(), "to", df_fact['UnitPrice'].max())


Value ranges:
Quantity range: 1 to 1
UnitPrice range: 0.99 to 1.99


In [32]:
set_dataframe(
    df=df_fact,
    table_name='fact_sales',
    pk_column='SalesKey',
    db_operation='insert',
    **mysql_args
)

## Data Warehouse Validation and Analysis Queries for Sales Fact Table

#### Geographic Sales Analysis by Music Genre
This query examines the relationship between music genres and geographic markets by analyzing customer counts, track sales, and revenue across different countries.

In [33]:
sql_genre_country = """
    SELECT 
        p.GenreName,
        c.Country,
        COUNT(DISTINCT f.CustomerKey) as UniqueCustomers,
        SUM(f.Quantity) as TotalTracks,
        ROUND(SUM(f.UnitPrice * f.Quantity), 2) as TotalRevenue
    FROM chinook_dw.fact_sales f
    JOIN chinook_dw.dim_product p ON f.ProductKey = p.ProductKey
    JOIN chinook_dw.dim_customer c ON f.CustomerKey = c.CustomerKey
    GROUP BY p.GenreName, c.Country
    ORDER BY TotalRevenue DESC;
"""

df_genre_country = get_sql_dataframe(sql_genre_country, **mysql_args)
df_genre_country.head()

Unnamed: 0,GenreName,Country,UniqueCustomers,TotalTracks,TotalRevenue
0,Latin,"""USA""",10,48.0,47.52
1,Rock,"""USA""",10,43.0,42.57
2,Rock,"""France""",5,30.0,29.7
3,Latin,"""France""",4,26.0,25.74
4,Rock,"""Canada""",6,25.0,24.75


#### Sales Support Representative Performance Analysis
This query analyzes sales performance metrics for each support representative, showing their customer base size, sales volume, and revenue generation to evaluate individual effectiveness.

In [34]:
sql_support_rep = """
    SELECT 
        CONCAT(c.SupportRepFirstName, ' ', c.SupportRepLastName) as SupportRep,
        COUNT(DISTINCT f.CustomerKey) as NumberOfCustomers,
        COUNT(f.SalesKey) as TotalSales,
        ROUND(SUM(f.UnitPrice * f.Quantity), 2) as TotalRevenue,
        ROUND(AVG(f.UnitPrice * f.Quantity), 2) as AverageOrderValue
    FROM chinook_dw.fact_sales f
    JOIN chinook_dw.dim_customer c ON f.CustomerKey = c.CustomerKey
    GROUP BY SupportRep
    ORDER BY TotalRevenue DESC;
"""
df_support_rep = get_sql_dataframe(sql_support_rep, **mysql_args)
df_support_rep.head()

Unnamed: 0,SupportRep,NumberOfCustomers,TotalSales,TotalRevenue,AverageOrderValue
0,"""Jane"" ""Peacock""",21,796,833.04,1.05
1,"""Margaret"" ""Park""",20,760,775.4,1.02
2,"""Steve"" ""Johnson""",18,684,720.16,1.05


#### Top Artist Sales Performance Analysis
This query evaluates the commercial success of artists by analyzing their track sales, customer reach, and revenue generation, focusing on the top 15 performers.

In [35]:
sql_artist_analysis = """
    SELECT 
        p.ArtistName,
        COUNT(f.SalesKey) as TracksSold,
        COUNT(DISTINCT f.CustomerKey) as UniqueCustomers,
        ROUND(SUM(f.UnitPrice * f.Quantity), 2) as TotalRevenue,
        ROUND(AVG(f.UnitPrice), 2) as AverageTrackPrice
    FROM chinook_dw.fact_sales f
    JOIN chinook_dw.dim_product p ON f.ProductKey = p.ProductKey
    GROUP BY p.ArtistName
    ORDER BY TotalRevenue DESC
    LIMIT 15;
"""
df_artist = get_sql_dataframe(sql_artist_analysis, **mysql_args)
df_artist

Unnamed: 0,ArtistName,TracksSold,UniqueCustomers,TotalRevenue,AverageTrackPrice
0,Deep Purple,44,10,43.56,0.99
1,Faith No More,42,11,41.58,0.99
2,Creedence Clearwater Revival,37,12,36.63,0.99
3,Various Artists,29,14,28.71,0.99
4,Eric Clapton,27,10,26.73,0.99
5,Chico Buarque,27,11,26.73,0.99
6,Chico Science & Nação Zumbi,25,6,24.75,0.99
7,Antônio Carlos Jobim,22,12,21.78,0.99
8,Cássia Eller,21,6,20.79,0.99
9,Caetano Veloso,21,8,20.79,0.99


#### Temporal Sales Analysis and Trends
This query performs a comprehensive time-series analysis of sales data by examining monthly trends in customer activity, transaction volumes, and revenue metrics across different years.

In [36]:
sql_time_trends = """
    SELECT 
        YEAR(i.InvoiceDate) as Year,
        MONTH(i.InvoiceDate) as Month,
        COUNT(DISTINCT f.CustomerKey) as UniqueCustomers,
        COUNT(f.SalesKey) as NumberOfTransactions,
        SUM(f.Quantity) as TotalTracksSOld,
        ROUND(SUM(f.UnitPrice * f.Quantity), 2) as TotalRevenue,
        ROUND(AVG(f.UnitPrice * f.Quantity), 2) as AverageTransactionValue
    FROM chinook_dw.fact_sales f
    JOIN chinook.Invoice i ON f.InvoiceId = i.InvoiceId
    GROUP BY Year, Month
    ORDER BY Year, Month
    LIMIT 15;
"""
df_time_trends = get_sql_dataframe(sql_time_trends, **mysql_args)
df_time_trends

Unnamed: 0,Year,Month,UniqueCustomers,NumberOfTransactions,TotalTracksSOld,TotalRevenue,AverageTransactionValue
0,2021,1,6,36,36.0,35.64,0.99
1,2021,2,7,38,38.0,37.62,0.99
2,2021,3,7,38,38.0,37.62,0.99
3,2021,4,7,38,38.0,37.62,0.99
4,2021,5,7,38,38.0,37.62,0.99
5,2021,6,7,38,38.0,37.62,0.99
6,2021,7,7,38,38.0,37.62,0.99
7,2021,8,7,38,38.0,37.62,0.99
8,2021,9,7,38,38.0,37.62,0.99
9,2021,10,7,38,38.0,37.62,0.99
