# Movies DWH Pipeline
### Data Engineering Capstone Project

#### Project Summary
The goal of this project to have ETL pipeline for movies data. Which extract movies dataset from S3 bucket and ingest it to DWH.  

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [14]:

import pandas as pd

### Step 1: Scope the Project and Gather Data

#### Scope 

The goal of the project is to have DWH which includes movies data to be able to answer some analytical questions at the end like:
 * How many movies were released in 2020 in non English languages?
 * The number of females were involved in movies between 1990-2000?
 * How much money did the German production companies earn between 2010-2015 from action movies?


The data used in this project is coming from https://www.kaggle.com/datasets/rounakbanik/the-movies-dataset

 * credits.csv : contains the cast and crew information.
 * keywords.csv : conatins the keywords which each movie has.
 * movies_metadata.csv : conatins information about the movies.
 * ratings.csv : conatins users rating on rach movie.


In the project I used:
 * AWS S3: The CSV file will be stored in S3 bucket and the bucket will be used as aout data source.
 * AWS EMR: is used as our server less compute power. It allows us to use the power of Spark with the ability to 
 increase clusters on demand.
 * Apache Airflow: is used or ETL jobs orchestration.
 * AWS Redshift: Serverless DWH with column oriented databases.



In [None]:
movies_metadata_df = pd.read_csv("data/movies_metadata/movies_metadata.csv")  

In [16]:
len(movies_metadata_df.index)

45466

In [17]:
movies_metadata_df.head()

Unnamed: 0,adult,belongs_to_collection,budget,genres,homepage,id,imdb_id,original_language,original_title,overview,...,release_date,revenue,runtime,spoken_languages,status,tagline,title,video,vote_average,vote_count
0,False,"{'id': 10194, 'name': 'Toy Story Collection', ...",30000000,"[{'id': 16, 'name': 'Animation'}, {'id': 35, '...",http://toystory.disney.com/toy-story,862,tt0114709,en,Toy Story,"Led by Woody, Andy's toys live happily in his ...",...,1995-10-30,373554033.0,81.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,,Toy Story,False,7.7,5415.0
1,False,,65000000,"[{'id': 12, 'name': 'Adventure'}, {'id': 14, '...",,8844,tt0113497,en,Jumanji,When siblings Judy and Peter discover an encha...,...,1995-12-15,262797249.0,104.0,"[{'iso_639_1': 'en', 'name': 'English'}, {'iso...",Released,Roll the dice and unleash the excitement!,Jumanji,False,6.9,2413.0
2,False,"{'id': 119050, 'name': 'Grumpy Old Men Collect...",0,"[{'id': 10749, 'name': 'Romance'}, {'id': 35, ...",,15602,tt0113228,en,Grumpier Old Men,A family wedding reignites the ancient feud be...,...,1995-12-22,0.0,101.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,Still Yelling. Still Fighting. Still Ready for...,Grumpier Old Men,False,6.5,92.0
3,False,,16000000,"[{'id': 35, 'name': 'Comedy'}, {'id': 18, 'nam...",,31357,tt0114885,en,Waiting to Exhale,"Cheated on, mistreated and stepped on, the wom...",...,1995-12-22,81452156.0,127.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,Friends are the people who let you be yourself...,Waiting to Exhale,False,6.1,34.0
4,False,"{'id': 96871, 'name': 'Father of the Bride Col...",0,"[{'id': 35, 'name': 'Comedy'}]",,11862,tt0113041,en,Father of the Bride Part II,Just when George Banks has recovered from his ...,...,1995-02-10,76578911.0,106.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,Just When His World Is Back To Normal... He's ...,Father of the Bride Part II,False,5.7,173.0


In [18]:
credits_metadata_df = pd.read_csv("data/credits/credits.csv") 

In [19]:
len(credits_metadata_df.index)

45476

In [20]:
credits_metadata_df.head()

Unnamed: 0,cast,crew,id
0,"[{'cast_id': 14, 'character': 'Woody (voice)',...","[{'credit_id': '52fe4284c3a36847f8024f49', 'de...",862
1,"[{'cast_id': 1, 'character': 'Alan Parrish', '...","[{'credit_id': '52fe44bfc3a36847f80a7cd1', 'de...",8844
2,"[{'cast_id': 2, 'character': 'Max Goldman', 'c...","[{'credit_id': '52fe466a9251416c75077a89', 'de...",15602
3,"[{'cast_id': 1, 'character': ""Savannah 'Vannah...","[{'credit_id': '52fe44779251416c91011acb', 'de...",31357
4,"[{'cast_id': 1, 'character': 'George Banks', '...","[{'credit_id': '52fe44959251416c75039ed7', 'de...",11862


In [21]:
keywords_metadata_df = pd.read_csv("data/keywords/keywords.csv")  

In [22]:
len(keywords_metadata_df.index)

46419

In [23]:
keywords_metadata_df.head()

Unnamed: 0,id,keywords
0,862,"[{'id': 931, 'name': 'jealousy'}, {'id': 4290,..."
1,8844,"[{'id': 10090, 'name': 'board game'}, {'id': 1..."
2,15602,"[{'id': 1495, 'name': 'fishing'}, {'id': 12392..."
3,31357,"[{'id': 818, 'name': 'based on novel'}, {'id':..."
4,11862,"[{'id': 1009, 'name': 'baby'}, {'id': 1599, 'n..."


In [24]:
rating_metadata_df = pd.read_csv("data/ratings/ratings.csv")  

In [25]:
len(rating_metadata_df.index)

26024289

In [26]:
rating_metadata_df.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,110,1.0,1425941529
1,1,147,4.5,1425942435
2,1,858,5.0,1425941523
3,1,1221,5.0,1425941546
4,1,1246,5.0,1425941556


### Step 2: Explore and Assess the Data
#### Explore the Data 
It was discovred that the spoken languges column contains languges names in the original languges which makes it diffcult to relize which language is that. 


In [27]:
pd.set_option('display.max_colwidth', None)
non_english_movies = movies_metadata_df[movies_metadata_df['original_language'] != 'en']
print(non_english_movies['spoken_languages'])

28       [{'iso_639_1': 'cn', 'name': '广州话 / 廣州話'}, {'iso_639_1': 'fr', 'name': 'Français'}]
29                                                      [{'iso_639_1': 'zh', 'name': '普通话'}]
32                                                  [{'iso_639_1': 'en', 'name': 'English'}]
52           [{'iso_639_1': 'sq', 'name': 'shqip'}, {'iso_639_1': 'it', 'name': 'Italiano'}]
57         [{'iso_639_1': 'it', 'name': 'Italiano'}, {'iso_639_1': 'es', 'name': 'Español'}]
                                                ...                                         
45451                                           [{'iso_639_1': 'xx', 'name': 'No Language'}]
45453                                                [{'iso_639_1': 'hi', 'name': 'हिन्दी'}]
45455                                              [{'iso_639_1': 'it', 'name': 'Italiano'}]
45461                                                 [{'iso_639_1': 'fa', 'name': 'فارسی'}]
45462                                                      [{'iso_639_

#### Cleaning Steps
In the project I used langcodes library to get the english translation of the language by passing the language code and getting the translation. 

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Due to the existence of some many-to-many relationships in our datasource. I decided to use snowflake as architecture because of the amout of the bridge tables which I have.

![Datbase_Design](images//Capstone_movies_database_design.png)


#### 3.2 Mapping Out Data Pipelines
In order to have the movies data landed in DWH serval steps
 * Installation 
    * AWS S3 bucket with the CVS files
    * AWS EMR cluster with spark installed (ETL folder is uploaded to project directory)
    * AWS Redhisft cluster with database created
    * Apache Airflow (movies_dag should be placed in dags folder)
 
 * Execution 
    * Execute create_tables.sql Redhisft DB
    * Trigger Airflow Dag



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
* Building Database was done manually by executing create_table.sql in Redshift database
* Trigger Airflow pipeline and ETL process consists of serveral jobs, which are being executed by spark-submit command in the EMR cluster

    ![Airflow_pipeline](images//Airflow_pipeline.png)

* After the pipeline was executed successfully, here are some data samples for the DWH

    ![dim_movies](images//dim_movies.png)

    ![dim_cast](images//dim_cast.png)

    ![dim_languages](images//dim_languages.png)

* The end user can use any data visualization tool like Tablue, Bower BI or execute SQL queries on the database directly like the following
    
  * List Action movies release in 2004

     ![released_action_movies_2004](images//released_action_movies_2004.png)

  * List production companies revenue and number of movies relased between 2010 and 2020 order by top revenue

    ![production_companies_revenue_between_2010-2020](images//production_companies_revenue_between_2010-2020.png)





#### 4.2 Data Quality Checks
* All the primary keys and foreign keys were created during database creation to make sure of data integrity
* After the ETL jobs were executed at the end of the pipeline there is a quality check job which executes some quires to check the data completion

#### 4.3 Data dictionary 

Please check [Data_Dictionary.xlsx](Data_Dictionary.xlsx)

<br><br>

#### Step 5: Complete Project Write Up


#### Tools and Technologies
* AWS
    * S3: Is used as data source, It is scalable object storage. It is optimal for storing files
    * EMR: Is used as computational power for the ETL processes, It is scalable hadoop cluster with the option of installing spark applications. 
    * Redshift: Is used a data destination, Its a serverless Data warehouse.
* Airflow: Is used for data pipeline orchestration, It can be scheduled and triggered automatically. It showes graphical data lineage 
* Python
    * Pyspark: Python library used as API to Spark cluster. Mainly used for data frames operations on spark application.
    * Pandas: Python library used for data frames operations. In the project was used for data discovery.



#### Data Update Frequency
* Cenima production is not active on daily basis the data can be updated in 2 ways
    * The data can updated monthly or quarterly.
    * If new files landed in S3 bucket, Airflow pipeline could be triggered.
  



#### Performance challenges
* The data was increased by 100x.
    * EMR has the ability to scale out and scale in clusters depends on the work load for futher info pleae check https://aws.amazon.com/blogs/aws/new-auto-scaling-for-emr-clusters/
* The data populates a dashboard that must be updated on a daily basis by 7am every day.
    * Airflow could be scheduled to trigger the data pipeline at predefined schedule
* The database needed to be accessed by 100+ people.
    * Redshift can have to 500 connections for further info please check Redshift limits https://docs.aws.amazon.com/redshift/latest/mgmt/amazon-redshift-limits.html#amazon-redshift-limits-quota
    
  

