# ETL Data Processor

### The sakila database represents a DVD rental store and contains data regarding customers, rentals, payments, etc. This ETL Data Processor extracts, transforms, and loads data from the sakila database

### Import libraries

In [1]:
import os
import numpy
import json
import csv
import pandas as pd
import datetime
import pymongo
from sqlalchemy import create_engine

### Connect to MySQL and MongoDB

In [72]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "ani_ponugoti"
pwd = "PineApple53!"

ports = {"mongo" : 27017, "mysql" : 3306}

atlas_cluster_name = "sakila"
atlas_default_dbname = "admin"
atlas_user_name = "ani_ponugoti"
atlas_password = "PineApple"

conn_str = {"local" : f"mongodb://{host_name}:{ports['mongo']}/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.zvzvnbm.mongodb.net/test?authSource={atlas_default_dbname}"
}

print(conn_str["local"])
print(conn_str["atlas"])

src_dbname = "sakila"
dst_dbname = "sakila_dw"

mongodb://localhost:27017/
mongodb+srv://ani_ponugoti:PineApple@sakila.zvzvnbm.mongodb.net/test?authSource=admin


### Create functions for pulling from and pushing dataframes into MySQLWorkbench and MongoDB Compass

In [3]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def get_mongo_dataframe_local(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.reset_index().to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

### Connect to SQLAlchemy Engine

In [4]:
conn_str1 = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str1, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f883b9f71f0>

### Read data from MySQL table and convert to dataframes

In [59]:
try:
    sql_customer = "SELECT * FROM sakila.customer;"
    df_customer = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
    print(df_customer.head(5))
except:
    print("sakila.customer does not exist")

   customer_id  store_id first_name last_name  \
0            1         1       MARY     SMITH   
1            2         1   PATRICIA   JOHNSON   
2            3         1      LINDA  WILLIAMS   
3            4         2    BARBARA     JONES   
4            5         1  ELIZABETH     BROWN   

                                 email  address_id  active  \
0        MARY.SMITH@sakilacustomer.org           5       1   
1  PATRICIA.JOHNSON@sakilacustomer.org           6       1   
2    LINDA.WILLIAMS@sakilacustomer.org           7       1   
3     BARBARA.JONES@sakilacustomer.org           8       1   
4   ELIZABETH.BROWN@sakilacustomer.org           9       1   

          create_date         last_update  
0 2006-02-14 22:04:36 2006-02-15 04:57:20  
1 2006-02-14 22:04:36 2006-02-15 04:57:20  
2 2006-02-14 22:04:36 2006-02-15 04:57:20  
3 2006-02-14 22:04:36 2006-02-15 04:57:20  
4 2006-02-14 22:04:36 2006-02-15 04:57:20  


### Read data from JSON File and convert to dataframe

In [61]:
try:
    df_payment = pd.read_json("sakila-payment.json")
    print(df_payment.head(5))
except:
    print("sakila-payment.json does not exist")

   payment_id  customer_id  staff_id  rental_id  amount         payment_date  \
0           1            1         1         76    2.99  2005-05-25 11:30:37   
1           2            1         1        573    0.99  2005-05-28 10:35:23   
2           3            1         1       1185    5.99  2005-06-15 00:54:12   
3           4            1         2       1422    0.99  2005-06-15 18:02:53   
4           5            1         2       1476    9.99  2005-06-15 21:08:46   

           last_update  
0  2006-02-15 22:12:30  
1  2006-02-15 22:12:30  
2  2006-02-15 22:12:30  
3  2006-02-15 22:12:30  
4  2006-02-15 22:12:30  


### Write data from CSV File to MongoDB Compass Table, Read data from MongoDB Compass Table, and convert data to Dataframe

In [69]:
try:
    client = pymongo.MongoClient(conn_str["atlas"])
    db = client[src_dbname]
    data_dir = os.path.join(os.getcwd())

    csv_files = {"rental" : 'sakila-rental.csv'}

    for file in csv_files:
        csv_f = os.path.join(data_dir, csv_files[file])
        with open(csv_f, 'r') as openfile:
            reader = csv.DictReader(openfile)
            list_reader = list(reader)
            file = db[file]
            result = file.insert_many(list_reader)
    client.close()
    
    query = {}
    collection = "rental"
    df_rental = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
    print(df_rental.head(5))
except:
    print("sakila-rental.csv does not exist or the data did not get inserted into MongoDB Compass")

  rental_id          rental_date inventory_id customer_id  \
0         1  2005-05-24 22:53:30          367         130   
1         2  2005-05-24 22:54:33         1525         459   
2         3  2005-05-24 23:03:39         1711         408   
3         4  2005-05-24 23:04:41         2452         333   
4         5  2005-05-24 23:05:21         2079         222   

           return_date staff_id          last_update  
0  2005-05-26 22:04:30        1  2006-02-15 21:30:53  
1  2005-05-28 19:40:33        1  2006-02-15 21:30:53  
2  2005-06-01 22:12:39        1  2006-02-15 21:30:53  
3  2005-06-03 01:43:41        2  2006-02-15 21:30:53  
4  2005-06-02 04:33:21        1  2006-02-15 21:30:53  


### Drop unnecessary columns from dimension tables and rename primary keys

In [13]:
# Customer dimension table
drop_cols = ["email", "address_id", "active", "create_date", "last_update"]
df_customer.drop(drop_cols, axis=1, inplace=True)
df_customer.rename(columns={"customer_id": "customer_key"}, inplace=True)
df_customer

Unnamed: 0,customer_key,store_id,first_name,last_name
0,1,1,MARY,SMITH
1,2,1,PATRICIA,JOHNSON
2,3,1,LINDA,WILLIAMS
3,4,2,BARBARA,JONES
4,5,1,ELIZABETH,BROWN
...,...,...,...,...
594,595,1,TERRENCE,GUNDERSON
595,596,1,ENRIQUE,FORSYTHE
596,597,1,FREDDIE,DUGGAN
597,598,1,WADE,DELVALLE


In [15]:
# Payment dimension table
drop_cols = ["staff_id", "last_update"]
df_payment.drop(drop_cols, axis=1, inplace=True)
df_payment.rename(columns={"payment_id": "payment_key"}, inplace=True)
df_payment.head(5)

Unnamed: 0,payment_key,customer_id,rental_id,amount,payment_date
0,1,1,76,2.99,2005-05-25 11:30:37
1,2,1,573,0.99,2005-05-28 10:35:23
2,3,1,1185,5.99,2005-06-15 00:54:12
3,4,1,1422,0.99,2005-06-15 18:02:53
4,5,1,1476,9.99,2005-06-15 21:08:46


In [17]:
# Rental dimension table
drop_cols = ["inventory_id", "staff_id", "last_update"]
df_rental.drop(drop_cols, axis=1, inplace=True)
df_rental.rename(columns={"rental_id": "rental_key"}, inplace=True)
df_rental.head(5)

Unnamed: 0,rental_key,rental_date,customer_id,return_date
0,1,2005-05-24 22:53:30,130,2005-05-26 22:04:30
1,2,2005-05-24 22:54:33,459,2005-05-28 19:40:33
2,3,2005-05-24 23:03:39,408,2005-06-01 22:12:39
3,4,2005-05-24 23:04:41,333,2005-06-03 01:43:41
4,5,2005-05-24 23:05:21,222,2005-06-02 04:33:21


### Insert dimension tables into sakila_dw

In [20]:
db_operation = "insert"
tables = [('dim_customer', df_customer, 'customer_key'),
          ('dim_payment', df_payment, 'payment_key'),
          ('dim_rental', df_rental, 'rental_key(100)')]

In [21]:
try:
    for table_name, dataframe, primary_key in tables:
        set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)
except:
    print("The dimension tables were not inserted into sakila_dw properly")

### Create fact_orders table by joining dimension tables

In [34]:
sql_fact_orders = """SELECT r.rental_id as rental_key,
r.rental_date,
r.customer_id,
r.return_date,
p.payment_id,
p.amount,
p.payment_date,
c.store_id,
c.first_name,
c.last_name
FROM rental r JOIN payment p ON r.rental_id = p.rental_id JOIN customer c ON p.customer_id = c.customer_id;"""
df_fact_orders = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_fact_orders)
df_fact_orders

Unnamed: 0,rental_key,rental_date,customer_id,return_date,payment_id,amount,payment_date,store_id,first_name,last_name
0,76,2005-05-25 11:30:37,1,2005-06-03 12:00:37,1,2.99,2005-05-25 11:30:37,1,MARY,SMITH
1,573,2005-05-28 10:35:23,1,2005-06-03 06:32:23,2,0.99,2005-05-28 10:35:23,1,MARY,SMITH
2,1185,2005-06-15 00:54:12,1,2005-06-23 02:42:12,3,5.99,2005-06-15 00:54:12,1,MARY,SMITH
3,1422,2005-06-15 18:02:53,1,2005-06-19 15:54:53,4,0.99,2005-06-15 18:02:53,1,MARY,SMITH
4,1476,2005-06-15 21:08:46,1,2005-06-25 02:26:46,5,9.99,2005-06-15 21:08:46,1,MARY,SMITH
...,...,...,...,...,...,...,...,...,...,...
16039,14599,2005-08-21 17:43:42,599,2005-08-22 18:53:42,16045,4.99,2005-08-21 17:43:42,2,AUSTIN,CINTRON
16040,14719,2005-08-21 21:41:57,599,2005-08-25 20:37:57,16046,1.99,2005-08-21 21:41:57,2,AUSTIN,CINTRON
16041,15590,2005-08-23 06:09:44,599,2005-09-01 06:53:44,16047,8.99,2005-08-23 06:09:44,2,AUSTIN,CINTRON
16042,15719,2005-08-23 11:08:46,599,2005-08-25 07:25:46,16048,2.99,2005-08-23 11:08:46,2,AUSTIN,CINTRON


### Insert fact orders table into sakila_dw

In [35]:
try:
    table_name = "fact_orders"
    pk_column = "rental_key"
    db_operation = "insert"
    set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, table_name, pk_column, db_operation)
except:
    print("fact orders table was not inserted properly into sakila_dw")

### SQL Queries to Verify Functionality

In [46]:
conn = sqlEngine.connect()

sql_query = """SELECT * FROM sakila_dw.dim_customer;"""
df_customer_test = pd.read_sql(sql_query, conn)

conn.close()

df_customer_test.head(5)

Unnamed: 0,customer_key,store_id,first_name,last_name
0,1,1,MARY,SMITH
1,2,1,PATRICIA,JOHNSON
2,3,1,LINDA,WILLIAMS
3,4,2,BARBARA,JONES
4,5,1,ELIZABETH,BROWN


In [47]:
conn = sqlEngine.connect()

sql_query = """SELECT * FROM sakila_dw.dim_rental;"""
df_rental_test = pd.read_sql(sql_query, conn)

conn = sqlEngine.connect()

df_rental_test.head(5)

Unnamed: 0,rental_key,rental_date,customer_id,return_date
0,1,2005-05-24 22:53:30,130,2005-05-26 22:04:30
1,10,2005-05-25 00:02:21,399,2005-05-31 22:44:21
2,100,2005-05-25 16:50:28,208,2005-06-02 22:11:28
3,1000,2005-05-31 00:25:56,332,2005-06-08 19:42:56
4,1001,2005-05-31 00:46:31,64,2005-06-06 06:14:31


In [48]:
conn = sqlEngine.connect()

sql_query = """SELECT * FROM sakila_dw.fact_orders;"""
df_fact_orders_test = pd.read_sql(sql_query, conn)

conn.close()

df_fact_orders_test.head(5)

Unnamed: 0,rental_key,rental_date,customer_id,return_date,payment_id,amount,payment_date,store_id,first_name,last_name
0,1,2005-05-24 22:53:30,130,2005-05-26 22:04:30,3504,2.99,2005-05-24 22:53:30,1,CHARLOTTE,HUNTER
1,2,2005-05-24 22:54:33,459,2005-05-28 19:40:33,12377,2.99,2005-05-24 22:54:33,1,TOMMY,COLLAZO
2,3,2005-05-24 23:03:39,408,2005-06-01 22:12:39,11032,3.99,2005-05-24 23:03:39,1,MANUEL,MURRELL
3,4,2005-05-24 23:04:41,333,2005-06-03 01:43:41,8987,4.99,2005-05-24 23:04:41,2,ANDREW,PURDY
4,5,2005-05-24 23:05:21,222,2005-06-02 04:33:21,6003,6.99,2005-05-24 23:05:21,2,DELORES,HANSEN


In [55]:
conn = sqlEngine.connect()

sql_query = """SELECT customer_id, SUM(amount) as total_amount FROM sakila_dw.fact_orders GROUP BY customer_id;"""
df_customer_total_payment_test = pd.read_sql(sql_query, conn)

conn.close()

df_customer_total_payment_test.head(5)

Unnamed: 0,customer_id,total_amount
0,130,93.76
1,459,186.62
2,408,116.7
3,333,109.73
4,222,91.79
