# Data Lake for Movie Ratings
### Data Engineering Capstone Project

#### Project Summary
In this project we build a Data Lake on Amazon S3. We will use movie ratings data to illustrate the core concepts.
The raw data will be stored in S3. Afterwards, we use Spark as an ETL tool to transform the raw data into a dimensional schema. AWS services like Glue are used to build a metadata repository. Based on the dimensional schema, one could use tools like AWS Athena to submit analytical SQL queries to answer questions like ‘how many users have watched a given movie’.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Build Different Layers of the Data Lake and Ingest Data
* Step 3: Design the ETL Process to Transform and Load Data to Cleanse
* Step 4: Define the Data Model for Analytical Queries and the Access Layer
* Step 5: Assess Data Quality
* Step 6: Build Metadata Repository
* Step 7: Complete Project Write Up

In [1]:
# dependencies
import boto3
import pandas as pd
import configparser
import os
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Step 1: Scope the Project and Gather Data

#### Scope 


The data we use for this project is inspired from the MovieLens 10M Dataset which is available here: https://grouplens.org/datasets/movielens/. We also use synthetically generated masterdata about users. To be more precise, we use the following datasets:
- Movies: Data about movies which is part of the MovieLens10M Dataset. It contains information about movies in a csv format where the delimiter is given by ‘::’. Here are two sample records:

    1::Toy Story (1995)::Adventure|Animation|Children|Comedy|Fantasy

    2::Jumanji (1995)::Adventure|Children|Fantasy

    The dataset does not include a header. The first column is the movie id, the second column the title of the movie which includes also the year of the movie and the last columns contains a list of genres where the items are separated by ‘|’. Since we will be interested in queries like e.g. ‘which movies have a publication date larger than 1998’, it is crucial to transform the data in such a way, that the date is contained in a separate column.


- Ratings: Data about user ratings of the movies contained in the Movies dataset. Below are also two sample records of this dataset:

    1::122::5::838985046
    
    1::185::5::838983525

    The Ratings dataset is also in a csv format without a header. The first column is a user id, the second one is a movie id referring to the Movies dataset while the third column contains the rating on a scale from 1 to 5. The last column contains a Unix timestamp. Since we will be interested in answering questions like ‘at which time of a day the most ratings are submitted’, we need to transform the timestamp into a more readable format.
    

- User Masterdata: the last dataset is synthetically generated and contains masterdata about the users which perform the ratings. It is based on the user ids contained in the ratings dataset. To each user id, a first name and last name is randomly generated using the python library names. The dataset is in JSON format. We could imagine that this data is coming from a masterdata management system.


The raw data will be transformed in several steps into an analytical dataset which has the form of a star schema. This dataset can be use in conjunction with tools like e.g. Amazon Athena to submit SQL queries in order to answer questions like ‘which movies has a particular user watched’ or ‘what is the average rating for a given movie’ etc. The data lake will be based on Amazon S3 as a cheap storage solution. We can store the raw datasets as objects in S3 and there is no need for the raw dataset to be in a structured form. The data lake will consist of 3 layers:
- Staging
- Cleanse
- Access

The staging layer will store the raw data in csv, resp. JSON format while the cleanse layer will contain a cleansed version of the raw data. For example, data in the cleanse layer will include data types. Finally, the access layer stores the data in way suitable for end users to submit analytical SQL queries to answer questions of interest. It could also be used by reporting tools (e.g. Amazon QuickSight) to build dashboards for end users. Data in the access layer will be stored in parquet format (i.e. a columnar storage format optimized for analytical queries) We will use the python boto3 library to create the different layers of the data lake following the infrastructure as code (IaC) paradigm. In that way, we can easily build different environments for e.g. development, testing and production. Further, we use Spark as ETL tool, i.e. all transformations on the dataset will be performed using Spark. Since Spark can run on an Amazon EMR cluster, we are able to scale if the data volume increases. AWS Glue Crawler will be used to create a metadata store for better data management.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

Read the AWS key and secret key from the config file dl.cfg to be able to access AWS cloud resources. A template for the file dl.cfg is contained in the GitHub repository.

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

aws_key = config['AWS']['AWS_ACCESS_KEY_ID']
aws_secret_key = config['AWS']['AWS_SECRET_ACCESS_KEY']

os.environ['AWS_ACCESS_KEY_ID'] = aws_key
os.environ['AWS_SECRET_ACCESS_KEY'] = aws_secret_key

# Step 2:  Build Different Layers of the Data Lake and Ingest Data

First, we use boto3 to create a S3 client:

In [33]:
s3 = boto3.client('s3', region_name='us-east-1', 
                         aws_access_key_id=aws_key, 
                         aws_secret_access_key=aws_secret_key)

## Staging Layer

Now, we use the S3 client to create a S3 bucket with the name data-lake-staging. This bucket will contain the data in its raw format

In [28]:
s3.create_bucket(Bucket='data-lake-staging')

{'ResponseMetadata': {'RequestId': '24E71771C6865E35',
  'HostId': 'a/1g71aJXuxDl3Qpyu6YQadOo6a+V9roQ4A6ymhAHMeEMJ5UbXK3U1VvE98FoF/Or4pcR3RDP8k=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'a/1g71aJXuxDl3Qpyu6YQadOo6a+V9roQ4A6ymhAHMeEMJ5UbXK3U1VvE98FoF/Or4pcR3RDP8k=',
   'x-amz-request-id': '24E71771C6865E35',
   'date': 'Mon, 06 Jan 2020 11:25:16 GMT',
   'location': '/data-lake-staging',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Location': '/data-lake-staging'}

Next, we ingest the raw data into the staging layer. We start with the user masterdata. The masterdata will be stored in the staging bucket under the key 'masterdata':

In [38]:
s3.upload_file(Bucket='data-lake-staging',
              # Set filename and key
               Filename=os.path.abspath(r"..\data\users_masterdata.json"), 
               Key='masterdata/user_masterdata.json')

Afterwards, we ingest the ratings data into the staging layer under the key 'ratings'

In [41]:
s3.upload_file(Bucket='data-lake-staging',
              # Set filename and key
               Filename=os.path.abspath(r"..\data\ratings.dat"), 
               Key='ratings/ratings.dat')

Finally, the movies data will be ingested into the staging layer under the key 'movies':

In [42]:
s3.upload_file(Bucket='data-lake-staging',
              # Set filename and key
               Filename=os.path.abspath(r"..\data\movies.dat"), 
               Key='movies/movies.dat')

## Cleanse Layer

Now we create the cleanse layer in terms of another S3 bucket:

In [43]:
s3.create_bucket(Bucket='data-lake-cleanse')

{'ResponseMetadata': {'RequestId': '35C822E32FF5A980',
  'HostId': 'np8ZpX4xdDp6U2E+mHLfseAAMQTISa0fzjFrrXxWrexgSmHciZLwMoz8c+OgvCUXJ6Sw3AAg/jY=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'np8ZpX4xdDp6U2E+mHLfseAAMQTISa0fzjFrrXxWrexgSmHciZLwMoz8c+OgvCUXJ6Sw3AAg/jY=',
   'x-amz-request-id': '35C822E32FF5A980',
   'date': 'Mon, 06 Jan 2020 12:09:07 GMT',
   'location': '/data-lake-cleanse',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Location': '/data-lake-cleanse'}

# Step 3: Design the ETL Process to Transform and Load Data to Cleanse
In this section, we will use Apache Spark as ETL tool to transform and load the data into the cleanse layer. First, we need to create a Spark session:

In [35]:
def create_spark_session():

    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    
    return spark

In [36]:
spark=create_spark_session()

### Load Masterdata into Cleanse
We start with the loading of the user masterdata into the cleanse layer. To this end, the following steps are performed:
- extract the raw data from the staging layer
- transform the data using Spark
- load data into the cleanse layer

Extract the masterdata from the stage layer: 

In [7]:
user_masterdata = spark.read.json(r"s3a://data-lake-staging/masterdata/user_masterdata.json")

Let us create a temporary table to be able to use Spark's SQL API

In [8]:
user_masterdata.createOrReplaceTempView('user_masterdata')

Let us also do some first data quality checks: check for null values in user_id column

In [9]:
spark.sql("""
    SELECT
        COUNT(1) AS cnt
    FROM
        user_masterdata
    WHERE
        user_id IS NULL""").show()

+---+
|cnt|
+---+
|  0|
+---+



It seems that there are no null values in the user id column which is good. But since that could potentially change in the future (for example if the user data is extracted on a daily basis from a masterdata management system), we will filter null values out before we load the data to the cleanse layer. Further, the user id is of data type string in the raw data set athough it is a number. Let us convert it as well:

In [10]:
user_masterdata_cleansed = spark.sql("""
    SELECT
        DISTINCT CAST(user_id as int) as user_id,
        firts_name AS firsts_name,
        last_name
    FROM
        user_masterdata
    WHERE
        user_id IS NOT NULL""")

Now we can load the cleansed data (i.e. only users which have a user Id) into the cleansed layer:

In [None]:
user_masterdata_cleansed.write.json(r"s3a://data-lake-cleanse/masterdata/user")

In [64]:
user_masterdata_cleansed.toPandas().to_json(orient='records')

'[{"user_id":169,"firsts_name":"Claire","last_name":"Wiggins"},{"user_id":215,"firsts_name":"Cheryl","last_name":"Achenbach"},{"user_id":280,"firsts_name":"Desire","last_name":"Young"},{"user_id":746,"firsts_name":"Robert","last_name":"Davis"},{"user_id":828,"firsts_name":"Kevin","last_name":"Jackson"},{"user_id":1095,"firsts_name":"Ken","last_name":"Johnson"},{"user_id":1968,"firsts_name":"Michael","last_name":"Banuelos"},{"user_id":2339,"firsts_name":"Willie","last_name":"Barre"},{"user_id":2459,"firsts_name":"Doris","last_name":"Grudem"},{"user_id":2549,"firsts_name":"Chester","last_name":"Starks"},{"user_id":2851,"firsts_name":"William","last_name":"Talamo"},{"user_id":2974,"firsts_name":"Shirley","last_name":"Orourke"},{"user_id":3514,"firsts_name":"Ray","last_name":"Cassidy"},{"user_id":3528,"firsts_name":"Preston","last_name":"Pipkin"},{"user_id":4101,"firsts_name":"Jessica","last_name":"Baker"},{"user_id":4222,"firsts_name":"Robert","last_name":"Channell"},{"user_id":4225,"firs

### Load Movies into Cleanse
We proceed now with the cleansing of the movie data. We perform the same steps as in the previous section. First, we extract the data from the staging area:

In [208]:
movies_raw = spark.read.csv(r"s3a://data-lake-staging/movies/movies.dat", header=False)

Again, we create a temporary Spark table:

In [209]:
movies_raw.createOrReplaceTempView('movies_raw')

Let us inspect ans assess the first records:

In [210]:
spark.sql("""
    SELECT
        *
    FROM
        movies_raw
""").toPandas().head(12)

Unnamed: 0,_c0
0,1::Toy Story (1995)::Adventure|Animation|Child...
1,2::Jumanji (1995)::Adventure|Children|Fantasy
2,3::Grumpier Old Men (1995)::Comedy|Romance
3,4::Waiting to Exhale (1995)::Comedy|Drama|Romance
4,5::Father of the Bride Part II (1995)::Comedy
5,6::Heat (1995)::Action|Crime|Thriller
6,7::Sabrina (1995)::Comedy|Romance
7,8::Tom and Huck (1995)::Adventure|Children
8,9::Sudden Death (1995)::Action
9,10::GoldenEye (1995)::Action|Adventure|Thriller


Please note that the expression spark.read.csv does not allow to specify to delimiter which consists of more than one character. Since in our case the delimiter is ‘::’, i.e. it consists of two characters, all fields have been loaded into a single columns. We now need to bring the data into a reasonable form. To that end, we define some user defined functions (udf). First, let us try to separate the title and the puplication year:

In [211]:
def get_year(title):
    try:
        year = int(title[-5:-1])
        return year
    except:
        return 9999

In [220]:
def get_title(title):
    split = title.split('(')
    if len(split) == 2:
        title=split[0].strip()
        return title
    else:
        return split[0]

In [213]:
get_title = udf(get_title)
spark.udf.register('get_title', get_title)

<function __main__.get_title(title)>

In [214]:
get_year = udf(get_year, IntegerType())
spark.udf.register('get_year', get_year)

<function __main__.get_year(title)>

With the above defined udf’s, we can now cleanse the data in terms of a SQL query. We cast the movie id to an integer (it is a string in the raw dataset), separate the title and the publication year and indicate if a genre is available or not:

In [228]:
movies_cleansed = spark.sql("""
    SELECT
        CAST(split(_c0,'::')[0] as int) as movie_id,
        get_title(split(_c0,'::')[1]) as title,
        get_year(split(_c0,'::')[1]) as year,
        CASE
            WHEN
                size(split(_c0,'::')) = 3 THEN split(_c0,'::')[2]
            ELSE
                'no genre available'
        END as genre
    FROM
        movies_raw
""")

Now we can load the cleansed data into the cleanse layer as csv file with a more common delimiter:

In [None]:
movies_cleansed.repartition(1).write.csv(r"s3a://data-lake-cleanse/movies/", header=True, sep=';')

## Load Ratings into Cleanse
Let us clean the ratings in the same way as the movies:

In [229]:
ratings_raw = spark.read.csv(r"s3a://data-lake-staging/ratings/ratings.dat", header=False)

In [230]:
ratings_raw.createOrReplaceTempView('ratings_raw')

In [231]:
spark.sql("""
    SELECT
        *
    FROM
        ratings_raw
""").show()

+--------------------+
|                 _c0|
+--------------------+
|1::122::5::838985046|
|1::185::5::838983525|
|1::231::5::838983392|
|1::292::5::838983421|
|1::316::5::838983392|
|1::329::5::838983392|
|1::355::5::838984474|
|1::356::5::838983653|
|1::362::5::838984885|
|1::364::5::838983707|
|1::370::5::838984596|
|1::377::5::838983834|
|1::420::5::838983834|
|1::466::5::838984679|
|1::480::5::838983653|
|1::520::5::838984679|
|1::539::5::838984068|
|1::586::5::838984068|
|1::588::5::838983339|
|1::589::5::838983778|
+--------------------+
only showing top 20 rows



We chose the right data types for the respective columns:

In [245]:
ratings_cleansed = spark.sql("""
    SELECT
        CAST(split(_c0,'::')[0] as int) as user_id,
        CAST(split(_c0,'::')[1] as int) as movie_id,
        CAST(split(_c0,'::')[2] as float) as rating,
        CAST(from_unixtime(split(_c0,'::')[3],'YYYY-MM-dd') as date) as date
    FROM
        ratings_raw
""")

In [246]:
ratings_cleansed.createOrReplaceTempView('ratings_cleansed')

Let us do another data quality check which checks for duplicates:

In [248]:
spark.sql("""
    SELECT
        user_id,
        movie_id,
        count(1) as cnt
    FROM
        ratings_cleansed
    GROUP BY user_id, movie_id
    HAVING
        count(1) > 1""").show()

+-------+--------+---+
|user_id|movie_id|cnt|
+-------+--------+---+
+-------+--------+---+



Finally, we load the cleansed data to the clenased layer:

In [None]:
ratings_cleansed.repartition(1).write.csv(r"s3a://data-lake-cleanse/ratings/", header=True, sep=';')

# Step 4: Define the Data Model for Analytical Queries and the Access Layer
The access layer will also be realized in terms of an S3 bucket. For performance reasons, we store the data in the access layer in a columnar format (parquet). This allows to access the data via analytical queries in a performant way using Spark or tools like AWS Athena (although we need a metadata store to use Athena which we will create later).

In [13]:
s3.create_bucket(Bucket='data-lake-access-layer')

{'ResponseMetadata': {'RequestId': '06A0142DC051BE0C',
  'HostId': 'a+bCm8lg9lMEXSAMGmHeL4/lOrVz4R7DfcDWxSsIXK4ZThN4kUxaMa9CW7aYn4b6MdvC+zJtwK8=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'a+bCm8lg9lMEXSAMGmHeL4/lOrVz4R7DfcDWxSsIXK4ZThN4kUxaMa9CW7aYn4b6MdvC+zJtwK8=',
   'x-amz-request-id': '06A0142DC051BE0C',
   'date': 'Tue, 07 Jan 2020 19:42:25 GMT',
   'location': '/data-lake-access-layer',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Location': '/data-lake-access-layer'}

## Definition of the Data Model
The data model in the access layer will be in a dimensional form and it consist of one fact table and two dimension tables:
- Fact ratings
- Dimension User
- Dimension Movies

By choosing a dimensional schema we allow users to access the data in a convenient way which is suitable for answering common business questions.


### Create and Load the User Dimension
Let's have a quick look at the user data in the cleanse layer:

In [20]:
s3.list_objects(Bucket='data-lake-cleanse', Prefix='masterdata/user/part')['Contents']

[{'Key': 'masterdata/user/part-00000-56d4ae59-701c-4f1f-a1dd-3dd04af730e6-c000.json',
  'LastModified': datetime.datetime(2020, 1, 7, 19, 50, 7, tzinfo=tzutc()),
  'ETag': '"1f748ad7ea20f7d58a4e065d443ac239"',
  'Size': 20877,
  'StorageClass': 'STANDARD',
  'Owner': {'DisplayName': 'christoph.berns',
   'ID': 'db8cc5d95825d0c3dc2a32c1c7c84e43928016494aa65304939002f13e741856'}},
 {'Key': 'masterdata/user/part-00001-56d4ae59-701c-4f1f-a1dd-3dd04af730e6-c000.json',
  'LastModified': datetime.datetime(2020, 1, 7, 19, 50, 9, tzinfo=tzutc()),
  'ETag': '"63e60ad76332b63132f28d4a628835a7"',
  'Size': 22442,
  'StorageClass': 'STANDARD',
  'Owner': {'DisplayName': 'christoph.berns',
   'ID': 'db8cc5d95825d0c3dc2a32c1c7c84e43928016494aa65304939002f13e741856'}},
 {'Key': 'masterdata/user/part-00002-56d4ae59-701c-4f1f-a1dd-3dd04af730e6-c000.json',
  'LastModified': datetime.datetime(2020, 1, 7, 19, 50, 10, tzinfo=tzutc()),
  'ETag': '"16136e511d5dec132ef89e241da18822"',
  'Size': 20416,
  'Stora

In [41]:
dim_user = spark.read.json(r"s3a://data-lake-cleanse/masterdata/user/part*")

In [42]:
dim_user.createOrReplaceTempView('dim_user')

In [43]:
dim_user.printSchema()

root
 |-- firsts_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- user_id: long (nullable = true)



#### Load the User Dimension into the Access Layer
We can load the user data in the cleanse layer without any transformation into the user dimension. We just change the format from JSON to parquet for the reasons described above. The user dimension will be loaded to the access layer under the key 'dim-user':

In [None]:
 dim_user.write.parquet(r"s3a://data-lake-access-layer/ratings-data-mart/dim-user/")

### Create and Load the Movies Dimension
The movies dimension is created in the same way as the user dimension although we cast some columns here. It will be stored in the access layer under the key 'dim-movie':

In [44]:
movies = spark.read.csv(r"s3a://data-lake-cleanse/movies/part*", header=True, sep=';')

In [45]:
movies.createOrReplaceTempView('movies')

In [49]:
dim_movie = spark.sql("""
    SELECT
        CAST(movie_id AS int) AS movie_id,
        title,
        CAST(year as int) as year,
        genre
    FROM
        movies
"""
)

In [None]:
dim_movie.write.parquet(r"s3a://data-lake-access-layer/ratings-data-mart/dim-movie/")

### Create and Load Fact Table
Finally, we crate and load the fact table for the ratings. It contains the user id and the movie id as foreign keys refering to the respective dimensions. Some casting of data types will be performed:

In [50]:
ratings = spark.read.csv(r"s3a://data-lake-cleanse/ratings/part*", header=True, sep=';')

In [51]:
ratings.createOrReplaceTempView('ratings')

In [52]:
ratings.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- movie_id: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- date: string (nullable = true)



In [53]:
fct_rating = spark.sql("""
    SELECT
        CAST(user_id AS int) AS user_id,
        CAST(movie_id AS int) AS movie_id,
        CAST(rating AS float) as rating,
        CAST(date AS date) as date
    FROM
        ratings
"""
)

In [None]:
fct_rating.write.parquet(r"s3a://data-lake-access-layer/ratings-data-mart/fct-rating/")

 # Step 5:  Data Quality Checks 
 We conclude with some data quality checks for referential integrity:

In [None]:
spark.sql("select * from ratings where user_id not in (select distinct user_id from dim_user)").show()

In [None]:
spark.sql("select * from ratings where movie_id not in (select movie_id from movies)").show()

# Step 6: Build Metadata Repository
In this section we use AWS Glue Crawlers to create metadata for each layer in the data lake:

In [11]:
glue = boto3.client('glue', region_name='us-east-1', 
                         aws_access_key_id=aws_key, 
                         aws_secret_access_key=aws_secret_key)

## Crawler Cleanse

In [38]:
glue.create_crawler(
    Name='CleanseCrawler',
    Role='GlueServiceRole',
    DatabaseName='Clense',
    Description='Crawler for generated schemas in cleanse area',
    Targets={
        'S3Targets': [
            {
                'Path': 's3://data-lake-cleanse/masterdata/user',
                'Exclusions': []
                
            },
            {
                'Path': 's3://data-lake-cleanse/movies',
                'Exclusions': []
            },
            {
                'Path': 's3://data-lake-cleanse/ratings',
                'Exclusions': []
            }
        ]
    }
)

{'ResponseMetadata': {'RequestId': '8641ac48-324e-11ea-9675-57b96279da8d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 08 Jan 2020 19:39:03 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': '8641ac48-324e-11ea-9675-57b96279da8d'},
  'RetryAttempts': 0}}

In [40]:
glue.start_crawler(
    Name='CleanseCrawler'
)


{'ResponseMetadata': {'RequestId': '90e388d1-324e-11ea-b7d0-256a56e4a183',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 08 Jan 2020 19:39:31 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': '90e388d1-324e-11ea-b7d0-256a56e4a183'},
  'RetryAttempts': 0}}

## Crawler Stage

In [22]:
glue.create_crawler(
    Name='StageCrawler',
    Role='GlueServiceRole',
    DatabaseName='Stage',
    Description='Crawler for generated schemas in staging area',
    Targets={
        'S3Targets': [
            {
                'Path': 's3://data-lake-staging/masterdata',
                'Exclusions': []
                
            },
            {
                'Path': 's3://data-lake-staging/movies',
                'Exclusions': []
            },
            {
                'Path': 's3://data-lake-staging/ratings',
                'Exclusions': []
            }
        ]
    }
)

{'ResponseMetadata': {'RequestId': '480296f7-3246-11ea-ab59-71e2a3db621f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 08 Jan 2020 18:40:02 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': '480296f7-3246-11ea-ab59-71e2a3db621f'},
  'RetryAttempts': 0}}

In [23]:
glue.start_crawler(
    Name='StageCrawler'
)

{'ResponseMetadata': {'RequestId': '49cccf62-3246-11ea-a0e9-df4d4071153e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 08 Jan 2020 18:40:16 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': '49cccf62-3246-11ea-a0e9-df4d4071153e'},
  'RetryAttempts': 0}}

## Crawler Acces layer

In [57]:
glue.create_crawler(
    Name='AccessLayerCrawler',
    Role='GlueServiceRole',
    DatabaseName='AccessLayer',
    Description='Crawler for generated schemas in access layer',
    Targets={
        'S3Targets': [
            {
                'Path': 's3://data-lake-access-layer/ratings-data-mart/dim-user',
                'Exclusions': []
                
            },
            {
                'Path': 's3://data-lake-access-layer/ratings-data-mart/dim-movie',
                'Exclusions': []
            },
            {
                'Path': 's3://data-lake-access-layer/ratings-data-mart/fct-rating',
                'Exclusions': []
            }
        ]
    }
)

{'ResponseMetadata': {'RequestId': 'f2732cb6-3253-11ea-9878-0f502481f9e6',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 08 Jan 2020 20:17:52 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'f2732cb6-3253-11ea-9878-0f502481f9e6'},
  'RetryAttempts': 0}}

In [58]:
glue.start_crawler(
    Name='AccessLayerCrawler'
)

{'ResponseMetadata': {'RequestId': 'f317aa1c-3253-11ea-b7d9-23bbec2410d6',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 08 Jan 2020 20:18:04 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'f317aa1c-3253-11ea-b7d9-23bbec2410d6'},
  'RetryAttempts': 0}}

The resulting metadata discovered by the crawlers is described in the folder ‘data-dictionary’ in this repository. Please note that the data in the staging layer is completely unstructured. Hence it's metadata will not be included in the data catalog.

# Step 7: Complete Project Write Up

The resulting data in the access layer in parquet format can now be queried by using tools like Spark or Athena. The dimensional data schema is optimal to write queries to answer common business questions in an easy way. We could also transfer the data in the access layer into a relational database. For example, Redshift could be used since it also stores data in a columnar format. Hence, it is optimal for OLAP. 
We could image that the data in the staging layer gets ingested on a daily basis. In that case, we could use Apache Airflow to create a DAG which gets scheduled to run e.g. every day at 7:00 am to refresh the data in the cleanse and access layer.
If the data gets increased 100x, no problems would arise from a storage perspective since we use S3. S3 scales perfectly w.r.t volume. Since we use Spark, from a processing perspective, we can scale as well since we can increase the nodes of the cluster on which Spark runs. 
If we store the final dataset of the access layer in Redshift and if 100+ people need to access the data, we can increase the number of nodes of the Redshift cluster as well. In that way, we establish a load balancer which can direct read requests to nodes which have capacity.  
