# ETL Data Processor

#### This database stores information about movies, the cast and crew involved, where the movie was produced and by which company, and other information about movies such as the languages, genres, and keywords.

#### Import the necessary libraries

In [1]:
import os
import numpy
import json
import csv
import pandas as pd
import pymongo
from sqlalchemy import create_engine

#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which you'll be working

In [2]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "Ravenclaw_2728"

ports = {"mongo" : 27017, "mysql" : 3306}

atlas_cluster_name = "ds-midterm"
atlas_default_dbname = "admin"
atlas_user_name = "abg5eqp"
atlas_password = "Ravenclaw_2728"

conn_str = {"local" : f"mongodb://{host_name}:{ports['mongo']}/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.rho0qln.mongodb.net/test?authSource={atlas_default_dbname}"
}


print(conn_str["local"])
print(conn_str["atlas"])

src_dbname = "movies"
dst_dbname = "movies_dw"

mongodb://localhost:27017/
mongodb+srv://abg5eqp:Ravenclaw_2728@ds-midterm.rho0qln.mongodb.net/test?authSource=admin


#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def get_mongo_dataframe_local(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Connect to SQL Alchemy

In [4]:
conn_str1 = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str1, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x17f4ab79d30>

In [47]:
try:
    sql_prod = "SELECT * FROM movies.production_company;"
    df_prod = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_prod)
    print(df_prod.head(10))
except:
    print("movies production company table does not exist in the movies schema")
    

   company_id             company_name
0           1                Lucasfilm
1           2     Walt Disney Pictures
2           3  Pixar Animation Studios
3           4       Paramount Pictures
4           5        Columbia Pictures
5           6       RKO Radio Pictures
6           7               DreamWorks
7           8       Fine Line Features
8           9                  Gaumont
9          11            WingNut Films


#### Read data from MySQL tables and connect to dataframes

In [48]:
try:
    sql_mov = "SELECT * FROM movies.movie;"
    df_mov = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_mov)
    print(df_mov.head)
except:
    print("movie does not exist in the movies schema")

<bound method NDFrame.head of       movie_id                        title    budget  \
0            5                   Four Rooms   4000000   
1           11                    Star Wars  11000000   
2           12                 Finding Nemo  94000000   
3           13                 Forrest Gump  55000000   
4           14              American Beauty  15000000   
...        ...                          ...       ...   
4798    426067             Midnight Cabaret         0   
4799    426469             Growing Up Smith         0   
4800    433715                       8 Days         0   
4801    447027              Running Forever         0   
4802    459488  To Be Frank, Sinatra at 100         2   

                                               homepage  \
0                                                         
1     http://www.starwars.com/films/star-wars-episod...   
2                 http://movies.disney.com/finding-nemo   
3                                                

#### Read data from JSON file and convert to dataframe

In [49]:
try:
    df_comp = pd.read_json('movie_company.json')
    df_comp.insert(0, "movie_company_key", range(1, df_comp.shape[0] + 1))
    print(df_comp.head(10))
except:
    print("the movie_company.json file does not exist")

   movie_company_key  movie_id  company_id
0                  1         5          14
1                  2         5          59
2                  3        11           1
3                  4        11         306
4                  5        12           3
5                  6        13           4
6                  7        14          27
7                  8        14        2721
8                  9        16       11239
9                 10        16         119


#### Write data from CSV file to MongoDB Compass Table and Create Dataframe

In [51]:
try:
    client = pymongo.MongoClient(conn_str["atlas"])
    db = client[src_dbname]
    data_dir = os.path.join(os.getcwd())
    csv_files = {"movie_cast" : 'movie_cast.csv'}


    for file in csv_files:
        c_file = os.path.join(data_dir, csv_files[file])
        with open(c_file,  encoding= "utf8") as openfile:
            reader = csv.DictReader(openfile)
            list_reader = list(reader)
            file = db[file]
            result = file.insert_many(list_reader)
    client.close()
except:
    print("the movie_cast.csv file does not exist or data were not inserted into MongoDBCompass")

In [53]:
try:
    query = {}
    collection = "movie_cast"

    df_movie_cast = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
    print(df_movie_cast.head(2))
except:
    print("the movie_cast.csv file was not inserted into MongoDBCompass")

  movie_id person_id        character_name gender_id cast_order
0      285        85  Captain Jack Sparrow         2          0
1      285       114           Will Turner         2          1


#### Drop Unnecessary Columns from Dimension Tables and Rename Primary Keys

In [10]:
# Dropping Columns 
drop_cols = ['cast_order']
df_movie_cast.drop(drop_cols, axis=1, inplace=True)
df_movie_cast.insert(0, "movie_cast_key", range(1, df_movie_cast.shape[0] + 1))

df_movie_cast.head(2)

Unnamed: 0,movie_cast_key,movie_id,person_id,character_name,gender_id
0,1,285,85,Captain Jack Sparrow,2
1,2,285,114,Will Turner,2


In [11]:
# Dropping Columns 
drop_cols = ['homepage', 'overview', 'tagline']
df_mov.drop(drop_cols, axis=1, inplace=True)
df_mov.rename(columns={"movie_id":"movie_key"}, inplace=True)

df_mov.head(2)

Unnamed: 0,movie_key,title,budget,popularity,release_date,revenue,runtime,movie_status,vote_average,vote_count
0,5,Four Rooms,4000000,22.87623,1995-12-09,4300000,98,Released,6.5,530
1,11,Star Wars,11000000,126.393695,1977-05-25,775398007,121,Released,8.1,6624


In [12]:
# Add keys
df_prod.rename(columns={"company_id":"production_company_key"}, inplace=True)

#### Insert Dimension Tables into movies_dw

In [13]:
db_operation = "insert"

tables = [('dim_movie', df_mov, 'movie_key'),
          ('dim_production_company', df_prod, 'production_company_key'),
          ('dim_movie_company', df_comp, 'movie_company_key'),
          ('dim_movie_cast', df_movie_cast, 'movie_cast_key')]

In [54]:
try:
    for table_name, dataframe, primary_key in tables:
        set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)
except:
    print("Dimension tables were not inserted into movies_dw")

#### Create Fact Orders Table by Joining Dimension Tables

In [19]:
sql_fact_orders = """SELECT m.movie_id as movie_key,
mc.company_id,
m.title,
m.budget,
m.popularity,
m.release_date,
m.revenue,
m.runtime,
m.movie_status,
m.vote_average,
m.vote_count
FROM movie m JOIN movie_company mc ON m.movie_id = mc.movie_id JOIN production_company p ON mc.company_id = p.company_id GROUP BY m.movie_id HAVING count(*) = 1
"""
df_fact_orders = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_fact_orders)
df_fact_orders.head(10)

Unnamed: 0,movie_key,company_id,title,budget,popularity,release_date,revenue,runtime,movie_status,vote_average,vote_count
0,1893,1,Star Wars: Episode I - The Phantom Menace,115000000,54.035265,1999-05-19,924317558,136,Released,6.3,4432
1,1894,1,Star Wars: Episode II - Attack of the Clones,120000000,43.987061,2002-05-15,649398328,142,Released,6.4,3992
2,1895,1,Star Wars: Episode III - Revenge of the Sith,113000000,44.108427,2005-05-17,850000000,140,Released,7.1,4116
3,756,2,Fantasia,2280000,32.875469,1940-11-13,83320000,124,Released,7.2,808
4,812,2,Aladdin,28000000,92.982009,1992-11-25,504050219,90,Released,7.4,3416
5,4244,2,The Kid,65000000,18.281798,2000-07-07,69700000,104,Released,6.0,238
6,9849,2,My Favorite Martian,65000000,6.80692,1999-02-11,36850101,94,Released,5.1,80
7,9918,2,Glory Road,0,8.305085,2006-01-13,42647449,118,Released,7.2,144
8,10439,2,Hocus Pocus,28000000,18.650748,1993-07-16,39514713,96,Released,6.4,471
9,10545,2,The Hunchback of Notre Dame,100000000,46.727941,1996-06-21,100138851,91,Released,6.8,1129


#### Insert Fact Orders Table into movies_dw

In [55]:
try:
    table_name = "fact_orders"
    primary_key = "movie_key"
    db_operation = "insert"

    set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, table_name, primary_key, db_operation)
except:
    print("Fact table was not inserted in movies_dw")

In [31]:
sqlEngine.execute(f"SELECT * FROM movies_dw.dim_movie;").fetchall()
conn = sqlEngine.connect()

sql_query = "SELECT * FROM movies_dw.dim_movie;"

df = pd.read_sql(sql_query, conn);

conn.close()
df.head(10)

Unnamed: 0,movie_key,title,budget,popularity,release_date,revenue,runtime,movie_status,vote_average,vote_count
0,5,Four Rooms,4000000,22.87623,1995-12-09,4300000,98,Released,6.5,530
1,11,Star Wars,11000000,126.393695,1977-05-25,775398007,121,Released,8.1,6624
2,12,Finding Nemo,94000000,85.688789,2003-05-30,940335536,100,Released,7.6,6122
3,13,Forrest Gump,55000000,138.133331,1994-07-06,677945399,142,Released,8.2,7927
4,14,American Beauty,15000000,80.878605,1999-09-15,356296601,122,Released,7.9,3313
5,16,Dancer in the Dark,12800000,22.022228,2000-05-17,40031879,140,Released,7.6,377
6,18,The Fifth Element,90000000,109.528572,1997-05-07,263920180,126,Released,7.3,3885
7,19,Metropolis,92620000,32.351527,1927-01-10,650422,153,Released,8.0,657
8,20,My Life Without Me,0,7.958831,2003-03-07,9726954,106,Released,7.2,77
9,22,Pirates of the Caribbean: The Curse of the Bla...,140000000,271.972889,2003-07-09,655011224,143,Released,7.5,6985


#### Select Queries to Verify Functionality

In [32]:
sqlEngine.execute(f"SELECT * FROM movies_dw.dim_production_company;").fetchall()
conn = sqlEngine.connect()

sql_query = "SELECT * FROM movies_dw.dim_production_company;"

df = pd.read_sql(sql_query, conn);

conn.close()
df.head(10)

Unnamed: 0,production_company_key,company_name
0,1,Lucasfilm
1,2,Walt Disney Pictures
2,3,Pixar Animation Studios
3,4,Paramount Pictures
4,5,Columbia Pictures
5,6,RKO Radio Pictures
6,7,DreamWorks
7,8,Fine Line Features
8,9,Gaumont
9,11,WingNut Films


In [34]:
sqlEngine.execute(f"SELECT * FROM movies_dw.fact_orders;").fetchall()
conn = sqlEngine.connect()
sql_query = "SELECT * FROM movies_dw.fact_orders;"

df = pd.read_sql(sql_query, conn);

conn.close()
df.head(10)

Unnamed: 0,movie_key,company_id,title,budget,popularity,release_date,revenue,runtime,movie_status,vote_average,vote_count
0,12,3,Finding Nemo,94000000,85.688789,2003-05-30,940335536,100,Released,7.6,6122
1,13,4,Forrest Gump,55000000,138.133331,1994-07-06,677945399,142,Released,8.2,7927
2,68,10214,Brazil,15000000,41.089863,1985-02-20,0,132,Released,7.5,861
3,75,8601,Mars Attacks!,70000000,44.090535,1996-12-12,101371017,106,Released,6.1,1509
4,76,98,Before Sunrise,2500000,23.672571,1995-01-27,5535405,105,Released,7.7,959
5,83,22376,Open Water,130000,15.611857,2004-08-06,54667954,79,Released,5.4,315
6,111,33,Scarface,25000000,70.105981,1983-12-08,65884703,170,Released,8.0,2948
7,116,7,Match Point,15000000,30.669913,2005-10-26,85306374,124,Released,7.3,1105
8,117,4,The Untouchables,25000000,38.272889,1987-06-02,76270454,119,Released,7.6,1384
9,129,10342,千と千尋の神隠し,15000000,118.968562,2001-07-20,274925095,125,Released,8.3,3840


In [42]:
conn = sqlEngine.connect()
sql_query = "SELECT company_id, AVG(revenue) as average_revenue FROM movies_dw.fact_orders GROUP BY company_id;"
df_comp_rev = pd.read_sql(sql_query, conn);
conn.close()
df_comp_rev.head(10)

Unnamed: 0,company_id,average_revenue
0,3,636588900.0
1,4,89795960.0
2,10214,0.0
3,8601,68329400.0
4,98,5535405.0
5,22376,54667950.0
6,33,52119240.0
7,7,213431300.0
8,10342,274925100.0
9,5,79190210.0
