# Midterm Project with Sakila Database

## Importing Data from MySQL:

#### Import the Necessary Libraries

In [1]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine 
import json
import datetime
import pymongo

In [2]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

#### Declare & Assign Connection Variables for the MySQL Server & Databases with which You'll be Working

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context.

In [4]:
 conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x1c7f59c8bd0>

### 1.0. Create & Populate the Dimension Tables

#### 1.1. Extract Data from the Source Database Tables

In [5]:
sql_customer = "SELECT * FROM sakila.customer;"
df_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-14 23:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-14 23:57:20


In [6]:
sql_film = "SELECT * FROM sakila.film;"
df_film = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42


In [7]:
sql_staff = "SELECT * FROM sakila.staff;"
df_staff = get_dataframe(user_id, pwd, host_name, src_dbname, sql_staff)
df_staff.head()

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-14 22:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-14 22:57:16


#### 1.2. Perform Any Necessary Transformations

In [8]:
df_customer.rename(columns={"customer_id":"customer_key", "store_id": "store_key", "address_id":"address_key"}, inplace=True)
df_customer.head(2)

Unnamed: 0,customer_key,store_key,first_name,last_name,email,address_key,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-14 23:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-14 23:57:20


In [9]:
drop_cols = ['original_language_id']
df_film.drop(drop_cols, axis=1, inplace=True)
df_film.rename(columns={"film_id":"film_key", "language_id":"language_key"}, inplace=True)

df_film.head(2)

Unnamed: 0,film_key,title,description,release_year,language_key,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42


In [10]:
drop_cols = ['picture', 'password']
df_staff.drop(drop_cols, axis=1, inplace=True)
df_staff.rename(columns={"staff_id":"staff_key", "store_id":"store_key"}, inplace=True)

df_staff.head()

Unnamed: 0,staff_key,first_name,last_name,address_id,email,store_key,active,username,last_update
0,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,2006-02-14 22:57:16
1,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,2006-02-14 22:57:16


#### 1.3. Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [11]:
db_operation = "insert"

tables = [('dim_customer', df_customer, 'customer_key'),
          ('dim_film', df_film, 'film_key'),
          ('dim_staff', df_staff, 'staff_key')]

In [12]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### 2.0. Create & Populate the Fact Table
#### 2.1 Implement the Solution using Pandas DataFrames to Craft the Table

In [13]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)
df_payment.rename(columns={"payment_id":"payment_key", "customer_id":"customer_key", "staff_id":"staff_key", 
                           "rental_id":"rental_key"}, inplace=True)
df_payment.drop(columns=['last_update'], inplace=True)
df_payment.head(2)

Unnamed: 0,payment_key,customer_key,staff_key,rental_key,amount,payment_date
0,1,1,1,76,2.99,2005-05-25
1,2,1,1,573,0.99,2005-05-28


In [14]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
df_rental.rename(columns={"rental_id":"rental_key", "inventory_id":"inventory_key", "customer_id":"customer_key", 
                          "staff_id":"staff_key"}, inplace=True)
df_rental.drop(columns=['last_update'], inplace=True)
df_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_key,return_date,staff_key
0,1,2005-05-24,367,130,2005-05-26,1
1,2,2005-05-24,1525,459,2005-05-28,1


In [15]:
df_fact_orders = pd.merge(df_payment, df_rental, on = ('rental_key', 'customer_key', 'staff_key') , how ='inner' )
df_fact_orders.head(2)

Unnamed: 0,payment_key,customer_key,staff_key,rental_key,amount,payment_date,rental_date,inventory_key,return_date
0,2,1,1,573,0.99,2005-05-28,2005-05-28,4020,2005-06-03
1,4,1,2,1422,0.99,2005-06-15,2005-06-15,1021,2005-06-19


In [16]:
df_fact_orders.shape

(7966, 9)

#### 2.2. Get the Data from the Date Dimension Table.

In [18]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]')
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [19]:
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_orders = pd.merge(df_fact_orders, df_dim_payment_date, on = 'payment_date', how ='left' )
df_fact_orders.drop(['payment_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,payment_key,customer_key,staff_key,rental_key,amount,rental_date,inventory_key,return_date,payment_date_key
0,2,1,1,573,0.99,2005-05-28,4020,2005-06-03,
1,4,1,2,1422,0.99,2005-06-15,1021,2005-06-19,


In [20]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_orders = pd.merge(df_fact_orders, df_dim_rental_date, on = 'rental_date', how ='left' )
df_fact_orders.drop(['rental_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,payment_key,customer_key,staff_key,rental_key,amount,inventory_key,return_date,payment_date_key,rental_date_key
0,2,1,1,573,0.99,4020,2005-06-03,,
1,4,1,2,1422,0.99,1021,2005-06-19,,


In [21]:
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_orders = pd.merge(df_fact_orders, df_dim_return_date, on = 'return_date', how ='left' )
df_fact_orders.drop(['return_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,payment_key,customer_key,staff_key,rental_key,amount,inventory_key,payment_date_key,rental_date_key,return_date_key
0,2,1,1,573,0.99,4020,,,
1,4,1,2,1422,0.99,1021,,,


#### 2.3. Perform any Additional Transformations

In [22]:
#Insert a new column with ever-incrementing numneic value, to seve as the primary key
df_fact_orders.insert(0,"fact_order_key", range(1, df_fact_orders.shape[0]+1))
df_fact_orders.head(2)

Unnamed: 0,fact_order_key,payment_key,customer_key,staff_key,rental_key,amount,inventory_key,payment_date_key,rental_date_key,return_date_key
0,1,2,1,1,573,0.99,4020,,,
1,2,4,1,2,1422,0.99,1021,,,


#### 2.4. Write the DataFrame Back to the Database

In [23]:
table_name = "fact_orders"
primary_key = "fact_order_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, table_name, primary_key, db_operation)

## Importing Data From MongoDB:

#### Declare & Assign Connection Variables for the MongoDB Server

In [51]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"

atlas_cluster_name = "cluster1.c4ndfee"
atlas_user_name = "apl3ncm"
atlas_password = "Mongoapl5"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "sakila"
dst_dbname = "sakila_dw"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://apl3ncm:Mongoapl5@cluster1.c4ndfee.mongodb.net


#### Define Functions for Getting Data From and Setting Data Into Databases

In [52]:
def get_sql_dataframe(user_id, pwd, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Populate MongoDB with Source Data

In [53]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"inventory" : 'sakila_inventory.json'}

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()  

### 1.0. Create and Populate the New Dimension Tables
#### 1.1. Extract Data from the Source MongoDB Collections Into DataFrames

In [54]:
query = {}
collection = "inventory"

df_inventory = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 00:00:00
1,2,1,1,2006-02-15 00:00:00


#### 1.2. Perform Any Necessary Transformations to the DataFrames

In [67]:
df_inventory.rename(columns={"inventory_id":"inventory_key", "film_id":"film_key", "store_id":"store_key"}, inplace=True)
df_inventory.drop(columns=['last_update'], inplace=True)
df_inventory.head(2)

Unnamed: 0,inventory_key,film_key,store_key
0,1,1,1
1,2,1,1


#### 1.3. Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [68]:
dataframe = df_inventory
table_name = 'dim_inventory'
primary_key = 'inventory_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### 1.4. Validate that the New Dimension Tables were Created.

In [69]:
sql_inventory = "SELECT * FROM sakila_dw.dim_inventory;"
df_dim_inventory = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_inventory)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,film_key,store_key
0,1,1,1
1,2,1,1


## Importing Data From Local Hardware:

#### Import the Data from Local Device

In [70]:
file_path = r'C:\Users\ds2002-student\Downloads\DS-2002-main\DS-2002-main\data\sakila_actor.csv'

df_actor = pd.read_csv(file_path)

df_actor.head()

Unnamed: 0,actor_id,first_name,last_name,last_update
0,1,PENELOPE,GUINESS,2006-02-14 00:00:00
1,2,NICK,WAHLBERG,2006-02-14 00:00:00
2,3,ED,CHASE,2006-02-14 00:00:00
3,4,JENNIFER,DAVIS,2006-02-14 00:00:00
4,5,JOHNNY,LOLLOBRIGIDA,2006-02-14 00:00:00


In [71]:
df_actor.rename(columns={"actor_id":"actor_key"}, inplace=True)
df_actor.drop(columns=['last_update'], inplace=True)
df_actor.head()

Unnamed: 0,actor_key,first_name,last_name
0,1,PENELOPE,GUINESS
1,2,NICK,WAHLBERG
2,3,ED,CHASE
3,4,JENNIFER,DAVIS
4,5,JOHNNY,LOLLOBRIGIDA


In [72]:
dataframe = df_actor
table_name = 'dim_actor'
primary_key = 'actor_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

## Utilizing The Sakila Data Warehouse

In [73]:
sql_test = """
    SELECT dc.customer_key AS 'Customer Key'
        , dc.first_name AS 'Customer First Name'
        , dc.last_name AS 'Customer Last Name'
        , FORMAT(SUM(amount), 'c') AS 'Total Amount Rented'
    FROM sakila_dw.fact_orders fo
    JOIN sakila_dw.dim_customer dc
    ON fo.customer_key = dc.customer_key
    JOIN sakila_dw.dim_staff ds
    ON fo.staff_key = ds.staff_key
    WHERE ds.staff_key = 2
    GROUP BY dc.customer_key
        , dc.first_name
        , dc.last_name;
"""

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head()

Unnamed: 0,Customer Key,Customer First Name,Customer Last Name,Total Amount Rented
0,1,MARY,SMITH,26
1,2,PATRICIA,JOHNSON,19
2,3,LINDA,WILLIAMS,45
3,4,BARBARA,JONES,25
4,5,ELIZABETH,BROWN,35
