# Data Engineering Project 4: Data Lakes with Spark

Introduction

A music streaming startup, Sparkify, has grown their user base and song database even more and want to move their data warehouse to a data lake. Their data resides in S3, in a directory of JSON logs on user activity on the app, as well as a directory with JSON metadata on the songs in their app.

In this notebook we test an ETL pipeline that works on an EMR Spark cluster in AWS.


## Prepare notebook

In [1]:
import os
import configparser
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField as Fld, DoubleType as Dbl, \
        StringType as Str, IntegerType as Int, DateType as Date, StringType as Str, \
        LongType as Lgt, TimestampType as Tme
from pyspark.sql.functions import *
from zipfile import ZipFile

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1592070304952_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Lines below are commented out, because we are using a local copy

In [2]:
#config = configparser.ConfigParser()
#config.read_file(open('aws/credentials.cfg'))
#os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
#os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Upack the zip files with the data and save it in the `data` folder.

`song-data.zip` has the following path structure: `song-data/*/*/*/*.json`

`log-data.zip` has the following path structure: `*.json`



In [3]:
#with ZipFile('data/song-data.zip','r') as zip_file:
#    zip_file.extractall('data/')
#with ZipFile('data/log-data.zip','r') as zip_file:
#    zip_file.extractall('data/log_data')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Create a spark session. Comment this when working on an EMR Spark cluster in the cloud.

In [4]:
#spark = SparkSession.builder\
#                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
#                     .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create the schema 
Create schema of the `json` files we are going to read. We create one schema for the `song-data` and one for the `log-data`

In [5]:
songSchema = StructType([
    Fld("artist_id",       Str()),
    Fld("artist_latitude", Dbl()),
    Fld("artist_location", Str()),
    Fld("artist_longitude",Dbl()),
    Fld("artist_name",     Str()),
    Fld("duration",        Dbl()),
    Fld("num_songs",       Int()),
    Fld("song_id",         Str()),
    Fld("title",           Str()),
    Fld("year",            Int())
    ])

logschema = StructType([
    Fld('artist',          Str()),
    Fld('auth',            Str()),
    Fld('firstName',       Str()),
    Fld('gender',          Str()),
    Fld('itemInSession',   Int()),
    Fld('lastName',        Str()),
    Fld('length',          Dbl()),
    Fld('level',           Str()),
    Fld('location',        Str()),
    Fld('method',          Str()),
    Fld('page',            Str()),
    Fld('registration',    Dbl()),
    Fld('sessionId',       Int()),
    Fld('song',            Str()),
    Fld('status',          Int()),
    Fld('ts',              Lgt()),
    Fld('userAgent',       Str()),
    Fld('userId',          Int())
    ]) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Load data 
We load the data, S3 download has been commented out, because we use the local copy of the data.

In [6]:
#uncomment if S3
input_data = "s3a://udacity-dend/"
song_data = os.path.join(input_data, "song_data/*/*/*/*.json") #full data set
#song_data = os.path.join(input_data, "song_data/A/A/A/*.json") #partial data set
#uncomment if local
#input_data = "data/"
#song_data = os.path.join(input_data, "song_data/*/*/*/*.json")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Load data in spark data frame.

In [7]:
df_song = spark.read.json(song_data, schema=songSchema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We check the schema and print the first row, to check if the data has been loaded correctly.

In [8]:
df_song.printSchema()
df_song.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)

Row(artist_id='AR4T2IF1187B9ADBB7', artist_latitude=63.96027, artist_location='<a href="http://billyidol.net" onmousedown=\'UntrustedLink.bootstrap($(this), "fc44f8f60d13ab68c56b3c6709c6d670", event)\' target="_blank" rel="nofollow">http://billyidol.net</a>', artist_longitude=10.22442, artist_name='Billy Idol', duration=233.22077, num_songs=1, song_id='SOVIYJY12AF72A4B00', title='The Dead Next Door (Digitally Remastered 99)', year=1983)

Next we load the `log-data` into a spark data frame

In [9]:
# get filepath to song data file
log_data = os.path.join(input_data, "log_data/*/*/*.json")
#log_data = os.path.join(input_data, "log_data/*.json")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
df_log = spark.read.json(log_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Check that we have loaded the `log-data` correctly into the spark dataframe

In [11]:
df_log.printSchema()
df_log.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=15422

## Join log and song data

We join both data frames, based on `artist_name` and `song_title`. We also create an additional column with the `ts`-column converted into timestamp type column. Later we can use the `start_time`-column to create the time table. 

In [12]:
df_log_join_song = df_log.join(df_song, (df_song.artist_name == df_log.artist) \
                               & (df_song.title == df_log.song))
df_log_join_song = df_log_join_song.withColumn( 'start_time', (round( col('ts')/1000 )).cast(Tme()) )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
df_log_join_song

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: int, song_id: string, title: string, year: int, start_time: timestamp]

## Create tables

Create the `songs_table`

In [14]:
songs_table = df_log_join_song.select('song_id', 'title', 'artist_id', 'year', 'duration')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Check the `songs_table`

In [15]:
songs_table.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(song_id='SOCHPTV12A6BD53113', title='Die Kunst der Fuge_ BWV 1080 (2007 Digital Remaster): Contrapunctus XVII - Inversus', artist_id='ARN8NCB1187FB49652', year=0, duration=172.38159)

Create the `users_table`. We use pyspark alias to rename the respective column in accordance with the project instructions.

In [16]:
users_table = df_log_join_song.select(col('userId').alias('user_id'), 
                                     col('firstName').alias('first_name'), 
                                     col('lastName').alias('last_name'), 
                                     col('gender'), 
                                     col('level'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Check the `users_table`

In [17]:
users_table.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(user_id='49', first_name='Chloe', last_name='Cuevas', gender='F', level='paid')

Create the `artist_table`

In [18]:
artists_table = df_log_join_song.select(col('artist_id'), 
                                     col('artist_name').alias('name'), 
                                     col('artist_location').alias('location'), 
                                     col('artist_latitude').alias('latitude'),
                                     col('artist_longitude').alias('longitude'),)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Check the `artist_table`

In [19]:
artists_table.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist_id='ARN8NCB1187FB49652', name='Lionel Rogg', location='Geneva, Switzerland', latitude=46.20835, longitude=6.1427)

Create the `time_table`. We use the pyspark functions to convert the time stamp `start_time`. We also make sure that the time stamp is unique `dropDuplicates`

In [20]:
time_table = df_log_join_song.select(col('start_time'), 
                                     hour(df_log_join_song.start_time).alias('hour'), 
                                     dayofmonth(df_log_join_song.start_time).alias('day'), 
                                     weekofyear(df_log_join_song.start_time).alias('week'),
                                     month(df_log_join_song.start_time).alias('month'),
                                     year(df_log_join_song.start_time).alias('year'),
                                     dayofweek(df_log_join_song.start_time).alias('weekday')) \
                                     .dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Check the `time-table`.

In [21]:
time_table.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(start_time=datetime.datetime(2018, 11, 27, 18, 4, 58), hour=18, day=27, week=48, month=11, year=2018, weekday=3)

Create `songplays_table`, rename column names and filter to include only records with page `NextSong`.

In [22]:
songplays_table = df_log_join_song.select(col('start_time'), 
                                     col('userId').alias('user_id'), 
                                     col('level'), 
                                     col('song_id'), 
                                     col('artist_id'),
                                     col('sessionId').alias('session_id'),
                                     col('location'),
                                     col('userAgent').alias('user_agent')) \
                                     .filter("page = 'NextSong'")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Check `songplays_table`.

In [23]:
songplays_table.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(start_time=datetime.datetime(2018, 11, 21, 8, 25, 44), user_id='88', level='paid', song_id='SOCHPTV12A6BD53113', artist_id='ARN8NCB1187FB49652', session_id=744, location='Sacramento--Roseville--Arden-Arcade, CA', user_agent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"')

## Write table in parquete files

Parquet is used to store data in columnar format. We partition:

`songs_table` by `year` and `artist_name`

`user_table` by `gender` and `level`

`artists_table` by `location`

`time_table` by `month` and `day`

`songplays_table` by `user_agent`, `artist_id` and `location`





In [24]:
songs_table.write.partitionBy("year", "artist_id").mode('overwrite').parquet("songs.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
users_table.write.partitionBy("gender", "level").mode('overwrite').parquet("users.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
artists_table.write.partitionBy("location").mode('overwrite').parquet("artists.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
time_table.write.partitionBy("month", "day").mode('overwrite').parquet("time.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
songplays_table.write.partitionBy("user_agent", "artist_id", 'location') \
                        .mode('overwrite').parquet("songplays.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Load `parquet` files

We test if the `parquet` files have been written correctly.
We load each file and print the schema and first row.

In [29]:
songs_parquet = spark.read \
                .format('parquet') \
                .load('songs.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
songs_parquet.printSchema()
songs_parquet.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)

Row(song_id='SOCHPTV12A6BD53113', title='Die Kunst der Fuge_ BWV 1080 (2007 Digital Remaster): Contrapunctus XVII - Inversus', duration=172.38159, year=0, artist_id='ARN8NCB1187FB49652')

In [31]:
artists_parquet = spark.read \
                .format('parquet') \
                .load('artists.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
artists_parquet.printSchema()
artists_parquet.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- location: string (nullable = true)

Row(artist_id='ARP29T31187B98DD5F', name='Keyshia Cole / T.I. / Missy Elliott / Young Dro', latitude=37.80506, longitude=-122.27302, location='Oakland, CA')

In [33]:
time_parquet = spark.read \
                .format('parquet') \
                .load('time.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
time_parquet.printSchema()
time_parquet.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)

Row(start_time=datetime.datetime(2018, 11, 27, 18, 4, 58), hour=18, week=48, year=2018, weekday=3, month=11, day=27)

In [35]:
users_parquet = spark.read \
                .format('parquet') \
                .load('users.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
users_parquet.printSchema()
users_parquet.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

Row(user_id='24', first_name='Layla', last_name='Griffin', gender='F', level='paid')

In [37]:
songplays_parquet = spark.read \
                .format('parquet') \
                .load('songplays.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
songplays_parquet.printSchema()
songplays_parquet.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- location: string (nullable = true)

Row(start_time=datetime.datetime(2018, 11, 13, 18, 26, 41), user_id='97', level='paid', song_id='SOBONKR12A58A7A7E0', session_id=537, user_agent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"', artist_id='AR5E44Z1187B9A1D74', location='Lansing-East Lansing, MI')