# DS 2002 Midterm

## Deliverable 1: Design a dimensional data mart that represents a simple business process of your choosing.
   ### a. 
   Retail sales
   ### b. 
   Sakila MySQL database - Business process is sales at a movie rental store
   
## Deliverable 2: Develop an ETL pipeline that extracts, transforms and loads data into your data mart.
   ### a. Extract data from one or more SQL database tables; hosted locally or in the Cloud.
   ### *Extracting the customers and staff tables from MySQL workbench relational database (on the local virtual computer)*
   + Extacting the payment and rental tables from JSON files (located in the data folder in Jupyter)

#### Importing Libraries
Before importing, make sure these are on your machine. If they aren't, open a terminal and install them.

In [2]:
import os
import json
import numpy
import datetime
import pandas as pd
import pymongo
from sqlalchemy import create_engine

#### Declaring and assigning connection variables to use with MySQL server and the Sakila database

In [3]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila" # source database to grabbing info from
dst_dbname = "sakila_dw2" # data mart I am creating

#### Defining functions for getting data from a database and setting data into a database

In [4]:
# this fn connects to MySQL database to get data from it and creates a dataframe
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}" # the at sign means we cannot put an @ in our password to the connection
    sqlEngine = create_engine(conn_str, pool_recycle=3600) # pool recycle quits command if it takes too long
    connection = sqlEngine.connect() #this returns an instance of a connection to the MySQL server
    dframe = pd.read_sql(sql_query, connection); #this uses Pandas to make a dataframe using sql query
    connection.close() # close connection to conserve resources
    
    return dframe

# this fn creates a new table in MySQL using the Pandas dataframe we created using the get_dataframe fn
def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert": # to insert new table
        df.to_sql(table_name, con=connection, index=False, if_exists='replace') # if_exists=replace will drop the table if it already exists
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update": # to update an old table
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Creating the new data warehouse database and using it to switch the connection context

In [5]:
# creates connection to MySQL
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

# this empties the database if it exists, creates a fresh one, and then sets following code to use the new database
sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

OperationalError: (pymysql.err.OperationalError) (1045, "Access denied for user 'root'@'localhost' (using password: YES)")
(Background on this error at: http://sqlalche.me/e/13/e3q8)

### EXTRACT
#### Extracting data from the source database tables

In [None]:
sql_customers = "SELECT * FROM sakila.customer;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers) # makes dataframe for the sakila customers
df_customers.head(2) # check if it worked

In [None]:
sql_staff = "SELECT * FROM sakila.employees;"
df_staff = get_dataframe(user_id, pwd, host_name, src_dbname, sql_staff) # makes dataframe for sakila staff
df_staff.head(2)

### TRANSFORM
#### Transforming data to drop unnecessary columns and rename the primary key

In [None]:
drop_cols = ['email_address','home_phone','mobile_phone','web_page','notes','attachments'] # this list has the cols we have deemed unnecessary and want to drop
df_customers.drop(drop_cols, axis=1, inplace=True) # this drops the cols in the drop_cols list we made above from the df_customers dataframe. Axis=0 (is rows) and axis=1 (is cols). Inplace=TRUE makes modification to df directly instead of making a copy of it.
df_customers.rename(columns={"id":"customer_key"}, inplace=True) # renames columns called id as customer_key

df_customers.head(2)

In [None]:
drop_cols = ['mobile_phone','notes','attachments']
df_employees.drop(drop_cols, axis=1, inplace=True)
df_employees.rename(columns={"id":"employee_key"}, inplace=True)

df_employees.head(2)

### LOAD
#### Loading the transformed dataframes into the new data warehouse by creating new tables

In [None]:
db_operation = "insert"

# tables is a list containing vectors that act as a row that creates a table for each dataframe we made (see above description)
tables = [('dim_customers', df_customers, 'customer_key'), # third argument sets the primary key for the table
          ('dim_employees', df_employees, 'employee_key'),
          ('dim_products', df_products, 'product_key'),
          ('dim_shippers', df_shippers, 'shipper_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

   ### *Extracting the store and inventory tables from MongoDB NoSQL database (using my Atlas account)*

#### Delcaring and assigning connection varables for the MongoDB server, MySQL server, and the databases

In [None]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"
#mysql_host = "ds2002-mysql.mysql.database.azure.com"

atlas_cluster_name = "sandbox.den9rfu"
atlas_user_name = "ecm8yu"
atlas_password = "HR91sK8EWrAUzWRM"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "northwind_purchasing"
dst_dbname = "northwind_dw2"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

#### Defining functions for getting data from databases and putting data into databases

In [None]:
def get_sql_dataframe(user_id, pwd, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Populating MongoDB with source data

In [None]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"suppliers" : 'northwind_suppliers.json',
              "invoices" : 'northwind_invoices.json',
              "purchase_orders" : 'northwind_purchase_orders.json',
              "inventory_transactions" : 'northwind_inventory_transactions.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile) #turns JSON string file into an object
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()

### EXTRACT
#### Extracting data from the source MongoDB collections into dataframes

In [None]:
query = {}
collection = "suppliers"

df_suppliers = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_suppliers.head(2)

In [None]:
query = {}
collection = "suppliers"

df_suppliers = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_suppliers.head(2)

### TRANSFORM
#### Making necessary transformations

In [None]:
df_suppliers.rename(columns={"id":"supplier_key"}, inplace=True)
df_suppliers.head(2)

In [None]:
df_suppliers.rename(columns={"id":"supplier_key"}, inplace=True)
df_suppliers.head(2)

### LOAD
#### Loading the transformed dataframes into the new data warehouse by creating new tables

In [None]:
dataframe = df_suppliers
table_name = 'dim_suppliers'
primary_key = 'supplier_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [None]:
dataframe = df_suppliers
table_name = 'dim_suppliers'
primary_key = 'supplier_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validating the store and inventory dimension tables were created

In [None]:
sql_suppliers = "SELECT * FROM northwind_dw2.dim_suppliers;"
df_dim_suppliers = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_suppliers)
df_dim_suppliers.head(2)

In [None]:
sql_suppliers = "SELECT * FROM northwind_dw2.dim_suppliers;"
df_dim_suppliers = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_suppliers)
df_dim_suppliers.head(2)