# DS-2002 – Data Project 1
##### Felipe Martinez

### 1. Design a dimensional data mart that represents a simple business process of your choosing.

My dimensional data mart will include two dimension tables (along with a date dimension) that describe rentable films and customer profiles, respectively. I will also have a fact able that includes information from the rental history of the various film and the payment that was collected for these transactions.

### 2. Develop an ETL pipeline that extracts, transforms, and loads data into your data mart.

In [1]:
import os # Importing libraries
import numpy
import pandas as pd
from sqlalchemy import create_engine
import json
import datetime
import pymongo

In [2]:
host_name = "localhost" # Declare & Assign Connection Variables for the MySQL Server & Databases
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_dw2"

In [3]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"

atlas_cluster_name = "sandbox.zibbf"
atlas_user_name = "m001-student"
atlas_password = "m001-mongodb-basics"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://m001-student:m001-mongodb-basics@sandbox.zibbf.mongodb.net


In [4]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query): # Create the New Data Warehouse database
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [5]:
def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe

In [6]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x1c7a1144910>

In [None]:
client = pymongo.MongoClient("local")
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"payment" : 'sakila_payment.json'}

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()        

#### 2.2 Extract Data from Source Data Tables

In [7]:
sql_film = "SELECT * FROM sakila.film;"
df_film = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [None]:
query = {}
collection = "customer"

df_customer = get_mongo_dataframe(client, src_dbname, collection, query)
df_customer.head(2)

#### 2.3 Preform Necessary Data Transformations

In [8]:
## TEMP
sql_customer = "SELECT * FROM sakila.customer;"
df_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [9]:
drop_cols = ['description','original_language_id','rating','special_features','length','language_id']
df_film.drop(drop_cols, axis=1, inplace=True)
df_film.rename(columns={"film_id":"film_key"}, inplace=True)

df_film.head(2)

Unnamed: 0,film_key,title,release_year,rental_duration,rental_rate,replacement_cost,last_update
0,1,ACADEMY DINOSAUR,2006,6,0.99,20.99,2006-02-15 05:03:42
1,2,ACE GOLDFINGER,2006,3,4.99,12.99,2006-02-15 05:03:42


In [10]:
drop_cols = ['first_name','last_name','address_id','create_date']
df_customer.drop(drop_cols, axis=1, inplace=True)
df_customer.rename(columns={"customer_id":"customer_key","store_id":"store_key"}, inplace=True)

df_customer.head(2)

Unnamed: 0,customer_key,store_key,email,active,last_update
0,1,1,MARY.SMITH@sakilacustomer.org,1,2006-02-15 04:57:20
1,2,1,PATRICIA.JOHNSON@sakilacustomer.org,1,2006-02-15 04:57:20


In [11]:
sql_dim_date = "SELECT last_update FROM sakila.payment;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.last_update.astype('datetime64')
df_dim_date.head(2)

  df_dim_date.full_date = df_dim_date.last_update.astype('datetime64')


Unnamed: 0,last_update
0,2006-02-15 22:12:30
1,2006-02-15 22:12:30


In [12]:
df_dim_date.insert(0, "date_key", range(1, df_dim_date.shape[0]+1))

#### 2.4 build dimension tables

In [13]:
db_operation = "insert"

tables = [('dim_film', df_film, 'film_key'),
          ('dim_customer', df_customer, 'customer_key'),
          ('dim_date', df_dim_date, 'date_key')]

In [14]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Populate the fact table

In [15]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [16]:
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'sakila_payment.csv')

df_payment = pd.read_csv(data_file, header=0, index_col=0)
df_payment.head(2)

Unnamed: 0_level_0,customer_id,staff_id,rental_id,amount,payment_date,last_update
payment_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


#### Preform Necessary Transformations

In [17]:
drop_cols = ['staff_id']
df_rental.drop(drop_cols, axis=1, inplace=True)
df_rental.rename(columns={"rental_id":"rental_key","customer_id":"customer_key","inventory_id":"inventory_key"}, inplace=True)

df_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,2006-02-15 21:30:53


In [18]:
#drop_cols = ['staff_id']
#df_payment.drop(drop_cols, axis=1, inplace=True)
df_payment.rename(columns={"payment_id":"payment_key","customer_id":"customer_key","staff_id":"staff_key","rental_id":"rental_key"}, inplace=True)

df_payment.head(2)

Unnamed: 0_level_0,customer_key,staff_key,rental_key,amount,payment_date,last_update
payment_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


#### Create and load the fact table into the database

In [19]:
df_fact_orders = pd.merge(df_rental, df_payment, on='rental_key', how='left')
df_fact_orders.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key_x,return_date,last_update_x,customer_key_y,staff_key,amount,payment_date,last_update_y
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,2006-02-15 21:30:53,,,,,
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,2006-02-15 21:30:53,,,,,


In [20]:
df_fact_orders.insert(0, "fact_order_key", range(1, df_fact_orders.shape[0]+1))

In [21]:
table_name = "fact_orders"
primary_key = "fact_order_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, table_name, primary_key, db_operation)

### 3. Run queries to test your data mart.

In [28]:
sql_test = """
    SELECT customer.customer_key AS customer_key,
        customer.active AS active,
        AVG('dim_film.rental_duration') AS `average_duration`,
        orders.payment_date
    FROM `{0}`.`fact_orders` AS orders
    INNER JOIN `{0}`.`dim_customer` AS customer
    ON orders.customer_key_x = customer.customer_key
    GROUP BY customer.customer_key
    ORDER BY average_duration DESC;
""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head()

OperationalError: (pymysql.err.OperationalError) (1055, "Expression #4 of SELECT list is not in GROUP BY clause and contains nonaggregated column 'sakila_dw2.orders.payment_date' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by")
[SQL: 
    SELECT customer.customer_key AS customer_key,
        customer.active AS active,
        SUM('dim_film.rental_duration') AS `total_duration`,
        orders.payment_date
    FROM `sakila_dw2`.`fact_orders` AS orders
    INNER JOIN `sakila_dw2`.`dim_customer` AS customer
    ON orders.customer_key_x = customer.customer_key
    GROUP BY customer.customer_key
    ORDER BY total_duration DESC;
]
(Background on this error at: https://sqlalche.me/e/14/e3q8)