In [None]:
# Midterm 1, DS 2002, Annabelle Claypoole

# In this project, I'll be designing a dimensional data mart that represents retail transactions. The dataset I will be working with is publicly avaible from Kaggle.

In [1]:
# Import first set of dependencies

import pandas as pd
from sqlalchemy import create_engine

In [2]:
# Declare and assign connection variables for MongoDB server

import pymongo

local_conn_str = "mongodb://localhost27017/"

client = pymongo.MongoClient(local_conn_str)

src_dbname = "Midterm1"
collection_name = "Datedimension"
db = client[src_dbname]
collection = db[collection_name]

In [23]:
# Declare and assign connection variables for MySQL Server

from sqlalchemy import create_engine

mysql_user = "root"
mysql_password = "Passw0rd123"
mysql_host = "localhost"
mysql_port = "3306"
mysql_db = "midterm_database"

mysql_conn_str = f'mysql+mysqlconnector://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_db}'

engine = create_engine(mysql_conn_str)

In [4]:
# Define Functions for Getting Data From and Setting Data Into Databases

def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [5]:
# Read in data, retrived from the CSV file

df = pd.read_csv(r"C:\Users\ds2002-student\Downloads\data\Online Retail.csv")

In [6]:
# Visualize sample of data, as well as what dimensions I am working with. In this project, I will be focusing on 'InvoiceDate' for the date dimension, as well as two additional dimension tables: customers, and products.

print(df.head())
print(df.info())

  InvoiceNo StockCode                          Description  Quantity  \
0    536365    85123A   WHITE HANGING HEART T-LIGHT HOLDER         6   
1    536365     71053                  WHITE METAL LANTERN         6   
2    536365    84406B       CREAM CUPID HEARTS COAT HANGER         8   
3    536365    84029G  KNITTED UNION FLAG HOT WATER BOTTLE         6   
4    536365    84029E       RED WOOLLY HOTTIE WHITE HEART.         6   

           InvoiceDate  UnitPrice  CustomerID         Country  
0  2010-12-01 08:26:00       2.55       17850  United Kingdom  
1  2010-12-01 08:26:00       3.39       17850  United Kingdom  
2  2010-12-01 08:26:00       2.75       17850  United Kingdom  
3  2010-12-01 08:26:00       3.39       17850  United Kingdom  
4  2010-12-01 08:26:00       3.39       17850  United Kingdom  
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 541909 entries, 0 to 541908
Data columns (total 8 columns):
 #   Column       Non-Null Count   Dtype  
---  ------       ------------

In [7]:
# I cleaned the data dropping null values. 

df.dropna(subset = ['CustomerID'], inplace = True)
df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])

In [8]:
# The next 4 cells are working through error with the formatting of date dimension

print(df['InvoiceDate'].head())

0   2010-12-01 08:26:00
1   2010-12-01 08:26:00
2   2010-12-01 08:26:00
3   2010-12-01 08:26:00
4   2010-12-01 08:26:00
Name: InvoiceDate, dtype: datetime64[ns]


In [9]:
print(pd.to_datetime(df['InvoiceDate'][0]))

2010-12-01 08:26:00


In [10]:
df['ParsedDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')
df['Flag'] = df['ParsedDate'].isna()
print(df[df['Flag'] == True])

Empty DataFrame
Columns: [InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country, ParsedDate, Flag]
Index: []


In [11]:
date_dim = pd.DataFrame(df['InvoiceDate'].dt.normalize().unique(), columns=['Date'])

# Extract year, month, and day
date_dim['Year'] = date_dim['Date'].dt.year
date_dim['Month'] = date_dim['Date'].dt.month
date_dim['Day'] = date_dim['Date'].dt.day

# Visualize date dimension

date_dim.head(2)

Unnamed: 0,Date,Year,Month,Day
0,2010-12-01,2010,12,1
1,2010-12-02,2010,12,2


In [12]:
# Now that our data is in the proper format, I'll connect it to MySQL

date_dim.to_sql('date_dim', con = engine, if_exists = 'append', index = False)

305

In [13]:
# Create other two dimensions. The customer dimension will contain the Customer ID and their Country. The product dimension will contain the stock code for the item and its description.

customer_dim = df[['CustomerID', 'Country']].drop_duplicates()

product_dim = df[['StockCode', 'Description']].drop_duplicates()

# Next, I'll connect these dimensions to SQL

engine = create_engine(mysql_conn_str)

customer_dim.to_sql('dim_customer', con = engine, if_exists = 'append', index = False)

product_dim.to_sql('dim_product', con = engine, if_exists = 'append', index = False)

5752

In [14]:
# Visulaize the customer dimension table

customer_dim.head(2)

Unnamed: 0,CustomerID,Country
0,17850,United Kingdom
9,13047,United Kingdom


In [15]:
# Visulaize the product dimension table

product_dim.head(2)

Unnamed: 0,StockCode,Description
0,85123A,WHITE HANGING HEART T-LIGHT HOLDER
1,71053,WHITE METAL LANTERN


In [16]:
# Now, I'll create fact table, adding additional dimesnion of total price

df['TotalPrice'] = df['Quantity']*df['UnitPrice']

fact_sales_columns = ['InvoiceNo', 'StockCode', 'Quantity', 'Description', 'InvoiceDate', 'TotalPrice', 'CustomerID', 'Country']

fact_sales = df[fact_sales_columns]

fact_sales.to_sql('fact_sales', con = engine, if_exists = 'append', index = False)


541909

In [17]:
# Visualize fact table

fact_sales.head(2)

Unnamed: 0,InvoiceNo,StockCode,Quantity,Description,InvoiceDate,TotalPrice,CustomerID,Country
0,536365,85123A,6,WHITE HANGING HEART T-LIGHT HOLDER,2010-12-01 08:26:00,15.3,17850,United Kingdom
1,536365,71053,6,WHITE METAL LANTERN,2010-12-01 08:26:00,20.34,17850,United Kingdom


In [18]:
# Demonstrate new datawarehouse exists and contains the correct data

sql_fact_sales = """
   SELECT f.*, c.Country, p.Description
FROM fact_sales f
JOIN dim_customer c ON f.CustomerID = c.CustomerID
JOIN dim_product p ON f.StockCode = p.StockCode
GROUP BY c.Country
ORDER BY SUM(f.TotalPrice) DESC
LIMIT 20;

"""

In [25]:
df_fact_sales = get_sql_dataframe(sql_fact_sales, mysql_user, mysql_password, mysql_host, mysql_db)
df_fact_sales

# The data warehouse from SQL could not be displayed in this Jupyter notebook due to an error with the password. However, I was able to display the datawarehouse within SQL, and that notebook is included in this submission. As well, the code from that SQL sequence is in the next cell.

OperationalError: (pymysql.err.OperationalError) (2003, "Can't connect to MySQL server on 'Passw0rd123' ([Errno 11001] getaddrinfo failed)")
(Background on this error at: https://sqlalche.me/e/14/e3q8)

# SQL Code, also included in SQL file in GitHub repo

SELECT
	c.Country, 
    SUM(f.TotalPrice) AS TotalSales, 
    COUNT(f.InvoiceNo) AS NumberOfInvoices
    
    
FROM
	(SELECT * FROM fact_sales ORDER BY InvoiceDate ASC LIMIT 1000) f 
JOIN 
	dim_customer c ON f.CustomerID = c.CustomerID
JOIN
	dim_product p ON f.StockCode = p.StockCode
GROUP BY
	c.Country
ORDER BY
	TotalSales DESC;