# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import configparser
from datetime import datetime
import os
import glob
from pyspark.sql import SparkSession, types
from pyspark.sql.functions import udf, col,concat_ws, from_unixtime, substring, monotonically_increasing_id,split, col,array, lit, explode, from_json, json_tuple, length, to_timestamp  
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format,from_unixtime
from pyspark.sql.types import StructType, StringType, ArrayType, TimestampType, IntegerType, DoubleType

### Step 1: Scope the Project and Gather Data

This is the final capstone project for the Data Engineer Nano Degree. Udacity provides
a topic of the final project with provided dataset, where students should show their gained 
through their learning path knowledge. Beside of provided topic for the capstone project, there is 
also a chance to choose your own dataset, but with some strict requirements:
>- _Dataset should contain at least 1 Million rows_
>- _Dataset should have at least two different data sources(JSON, CSV,...)_

In my case I've decided to explore my own dataset :blush:, which I found in Kaggle, called
  [The Movies Kaggle dataset.](https://www.kaggle.com/rounakbanik/the-movies-dataset?select=credits.csv)

### Goal of this project  
The goal of this project is to create an ETL process to extract,
transform and load data from existing CSV/JSON files from S3 bucket to AWS Redshift Datawarehouse
using Apache Airflow to automate as much as possible ETL process. Star Schema of tables in Redshift
allows quick analysys of data by using simple queries without JOIN statements. Apache Airflow allows
to schedule tasks and see the execution status of Pipeline steps. 
Data Warehouse has an ability to distribute data by key among CPU's which
encreases query the Data. 


#### Movie Dataset
This Dataset is taken from [The Movies Kaggle dataset.](https://www.kaggle.com/rounakbanik/the-movies-dataset?select=credits.csv)
These files contain metadata for all 45,000 movies listed in the Full MovieLens Dataset. The dataset consists of movies released on or before July 2017. Data points include cast, crew, plot keywords, budget, revenue, posters, release dates, languages, production companies, countries, TMDB vote counts and vote averages.
This dataset also has files containing 26 million ratings from 270,000 users for all 45,000 movies. Ratings are on a scale of 1-5 and have been obtained from the official GroupLens website.

### Source Files and Data Structure
>- movies_metadata.csv: 
> 
> The main Movies Metadata file. Contains information on 45,000 movies featured in the Full MovieLens dataset. Features include posters, backdrops, budget, revenue, release dates, languages, production countries and companies.

>- ratings.json
> 
> contains information about: ratings of movies given by users in particular time.

>- credits.csv
> 
> Consists of Cast and Crew Information for all our movies. Available in the form of a stringified JSON Object.

>- keywords.csv:
> 
> Contains the movie plot keywords for our MovieLens movies. Available in the form of a stringified JSON Object.



In [2]:
### Step 1.1 Setting Access to the Source files and reading them to explore

In [2]:
#Getting AWS credentials from config file
config = configparser.ConfigParser()
config.read('dl.cfg')

AWS_ACCESS_KEY_ID=config.get('default','AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY=config.get('default','AWS_SECRET_ACCESS_KEY')

In [3]:
# Create Spark Session
spark = SparkSession.builder \
            .appName("my_app") \
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
            .config("spark.sql.broadcastTimeout", "360000")\
            .getOrCreate()


In [4]:
#Setting AWS credentials to spark 
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")


In [6]:
spark.read.parquet("s3a://tempbucket1168/dim_movies.parquet").printSchema()

AnalysisException: 'Path does not exist: s3a://tempbucket1168/dim_movies.parquet;'

In [5]:
# Read data from source files to spark dataframes from S3 bucket

movies_df = spark.read.format('csv').options(header='true', inferSchema='true', mode='PERMISSIVE').load("s3a://tempbucket1168/movies_metadata.csv")
keywords_df = spark.read.format('csv').options(header='true', inferSchema='true', mode='PERMISSIVE').load("s3a://tempbucket1168/keywords.csv")
credits_df = spark.read.format('csv').options(header='true', inferSchema='true', mode='PERMISSIVE').load("s3a://tempbucket1168/credits.csv")
ratings_df =spark.read.format('org.apache.spark.sql.json').load("s3a://tempbucket1168/ratings.json")



### Step 2: Explore and Assess the Data


In [6]:
#### Explore the Data

In [6]:
## Exploring dataframe schemas

movies_df.printSchema()
keywords_df.printSchema()
credits_df.printSchema()
ratings_df.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

In [7]:
## Exploring data structures of movies_df
# For our data analysis Team we are interested in the following column extraction:
# id, title, overview, release_date, original_language, runtime, vote_average, production_companies, vote_count, revenue, budget
movies_df.head(1)

#Following quality issues:
# id->(Null values, not integer, duplicates);title->(Null values);overview->(Null values);release_date->(Null values, not date format);original_language->(Null values);
# runtime->(Null values, not integer);vote_average->(Null values, not float);production_companies->(Null values, not array);vote_count->(Null values, not float);
# revenue->(Null values, not float);budget->(Null values, not float)

[Row(adult='False', belongs_to_collection="{'id': 10194, 'name': 'Toy Story Collection', 'poster_path': '/7G9915LfUQ2lVfwMEEhDsn3kT4B.jpg', 'backdrop_path': '/9FBwqcd9IRruEDUrTdcaafOMKUq.jpg'}", budget='30000000', genres="[{'id': 16, 'name': 'Animation'}, {'id': 35, 'name': 'Comedy'}, {'id': 10751, 'name': 'Family'}]", homepage='http://toystory.disney.com/toy-story', id='862', imdb_id='tt0114709', original_language='en', original_title='Toy Story', overview="Led by Woody, Andy's toys live happily in his room until Andy's birthday brings Buzz Lightyear onto the scene. Afraid of losing his place in Andy's heart, Woody plots against Buzz. But when circumstances separate Buzz and Woody from their owner, the duo eventually learns to put aside their differences.", popularity='21.946943', poster_path='/rhIRbceoE9lR4veEXuwCC2wARtG.jpg', production_companies="[{'name': 'Pixar Animation Studios', 'id': 3}]", production_countries="[{'iso_3166_1': 'US', 'name': 'United States of America'}]", relea

In [8]:
## Exploring data structures of keywords_df
# For our data analysis Team we are interested in the following column extraction:
# id, keywords

keywords_df.head(1)

#Following quality issues:
# id->(Null values, not integer, duplicates);keywords->(Null values, not array)

[Row(id=862, keywords="[{'id': 931, 'name': 'jealousy'}, {'id': 4290, 'name': 'toy'}, {'id': 5202, 'name': 'boy'}, {'id': 6054, 'name': 'friendship'}, {'id': 9713, 'name': 'friends'}, {'id': 9823, 'name': 'rivalry'}, {'id': 165503, 'name': 'boy next door'}, {'id': 170722, 'name': 'new toy'}, {'id': 187065, 'name': 'toy comes to life'}]")]

In [9]:
## Exploring data structures of credits_df
# For our data analysis Team we are interested in the following column extraction:
# cast, crew, id

credits_df.head(1)

#Following quality issues:
# id->(Null values, not integer, duplicates);cast->(Null values, not array);crew->(Null values, not array)

# cast and crew columns are Stringified JSON format which we will parce into following columns:
# (character ,name, gender) -> for dim_staff table;   (job, department, name, gender) -> for dim_crew table.

[Row(cast="[{'cast_id': 14, 'character': 'Woody (voice)', 'credit_id': '52fe4284c3a36847f8024f95', 'gender': 2, 'id': 31, 'name': 'Tom Hanks', 'order': 0, 'profile_path': '/pQFoyx7rp09CJTAb932F2g8Nlho.jpg'}, {'cast_id': 15, 'character': 'Buzz Lightyear (voice)', 'credit_id': '52fe4284c3a36847f8024f99', 'gender': 2, 'id': 12898, 'name': 'Tim Allen', 'order': 1, 'profile_path': '/uX2xVf6pMmPepxnvFWyBtjexzgY.jpg'}, {'cast_id': 16, 'character': 'Mr. Potato Head (voice)', 'credit_id': '52fe4284c3a36847f8024f9d', 'gender': 2, 'id': 7167, 'name': 'Don Rickles', 'order': 2, 'profile_path': '/h5BcaDMPRVLHLDzbQavec4xfSdt.jpg'}, {'cast_id': 17, 'character': 'Slinky Dog (voice)', 'credit_id': '52fe4284c3a36847f8024fa1', 'gender': 2, 'id': 12899, 'name': 'Jim Varney', 'order': 3, 'profile_path': '/eIo2jVVXYgjDtaHoF19Ll9vtW7h.jpg'}, {'cast_id': 18, 'character': 'Rex (voice)', 'credit_id': '52fe4284c3a36847f8024fa5', 'gender': 2, 'id': 12900, 'name': 'Wallace Shawn', 'order': 4, 'profile_path': '/oGE

In [10]:
## Exploring data structures of ratings_df
# For our data analysis Team we are interested in the following column extraction:
# cast, crew, id

ratings_df.show(2)

#Following quality issues:
# movieId->(Null values, not integer, duplicates);rating->(Null values, not float);timestamp->(Null values, not timestamp)



+-------+------+----------+------+
|movieId|rating| timestamp|userId|
+-------+------+----------+------+
|movieId|rating| timestamp|userId|
|    110|   1.0|1425941529|     1|
+-------+------+----------+------+
only showing top 2 rows



In [1]:
#### Cleaning Steps

In [11]:

#Filtering movies dataframe
movies_df = movies_df.filter(col("id").cast("int").isNotNull())
movies_df = movies_df.filter(col("production_companies").startswith('['))
#Filtering keywords dataframe
keywords_df = keywords_df.filter(col("id").cast("int").isNotNull())
keywords_df = keywords_df.filter(col("keywords").startswith('[{'))

#Leaving only usefull columns in movies df 
movies_df = movies_df.select(col("id").alias("movie_key"),col("title"),col("overview"),col("release_date"),col("original_language").alias("language"),col("runtime"),col("vote_average"),col("production_companies"),col("vote_count"),col("revenue"),col("budget"))
#Left joining movies df with keywords df and dropping id column 
full_movies_df = movies_df.join(keywords_df,movies_df.movie_key ==  keywords_df.id,"left")
full_movies_df = full_movies_df.drop(full_movies_df.id)


In [15]:
full_movies_df = full_movies_df.withColumn("movie_key", full_movies_df["movie_key"].cast(IntegerType()))
full_movies_df = full_movies_df.withColumn("runtime", full_movies_df["runtime"].cast(IntegerType()))
full_movies_df = full_movies_df.withColumn("vote_average", full_movies_df["vote_average"].cast(DoubleType()))
full_movies_df = full_movies_df.withColumn("vote_count", full_movies_df["vote_count"].cast(DoubleType()))
full_movies_df = full_movies_df.withColumn("revenue", full_movies_df["revenue"].cast(DoubleType()))
full_movies_df = full_movies_df.withColumn("budget", full_movies_df["budget"].cast(DoubleType()))
full_movies_df.printSchema()
full_movies_df.show(5)

root
 |-- movie_key: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- language: string (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- revenue: double (nullable = true)
 |-- budget: double (nullable = true)
 |-- keywords: string (nullable = true)

+---------+--------------------+--------------------+------------+--------+-------+------------+--------------------+----------+------------+------+--------------------+
|movie_key|               title|            overview|release_date|language|runtime|vote_average|production_companies|vote_count|     revenue|budget|            keywords|
+---------+--------------------+--------------------+------------+--------+-------+------------+--------------------+----------+------------+------+--------

In [16]:
# Writing full_movies_df to parquet file in S3

full_movies_df.write.mode("overwrite").parquet('s3a://tempbucket1168/stage_movies.parquet')

In [15]:
# Drop full_movies_df
full_movies_df.unpersist()
keywords_df.unpersist()
movies_df.unpersist()

DataFrame[movie_key: string, title: string, overview: string, release_date: string, language: string, runtime: string, vote_average: string, production_companies: string, vote_count: string, revenue: string, budget: string]

In [8]:
#Creating and filtering movie_staff/movie_crew dataframes from  credits df
# creating and filtering movie_staff_df
movie_staff_df = credits_df.select(col("id"),col("cast")).filter(col("id").cast("int").isNotNull()).filter(col("cast").startswith('[{'))
movie_staff_df = movie_staff_df.select(col("id"),explode(from_json(col("cast"), ArrayType(StringType()))).alias("staff_info"))
movie_staff_df = movie_staff_df.select(col("id").alias("movie_id"),json_tuple(col("staff_info"),"character","name","gender").alias("character","actor_name","gender"))
movie_staff_df.show(5)

# creating and filtering movie_crew_df
movie_crew_df = credits_df.select(col("id"),col("crew")).filter(col("id").cast("int").isNotNull()).filter(col("crew").startswith('[{'))
movie_crew_df = movie_crew_df.select(col("id"),explode(from_json(col("crew"), ArrayType(StringType()))).alias("crew_info"))
movie_crew_df = movie_crew_df.select(col("id").alias("movie_id"),json_tuple(col("crew_info"),"job","department","name","gender").alias("job","department","name","gender"))
movie_crew_df.show(5)

+--------+--------------------+--------------+------+
|movie_id|           character|    actor_name|gender|
+--------+--------------------+--------------+------+
|   15602|         Max Goldman|Walter Matthau|     2|
|   15602|      John Gustafson|   Jack Lemmon|     2|
|   15602|     Ariel Gustafson|   Ann-Margret|     1|
|   15602|Maria Sophia Cole...|  Sophia Loren|     1|
|   15602|   Melanie Gustafson|  Daryl Hannah|     1|
+--------+--------------------+--------------+------+
only showing top 5 rows

+--------+------------+----------+-------------------+------+
|movie_id|         job|department|               name|gender|
+--------+------------+----------+-------------------+------+
|   16420|      Writer|   Writing|William Shakespeare|     2|
|   16420|    Director| Directing|      Oliver Parker|     2|
|   16420|  Adaptation|   Writing|      Oliver Parker|     2|
|   31174|    Director| Directing|  Richard Loncraine|     2|
|   31174|Theatre Play|   Writing|William Shakespeare| 

In [10]:
movie_staff_df = movie_staff_df.withColumn("movie_id", movie_staff_df["movie_id"].cast(IntegerType()))
movie_crew_df = movie_crew_df.withColumn("movie_id", movie_crew_df["movie_id"].cast(IntegerType()))

In [12]:
movie_staff_df.printSchema()
movie_crew_df.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- character: string (nullable = true)
 |-- actor_name: string (nullable = true)
 |-- gender: string (nullable = true)

root
 |-- movie_id: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- department: string (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)



In [13]:
# Writing to parquet file
movie_staff_df.write.mode("overwrite").parquet('s3a://tempbucket1168/dim_movie_staff.parquet')


In [14]:
# Writing to parquet file
movie_crew_df.write.mode("overwrite").parquet('s3a://tempbucket1168/dim_movie_crew.parquet')

In [17]:
spark.read.parquet("s3a://tempbucket1168/dim_movie_staff.parquet").printSchema()
spark.read.parquet("s3a://tempbucket1168/dim_movie_crew.parquet").printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- character: string (nullable = true)
 |-- actor_name: string (nullable = true)
 |-- gender: string (nullable = true)

root
 |-- movie_id: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- department: string (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)



In [8]:
#Creating and filtering users/date dataframes from  ratings df
#Filtering data
ratings_df = ratings_df.select(col("userId").alias("user_key"),col("movieId"),col("rating"),col("timestamp")).filter(col("userId").cast("int").isNotNull()).filter(col("movieId").cast("int").isNotNull()).filter(col("rating").cast("float").isNotNull())
#Creating users_df/date_df -> converting timestamp to TimestampType
users_df = ratings_df.select(col("user_key"),col("movieId"),col("rating"))
users_df.show(5)

# create timestamp column from original timestamp column
date_df = ratings_df.select(col("timestamp"))
date_df = date_df.withColumn("date_key", from_unixtime((col("timestamp")/1000),"yyyy-MM-dd HH:mm:ss"))

# extract columns to create dim_date table
dim_date = date_df.selectExpr("date_key","hour(date_key) as hour","day(date_key) as day","weekofyear(date_key) as week","month(date_key) as month", "year(date_key) as year","dayofweek(date_key) as weekday")
dim_date.show(5)

+--------+-------+------+
|user_key|movieId|rating|
+--------+-------+------+
|       1|    110|   1.0|
|       1|    147|   4.5|
|       1|    858|   5.0|
|       1|   1221|   5.0|
|       1|   1246|   5.0|
+--------+-------+------+
only showing top 5 rows

+-------------------+----+---+----+-----+----+-------+
|           date_key|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|1970-01-17 12:05:41|  12| 17|   3|    1|1970|      7|
|1970-01-17 12:05:42|  12| 17|   3|    1|1970|      7|
|1970-01-17 12:05:41|  12| 17|   3|    1|1970|      7|
|1970-01-17 12:05:41|  12| 17|   3|    1|1970|      7|
|1970-01-17 12:05:41|  12| 17|   3|    1|1970|      7|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [11]:
ratings_df = ratings_df.withColumn("user_key", ratings_df["user_key"].cast(IntegerType()))
ratings_df = ratings_df.withColumn("movieId", ratings_df["movieId"].cast(IntegerType()))
ratings_df = ratings_df.withColumn("rating", ratings_df["rating"].cast(DoubleType()))
ratings_df = ratings_df.withColumn("timestamp", ratings_df["timestamp"].cast(TimestampType()))

In [12]:
ratings_df.printSchema()

root
 |-- user_key: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [13]:
ratings_df.write.mode("overwrite").parquet('s3a://tempbucket1168/stage_ratings.parquet')

In [20]:
users_df = users_df.withColumn("user_key", users_df["user_key"].cast(IntegerType()))
users_df = users_df.withColumn("movieId", users_df["movieId"].cast(IntegerType()))
users_df = users_df.withColumn("rating", users_df["rating"].cast(DoubleType()))
dim_date = dim_date.withColumn("date_key", dim_date["date_key"].cast(TimestampType()))

In [21]:
users_df.printSchema()
dim_date.printSchema()

root
 |-- user_key: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)

root
 |-- date_key: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [22]:
# Writing to parquet file
users_df.write.mode("overwrite").parquet('s3a://tempbucket1168/dim_users.parquet')

In [23]:
# Writing to parquet file
dim_date.write.mode("overwrite").parquet('s3a://tempbucket1168/dim_date.parquet')

In [25]:
spark.read.parquet("s3a://tempbucket1168/dim_users.parquet").printSchema()
spark.read.parquet("s3a://tempbucket1168/dim_date.parquet").printSchema()

root
 |-- user_key: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)

root
 |-- date_key: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
### Amazon Redshift - ERD

>  For Amazon Redshift Datawarehouse I have chosen Star Schema (see pic. below) of tables, which
> allows quick analysys of data by using simple queries without JOIN statements.
> ![Alt text](./ERD_Capstone.png?raw=true "Title")

### Tables structure and datatypes

> fact_movies - _contains PRIMARY keys of dimentional tables and budget/revenue metrics of each movie_
> - fact_movie_key: INT PRIMARY KEY
> - movie_key: INT FOREIGN KEY
> - user_key: INT FOREIGN KEY
> - date_key: TIMESTAMP FOREIGN KEY
> - movie_staff_key: INT FOREIGN KEY
> - movie_crew_key: INT FOREIGN KEY
> - budget: FLOAT
> - revenue: FLOAT

>### dim_movie - _contains information of 45000 movies_
>
> - movie_key: INT PRIMARY KEY
> - title: VARCHAR
> - overview: VARCHAR
> - release_date: TEXT
> - language: VARCHAR
> - runtime: INT
> - vote_avg: FLOAT
> - production_companies: TEXT
> - vote_count: FLOAT
> - keywords: TEXT

>### dim_users - _contains information of 270000 users with 26000000 movie ratings_
>
> - user_key: INT PRIMARY KEY
> - movieID: INT
> - rating: FLOAT

>### dim_date - _date time-table when ratings were given_
>
> - date_key: TIMESTAMP PRIMARY KEY
> - hour: INT
> - day: INT
> - week: INT
> - month: INT
> - year: INT
> -weekday: INT

>### dim_movie_staff - _contains information of actors and roles, which they played in movies_
> - movie_staff_key: INT PRIMARY KEY
> - character: VARCHAR
> - name: VARCHAR
> - movie_id: INT
> - gender: VARCHAR

>### dim_movie_crew - _contains information of crew of each movie_
>
> - movie_crew_key: INT PRIMARY KEY
> - job: VARCHAR
> - department: VARCHAR
> - movie_id: INT
> - name: VARCHAR
> - gender: VARCHAR

#### 3.2 Mapping Out Data Pipelines

1. COPY data to staging/dim tables in AWS Redshift
2. INSERT data from staging tables to fact/dim tables
3. RUN data quality check

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.