### Author: Vishal Kamalakrishnan
### Computing Id: cjq2cw

This project will analyze sales of a mock company. The data for the mock company is present in the sales dataset of the sample_supplies collection, which is a mock Mongo DB database. In an ETL pipeline, the data will then be merged with the MySQL world database to get the country code of the city. This data will then be merged with a country mapping dataset from Kaggle (continents2.csv) containing information about a country's region. This ETL will allow the mock company to analyze aggregate number of sales by country or region

In [None]:
import pandas as pd
import pymongo
import certifi
from sqlalchemy import create_engine, text

In [None]:
# mysql auth
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "TBX3|L2e77i7" # replace with your password

dbname = "world"

In [None]:
# mongo db auth
mongodb_args = {
    "user_name" : "cjq2cw", # replace with your usename
    "password" : "Password123", # replace with your password
    "cluster_name" : "cluster0", # replace with your cluster name
    "cluster_subnet" : "gtykg", # replace with your cluster subnet
    "cluster_location" : "atlas", # "local"
    "db_name" : "sample_supplies"
}

In [46]:
# helper functions
def get_mysql_db_as_df(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

def get_csv_as_df(path):
    return pd.read_csv(path)

def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client

def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe

### Getting Data

In [47]:
query = "SELECT * FROM world.city;"
df_mysql_world = get_mysql_db_as_df(user_id, pwd, host_name, dbname, query)
df_mysql_world.head(2)

Unnamed: 0,ID,Name,CountryCode,District,Population
0,1,Kabul,AFG,Kabol,1780000
1,2,Qandahar,AFG,Qandahar,237500


In [48]:
df_kaggle_country_iso = get_csv_as_df("./data/continents2.csv")
df_kaggle_country_iso.head(2)

Unnamed: 0,name,alpha-2,alpha-3,country-code,iso_3166-2,region,sub-region,intermediate-region,region-code,sub-region-code,intermediate-region-code
0,Afghanistan,AF,AFG,4,ISO 3166-2:AF,Asia,Southern Asia,,142.0,34.0,
1,Åland Islands,AX,ALA,248,ISO 3166-2:AX,Europe,Northern Europe,,150.0,154.0,


In [49]:
mongodb_client = get_mongo_client(**mongodb_args)
collection = "sales"
query = {} # Select all elements (columns), and all documents (rows).
df_mongo_sales = get_mongo_dataframe(mongodb_client, mongodb_args["db_name"], collection, query)
df_mongo_sales.head(2)

Unnamed: 0,saleDate,items,storeLocation,customer,couponUsed,purchaseMethod
0,2015-03-23 21:06:49.506,"[{'name': 'printer paper', 'tags': ['office', ...",Denver,"{'gender': 'M', 'age': 42, 'email': 'cauho@wit...",True,Online
1,2015-08-25 10:01:02.918,"[{'name': 'envelopes', 'tags': ['stationary', ...",Seattle,"{'gender': 'M', 'age': 50, 'email': 'keecade@h...",False,Phone


### Creating Dim Tables for Buisness Lookup

In [50]:
# creating a dim_date for the sale dates
dates = pd.to_datetime(df_mongo_sales['saleDate'])
df_dim_date = pd.DataFrame({
        # converting timestamp to y-m-d
        'full_date': dates.dt.strftime('%Y-%m-%d'),
        'year': dates.dt.year,
        'quarter': dates.dt.quarter,
        'month': dates.dt.month,
        'month_name': dates.dt.strftime('%B'),
    }).drop_duplicates()

# adding a primary key
df_dim_date.insert(0, "date_id", range(1, df_dim_date.shape[0]+1))
df_dim_date.head(2)

Unnamed: 0,date_id,full_date,year,quarter,month,month_name
0,1,2015-03-23,2015,1,3,March
1,2,2015-08-25,2015,3,8,August


In [51]:
# creating a new dim_store based on the store locations of mock company
df_dim_store = pd.DataFrame({
    'store_location': df_mongo_sales['storeLocation'].unique()
})
# merging sales with mysql countries by store location 
df_dim_store = df_dim_store.merge(
    df_mysql_world,
    left_on='store_location',
    right_on='Name',
    how='left'
)

# removing and renaming columns to be consistent
df_dim_store = df_dim_store.drop(columns=["ID", "Population", "Name", "District"])
df_dim_store = df_dim_store.rename(columns={'CountryCode': 'country_code'})

# adding a primary key
df_dim_store.insert(0, "store_id", range(1, df_dim_store.shape[0]+1))

df_dim_store.head(2)

Unnamed: 0,store_id,store_location,country_code
0,1,Denver,USA
1,2,Seattle,USA


In [52]:
# creating a new dim_country based on country info
df_dim_country = df_kaggle_country_iso.drop(columns=["alpha-2", "country-code", 'intermediate-region', 'intermediate-region-code'])

# renaming colums 
df_dim_country = df_dim_country.rename(columns={
    'alpha-3': 'country_code',
    'sub-region' : 'sub_region',
    'region-code' : 'region_code',
    'sub-region-code' : 'sub_region_code',
    
})

# adding a primary key
df_dim_country.insert(0, "country_id", range(1, df_dim_country.shape[0]+1))

df_dim_country.head(2)

Unnamed: 0,country_id,name,country_code,iso_3166-2,region,sub_region,region_code,sub_region_code
0,1,Afghanistan,AFG,ISO 3166-2:AF,Asia,Southern Asia,142.0,34.0
1,2,Åland Islands,ALA,ISO 3166-2:AX,Europe,Northern Europe,150.0,154.0


In [53]:
# adding the country id to dim_stores instead of the country code and dropping other columns
df_dim_store = df_dim_store.merge(df_dim_country, on='country_code')
df_dim_store = df_dim_store.drop(columns=['country_code', 'name', 'iso_3166-2', 'region', 'sub_region', 'region_code', 'sub_region_code'])
df_dim_store.head(2)

Unnamed: 0,store_id,store_location,country_id
0,1,Denver,236
1,2,Seattle,236


### Creating a Fact Sales Table

In [54]:
# creating a fact_sales table with sale data based on date, store, and country
df_fact_sales = df_mongo_sales
df_fact_sales['saleDate'] = df_fact_sales['saleDate'].dt.strftime('%Y-%m-%d')

# first merging sale date
df_fact_sales = df_fact_sales.merge(
    df_dim_date,
    left_on='saleDate',
    right_on='full_date',
    how='left'
)

# dropping columns other than id
df_fact_sales = df_fact_sales.drop(columns=['saleDate', 'full_date', 'year', 'quarter', 'month', 'month_name'])

df_fact_sales.head(2)

Unnamed: 0,items,storeLocation,customer,couponUsed,purchaseMethod,date_id
0,"[{'name': 'printer paper', 'tags': ['office', ...",Denver,"{'gender': 'M', 'age': 42, 'email': 'cauho@wit...",True,Online,1
1,"[{'name': 'envelopes', 'tags': ['stationary', ...",Seattle,"{'gender': 'M', 'age': 50, 'email': 'keecade@h...",False,Phone,2


In [55]:
# merging fact_sales with dim_store
df_fact_sales = df_fact_sales.merge(
    df_dim_store,
    left_on='storeLocation',
    right_on='store_location',
    how='left'
)
# dropping columns other than id
df_fact_sales = df_fact_sales.drop(columns=['storeLocation', 'store_location', 'country_id'])

df_fact_sales.head(2)

Unnamed: 0,items,customer,couponUsed,purchaseMethod,date_id,store_id
0,"[{'name': 'printer paper', 'tags': ['office', ...","{'gender': 'M', 'age': 42, 'email': 'cauho@wit...",True,Online,1,1
1,"[{'name': 'envelopes', 'tags': ['stationary', ...","{'gender': 'M', 'age': 50, 'email': 'keecade@h...",False,Phone,2,2


In [56]:
# renaming cols and adding primary key for fact_sales
df_fact_sales = df_fact_sales.rename(columns={'couponUsed': 'coupon_used', 'purchaseMethod': 'purchase_method'})

df_fact_sales.insert(0, "fact_sale_id", range(1, df_fact_sales.shape[0]+1))
df_fact_sales.head(2)

Unnamed: 0,fact_sale_id,items,customer,coupon_used,purchase_method,date_id,store_id
0,1,"[{'name': 'printer paper', 'tags': ['office', ...","{'gender': 'M', 'age': 42, 'email': 'cauho@wit...",True,Online,1,1
1,2,"[{'name': 'envelopes', 'tags': ['stationary', ...","{'gender': 'M', 'age': 50, 'email': 'keecade@h...",False,Phone,2,2


In [57]:
# dropping items off fact_sales -> a lot of processing is needed to create a seperate items dimension
# the items are in the form of list of objects for each individual item, there might be duplicates

# same with customer
df_fact_sales = df_fact_sales.drop(columns=['items', 'customer'])

And there we have it, the data has been processed by the ETL and a new df for fact_sales has been created. Using this df and the dim_store, dim_date, and dim_country the mock buisness can aggregate sales by date, country, or region

In [58]:
def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [None]:
# create a new db schema in MySQL to store the new database
dst_dbname = "mock_company"

In [60]:
table_name = "fact_sale"
primary_key = "fact_sale_id"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_sales, table_name, primary_key, db_operation)

In [61]:
# adding the dim date table
table_name = "dim_date"
primary_key = "date_id"
db_operation = "insert"
set_dataframe(user_id, pwd, host_name, dst_dbname, df_dim_date, table_name, primary_key, db_operation)

In [62]:
# adding the dim country table
table_name = "dim_country"
primary_key = "country_id"
db_operation = "insert"
set_dataframe(user_id, pwd, host_name, dst_dbname, df_dim_country, table_name, primary_key, db_operation)

In [63]:
# adding the dim store table
table_name = "dim_store"
primary_key = "store_id"
db_operation = "insert"
set_dataframe(user_id, pwd, host_name, dst_dbname, df_dim_store, table_name, primary_key, db_operation)

### Verifying that the new data store works

In [82]:
# total number of sales & sales with coupon by country
query = """
SELECT 
    c.name AS country,
    COUNT(f.fact_sale_id) AS total_sales,
    SUM(CASE WHEN 
            f.coupon_used = 1 THEN 1 
                ELSE 0 
        END) 
    AS sales_with_coupon
FROM dim_country c
    JOIN dim_store s ON c.country_id = s.country_id
    JOIN fact_sale f ON s.store_id = f.store_id
GROUP BY c.name, c.region, c.sub_region
ORDER BY total_sales DESC;
"""
df_total_sales_by_country = get_mysql_db_as_df(user_id, pwd, host_name, dst_dbname, query)
df_total_sales_by_country

Unnamed: 0,country,total_sales,sales_with_coupon
0,United States,4206,401.0
1,United Kingdom,794,76.0
2,Canada,794,76.0


In [83]:
# total number of sales and sales with coupon by region in 2016
query = """
SELECT 
    c.region,
    d.year,
    COUNT(f.fact_sale_id) AS total_sales,
    SUM(CASE WHEN 
            f.coupon_used = 1 THEN 1 
                ELSE 0 
            END) 
        AS sales_with_coupon
FROM dim_country c
    JOIN dim_store s ON c.country_id = s.country_id
    JOIN fact_sale f ON s.store_id = f.store_id
    JOIN dim_date d ON f.date_id = d.date_id
WHERE d.year = 2016
GROUP BY c.region, d.year
ORDER BY d.year, total_sales DESC;
"""
df_total_sales_by_region_in_timeframe = get_mysql_db_as_df(user_id, pwd, host_name, dst_dbname, query)
df_total_sales_by_region_in_timeframe

Unnamed: 0,region,year,total_sales,sales_with_coupon
0,Americas,2016,1010,95.0
1,Europe,2016,158,16.0
