In [62]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine
import json

In [80]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "midterm_dw"

In [64]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [65]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x26847ff8750>

In [66]:
#Read in CSV
df_films = pd.read_csv("film_midterm_data.csv", sep = ";")
df.head()

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42
2,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a ...,2006,1,0,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15 00:03:42
3,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumb...,2006,1,0,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15 00:03:42
4,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And ...,2006,1,0,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15 00:03:42


In [67]:
#Transform columns
df_films.drop(['last_update', 'description', 'original_language_id', 'special_features'], axis=1, inplace=True)
df_films.rename(columns={"film_id":"film_key" }, inplace=True)
df.columns

Index(['film_id', 'title', 'description', 'release_year', 'language_id',
       'original_language_id', 'rental_duration', 'rental_rate', 'length',
       'replacement_cost', 'rating', 'special_features', 'last_update'],
      dtype='object')

In [69]:
#Call Data from SQL
sql_mrp = "SELECT * from midterm_dw.movie_rental_process"
df_mrp = get_dataframe(user_id, pwd, host_name, src_dbname, sql_mrp)
df_mrp.head(2)

Unnamed: 0,fact_rental_key,customer_key,inventory_key,film_key,store_key,staff_key,customer_status,first_name,last_name,rental_date,return_date
0,1,130,367,80,1,1,1,CHARLOTTE,HUNTER,2005-05-24,2005-05-26
1,2,459,1525,333,2,1,1,TOMMY,COLLAZO,2005-05-24,2005-05-28


In [97]:
#Merge CSV and SQL data
df_movie_rp = pd.merge(df_mrp, df_films, on="film_key", how= "left")
df_movie_rp.head(2)

Unnamed: 0,fact_rental_key,customer_key,inventory_key,film_key,store_key,staff_key,customer_status,first_name,last_name,rental_date,return_date,title,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating
0,1,130,367,80,1,1,1,CHARLOTTE,HUNTER,2005-05-24,2005-05-26,BLANKET BEVERLY,2006,1,7,2.99,148,21.99,G
1,2,459,1525,333,2,1,1,TOMMY,COLLAZO,2005-05-24,2005-05-28,FREAKY POCUS,2006,1,7,2.99,126,16.99,R


In [98]:
#Load dim date from SQL
sql_dim_date = "Select date_key, full_date FROM midterm_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('object')
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [99]:
#Return Dim Date
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_movie_rp = pd.merge(df_movie_rp, df_dim_return_date, on ='return_date', how = 'left')
df_movie_rp.head(2)


Unnamed: 0,fact_rental_key,customer_key,inventory_key,film_key,store_key,staff_key,customer_status,first_name,last_name,rental_date,return_date,title,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,return_date_key
0,1,130,367,80,1,1,1,CHARLOTTE,HUNTER,2005-05-24,2005-05-26,BLANKET BEVERLY,2006,1,7,2.99,148,21.99,G,20050526.0
1,2,459,1525,333,2,1,1,TOMMY,COLLAZO,2005-05-24,2005-05-28,FREAKY POCUS,2006,1,7,2.99,126,16.99,R,20050528.0


In [100]:
#Rental Dim Date
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_movie_rp = pd.merge(df_movie_rp, df_dim_rental_date, on ='rental_date', how = 'left')
df_movie_rp.head(2)

Unnamed: 0,fact_rental_key,customer_key,inventory_key,film_key,store_key,staff_key,customer_status,first_name,last_name,rental_date,...,title,release_year,language_id,rental_duration,rental_rate,length,replacement_cost,rating,return_date_key,rental_date_key
0,1,130,367,80,1,1,1,CHARLOTTE,HUNTER,2005-05-24,...,BLANKET BEVERLY,2006,1,7,2.99,148,21.99,G,20050526.0,20050524
1,2,459,1525,333,2,1,1,TOMMY,COLLAZO,2005-05-24,...,FREAKY POCUS,2006,1,7,2.99,126,16.99,R,20050528.0,20050524


In [101]:
import datetime
import pymongo

In [102]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"

atlas_cluster_name = "cluster0.mekqnzk"
atlas_user_name = "hduke"
atlas_password = "Blizzard1"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}



print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://hduke:Blizzard1@cluster0.mekqnzk.mongodb.net


In [131]:
def get_sql_dataframe(user_id, pwd, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [132]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd())

json_files = {"payment" : 'payment_data.json'}

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()        

In [133]:
#Load data from MongoDB
query = {}
collection = "payment"

df_payment = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_payment.head(2)

Unnamed: 0,payment_id,rental_id,amount,payment_date
0,1,76,2.99,2005-05-25 11:30:37
1,2,573,0.99,2005-05-28 10:35:23


In [134]:
#Transform columns
df_payment.drop(["payment_id"], axis=1, inplace=True)


In [135]:
df_payment.rename(columns={"rental_id":"fact_rental_key" }, inplace=True)
df_payment.head(2)

Unnamed: 0,fact_rental_key,amount,payment_date
0,76,2.99,2005-05-25 11:30:37
1,573,0.99,2005-05-28 10:35:23


In [136]:
#Merge Mongo and SQL data
df_movie_rp = pd.merge(df_movie_rp, df_payment, on="fact_rental_key", how= "left")
df_movie_rp.head(2)

Unnamed: 0,fact_rental_key,customer_key,inventory_key,film_key,store_key,staff_key,customer_status,first_name,last_name,rental_date,...,rental_rate,length,replacement_cost,rating,return_date_key,rental_date_key,amount_x,payment_date_x,amount_y,payment_date_y
0,1,130,367,80,1,1,1,CHARLOTTE,HUNTER,2005-05-24,...,2.99,148,21.99,G,20050526.0,20050524,2.99,2005-05-24 22:53:30,2.99,2005-05-24 22:53:30
1,2,459,1525,333,2,1,1,TOMMY,COLLAZO,2005-05-24,...,2.99,126,16.99,R,20050528.0,20050524,2.99,2005-05-24 22:54:33,2.99,2005-05-24 22:54:33


In [137]:
df_movie_rp.columns


Index(['fact_rental_key', 'customer_key', 'inventory_key', 'film_key',
       'store_key', 'staff_key', 'customer_status', 'first_name', 'last_name',
       'rental_date', 'return_date', 'title', 'release_year', 'language_id',
       'rental_duration', 'rental_rate', 'length', 'replacement_cost',
       'rating', 'return_date_key', 'rental_date_key', 'amount_x',
       'payment_date_x', 'amount_y', 'payment_date_y'],
      dtype='object')

In [138]:
# Total number of rentals
total_rentals = df_movie_rp['fact_rental_key'].count()

# Total amount of revenue
total_revenue = df_movie_rp['amount'].sum()

#Average rental duration
average_duration = df_movie_rp['rental_duration'].mean()

# Average rental rate per store
average_rental_rate_per_store = df_movie_rp.groupby('store_key')['rental_rate'].mean()

# Total number of rentals per rating
rentals_per_rating = df_movie_rp.groupby('rating')['fact_rental_key'].count()

#Top customers
customer_spending = df_movie_rp.groupby(['first_name', 'last_name'])['amount'].sum().reset_index()
top_customers = customer_spending.sort_values(by='amount', ascending=False)


KeyError: 'amount'

In [127]:
print("Total Rentals:", total_rentals)
print("Total Revenue:", total_revenue)
print("Average Rental Duration:", average_duration)
print("Average Rental Rate per Store:")
print(average_rental_rate_per_store)
print("Rentals per Rating:")
print(rentals_per_rating)
print("Top Customers:")
print(top_customers[['first_name', 'last_name']].head())





Total Rentals: 16044
Total Revenue: 67406.56
Average Rental Duration: 4.935489902767389
Average Rental Rate per Store:
store_key
1    2.967281
2    2.918580
Name: rental_rate, dtype: float64
Rentals per Rating:
rating
G        2773
NC-17    3293
PG       3212
PG-13    3585
R        3181
Name: fact_rental_key, dtype: int64
Top Customers:
    first_name last_name
318       KARL      SEAL
175    ELEANOR      HUNT
105      CLARA      SHAW
474     RHONDA   KENNEDY
389     MARION    SNYDER


In [139]:
table_name = "fact_rental_process"
primary_key ="fact_rental_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_movie_rp, table_name, primary_key, db_operation)
