<h1 style='font-size:50px'>Project: Data Lake</h1>

# Introduction

A music streaming startup, Sparkify, has grown their user base and song database even more and want to move their data warehouse to a data lake. Their data resides in S3, in a directory of JSON logs on user activity on the app, as well as a directory with JSON metadata on the songs in their app.

As their data engineer, you are tasked with building an ETL pipeline that extracts their data from S3, processes them using Spark, and loads the data back into S3 as a set of dimensional tables. This will allow their analytics team to continue finding insights in what songs their users are listening to.

You'll be able to test your database and ETL pipeline by running queries given to you by the analytics team from Sparkify and compare your results with their expected results.

# Project Description

In this project, you'll apply what you've learned on Spark and data lakes to build an ETL pipeline for a data lake hosted on S3. To complete the project, you will need to load data from S3, process the data into analytics tables using Spark, and load them back into S3. You'll deploy this Spark process on a cluster using AWS.

# Schema for Song Play Analysis

Using the song and log datasets, you'll need to create a star schema optimized for queries on song play analysis. This includes the following tables.

### Fact Table

- **songplays** - records in log data associated with song plays i.e. records with page `NextSong` with table schema
    - songplay_id
    - start_time
    - user_id
    - level
    - song_id
    - artist_id
    - session_id
    - location
    - user_agent

### Dimension Tables

- **users** - users in the app
    - user_id
    - first_name
    - last_name
    - gender 
    - level
    
- **songs** - songs in music database
    - song_id
    - title
    - artist_id
    - year
    - duration

- **artists** - artists in music database
    - artist_id
    - name
    - location
    - lattitude
    - longitude
    
- **time** - timestamps of records in songplays broken down into specific units
    - start_time
    - hour
    - day
    - week
    - month
    - year
    - weekday



# Step 0: Create the EMR cluster

Creating an EMR cluster is a pain using boto3 
Create the EMR cluster using the console with the following options

- Launch mode: cluster ... Use the execution mode when you complete the scripting and development
- Release: 5.26.0
- Applications: Spark: Spark 2.4.3 on Hadoop 2.8.5 YARN with Ganglia 3.7.2 and Zeppelin 0.8.1
- Instance type: m3.xlarge ... using smaller instances for dev
- number of instances: 2

# Step 1: Import dependencies and create a spark session

In [1]:
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql import functions as F
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1567298058798_0001,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


# Step II: Creation Functions to process the song data and log data

The two functions will process the song and log data and save the output in an Amazon S3 bucket that the engineer can specify. These datasets can be used by downstream applications to create a dashboard, or by data scientists to be used in a machine learning training job using Amazon SageMaker.


In [9]:
def process_song_data(spark, input_data, output_data):
    """
    This function can be used to read the files stored in Amazon S3 (/song_data)
    to get the information for the tables songs, artists
    The required variables in the tables are
    Song Table: unique song id (song_id) which is the primary key, the title (title) of the song, 
                the artist id (artist_id) for the artist, 
                year when the song was produced (year) and duration of the song (duration).
    Artist Table: unique artist id (artist_id) which is the primary key, name of the artist (artist_name), 
                  location (location), latitude of the artist location (artist_latitude), longitude of 
                  artists location (artist_longitude).
    
    Arguments:
        spark: the spark session you want to use. 
        input_data: Amazon S3 path for the raw data
        output_data: Amazon S3 path for storing the processed data
    Returns:
        None
    """
    # get filepath to song data file
    song_data = input_data + 'song_data/*/*/*/*.json'
    
    # read song data file
    print('Reading in song data')
    df = spark.read.json(song_data)
    
    print(df)

    # extract columns to create songs table
    songs_table = df[['song_id', 'title', 'artist_id', 'year', 'duration']]
    
    # write songs table to parquet files partitioned by year and artist
    print('Writing in song table')
    songs_table.write.parquet(output_data + "/songs_table")

    # extract columns to create artists table
    print('Extracting artists table')
    artists_table = df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']]
    
    # write artists table to parquet files
    print('Writing in artists table')
    artists_table.write.parquet(output_data+'/artists_table')


def process_log_data(spark, input_data, output_data):
    """
    This function is used to read the files stored in Amazon S3 (data/log_data)
    to get the user and time info and used to populate the users and time dim tables.
                 
    The required variables for the users table and time dim tables are
    Users Table: Unique user id (user_id), first name of the user (first_name), last name (last_name), gender (gender)
                 and subscrption level (level)
                 
    Time dimenension table: time('TimeStamp', 'Hour', 'Day', 'Week', 'Month', 'Year', 'Weekday')
    
    Arguments:
        spark: the spark session you want to use. 
        input_data: Amazon S3 path for the raw data
        output_data: Amazon S3 path for storing the processed data
    Returns:
        None
    """
    # get filepath to log data file
    log_data = input_data + 'log_data/*/*/'

    # read log data file
    print('Reading in log data')
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = df[df['page'] == 'NextSong']

    # extract columns for users table   
    print('Extracting artists table data')
    artists_table = df[['userId', 'firstName', 'lastName', 'gender', 'level']]
    
    # write users table to parquet files
    print('Writing in users table')
    artists_table.write.parquet(output_data+'/users_table')

    # create timestamp column from original timestamp column
    print('Creating time table')
    get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000.0), TimestampType())
    df = df.withColumn('start_time', get_timestamp(df.ts))
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda x: datetime.fromtimestamp(x / 1000.0), TimestampType())
    df = df.withColumn('datetime', get_datetime(df.ts))
    df = df.withColumn('hour', F.hour(df.datetime))
    df = df.withColumn('day', F.dayofmonth(df.datetime))
    df = df.withColumn('week', F.weekofyear(df.datetime))
    df = df.withColumn('month', F.month(df.datetime))
    df = df.withColumn('year', F.year(df.datetime))
    df = df.withColumn('weekday', F.dayofweek(df.datetime))
    
    # extract columns to create time table
    time_table = df[['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday']]
    
    # write time table to parquet files partitioned by year and month
    time_table.write.parquet(output_data+'/timetable')

    # read in song data to use for songplays table
    print('Creating songplays table')
    song_df = spark.read.json(input_data + 'song_data/*/*/*/*.json')

    # extract columns from joined song and log datasets to create songplays table 
    songplays_table = df.join(song_df, song_df.artist_name == df.artist)
    songplays_table = songplays_table.withColumn("songplay_id",F.monotonically_increasing_id())
    songplays_table = songplays_table[['songplay_id', 'start_time', 'userId', 'level', 'song_id', 
                                      'artist_id', 'sessionId', 'location', 'userAgent']]

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.parquet(output_data + '/songplays_table')



spark = create_spark_session()
input_data = "s3://udacity-dend/"
#input_data = "s3://randombucketgaurav/udacity-data/"
output_data = "s3://randombucketgaurav/udacity-data/output/"

print('Processing Song Data')
process_song_data(spark, input_data, output_data)    
print('Processing Log Data')
process_log_data(spark, input_data, output_data)

Processing Song Data
Reading in song data
DataFrame[artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: bigint, song_id: string, title: string, year: bigint]
Writing in song table
Extracting artists table
Writing in artists table
Processing Log Data
Reading in log data
Extracting artists table data
Writing in users table
Creating time table
Creating songplays table

In [10]:
print("Done")

Done