## Sakila DB Analysis Through MongoDB
Please replace all the user and passwords for connecting to MongoDB to execute and replicate the results.

### Prerequisites:
#### Import the Necessary Libraries

In [9]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

Variable values are specific to running on my machine. Please make sure to replace these values!

In [23]:
mysql_uid = "aww5kx"
mysql_pwd = "asdASD12!@"
mysql_hostname = "localhost"

atlas_cluster_name = "sandbox.ttnfyow"
atlas_user_name = "herinseo03"
atlas_password = "q9BZnYnqWdgHq4T5"

conn_str = {
    "local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "sakila-db-json"
dst_dbname = "sakila_db"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://herinseo03:q9BZnYnqWdgHq4T5@sandbox.ttnfyow.mongodb.net


#### Define Functions for Getting Data From and Setting Data Into Databases

In [24]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Populate MongoDB with Source Data

In [39]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

# THIS path must be replaced with your path for the sakila-db-json folder downloaded locally
data_dir = "/Users/herinseo/Documents/sakila-db-json"

# loading all tables in
json_files = {"actor" : 'actor.json',
              "address" : 'address.json',
              "category" : 'category.json',
              "city" : 'city.json',
              "country" : 'country.json',
              "customer" : 'customer.json',
              "dim_actor" : 'dim_actor.json',
              "dim_category" : 'dim_category.json',
              "dim_film" : 'dim_film.json',
              "dim_store" : 'dim_store.json',
              "DimDate" : 'DimDate.json',
              "fact_rental_payment" : 'fact_rental_payment.json',
              "film_actor" : 'film_actor.json',
              "film_category" : 'film_category.json',
              "film_text" : 'film_text.json',
              "film" : 'film.json',
              "inventory" : 'inventory.json',
              "language" : 'language.json',
              "payment" : 'payment.json',
              "rental" : 'rental.json',
              "staff" : 'staff.json',
              "store" : 'store.json',
             }

for collection_name, json_filename in json_files.items():
    json_file_path = os.path.join(data_dir, json_filename)
    
    try:
        with open(json_file_path, 'r') as openfile:
            json_object = json.load(openfile)
        db[collection_name].drop()
        db[collection_name].insert_many(json_object)
        print(f"Loaded data into collection: {collection_name}") 
    except json.JSONDecodeError as err:
        print(f"Error loading JSON from {json_filename}: {err}")
    except Exception as e:
        print(f"An error occurred with {json_filename}: {e}")
client.close()        

Loaded data into collection: actor
Error loading JSON from address.json: Expecting value: line 10 column 16 (char 179)
Loaded data into collection: category
Loaded data into collection: city
Loaded data into collection: country
Loaded data into collection: customer
Loaded data into collection: dim_actor
Loaded data into collection: dim_category
Loaded data into collection: dim_film
Loaded data into collection: dim_store
Loaded data into collection: DimDate
Loaded data into collection: fact_rental_payment
Loaded data into collection: film_actor
Loaded data into collection: film_category
Loaded data into collection: film_text
Loaded data into collection: film
Loaded data into collection: inventory
Loaded data into collection: language
Loaded data into collection: payment
Loaded data into collection: rental
Error loading JSON from staff.json: Expecting value: line 7 column 15 (char 109)
Loaded data into collection: store


#### Loading data from Sakila DB from rental and payment collections to create new fact tables.

In [44]:
# fetching data from rental table into an individual dataframe
query = {} 
collection = "rental"

df_rental = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)  # Specify 'atlas', or 'local'
df_rental.head(2)

# fetching data from payment table into an individual dataframe
query = {} 
collection = "payment"

df_payment = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)  # Specify 'atlas', or 'local'
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


In [50]:
# CREATING A FACT TABLE
# summarizes customer activity regarding the number of rentals and total amount spent

customer_activity = df_payment.groupby('customer_id').agg(
    total_rentals=pd.NamedAgg(column='amount', aggfunc='size'),  # Count of payments equates to rental occurrences
    total_amount_spent=pd.NamedAgg(column='amount', aggfunc='sum')
).reset_index()

customer_activity.rename(columns={'customer_id': 'customer_key'}, inplace=True)
customer_activity.insert(0, "fact_customer_activity_key", range(1, 1 + len(customer_activity)))

customer_activity.head(2)

Unnamed: 0,fact_customer_activity_key,customer_key,total_rentals,total_amount_spent
0,1,1,32,118.68
1,2,2,27,128.73


In [52]:
# CREATING ANOTHER FACT TABLE
# summarizes which movie rentals identify to be the most popular movies

movie_popularity = df_rental.groupby('inventory_id').agg(
    total_rentals=pd.NamedAgg(column='rental_id', aggfunc='count')
).reset_index()

movie_popularity.rename(columns={'inventory_id': 'movie_key'}, inplace=True)
movie_popularity.insert(0, "fact_movie_popularity_key", range(1, 1 + len(movie_popularity)))

movie_popularity.head(2)

Unnamed: 0,fact_movie_popularity_key,movie_key,total_rentals
0,1,2,1
1,2,6,1


#### Loading fact_customer_activity and fact_movie_popularity tables onto the database connection

In [58]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

def insert_df_as_collection(df, collection_name, client):
    db = client['sakila-db-json']
    records = df.to_dict('records')
    db[collection_name].drop()  
    db[collection_name].insert_many(records)

insert_df_as_collection(customer_activity, 'fact_customer_activity', client)
insert_df_as_collection(movie_popularity, 'fact_movie_popularity', client)

client.close()
