# Movie Recommmendation with Spark and AWS
## Introduction

The project uses datasets (ml-latest-small) from [MovieLens](https://grouplens.org/datasets/movielens/latest/), a movie recommendation service. It contains 100836 ratings and 3683 tags across 9742 movies. The ratings were created by 610 users between 1996 and 2018. The larger dataset contains 27753444 ratings and 1108997 tags across 58098 movies. Ratings were created by 283228 users between 1995 and 2018.

I also generated two txt files for movies with awards. The file is copied from [Wikipedia/Award-winning films](https://en.wikipedia.org/wiki/List_of_Academy_Award-winning_films)

The Project is to build an ETL pipeline that extracts data from S3, processes them using Spark, stages them in Redshift, and transforms data into a set of dimensional tables.

In [1]:
import boto3
import os
import configparser
from datetime import datetime
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col, isnan, when, count, trim, desc, sum, asc
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import countDistinct, explode, split, concat_ws, collect_list
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('ggplot')

# STEP 1: Get the params of the created redshift cluster 
- This is for reading data from S3 to redshift
- We need:
    - The redshift cluster <font color='red'>endpoint</font>
    - The <font color='red'>IAM role ARN</font> that give access to Redshift to read from S3

In [2]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['KEY']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['SECRET']

In [127]:
# e.g. DWH_ENDPOINT="redshift-cluster-1.csmamz5zxmle.us-west-2.redshift.amazonaws.com" 
DWH_ENDPOINT="" 
    
#e.g DWH_ROLE_ARN="arn:aws:iam::988332130976:role/dwhRole"
DWH_ROLE_ARN=""

# Step 2: Explore and Assess the Data using Spark

In [6]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

### Part 1: Load Data from S3 and clean dataframe
- movie.csv: including movieId, title(year), genres
  - split title and year from the second column
  - split generes from the array
- ratings.csv: including userId, movieId, rating, ts
  - transform ts string into timestamp
- tags.csv: including userId, movieId, tag, ts
  - transform ts string into timestamp
- awards.txt: including Film, year, awards, nominations
  - split txt data using delimiter "|"
  - identify issues when splitting data like inappropriate year
  - transform data into appropriate data type
- award_corrected.txt: including Film, year, awards, nominations (corrections for awards.txt)
  - join with awards to correct the year
  - transform data into appropriate data type

In [7]:
movieSchema = R([
            Fld("movieId",Int()),
            Fld("title",Str()),
            Fld("genres",Str())
            ])

In [8]:
ratingSchema = R([
            Fld("userId",Int()),
            Fld("movieId",Int()),
            Fld("rating",Dbl()),
            Fld("ts",Str())
            ])

In [9]:
tagSchema = R([
            Fld("userId",Int()),
            Fld("movieId",Int()),
            Fld("tag",Str()),
            Fld("ts",Str())
            ])

In [33]:
# read movies, ratings, and tags csv
dfmovies = spark.read.csv("s3a://udacity-input/ml-latest-small/movies.csv", header=True, schema=movieSchema)
dfratings = spark.read.csv("s3a://udacity-input/ml-latest-small/ratings.csv", header = True, schema=ratingSchema)
dftags = spark.read.csv("s3a://udacity-input/ml-latest-small/tags.csv", header = True, schema=tagSchema)

In [34]:
# read awards txt
dfawards = spark.read.option("header", "true") \
    .option("delimiter", "|") \
    .option("inferSchema", "true") \
    .csv("s3a://udacity-input/ml-latest-small/Awards.txt")

dfawards.show(10, truncate=False)

+--------------------------------------------------------+-------+----------+-----------+
|Film                                                    |Year   |Awards    |Nominations|
+--------------------------------------------------------+-------+----------+-----------+
|Parasite                                                |2019.0 |4.0       |6          |
|Ford v Ferrari                                          |2019.0 |2.0       |4          |
|Learning to Skateboard in a Warzone (If You're a Girl)  |2019.0 |1.0       |1          |
|The Neighbors' Window                                   |2019.0 |1.0       |1          |
|Little Women                                            |2019.0 |1.0       |6          |
|Marriage Story                                          |2019.0 |1.0       |6          |
|Jojo Rabbit                                             |2019.0 |1.0       |6          |
|Toy Story 4                                             |2019.0 |1.0       |2          |
|Joker   2

In [35]:
# read award_corrected txt
dfawards2 = spark.read.option("header", "true") \
    .option("delimiter", "|") \
    .option("inferSchema", "true") \
    .csv("s3a://udacity-input/ml-latest-small/Award_corrected.txt")

dfawards2.show(10, truncate=False)

+--------------------+-------+----------+-----------+
|Film                |Year   |Awards    |Nominations|
+--------------------+-------+----------+-----------+
|Becket              |1964.0 |1.0       |12         |
|Ben-Hur             |1959.0 |11.0      |12         |
|Dances with Wolves  |1990.0 |7.0       |12         |
|The English Patient |1996.0 |9.0       |12         |
|Gladiator           |2000.0 |5.0       |12         |
|Johnny Belinda      |1948.0 |1.0       |12         |
|Lincoln             |2012.0 |2.0       |12         |
|Mrs. Miniver        |1942.0 |6.0       |12         |
|My Fair Lady        |1964.0 |8.0       |12         |
|On the Waterfront   |1954.0 |8.0       |12         |
+--------------------+-------+----------+-----------+
only showing top 10 rows



# Step 3: Define Relational Data Model
**For the following use cases, I created 5 tables**
- number of movies in the dataset  
- number of movies in each genre  
- number of users in the dataset  
- Minimum number of ratings per user  
- Minimum number of ratings per movie   
- number of movies not rated  
- the top 5 movies with high ratings  
- number of movies receiving awards  
- total awards that movie received  
- number of movies rated and receiving awards  
- the average rating scores of movies with awards  
- year durations in movies, ratings and awards dataset  

**snowflake schema**
* **awards** - (film, year, nominations, awards)  
This table will have the awards that each movie received. The composite key of film and year is used to identify each row in this table since films can be made in the same name. 
* **movies** - (movieId, title, year)  
The primary key for movies is movieId, and genres need to removed from the original table since genres include a list of genres for each movie.
* **genres** - (genreId, movieId, genre)  
A separate table genres needs to be created to identify the type of each movie. Since each movie can have several types, a unique id genreId is created for this table as primary key.  
* **ratings** - (userId, movieId, rating, rate_time, year)  
The composite key is userId and movieId in ratings table since a user can rate different movies.
* **time** - timestamps in ratings broken down into specific units (date_key, day, week, month, year)
A time table is created to check the day, week, month and year. The primary key is date_key.

#### Method 1: Mapping Out Data Pipelines using Spark
- Movies and genres can be created using the movies csv from S3.
- Ratings can be created using the ratings csv from S3.
- Awards can be created by joining data in awards.txt and award_correction.txt.

#### Method 2: Mapping Out Data Pipelines in Redshift
- Awards, ratings, genres table in parquet format can be read directly from S3.  
- Movies and genres can be created using the movies data from S3.

# Step 4: Run Pipelines to Model the Data 
### 4.1 Create the data model using Spark
Build the data pipelines to create the data model.

In [36]:
dfmovies.printSchema()
dfmovies.show(5, truncate = False)
dfmovies.count()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows



9742

In [37]:
# convert timestamp
dfratings = dfratings.withColumn(
    "rate_time",
    F.to_timestamp(F.from_unixtime((col("ts")) , 'yyyy-MM-dd HH:mm:ss.SSS')).cast("Timestamp")
).drop("ts")

In [38]:
dfratings = dfratings.withColumn("year", F.year("rate_time"))

In [39]:
dfratings.printSchema()
dfratings.show(5)
dfratings.count()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- rate_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)

+------+-------+------+-------------------+----+
|userId|movieId|rating|          rate_time|year|
+------+-------+------+-------------------+----+
|     1|      1|   4.0|2000-07-30 18:45:03|2000|
|     1|      3|   4.0|2000-07-30 18:20:47|2000|
|     1|      6|   4.0|2000-07-30 18:37:04|2000|
|     1|     47|   5.0|2000-07-30 19:03:35|2000|
|     1|     50|   5.0|2000-07-30 18:48:51|2000|
+------+-------+------+-------------------+----+
only showing top 5 rows



100836

In [40]:
# convert timestamp
dftags = dftags.withColumn("tag_time", F.to_timestamp(col("ts") / 1)).drop("ts")
dftags = dftags.withColumn("year", F.year("tag_time"))

In [41]:
dftags.printSchema()
dftags.show(5)
dftags.count()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- tag_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)

+------+-------+---------------+-------------------+----+
|userId|movieId|            tag|           tag_time|year|
+------+-------+---------------+-------------------+----+
|     2|  60756|          funny|2015-10-24 19:29:54|2015|
|     2|  60756|Highly quotable|2015-10-24 19:29:56|2015|
|     2|  60756|   will ferrell|2015-10-24 19:29:52|2015|
|     2|  89774|   Boxing story|2015-10-24 19:33:27|2015|
|     2|  89774|            MMA|2015-10-24 19:33:20|2015|
+------+-------+---------------+-------------------+----+
only showing top 5 rows



3683

In [42]:
dfawards.columns

['Film   ', 'Year   ', 'Awards    ', 'Nominations']

In [43]:
# clean awards txt flie
dfawards = dfawards.withColumn("film", dfawards['Film   '].cast(Str())).drop('Film   ')
dfawards = dfawards.withColumn("year", dfawards['Year   '].cast(Int())).drop("Year   ")
dfawards = dfawards.withColumn("awards", dfawards['Awards    '].cast(Dbl())).drop("Awards    ")
dfawards = dfawards.withColumn("nominations", dfawards['Nominations'].cast(Int()))

In [44]:
dfawards.columns

['nominations', 'film', 'year', 'awards']

In [45]:
dfawards2.columns

['Film   ', 'Year   ', 'Awards    ', 'Nominations']

In [46]:
dfawards2 = dfawards2.withColumn("film", dfawards2['Film   '].cast(Str())).drop('Film   ')
dfawards2 = dfawards2.withColumn("year", dfawards2['Year   '].cast(Int())).drop("Year   ")
#dfawards2 = dfawards2.withColumn("date", F.to_timestamp(col('Year   '))).drop('Year   ')
#dfawards2 = dfawards2.withColumn("year", F.year("date")).drop("date")
dfawards2 = dfawards2.withColumn("awards", dfawards2['Awards    '].cast(Dbl())).drop("Awards    ")
dfawards2 = dfawards2.withColumn("nominations", dfawards2['Nominations'].cast(Int()))

In [47]:
dfawards.printSchema()
dfawards.show(5, truncate = False)
dfawards.count()

root
 |-- nominations: integer (nullable = true)
 |-- film: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- awards: double (nullable = true)

+-----------+--------------------------------------------------------+----+------+
|nominations|film                                                    |year|awards|
+-----------+--------------------------------------------------------+----+------+
|6          |Parasite                                                |2019|4.0   |
|4          |Ford v Ferrari                                          |2019|2.0   |
|1          |Learning to Skateboard in a Warzone (If You're a Girl)  |2019|1.0   |
|1          |The Neighbors' Window                                   |2019|1.0   |
|6          |Little Women                                            |2019|1.0   |
+-----------+--------------------------------------------------------+----+------+
only showing top 5 rows



1316

In [48]:
dfawards2.printSchema()
dfawards2.show(5, truncate = False)
dfawards2.count()

root
 |-- nominations: integer (nullable = true)
 |-- film: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- awards: double (nullable = true)

+-----------+--------------------+----+------+
|nominations|film                |year|awards|
+-----------+--------------------+----+------+
|12         |Becket              |1964|1.0   |
|12         |Ben-Hur             |1959|11.0  |
|12         |Dances with Wolves  |1990|7.0   |
|12         |The English Patient |1996|9.0   |
|12         |Gladiator           |2000|5.0   |
+-----------+--------------------+----+------+
only showing top 5 rows



76

In [49]:
# split the mixed genres by '|'
dfmovies2 = dfmovies.withColumn('genre', explode(split(dfmovies.genres, '\|')))

In [50]:
dfmovies2.show(11)

+-------+--------------------+--------------------+---------+
|movieId|               title|              genres|    genre|
+-------+--------------------+--------------------+---------+
|      1|    Toy Story (1995)|Adventure|Animati...|Adventure|
|      1|    Toy Story (1995)|Adventure|Animati...|Animation|
|      1|    Toy Story (1995)|Adventure|Animati...| Children|
|      1|    Toy Story (1995)|Adventure|Animati...|   Comedy|
|      1|    Toy Story (1995)|Adventure|Animati...|  Fantasy|
|      2|      Jumanji (1995)|Adventure|Childre...|Adventure|
|      2|      Jumanji (1995)|Adventure|Childre...| Children|
|      2|      Jumanji (1995)|Adventure|Childre...|  Fantasy|
|      3|Grumpier Old Men ...|      Comedy|Romance|   Comedy|
|      3|Grumpier Old Men ...|      Comedy|Romance|  Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|   Comedy|
+-------+--------------------+--------------------+---------+
only showing top 11 rows



In [51]:
# create genere information for each movie
dfgenre = dfmovies2.select("movieId", "genre").dropDuplicates().dropna(subset=["movieId", "genre"]).withColumn("genreId", F.monotonically_increasing_id())

In [52]:
#dfgenre.filter(dfgenre.title.contains('Toy Story (1995)')).show()
dfgenre.filter(dfgenre.movieId == 1).show()

+-------+---------+-------------+
|movieId|    genre|      genreId|
+-------+---------+-------------+
|      1|   Comedy|  77309411328|
|      1|Adventure| 206158430208|
|      1|  Fantasy| 773094113280|
|      1| Children|1348619730944|
|      1|Animation|1434519076864|
+-------+---------+-------------+



In [53]:
dfgenre.columns
dfgenre.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- genreId: long (nullable = false)



#### Load Data to S3 in parquet format

In [55]:
dfawards.write.parquet("s3a://sparkifydend/movies/awards/", mode="overwrite")

In [56]:
dfawards2.write.parquet("s3a://sparkifydend/movies/awards2/", mode="overwrite")

In [57]:
dfmovies.write.parquet("s3a://sparkifydend/movies/movies/", mode="overwrite")

In [58]:
dfratings.write.parquet("s3a://sparkifydend/movies/ratings/", mode="overwrite")

In [59]:
dftags.write.parquet("s3a://sparkifydend/movies/tags/", mode="overwrite")

In [60]:
dfgenre.write.parquet("s3a://sparkifydend/movies/genres/", mode="overwrite")

In [61]:
dfawards = spark.read.parquet("s3a://sparkifydend/movies/awards/*")
dfawards2 = spark.read.parquet("s3a://sparkifydend/movies/awards2/*")
dfmovies = spark.read.parquet("s3a://sparkifydend/movies/movies/*")
dfratings = spark.read.parquet("s3a://sparkifydend/movies/ratings/*")
dftags = spark.read.parquet("s3a://sparkifydend/movies/tags/*")
dfgenre = spark.read.parquet("s3a://sparkifydend/movies/genres/*")

### 4.2 Data Quality Checks Part 1: Identify missing values, duplicate data, etc

In [62]:
# check for null values
dfmovies.select([count(when(col(c).isNull(), c)).alias(c) for c in dfmovies.columns]).show()
dfratings.select([count(when(col(c).isNull(), c)).alias(c) for c in dfratings.columns]).show()
dfawards.select([count(when(col(c).isNull(), c)).alias(c) for c in dfawards.columns]).show()
dfawards2.select([count(when(col(c).isNull(), c)).alias(c) for c in dfawards2.columns]).show()

+-------+-----+------+
|movieId|title|genres|
+-------+-----+------+
|      0|    0|     0|
+-------+-----+------+

+------+-------+------+---------+----+
|userId|movieId|rating|rate_time|year|
+------+-------+------+---------+----+
|     0|      0|     0|        0|   0|
+------+-------+------+---------+----+

+-----------+----+----+------+
|nominations|film|year|awards|
+-----------+----+----+------+
|          0|   0|   0|     0|
+-----------+----+----+------+

+-----------+----+----+------+
|nominations|film|year|awards|
+-----------+----+----+------+
|          0|   0|   0|     0|
+-----------+----+----+------+



In [63]:
# show records with year < 1920
dfawards.filter(dfawards.year < 1920).show(5, truncate = False)

+-----------+----------------------------------------+----+------+
|nominations|film                                    |year|awards|
+-----------+----------------------------------------+----+------+
|1          |Joker   2019                            |2   |1.0   |
|0          |Once Upon a Time in Hollywood   2019    |2   |1.0   |
|0          |1917    2019                            |3   |1.0   |
|0          |Roma    2018                            |3   |1.0   |
|0          |The Favourite   2018                    |1   |1.0   |
+-----------+----------------------------------------+----+------+
only showing top 5 rows



In [64]:
# check records in dfawards2
dfawards2.filter(trim(dfawards2.film) == "Joker").show()
dfawards2.filter(trim(dfawards2.film) == "Once Upon a Time in Hollywood").show()
dfawards2.filter(trim(dfawards2.film) == "1917").show()
dfawards2.filter(trim(dfawards2.film) == "Roma").show()
dfawards2.filter(trim(dfawards2.film) == "The Favourite").show()

+-----------+--------+----+------+
|nominations|    film|year|awards|
+-----------+--------+----+------+
|         11|Joker   |2019|   2.0|
+-----------+--------+----+------+

+-----------+--------------------+----+------+
|nominations|                film|year|awards|
+-----------+--------------------+----+------+
|         10|Once Upon a Time ...|2019|   2.0|
+-----------+--------------------+----+------+

+-----------+--------+----+------+
|nominations|    film|year|awards|
+-----------+--------+----+------+
|         10|1917    |2019|   3.0|
+-----------+--------+----+------+

+-----------+--------+----+------+
|nominations|    film|year|awards|
+-----------+--------+----+------+
|         10|Roma    |2018|   3.0|
+-----------+--------+----+------+

+-----------+----------------+----+------+
|nominations|            film|year|awards|
+-----------+----------------+----+------+
|         10|The Favourite   |2018|   1.0|
+-----------+----------------+----+------+



In [65]:
# drop records with wrong year 
dfawards = dfawards.filter(dfawards.year > 1920)

In [66]:
dfawards.select([count(when(col(c).isNull(), c)).alias(c) for c in dfawards.columns]).show()
dfawards.show(5, truncate = False)
dfawards.count()

+-----------+----+----+------+
|nominations|film|year|awards|
+-----------+----+----+------+
|          0|   0|   0|     0|
+-----------+----+----+------+

+-----------+--------------------------------------------------------+----+------+
|nominations|film                                                    |year|awards|
+-----------+--------------------------------------------------------+----+------+
|6          |Parasite                                                |2019|4.0   |
|4          |Ford v Ferrari                                          |2019|2.0   |
|1          |Learning to Skateboard in a Warzone (If You're a Girl)  |2019|1.0   |
|1          |The Neighbors' Window                                   |2019|1.0   |
|6          |Little Women                                            |2019|1.0   |
+-----------+--------------------------------------------------------+----+------+
only showing top 5 rows



1247

In [67]:
# union dfawards and dfawards2, and remove duplicates
# dfawards2 has corrections for year
dfawards3 = dfawards.union(dfawards2).distinct().filter(~col("year").isin([0]) & col("year").isNotNull()).sort(desc('year'))
dfawards3.show(5, truncate = False)

+-----------+--------------------------------------------------------+----+------+
|nominations|film                                                    |year|awards|
+-----------+--------------------------------------------------------+----+------+
|4          |Ford v Ferrari                                          |2019|2.0   |
|1          |Learning to Skateboard in a Warzone (If You're a Girl)  |2019|1.0   |
|11         |Joker                                                   |2019|2.0   |
|6          |Parasite                                                |2019|4.0   |
|1          |Rocketman                                               |2019|1.0   |
+-----------+--------------------------------------------------------+----+------+
only showing top 5 rows



In [68]:
# show records with year not in the right range
dfawards3.where(dfawards3.year < 1920).show(5, truncate = False)

+-----------+----+----+------+
|nominations|film|year|awards|
+-----------+----+----+------+
+-----------+----+----+------+



In [69]:
# load to S3
dfawards3.write.parquet("s3a://sparkifydend/movies/awards3/", mode="overwrite")

### 4.2 Data Quality Checks Part 2: source/count checks to ensure completeness

In [70]:
def quality_check(df, tablename):
    '''
    Input: Spark dataframe, table name
    Output: Print outcome of data quality check
    '''
    
    result = df.count()
    if result == 0:
        print("Data quality check failed for {} with zero records".format(tablename))
    else:
        print("Data quality check passed for {} with {} records".format(tablename, result))
    return 0

In [71]:
# Perform data quality check with unit test
quality_check(dfmovies, "movies table")
quality_check(dfratings, "ratings table")
quality_check(dfawards3, "awards table")
quality_check(dfgenre, "genre table")

Data quality check passed for movies table with 9742 records
Data quality check passed for ratings table with 100836 records
Data quality check passed for awards table with 1316 records
Data quality check passed for genre table with 22084 records


0

In [72]:
dfmovies.count()

9742

In [73]:
dfmovies[['movieId']].drop_duplicates().count()

9742

In [74]:
dfratings.count()

100836

In [75]:
# dfratings is on movieid and userid level
dfratings[['movieId', 'userId']].drop_duplicates().count()

100836

In [76]:
dfawards3.count()

1316

In [77]:
# dfawards3 is on title and year level
dfawards3[['film', 'year']].drop_duplicates().count()

1316

In [78]:
# check out movies with same name
df1 = dfawards3.groupBy("film").count().filter("count > 1")
df1.show(truncate = False)

+------------------------+-----+
|film                    |count|
+------------------------+-----+
|Cyrano de Bergerac      |2    |
|King Kong               |2    |
|Henry V                 |2    |
|A Star Is Born          |3    |
|Little Women            |3    |
|The Great Gatsby        |2    |
|The Old Man and the Sea |2    |
|Titanic                 |2    |
|Up                      |2    |
|Cleopatra               |2    |
+------------------------+-----+



In [79]:
dfawards3.filter(trim(dfawards3.film) == "A Star Is Born").show()
dfawards3.filter(trim(dfawards3.film) == "Titanic").show()

+-----------+----------------+----+------+
|nominations|            film|year|awards|
+-----------+----------------+----+------+
|          8|A Star Is Born  |2018|   1.0|
|          4|A Star Is Born  |1976|   1.0|
|          7|A Star Is Born  |1937|   1.0|
+-----------+----------------+----+------+

+-----------+--------+----+------+
|nominations|    film|year|awards|
+-----------+--------+----+------+
|         14|Titanic |1997|  11.0|
|          2|Titanic |1953|   1.0|
+-----------+--------+----+------+



### 4.3 Data Wrangling with Spark and OLAP

In [80]:
# use the dataframe dfmovies2 to match every movie to a single genre
genre_movies = dfmovies2 \
                    .groupBy(dfmovies2.genre) \
                    .agg(concat_ws(',', collect_list(dfmovies2.movieId)) \
                    .alias('MovieIds')) \
                    .orderBy('genre')

In [81]:
genre_movies.show()

+------------------+--------------------+
|             genre|            MovieIds|
+------------------+--------------------+
|(no genres listed)|114335,122888,122...|
|            Action|6,9,10,15,20,23,4...|
|         Adventure|1,2,8,10,13,15,29...|
|         Animation|1,13,48,239,313,3...|
|          Children|1,2,8,13,27,34,38...|
|            Comedy|1,3,4,5,7,11,12,1...|
|             Crime|6,16,20,21,22,23,...|
|       Documentary|77,99,108,116,128...|
|             Drama|4,11,14,16,17,20,...|
|           Fantasy|1,2,29,44,60,126,...|
|         Film-Noir|164,320,347,913,9...|
|            Horror|12,22,70,92,93,15...|
|              IMAX|150,364,595,1797,...|
|           Musical|48,107,199,242,34...|
|           Mystery|22,29,32,47,50,10...|
|           Romance|3,4,7,11,15,17,25...|
|            Sci-Fi|24,29,32,66,76,10...|
|          Thriller|6,10,20,21,22,23,...|
|               War|41,73,110,151,155...|
|           Western|163,210,266,303,3...|
+------------------+--------------

In [82]:
# use case
# number of movies in the dataset
distinct_movie = dfmovies.select("movieId").distinct().count()
print('{} movies in the movies dataset'.format(distinct_movie))

9742 movies in the movies dataset


In [83]:
# number of users in the dataset
distinct_user = dfratings.select("userId").distinct().count()
print('{} users rated the movies'.format(distinct_user))

610 users rated the movies


In [84]:
# number of movies receiving awards
distinct_award = dfawards3.select("film", "year").distinct().count()
print('{} movies received awards'.format(distinct_award))

1316 movies received awards


In [85]:
# show movies receiving more than 10 awards
dfawards3.where(dfawards3.awards > 10).show(truncate = False)

+-----------+------------------------------------------------+----+------+
|nominations|film                                            |year|awards|
+-----------+------------------------------------------------+----+------+
|11         |The Lord of the Rings: The Return of the King   |2003|11.0  |
|14         |Titanic                                         |1997|11.0  |
|12         |Ben-Hur                                         |1959|11.0  |
+-----------+------------------------------------------------+----+------+



In [86]:
# total awards that movie received
awards_cnt = dfawards3.groupBy("film", "year").agg(F.sum("awards").alias('cnt')).orderBy(desc('cnt'))

In [87]:
awards_cnt.show(truncate = False)

+------------------------------------------------+----+----+
|film                                            |year|cnt |
+------------------------------------------------+----+----+
|Titanic                                         |1997|11.0|
|Ben-Hur                                         |1959|11.0|
|The Lord of the Rings: The Return of the King   |2003|11.0|
|West Side Story                                 |1961|10.0|
|The English Patient                             |1996|9.0 |
|The Last Emperor                                |1987|9.0 |
|Gigi                                            |1958|9.0 |
|From Here to Eternity                           |1953|8.0 |
|Cabaret                                         |1972|8.0 |
|Gandhi                                          |1982|8.0 |
|On the Waterfront                               |1954|8.0 |
|Amadeus                                         |1984|8.0 |
|My Fair Lady                                    |1964|8.0 |
|Slumdog Millionaire    

In [88]:
# Minimum number of ratings per user
# Minimum number of ratings per movie 
tmp1 = dfratings.groupBy("userID").count().toPandas()['count'].min()
tmp2 = dfratings.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [89]:
# count number of movies in each genre
# The top three genres are drama, comedy, and thriller
df2=dfmovies2.groupBy("genre").count().filter(trim(dfmovies2.genre) != '(no genres listed)').sort(desc('count'))
df2.show(truncate = False)

+-----------+-----+
|genre      |count|
+-----------+-----+
|Drama      |4361 |
|Comedy     |3756 |
|Thriller   |1894 |
|Action     |1828 |
|Romance    |1596 |
|Adventure  |1263 |
|Crime      |1199 |
|Sci-Fi     |980  |
|Horror     |978  |
|Fantasy    |779  |
|Children   |664  |
|Animation  |611  |
|Mystery    |573  |
|Documentary|440  |
|War        |382  |
|Musical    |334  |
|Western    |167  |
|IMAX       |158  |
|Film-Noir  |87   |
+-----------+-----+



In [90]:
dfratings.createOrReplaceTempView("ratings")     #userId, movieId, rating, rate_time, year
dfmovies.createOrReplaceTempView("movies")       #movieId, title, genre
dftags.createOrReplaceTempView("tags")           #userId, movieId, tag, tag_time, year
dfawards3.createOrReplaceTempView("awards")      #nominations, film, year, awards
dfgenre.createOrReplaceTempView("genres")        #genreId, genre, movieId

In [91]:
# Split title and release year in separate columns     
movies = spark.sql("select movieId, substr(title, 0, length(title)-7) as title, substr(title, -5, 4) as year from movies")
movies.show()
movies.createOrReplaceTempView("movies") 

+-------+--------------------+----+
|movieId|               title|year|
+-------+--------------------+----+
|      1|           Toy Story|1995|
|      2|             Jumanji|1995|
|      3|    Grumpier Old Men|1995|
|      4|   Waiting to Exhale|1995|
|      5|Father of the Bri...|1995|
|      6|                Heat|1995|
|      7|             Sabrina|1995|
|      8|        Tom and Huck|1995|
|      9|        Sudden Death|1995|
|     10|           GoldenEye|1995|
|     11|American Presiden...|1995|
|     12|Dracula: Dead and...|1995|
|     13|               Balto|1995|
|     14|               Nixon|1995|
|     15|    Cutthroat Island|1995|
|     16|              Casino|1995|
|     17|Sense and Sensibi...|1995|
|     18|          Four Rooms|1995|
|     19|Ace Ventura: When...|1995|
|     20|         Money Train|1995|
+-------+--------------------+----+
only showing top 20 rows



In [92]:
# year of movies in the dataset
spark.sql("""select 
             min(year) as min_year,
             max(year) as max_year
             from movies 
             where year > 0
""").show()

+--------+--------+
|min_year|max_year|
+--------+--------+
|    1902|    2018|
+--------+--------+



In [93]:
# year of rating in the dataset
spark.sql("""select 
             min(year) as min_year,
             max(year) as max_year
             from ratings
""").show()

+--------+--------+
|min_year|max_year|
+--------+--------+
|    1996|    2018|
+--------+--------+



In [94]:
# year of awards in the dataset
spark.sql("""select 
             min(year) as min_year,
             max(year) as max_year
             from awards
""").show()

+--------+--------+
|min_year|max_year|
+--------+--------+
|    1927|    2019|
+--------+--------+



In [95]:
# number of movies not rated
spark.sql("""select 
          count(distinct movies.movieId)
          from movies 
          where movies.movieId not in
          (select distinct ratings.movieId from ratings)
          """).show()

+-----------------------+
|count(DISTINCT movieId)|
+-----------------------+
|                     18|
+-----------------------+



In [96]:
# number of movies rated and receiving awards
# 474 movies receiving awards and shown in ratings dataset
spark.sql("""select count(distinct movieId) as in_ratings from 
          (select distinct a.film, a.year, m.movieId as movieId
          from awards as a inner join movies as m on trim(a.film) == trim(m.title) and a.year = m.year
          where a.year > 0 and m.year > 0) t
          where movieId in 
          (select distinct ratings.movieId from ratings)
          """).show()

+----------+
|in_ratings|
+----------+
|       474|
+----------+



In [97]:
# the top 5 movies with high ratings
avg_rating = spark.sql("""select distinct
    m.title as title,
    m.year as year,
    sum(case when r.rating >= 0 then 1 else 0 end) as num_rating,
    avg(r.rating) as avg_rating
    from movies as m inner join ratings as r on m.movieId = r.movieId
    group by m.title, m.year
    order by avg_rating desc
""")
avg_rating.show(5)
avg_rating.createOrReplaceTempView("avg_rating") 

+--------------------+----+----------+----------+
|               title|year|num_rating|avg_rating|
+--------------------+----+----------+----------+
|SORI: Voice from ...|2016|         1|       5.0|
|National Lampoon'...|2007|         1|       5.0|
|      Blue Planet II|2017|         1|       5.0|
|                9/11|2002|         1|       5.0|
|Sun Alley (Sonnen...|1999|         1|       5.0|
+--------------------+----+----------+----------+
only showing top 5 rows



In [98]:
# total awards for each movie
tot_awards = spark.sql("""select distinct
                    film,
                    year,
                    sum(awards) as tot_awards
                    from awards
                    group by film, year
                    order by tot_awards desc
""")
tot_awards.show(5)
tot_awards.createOrReplaceTempView("tot_awards") 

+--------------------+----+----------+
|                film|year|tot_awards|
+--------------------+----+----------+
|The Lord of the R...|2003|      11.0|
|            Titanic |1997|      11.0|
|            Ben-Hur |1959|      11.0|
|    West Side Story |1961|      10.0|
|The Last Emperor    |1987|       9.0|
+--------------------+----+----------+
only showing top 5 rows



In [99]:
# the average rating scores of movies with awards
movie_awards_rating = spark.sql("""select distinct
             a.film,
             a.year,
             a.tot_awards,
             r.avg_rating
             from tot_awards as a inner join avg_rating as r on trim(a.film) == trim(r.title) and a.year == r.year
             where a.year > 0 and r.year > 0
             order by tot_awards desc, avg_rating desc
""")
movie_awards_rating.show(truncate = False)
movie_awards_rating.createOrReplaceTempView("movie_awards_rating") 

+------------------------+----+----------+------------------+
|film                    |year|tot_awards|avg_rating        |
+------------------------+----+----------+------------------+
|Ben-Hur                 |1959|11.0      |3.9411764705882355|
|Titanic                 |1997|11.0      |3.414285714285714 |
|West Side Story         |1961|10.0      |3.6029411764705883|
|Gigi                    |1958|9.0       |3.25              |
|On the Waterfront       |1954|8.0       |4.1875            |
|Amadeus                 |1984|8.0       |4.184210526315789 |
|My Fair Lady            |1964|8.0       |4.042857142857143 |
|From Here to Eternity   |1953|8.0       |3.9545454545454546|
|Gandhi                  |1982|8.0       |3.8333333333333335|
|Slumdog Millionaire     |2008|8.0       |3.8098591549295775|
|Gone with the Wind      |1939|8.0       |3.6444444444444444|
|Cabaret                 |1972|8.0       |3.0               |
|Lawrence of Arabia      |1962|7.0       |4.3               |
|Schindl

In [100]:
spark.sql("select count(*) from tot_awards").show()
spark.sql("select count(*) from avg_rating").show()
spark.sql("select count(*) from movie_awards_rating").show()

+--------+
|count(1)|
+--------+
|    1316|
+--------+

+--------+
|count(1)|
+--------+
|    9719|
+--------+

+--------+
|count(1)|
+--------+
|     473|
+--------+



### 4.4 Create the data model using Redshift
Build the data pipelines to create the data model.

#### Extract parquet data from S3 and transform into fact and dimension tables

In [123]:
s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )

s3bucket =  s3.Bucket("udacity-input") # private

s3_data = iter(s3bucket.objects.filter(Prefix="ml-latest-small/"))
for _ in range(5): print(next(s3_data))


s3.ObjectSummary(bucket_name='udacity-input', key='ml-latest-small/')
s3.ObjectSummary(bucket_name='udacity-input', key='ml-latest-small/Award_corrected.txt')
s3.ObjectSummary(bucket_name='udacity-input', key='ml-latest-small/Awards.txt')
s3.ObjectSummary(bucket_name='udacity-input', key='ml-latest-small/README.txt')
s3.ObjectSummary(bucket_name='udacity-input', key='ml-latest-small/links.csv')


In [124]:
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [125]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://dwhadmin:Pika1324_@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev


'Connected: dwhadmin@dev'

#### copy data from s3 to redshift

In [208]:
%%time

qry = """
    copy dimRatings from 's3://sparkifydend/movies/ratings/' 
    credentials 'aws_iam_role={}'
    FORMAT AS PARQUET;
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
Done.
CPU times: user 4.39 ms, sys: 0 ns, total: 4.39 ms
Wall time: 912 ms


In [209]:
%sql select * from dimRatings limit 5;

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


userid,movieid,rating,rate_time,year
1,1,4.0,2000-07-30 18:45:03,2000
1,151,5.0,2000-07-30 19:07:21,2000
1,296,3.0,2000-07-30 18:49:27,2000
1,441,4.0,2000-07-30 18:14:28,2000
1,590,4.0,2000-07-30 18:42:26,2000


In [210]:
%sql select count(*) from dimRatings;

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
100836


#### check the stl_load_errors table

In [211]:
%%sql
select query, substring(filename,22,25) as filename,line_number as line, 
substring(colname,0,12) as column, type, position as pos, substring(raw_line,0,30) as line_text,
substring(raw_field_value,0,15) as field_text, 
substring(err_reason,0,45) as reason
from stl_load_errors 
order by query desc
limit 10;

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
10 rows affected.


query,filename,line,column,type,pos,line_text,field_text,reason
3548,-latest-small/ratings.csv,2,rate_time,timestamp,8,"1,1,4.0,964982703",964982703,Invalid timestamp format or value [YYYY-MM-D
3227,-latest-small/ratings.csv,2,rate_time,timestamp,8,"1,1,4.0,964982703",964982703,Invalid timestamp format or value [YYYY-MM-D
3115,-latest-small/ratings.csv,2,rate_time,timestamp,8,"1,1,4.0,964982703",964982703,Invalid data
3095,-latest-small/ratings.csv,2,rate_time,timestamp,8,"1,1,4.0,964982703",964982703,Invalid timestamp format or value [YYYY-MM-D
3080,-latest-small/ratings.csv,2,rate_time,timestamp,8,"1,1,4.0,964982703",964982703,Invalid timestamp format or value [YYYY-MM-D
3058,-latest-small/ratings.csv,2,rate_time,timestamp,8,"1,1,4.0,964982703",964982703,Invalid data
3020,-latest-small/ratings.csv,2,rate_time,timestamp,8,"1,1,4.0,964982703",964982703,Invalid timestamp format or value [YYYY-MM-D
2975,-latest-small/ratings.csv,2,rate_time,timestamp,8,"1,1,4.0,964982703",964982703,Invalid data
2900,-latest-small/ratings.csv,2,rate_time,timestamp,8,"1,1,4.0,964982703",964982703,Invalid timestamp format or value [YYYY-MM-D
2880,-latest-small/ratings.csv,2,rate_time,timestamp,8,"1,1,4.0,964982703",964982703,Invalid timestamp format or value [YYYY-MM-D


In [212]:
%%time

qry = """
    copy dimAwards3 from 's3://sparkifydend/movies/awards3/' 
    credentials 'aws_iam_role={}' 
    FORMAT AS PARQUET;
""".format(DWH_ROLE_ARN)

%sql $qry

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
Done.
CPU times: user 4.62 ms, sys: 0 ns, total: 4.62 ms
Wall time: 1.2 s


In [213]:
%sql select * from dimAwards3 limit 5;

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


nominations,film,year,awards
7,Sons and Lovers,1960,1.0
1,Day of the Painter,1960,1.0
3,The Empire Strikes Back,1980,1.0
8,Raging Bull,1980,2.0
6,Mystic River,2003,2.0


In [214]:
%sql select count(*) from dimAwards3;

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
1316


In [215]:
%%time

qry = """
    copy dimGenres from 's3://sparkifydend/movies/genres/' 
    credentials 'aws_iam_role={}' 
    FORMAT AS PARQUET;
""".format(DWH_ROLE_ARN)
 
%sql $qry

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
Done.
CPU times: user 4.88 ms, sys: 280 µs, total: 5.16 ms
Wall time: 1.71 s


In [216]:
%sql select * from dimGenres limit 5;

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


movieid,genre,genreid
869,Drama,1614907703302
2164,Horror,1614907703310
3052,Comedy,1614907703318
4224,Thriller,1614907703326
5476,Horror,1614907703334


In [217]:
%sql select count(*) from dimGenres;

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
22084


In [218]:
%%sql
INSERT INTO dimDate (date_key, year, month, day, week)
SELECT DISTINCT(rate_time)                                       AS date_key,
       EXTRACT(year FROM rate_time)                              AS year,
       EXTRACT(month FROM rate_time)                             AS month,
       EXTRACT(day FROM rate_time)                               AS day,
       EXTRACT(week FROM rate_time)                              AS week
FROM dimRatings;

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
85043 rows affected.


[]

In [219]:
%sql select * from dimDate limit 5;

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


date_key,year,month,day,week
2000-07-30 18:56:33,2000,7,30,30
2000-07-26 14:46:26,2000,7,26,30
1996-10-17 12:51:34,1996,10,17,42
1996-06-22 10:59:28,1996,6,22,25
2016-09-27 18:31:15,2016,9,27,39


In [220]:
%sql select count(*) from dimDate;

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
85043


In [222]:
%%time

qry = """
    copy dimmovies0 from 's3://sparkifydend/movies/movies/' 
    credentials 'aws_iam_role={}' 
    FORMAT AS PARQUET;
""".format(DWH_ROLE_ARN)
 
%sql $qry

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
Done.
CPU times: user 6.26 ms, sys: 0 ns, total: 6.26 ms
Wall time: 11.3 s


In [223]:
%%sql
INSERT INTO dimMovies (movieId, title, year)
SELECT movieId                                                      AS movieId,
       substring(title, 0, length(title)-6)                         AS title, 
       substring(title, length(title)-4, 4)                         AS year
FROM dimMovies0

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
9742 rows affected.


[]

In [224]:
%sql select * from dimMovies limit 5;

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


movieid,title,year
47,Seven (a.k.a. Se7en),1995
123,Chungking Express (Chung Hing sam lam),1994
203,"To Wong Foo, Thanks for Everything! Julie Newmar",1995
273,Mary Shelley's Frankenstein (Frankenstein),1994
341,Double Happiness,1994


In [225]:
%sql select count(*) from dimMovies;

 * postgresql://dwhadmin:***@dwhcluster.cdqvmxgabrab.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
9742


# Summary
* In this project, I implemented two methods to read data from S3 by Spark and Redshift. After loading data from S3 using Spark, I did data quality check and data cleaning using Spark DF and Spark SQL. Then uploaded table to S3 in parquet format.
* Amazon S3 is selected as the data lake tool to store the raw csv and parquet staging data before the data is uploaded to the Amazon Redshift data warehouse. 
* Parquet is selected as the data format for the staging data in S3 because it is in columnar storage and minimizes latency, thus allowing a more efficient data retrieval and processing.
* Apache Spark as a distributed data processing framework allows us to efficiently load and transform huge datasets from the raw datasource to the S3 data lake and load to the Redshift data warehouse.
* I also created fact and dimension tables after reading parquet format data from S3 into redshift. When reading parquet data, the data type must match between parquet data and tables to be inserted.
* The data should be updated based on the MovieLens datasets.
* How I would approach the problem differently under the following scenarios:
 * The data was increased by 100x. 
   - Writing data by partitions to S3 and distributing data to different nodes in redshift by distkey and sortkey. Writing data by partitions in s3 can improve the speed a lot. Redshift is a cloud data warehouse that is optimized for aggregation and read-heavy workloads.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
   - Using Airflow to do the management. Creating Airflow allowed us to programmatically schedule our workflows and monitor them via the built-in Airflow user interface.
 * The database needed to be accessed by 100+ people.
   - Amazon Redshift, in which this data model is hosted, allows up to 500 concurrent users accessing the database.
   - Users can connect to the data model with Amazon QuickSight to create dashboards and analyze the dataset.
   - We can also manage user access and permission with the AWS IAM, so that we can control which users can access which dashboards and the underlying dataset.