# Project Title
### Data Engineering Capstone Project

#### Project Summary
This Project would use the MovieLens dataset to create an ETL pipeline for creating a data warehouse. The data warehouse could be used to analyze the MovieLens dataset using OLAP cubes.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import pandas as pd
import psycopg2
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType as R, StructField as F, IntegerType as Int, StringType as Str

#### Scope 
The Project's scope is to use the MovieLens dataset to create an ETL pipeline for creating a data warehouse in AWS Redshift.#### Data Description 
The project would be using the MovieLens 1M dataset which contains 1 million ratings from 6000 users on 4000 movies. Released in 2/2003.

The MovieLens dataset has been downloaded from https://grouplens.org/datasets/movielens/1m/.

The data are contained in the files movies.dat, ratings.dat and users.dat

In [2]:
# Read in the movie data here
columns = ['movieId','title','genres']
movies_df = pd.read_csv('movielens-1m/movies.dat', delimiter='::', header=None, names=columns, engine='python')
movies_df.head(5)

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Animation|Children's|Comedy
1,2,Jumanji (1995),Adventure|Children's|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama
4,5,Father of the Bride Part II (1995),Comedy


In [3]:
#Read in the users data
columns = ['userId', 'gender', 'age', 'occupation', 'zipcode']
users_df = pd.read_csv('movielens-1m/users.dat', delimiter='::', header=None, names=columns, engine='python')
users_df.head(5)

Unnamed: 0,userId,gender,age,occupation,zipcode
0,1,F,1,10,48067
1,2,M,56,16,70072
2,3,M,25,15,55117
3,4,M,45,7,2460
4,5,M,25,20,55455


In [4]:
#Read in the ratings data
columns = ['userId', 'movieId', 'rating', 'timestamp']
ratings_df = pd.read_csv('movielens-1m/ratings.dat', delimiter='::', header=None, names=columns, engine='python')
ratings_df.head(5)

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1193,5,978300760
1,1,661,3,978302109
2,1,914,3,978301968
3,1,3408,4,978300275
4,1,2355,5,978824291


### Data Cleaning and Pre-processing

- Cleaning

  We would perform the the following cleaning steps for all the data files
  - check for missing values
  - check for duplicate values

- Preprocessing
  
  Preprocessing would consist of following 2 steps-
  - Add occupation column to users table with specific occupation names as per Readme of movielens_1m dataset to
    enrich the data for analyzing the movies data w.r.t various occupation.
  - Add age group column to users table with age group as per Readme of movielens_1m dataset to
    enrich the data for analyzing the movies data w.r.t various age groups.
  - The movielens dataset have :: as record delimiter. Since S3 copy command cannot handle more than one
    ascii character as delimiter hence we would write the pandas dataframes created above for movies, users and
    rating data to parquet format in S3. During ETL step we would use S3 copy to load the data from parquet files
    to staging tables.

In [5]:
# check missing values for movies data
movies_df.isnull().sum()

movieId    0
title      0
genres     0
dtype: int64

In [6]:
# check missing values for users data
users_df.isnull().sum()

userId        0
gender        0
age           0
occupation    0
zipcode       0
dtype: int64

In [7]:
# check missing values for ratings data
ratings_df.isnull().sum()

userId       0
movieId      0
rating       0
timestamp    0
dtype: int64

In [8]:
# check duplicate values for movies data
movies_df['title'].duplicated().sum()

0

In [9]:
# check duplicate values for users data
users_df['userId'].duplicated().sum()

0

In [10]:
# check duplicate values for ratings data
ratings_df.duplicated(['userId', 'movieId']).sum()

0

In [5]:
#Enriching the users data with occupation

occupation = [[0, "other or not specified"], [1, "academic/educator"], [2, "artist"], [3, "clerical/admin"],
              [4, "college/grad student"], [5, "customer service"], [6, "doctor/health care"], 
              [7, "executive/managerial"], [8, "farmer"], [9, "homemaker"], [10, "K-12 student"],
              [11, "lawyer"], [12, "programmer"], [13, "retired"], [14, "sales/marketing"], [15, "scientist"],
              [16, "self-employed"], [17, "technician/engineer"], [18, "tradesman/craftsman"], [19, "unemployed"],
              [20, "writer"]]

#create the occupation table
occupation_df = pd.DataFrame(occupation, columns = ['occupation', 'occupation_detail']) 

#merge occupation table with users dataframe on the occupation column
users_df = users_df.merge(occupation_df, how='inner', on='occupation').sort_values('userId')

#delete previous occupation column and rename occupation_detail to occupation in the merged dataframe
del users_df['occupation']
users_df.rename(columns = {'occupation_detail':'occupation'}, inplace = True)

#Enriching the users data with age group
age_group = [[1, "Under 18"],[18, "18-24"],[25, "25-34"], [35, "35-44"], [45, "45-49"], [50, "50-55"], [56, "56+"]]

age_group_df = pd.DataFrame(age_group, columns = ['age', 'age_group'])

users_df = users_df.merge(age_group_df, how='inner', on='age').sort_values('userId')

del users_df['age']
users_df.head(5)

Unnamed: 0,userId,gender,zipcode,occupation,age_group
0,1,F,48067,K-12 student,Under 18
222,2,M,70072,self-employed,56+
602,3,M,55117,scientist,25-34
2698,4,M,2460,executive/managerial,45-49
603,5,M,55455,writer,25-34


In [6]:
#for pre-processing .dat files to load spark dataframes as parquet files to S3 for removing :: delimiter 
os.environ['AWS_ACCESS_KEY_ID']='' #removed for security
os.environ['AWS_SECRET_ACCESS_KEY']='' #removed for security
s3_path = 's3a://movielens-1m/'

#Create spark session
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

movieSchema = R([
    F("movieId", Int()),
    F("title", Str()),
    F("genres", Str()),
])

#convert pandas dataframe to spark dataframe
movies_df = spark.createDataFrame(movies_df, movieSchema)
movies_df.write.parquet(s3_path + "movies.parquet")

userSchema = R([
    F("userId", Int()),
    F("gender", Str()),
    F("zipcode", Str()),
    F("occupation", Str()),
    F("age_group", Str())    
])
 
#convert pandas dataframe to spark dataframe
users_df = spark.createDataFrame(users_df, userSchema)
users_df.write.parquet(s3_path + "users.parquet")

ratingSchema = R([
    F("userId", Int()),
    F("movieId", Int()),
    F("rating", Int()),
    F("timestamp", Int())
])

#convert pandas dataframe to spark dataframe
ratings_df = spark.createDataFrame(ratings_df, ratingSchema)
ratings_df.write.parquet(s3_path + "ratings.parquet")

### Data Modelling
We would model the data as relational tables. The following tables would be creating during staging, each of them corresponding to the movielens data files-

- staging_movies (movieId, title, genres)
- staging_users (userId, gender, age_group, occupation, zipcode)
- staging_ratings (userId, movieId, rating, timestamp)

From the staging tables we would create a star schema consisting of the following fact and dimensional tables-
- Fact table
  - rating (userId, movieId, rating, timestamp)

- Dimensional tables
  - user (userId, gender, age_group_id, occupation_id, zipcodeId)
  - occupation (occupation_id, occupation)
  - age_group (age_group_id, age_group)
  - movie (movieId, title)
  - zipcode (zipcodeId, zipcode)

In [4]:
#create schema for staging tables
staging_movies_create = """CREATE TABLE IF NOT EXISTS staging_movies
                         (movieId int,
                          title varchar,
                          genres varchar)"""
                          
staging_users_create = """CREATE TABLE IF NOT EXISTS staging_users
                          (userId int,
                           gender varchar,
                           zipcode varchar,
                           occupation varchar,
                           age_group varchar)"""

staging_ratings_create = """CREATE TABLE IF NOT EXISTS staging_ratings
                            (userId int,
                             movieId int,
                             rating int,
                             timestamp int)"""

#create schema for fact and dimensional tables

rating_create = """CREATE TABLE IF NOT EXISTS rating 
                   (userId int,
                    movieId int,
                    rating int,
                    timestamp int)"""

user_create = """CREATE TABLE IF NOT EXISTS users 
                 (userId int PRIMARY KEY,
                  gender varchar,
                  age_group_id int,
                  occupation_id int,
                  zipcodeId int)"""

movie_create = """CREATE TABLE IF NOT EXISTS movie
                  (movieId int PRIMARY KEY,
                   title varchar)"""

zipcode_create = """CREATE TABLE IF NOT EXISTS zipcode
                    (zipcodeId int IDENTITY(0,1) PRIMARY KEY,
                     zipcode varchar)"""

age_group_create = """CREATE TABLE IF NOT EXISTS age_group
                      (age_group_id int IDENTITY(0,1) PRIMARY KEY,
                       age_group varchar)"""

occupation_create = """CREATE TABLE IF NOT EXISTS occupation
                       (occupation_id int IDENTITY(0,1) PRIMARY KEY,
                        occupation varchar)"""

create_table_queries = [staging_movies_create, staging_users_create, staging_ratings_create, 
                        zipcode_create, movie_create, user_create, rating_create, age_group_create,
                        occupation_create]

# Drop table queries
staging_movies_drop = "DROP TABLE IF EXISTS staging_movies"
staging_users_drop = "DROP TABLE IF EXISTS staging_users"
staging_ratings_drop = "DROP TABLE IF EXISTS staging_ratings"
zipcode_drop = "DROP TABLE IF EXISTS zipcode"
age_group_drop = "DROP TABLE IF EXISTS age_group"
occupation_drop = "DROP TABLE IF EXISTS occupation"
movie_drop = "DROP TABLE IF EXISTS movie"
user_drop = "DROP TABLE IF EXISTS users"
rating_drop = "DROP TABLE IF EXISTS rating"

drop_table_queries = [staging_movies_drop, staging_users_drop, staging_ratings_drop, zipcode_drop,
                      age_group_drop, occupation_drop, movie_drop, user_drop, rating_drop]

In [5]:
#Insert queries for fact and dimensional tables
movie_table_insert = """INSERT INTO movie (movieId, title)
                        SELECT movieId, title FROM staging_movies"""

zipcode_table_insert = """INSERT into zipcode (zipcode)
                          SELECT DISTINCT zipcode FROM staging_users"""

occupation_table_insert = """INSERT into occupation (occupation)
                             SELECT DISTINCT occupation FROM staging_users"""

age_group_insert = """INSERT into age_group (age_group)
                      SELECT DISTINCT age_group FROM staging_users"""

user_table_insert = """INSERT INTO users (userId, gender, age_group_id, occupation_id, zipcodeId)
                       SELECT userId, gender, 
                       (SELECT age_group_id from age_group WHERE staging_users.age_group=age_group.age_group),
                       (SELECT occupation_id from occupation WHERE staging_users.occupation=occupation.occupation),
                       (SELECT zipcodeId from zipcode WHERE staging_users.zipcode=zipcode.zipcode)
                       FROM staging_users"""

rating_table_insert = """INSERT INTO rating (userId, movieId, rating, timestamp)
                         SELECT userId, movieId, rating, timestamp
                         FROM staging_ratings
                         """

insert_table_queries = [movie_table_insert, rating_table_insert, zipcode_table_insert,
                        occupation_table_insert, age_group_insert, user_table_insert]

### Data Pipeline
The ETL pipeline would consist of the following steps-
- create the staging, fact and dimensional tables in AWS Redshift
- stage the movielens data from AWS S3 bucket s3://movielens-1m to a Redshift cluster
- load data from staging tables to fact and dimensional tables

In [9]:
# create data pipeline
host = 'redshift-cluster.cznb5fvkwfnu.us-east-1.redshift.amazonaws.com'
dbname = 'dev'
user='' #removed for security
password='' #removed for security
port=5439
conn = psycopg2.connect(host=host, dbname=dbname, user=user, password=password, port=port)
cur = conn.cursor()

In [6]:
#build query to copy data from S3 to staging
staging_movies_copy = """copy staging_movies
                         from 's3://movielens-1m/movies.parquet' 
                         CREDENTIALS 'aws_iam_role=arn:aws:iam::313544258263:role/redshift_role'
                         FORMAT AS PARQUET"""

staging_users_copy = """copy staging_users 
                        from 's3://movielens-1m/users.parquet' 
                        CREDENTIALS 'aws_iam_role=arn:aws:iam::313544258263:role/redshift_role'
                        FORMAT AS PARQUET"""

staging_ratings_copy = """copy staging_ratings
                          from 's3://movielens-1m/ratings.parquet' 
                          CREDENTIALS 'aws_iam_role=arn:aws:iam::313544258263:role/redshift_role'
                          FORMAT AS PARQUET"""

copy_table_queries = [staging_movies_copy, staging_users_copy, staging_ratings_copy]

In [7]:
def drop_tables(cur, conn):
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()


def create_tables(cur, conn):
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()
        
def load_staging_tables(cur, conn):
    for query in copy_table_queries:
        cur.execute(query)
        conn.commit()


def insert_tables(cur, conn):
    for query in insert_table_queries:
        cur.execute(query)
        conn.commit()

In [8]:
#create staging, fact and dimension tables 
drop_tables(cur, conn)
create_tables(cur, conn)

#load staging table from S3
load_staging_tables(cur, conn)
insert_tables(cur, conn)

conn.close()

#### 4.2 Data Quality Checks
We would perform the following quality checks to validate the ETL pipeline
- Check if all staging, dimensional and fact tables have been created
- Check if all staging, dimensional and fact tables contain records

In [18]:
# Perform quality checks here
#Check if all staging, dimensional and fact tables have been created

table_check_query = """SELECT DISTINCT tablename FROM PG_TABLE_DEF WHERE schemaname = 'public';"""
cur.execute(table_check_query)
records = cur.fetchall()

tables = []
for rec in records:
    print(rec[0])
    tables.append(rec[0])

#Check if all staging, dimensional and fact tables contain records

for table in tables:
    query = """SELECT COUNT(*) FROM {}""".format(table)
    cur.execute(query)
    result = cur.fetchone()
    print("Table {} contains {} records".format(table, result[0]))
    if result[0] < 1:
        print("Error in running ETL pipeline. Table {} is empty".format(table))

age_group
movie
occupation
rating
staging_movies
staging_ratings
staging_users
users
zipcode
Table age_group contains 7 records
Table movie contains 3883 records
Table occupation contains 21 records
Table rating contains 1000209 records
Table staging_movies contains 3883 records
Table staging_ratings contains 1000209 records
Table staging_users contains 6040 records
Table users contains 6040 records
Table zipcode contains 3439 records


#### 4.3 Data dictionary 
- Fact table
  - rating 
    - userId - directly mapped from the staging_rating table in movielens dataset identifies each user.
    - movieId - directly mapped from the staging_rating table in movielens dataset identifies each movie
    - rating - rating given by each user for a movie
    - timestamp - timestamp of the rating

- Dimensional tables
  - user 
    - userId - directly mapped from the staging_users table in movielens dataset identifies each user.
               Primary key.
    - gender - user's gender (either M or F)
    - age_group_id - foreign key into the age_group table 
    - occupation_id - foriegn key into the occupation table 
    - zipcodeId - foreign key into the zipcode table
    
  - occupation
    - occupation_id - primary key identifier for each occupation record. Auto generated. Primary key.
    - occupation - occupation mapped from staging_users table
    
  - age_group
    - age_group_id - primary key identifier for each age_group record. Auto generated. Primary key.
    - age_group - age group mapped from staging_users table
    
  - movie
    - movieId - movie id mapped from staging_movies table. Primary key.
    - title - movie title mapped from staging_movies table
    
  - zipcode
    - zipcodeId - primary key identifier for each zipcode record. Auto generated. Primary key.
    - zipcode - zipcode mapped from staging_users table

#### End Remarks
The project builds a data warehouse from the movielens dataset for the data analysts and machine learning engineers who could use the same to perform data analysis using OLAP cubes or build a movie recommendation system.

#### Rationale for Tools and Technologies used
The project uses AWS S3 and Redshift to leverage managed storge and data warehouse infrastructure respectively.

#### Rationale for data model choice
Since we would be creating a data warehouse solution for this project we have chosen a relational data model using star schema to model the movielens data. The star schema would have benefits in terms of fast aggregrations and simplified queries for performing data analytics on the movielens dataset stored in our data warehouse.  

#### Data update frequency 
Since the project uses movielens dataset hence currently data updation in the staging area is not a major consideration as the data is not updated.

#### Scaling considerations

- If data increases by 100x - In such a scenario we could use the AWS EMR service to use a spark cluster to run our
  ETL pipeline  

- The data populates a dashboard that must be updated on a daily basis by 7am every day - In such a scenario we
  could create spark jobs which would be submited to EMR cluster on a daily nightly basis so that the data
  required by dashboard is updated before 7 am everyday.
  
- The database needed to be accessed by 100+ people - In such a scenario we can consider migrating our data
  warehouse into a data lake. 