## DS 2002 Project 1
### Ashley Manzanares fnv2vx
The database being using for this project is the [Sakila database](https://dev.mysql.com/doc/sakila/en/sakila-preface.html). This project focuses on the construction and utilization of a Data Warehouse for a rental business. It develops a rentals fact table which records each rental, including unique identifiers for each rentals, associated inventory items, customers, and staff members involved, as well as rental and return dates. 

To reproduce this project, ensure the following libraries are installed in your python envenviornment by running these commands:
- `python -m pip install PyMySQL`
- `python -m pip install mysql.connector-python`
- `python -m pip install sqlalchemy`
- `python -m pip install pymongo[srv]`

The SQL files in the [Github repo](https://github.com/amanzanares410/DS2002-Project1) have also been run beforehand.

In [1]:
# Import statements
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

### Setting up the Project
#### Creating connections to MongoDB,the MySQL Server and Databases
MySQL Server is hosted on Microsoft Azure

In [2]:
# MySQL Server connection
mysql_args = {
    "uid" : "amanzanares",
    "pwd" : "Passw0rd123!",
    "hostname" : "amanzanares-mysql.mysql.database.azure.com",
    "dbname" : "sakila_dw"
}

# MongoDB connection
mongodb_args = {
    "user_name" : "fnv2vx",
    "password" : "Passw0rd123!",
    "cluster_name" : "ds2002",
    "cluster_subnet" : "x5wij4u",
    "cluster_location" : "atlas", 
    "db_name" : "sakila_rentals"
}

#### Functions for Getting Data From and Setting Data Into Databases
(Code taken directly from lab 4)

In [3]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Creating JSON Files

In [4]:
# Storing the json files in a directory called data
data_dir = os.path.join(os.getcwd(), 'data')

if not os.path.exists(data_dir):
    os.makedirs(data_dir)

sql_query_rental = "SELECT * FROM sakila.rental"
df = get_sql_dataframe(sql_query_rental, **mysql_args)
json_file_path = os.path.join(data_dir, 'sakila_rental.json')
df.to_json(json_file_path, orient='records', lines=False)

sql_query_inventory = "SELECT * FROM sakila.inventory"
df = get_sql_dataframe(sql_query_inventory, **mysql_args)
json_file_path = os.path.join(data_dir, 'sakila_inventory.json')
df.to_json(json_file_path, orient='records', lines=False)

sql_query_customer = "SELECT * FROM sakila.customer"
df = get_sql_dataframe(sql_query_customer, **mysql_args)
json_file_path = os.path.join(data_dir, 'sakila_customer.json')
df.to_json(json_file_path, orient='records', lines=False)

sql_query_staff = "SELECT * FROM sakila.staff"
df = get_sql_dataframe(sql_query_staff, **mysql_args)
json_file_path = os.path.join(data_dir, 'sakila_staff.json')
df.to_json(json_file_path, orient='records', lines=False)

#### Populating MongoDB with the JSON File Data

In [5]:
client = get_mongo_client(**mongodb_args)

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"rental" : 'sakila_rental.json',
              "inventory" : 'sakila_inventory.json',
              "customer" : 'sakila_customer.json',
              "staff": 'sakila_staff.json',
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)

### Creating and Populating Dimension Tables
#### Extracting Data from MongoDB into Dataframes

In [6]:
client = get_mongo_client(**mongodb_args)

query = {} 
collection = "customer"

df_customer = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,1139954676000,1139979440000
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,1139954676000,1139979440000


In [7]:
client = get_mongo_client(**mongodb_args)

query = {} 
collection = "inventory"

df_inventory = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,1139980157000
1,2,1,1,1139980157000


In [8]:
client = get_mongo_client(**mongodb_args)

query = {} 
collection = "staff"

df_staff = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,1139975836000
1,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,,1139975836000


#### Getting Data from the Date Dimension Table

In [9]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


#### Cleaning Up Dataframes
Customer Table

In [10]:
# Dropping create_date and last_update
df_customer.drop(['create_date'], axis=1, inplace=True)
df_customer.drop(['last_update'], axis=1, inplace=True)

# Adding in incrementing key
df_customer.insert(0, "customer_key", range(1, df_customer.shape[0]+1))

df_customer.head(2)

Unnamed: 0,customer_key,customer_id,store_id,first_name,last_name,email,address_id,active
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1


Inventory Table

In [11]:
# Drop last_update
df_inventory.drop(['last_update'], axis=1, inplace=True)

# Adding in incrementing key
df_inventory.insert(0, "inventory_key", range(1, df_inventory.shape[0]+1))

df_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id
0,1,1,1,1
1,2,2,1,1


Staff Table

In [12]:
df_staff.drop(['last_update'], axis=1, inplace=True)

# Adding in incrementing key
df_staff.insert(0, "staff_key", range(1, df_staff.shape[0]+1))
df_staff.head(2)

Unnamed: 0,staff_key,staff_id,first_name,last_name,address_id,email,store_id,active,username,password
0,1,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964
1,2,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,


#### Loading Dataframes into Data Warehouse

In [13]:
dataframe = df_customer
table_name = 'dim_customer'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [14]:
dataframe = df_inventory
table_name = 'dim_inventory'
primary_key = 'inventory_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [15]:
dataframe = df_staff
table_name = 'dim_staff'
primary_key = 'staff_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### Validating that each Dataframe was Created

In [16]:
sql_customer = "SELECT * FROM sakila_dw.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_customer, **mysql_args)
df_dim_customer.head(2)

Unnamed: 0,customer_key,customer_id,store_id,first_name,last_name,email,address_id,active
0,1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
1,2,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1


In [17]:
sql_inventory = "SELECT * FROM sakila_dw.dim_inventory;"
df_dim_inventory = get_sql_dataframe(sql_inventory, **mysql_args)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id
0,1,1,1,1
1,2,2,1,1


In [18]:
sql_staff = "SELECT * FROM sakila_dw.dim_staff;"
df_dim_staff = get_sql_dataframe(sql_staff, **mysql_args)
df_dim_staff.head(2)

Unnamed: 0,staff_key,staff_id,first_name,last_name,address_id,email,store_id,active,username,password
0,1,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964
1,2,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,


### Creating and Populating the Fact Table

In [19]:
client = get_mongo_client(**mongodb_args)

query = {} 
collection = "rental"

df_fact_rental = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_fact_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,1116975210000,367,130,1117145000000.0,1,1140039053000
1,2,1116975273000,1525,459,1117309000000.0,1,1140039053000


#### Changing the Rental Date and Return Date Keys

In [20]:
df_fact_rental['rental_date'] = pd.to_datetime(df_fact_rental['rental_date'], unit='ms').dt.strftime('%Y-%m-%d')
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_dim_rental_date['rental_date'] = df_dim_rental_date['rental_date'].astype(str)
df_fact_rental = pd.merge(df_fact_rental, df_dim_rental_date, on='rental_date', how='left')
df_fact_rental.drop(['rental_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,return_date,staff_id,last_update,rental_date_key
0,1,367,130,1117145000000.0,1,1140039053000,20050524
1,2,1525,459,1117309000000.0,1,1140039053000,20050524


In [21]:
df_fact_rental = df_fact_rental[pd.to_numeric(df_fact_rental['return_date'], errors='coerce').notna()]
df_fact_rental['return_date'] = pd.to_numeric(df_fact_rental['return_date'], errors='coerce').fillna(0)
df_fact_rental['return_date'] = pd.to_datetime(df_fact_rental['return_date'], unit='ms').dt.strftime('%Y-%m-%d')
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_dim_rental_date['return_date'] = df_dim_rental_date['return_date'].astype(str)
df_fact_rental = pd.merge(df_fact_rental, df_dim_rental_date, on='return_date', how='left')
df_fact_rental.drop(['return_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,staff_id,last_update,rental_date_key,return_date_key
0,1,367,130,1,1140039053000,20050524,20050526
1,2,1525,459,1,1140039053000,20050524,20050528


#### Getting Primary Key and Business Id from Each Dimension Table

In [22]:
sql_dim_customer = "SELECT customer_key, customer_id FROM sakila_dw.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_dim_customer, **mysql_args)
df_dim_customer.head(2)

Unnamed: 0,customer_key,customer_id
0,1,1
1,2,2


In [23]:
sql_dim_inventory = "SELECT inventory_key, inventory_id FROM sakila_dw.dim_inventory;"
df_dim_inventory = get_sql_dataframe(sql_dim_inventory, **mysql_args)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id
0,1,1
1,2,2


In [24]:
sql_dim_staff = "SELECT staff_key, staff_id FROM sakila_dw.dim_staff;"
df_dim_staff = get_sql_dataframe(sql_dim_staff, **mysql_args)
df_dim_staff.head(2)

Unnamed: 0,staff_key,staff_id
0,1,1
1,2,2


#### Cleaning up Dataframe

In [25]:
# dropping the 'last_update' column
df_fact_rental.drop(['last_update'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,inventory_id,customer_id,staff_id,rental_date_key,return_date_key
0,1,367,130,1,20050524,20050526
1,2,1525,459,1,20050524,20050528


#### Establishing Foreign Key Relationships

In [26]:
df_fact_rental = pd.merge(df_fact_rental, df_dim_customer, on='customer_id', how='inner')
df_fact_rental.drop(['customer_id'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,inventory_id,staff_id,rental_date_key,return_date_key,customer_key
0,1,367,1,20050524,20050526,130
1,746,4272,2,20050529,20050602,130


In [27]:
df_fact_rental = pd.merge(df_fact_rental, df_dim_inventory, on='inventory_id', how='inner')
df_fact_rental.drop(['inventory_id'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,staff_id,rental_date_key,return_date_key,customer_key,inventory_key
0,1,1,20050524,20050526,130,367
1,3584,1,20050706,20050713,207,367


In [28]:
df_fact_rental = pd.merge(df_fact_rental, df_dim_staff, on='staff_id', how='inner')
df_fact_rental.drop(['staff_id'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_id,rental_date_key,return_date_key,customer_key,inventory_key,staff_key
0,1,20050524,20050526,130,367,1
1,3584,20050706,20050713,207,367,1


In [29]:
# Inserting in an increasing key
df_fact_rental.insert(0, "fact_rental_key", range(1, df_fact_rental.shape[0]+1))
df_fact_rental.head(2)

Unnamed: 0,fact_rental_key,rental_id,rental_date_key,return_date_key,customer_key,inventory_key,staff_key
0,1,1,20050524,20050526,130,367,1
1,2,3584,20050706,20050713,207,367,1


#### Loading New Data into the Data Warehouse

In [30]:
dataframe = df_fact_rental
table_name = 'fact_rental'
primary_key = 'fact_rental'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

### Validating the Fact Table
This query counts the number of rentals (Total Rentals) for each customer, grouping the results by the customer's last name and ordering them in descending order of the count, so you can see which customers rented the most.

In [31]:
sql_fact_rental = """
SELECT c.last_name,
    COUNT(fr.rental_id) AS 'Total Rentals'
FROM sakila_dw.fact_rental AS fr
JOIN sakila_dw.dim_customer AS c 
ON fr.customer_key = c.customer_key
GROUP BY c.last_name
ORDER BY 'Total Rentals' DESC;
"""
df_fact_rental = get_sql_dataframe(sql_fact_rental, **mysql_args)
df_fact_rental

Unnamed: 0,last_name,Total Rentals
0,HUNTER,24
1,CASTILLO,34
2,OBRIEN,14
3,STEVENS,24
4,SCHWAB,20
...,...,...
594,TRUONG,19
595,EMMONS,23
596,YOUNG,31
597,BUFORD,24
