# Project: Data Lake

## Problem Definition

<p>
A music streaming startup, Sparkify, has grown their user base and song database even more and want to move their data warehouse to a data lake. Their data resides in S3, in a directory of JSON logs on user activity on the app, as well as a directory with JSON metadata on the songs in their app.
</p>
<p>
As their data engineer, you are tasked with building an ETL pipeline that extracts their data from S3, processes them using Spark, and loads the data back into S3 as a set of dimensional tables. This will allow their analytics team to continue finding insights in what songs their users are listening to.
</p>
<p>
You'll be able to test your database and ETL pipeline by running queries given to you by the analytics team from Sparkify and compare your results with their expected results.
</p>

## Project Description
<p>
To address the problem, an ETL pipeline for a data lake hostes on AWS S3 is created. Log events data is loaded from S3 and processed into analytics tables using Spark. The processed data is loaded back into S3.
</p>

## Import required packages

In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import from_unixtime, to_timestamp
from pyspark.sql.functions import col, hour, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import monotonically_increasing_id

import datetime
import numpy as np

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
sc.list_packages()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package                    Version
-------------------------- -------
beautifulsoup4             4.8.1  
boto                       2.49.0 
jmespath                   0.9.4  
lxml                       4.4.2  
mysqlclient                1.4.6  
nltk                       3.4.5  
nose                       1.3.4  
numpy                      1.14.5 
pip                        20.0.2 
py-dateutil                2.2    
python36-sagemaker-pyspark 1.2.6  
pytz                       2019.3 
PyYAML                     3.11   
setuptools                 46.1.3 
six                        1.13.0 
soupsieve                  1.9.5  
wheel                      0.34.2 
windmill                   1.6

## Define path to JSON data

In [52]:
input_data = "s3a://udacity-dend/"
output_data = "s3://sparkprojectdata/"
song_data_path = input_data + "song_data/*/*/*"
log_data_path = input_data + "log_data/*/*"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Load log data

In [53]:
log_data = spark.read.json(log_data_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
log_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

## Load song data

In [55]:
song_data = spark.read.json(song_data_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
song_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

## Check total logs and songs records

In [57]:
log_data.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8056

In [58]:
song_data.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

14896

## Process Song data

### Create songs Dimension Table

In [83]:
songs = song_data.select(['song_id', 'title', 'artist_id', 'year', 'duration']).distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [60]:
songs.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

In [61]:
songs.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(song_id='SOVIYJY12AF72A4B00', title='The Dead Next Door (Digitally Remastered 99)', artist_id='AR4T2IF1187B9ADBB7', year=1983, duration=233.22077)

In [84]:
songs.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

14896

### Create artists Dimension Table

In [63]:
artists = song_data.select([
    'artist_id', 
    col('artist_name').alias('name'), 
    col('artist_location').alias('location'), 
    col('artist_latitude').alias('lattitude'), 
    col('artist_longitude').alias('longitude')
]).distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [64]:
artists.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10025

## Process Log Data

In [65]:
# filter records with page == NextSong 
log_data_new = log_data.filter(log_data.page == 'NextSong')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Create users Dimension Table

In [66]:
users = log_data_new.select([
    col('userId').alias('user_id'), 
    col('firstName').alias('first_name'), 
    col('lastName').alias('last_name'), 
    'gender', 
    'level'
]).distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [67]:
users.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

104

### Create time Dimension Table

In [68]:
log_data_new = log_data_new.withColumn('start_time', from_unixtime(col('ts')/1000))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [69]:
log_data_new.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', start_time='2018-11-15 00:30:26')

In [70]:
log_data_new = log_data_new.withColumn('hour', hour('start_time'))
log_data_new = log_data_new.withColumn('day', dayofmonth('start_time'))
log_data_new = log_data_new.withColumn('week', weekofyear('start_time'))
log_data_new = log_data_new.withColumn('month', month('start_time'))
log_data_new = log_data_new.withColumn('year', year('start_time'))
log_data_new = log_data_new.withColumn('weekday', dayofweek('start_time'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [71]:
log_data_new.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', start_time='2018-11-15 00:30:26', hour=0, day=15, week=46, month=11, year=2018, weekday=5)

In [72]:
time_df = log_data_new.select([
    'start_time',
    'hour',
    'day',
    'week',
    'month',
    'year',
    'weekday'
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [73]:
time_df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(start_time='2018-11-15 00:30:26', hour=0, day=15, week=46, month=11, year=2018, weekday=5)

### Create songplays Fact Table

In [74]:
# create an id in log data data frame
log_data_new = log_data_new.withColumn('songplay_id', monotonically_increasing_id())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [75]:
log_data_new.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', start_time='2018-11-15 00:30:26', hour=0, day=15, week=46, month=11, year=2018, weekday=5, songplay_id=0)

In [76]:
# create songplays df by joining the log df and song df on songs.title==log_data_new.song and selecting the right columns
songplays_df = log_data_new.join(songs, log_data_new.song==songs.title)
songplays_df = songplays_df.select([
    'songplay_id',
    'start_time',  
    col('userId').alias('user_id'),
    'level', 
    'song_id', 
    'artist_id', 
    col('sessionId').alias('session_id'), 
    'location', 
    col('userAgent').alias('user_agent')
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [77]:
songplays_df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(songplay_id=6, start_time='2018-11-15 05:55:56', user_id='80', level='paid', song_id='SOZZHYJ12A8C140F50', artist_id='ARNRA801187FB5587A', session_id=602, location='Portland-South Portland, ME', user_agent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"')

In [78]:
songplays_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

## Load fact and dimension tables back into S3

In [79]:
tables = {
    'songplays': songplays_df,
    'time': time_df,
    'users': users,
    'artists': artists,
    'songs': songs
}

for table_name, df in tables.items():
    df.write.parquet(output_data + table_name, mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Test reading from parquet

In [80]:
songs_df = spark.read.parquet(output_data + 'songs')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [81]:
songs_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

In [82]:
songs_df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(song_id='SOXPFIY12A8C13945E', title='Only One Forever (Album Version)', artist_id='ARTL0JQ1187FB4D190', year=2001, duration=202.97098)