In [1]:
# Notice : This is a user-created notebook, which is the development environment, but NOT part of the project
# It serves as a playground to test Python code snippets before putting them into etl.py script

In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [3]:
# https://knowledge.udacity.com/questions/137494
# Unlike the same config items in dl.cfg, you should enclose your keys with single quotes when using this notebook
# And always remember to **remove** these keys before sharing this notebook with others
AWS_ACCESS_KEY_ID=''
AWS_SECRET_ACCESS_KEY=''

In [4]:
# If you created your EMR cluster in AWS regions other than the region where the Udacity S3 bucket resides, 
# make sure you delete them and recreate a new EMR cluster in us-west-2 region before trying to execute codes.
#
# There are many threads in the Udacity Knowledge which focus on this issue (some of them listed below):
# https://knowledge.udacity.com/questions/613482
# https://knowledge.udacity.com/questions/461243
# https://knowledge.udacity.com/questions/245497
#
# You can also use some trivial methods to determine the exact AWS region of the Udacity S3 bucket
# https://stackoverflow.com/questions/62996989/how-can-i-determine-the-region-for-a-public-aws-s3-bucket
AWS_DEFAULT_REGION='us-west-2'

In [5]:
#!aws configure set aws_access_key_id $AWS_ACCESS_KEY_ID
#!aws configure set aws_secret_access_key $AWS_SECRET_ACCESS_KEY
#!aws configure set default.region $AWS_DEFAULT_REGION

In [6]:
#!aws s3 ls "s3://udacity-dend/"

In [7]:
#input_data = "s3a://udacity-dend"
#output_data = "s3a://udacity-dend-bucket-asthlihzxhfc"

In [8]:
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.html
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").appName("Project : Data Lake").getOrCreate()

In [9]:
# Load only a small portion of the JSON dataset for development and testing purposes
# WARNING : AWS free tier quota will be quickly consumed even if you load the entire dataset into EMR only for several times, 
# as it would result in excessive read (load into Spark DataFrames) and write (save as parquet files) operations
path_song_data = "s3a://{}:{}@udacity-dend/song_data/A/A/A/*.json".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

# https://knowledge.udacity.com/questions/137494
# The same discussion thread mentioned before also demonstrated how to embed inline AWS credentials in code, 
# like the one we used in this code cell.

In [10]:
# This Spark DataFrame will serve as the staging table of the song dataset
df_staging_songs = spark.read.json(path_song_data)

In [11]:
df_staging_songs.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [12]:
df_staging_songs.show(1)

+------------------+---------------+--------------------+----------------+------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|       artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|The Bonzo Dog Band|301.40036|        1|SOAFBCP12A8C13CC7D|King Of Scurf (20...|1972|
+------------------+---------------+--------------------+----------------+------------------+---------+---------+------------------+--------------------+----+
only showing top 1 row



In [13]:
df_staging_songs.take(1)

[Row(artist_id='ARTC1LV1187B9A4858', artist_latitude=51.4536, artist_location="Goldsmith's College, Lewisham, Lo", artist_longitude=-0.01802, artist_name='The Bonzo Dog Band', duration=301.40036, num_songs=1, song_id='SOAFBCP12A8C13CC7D', title='King Of Scurf (2007 Digital Remaster)', year=1972)]

In [14]:
df_staging_songs.createOrReplaceTempView("staging_songs")

In [15]:
# https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-cte.html
songs_table = spark.sql("""
WITH cte_song_id AS 
(
    SELECT DISTINCT song_id 
    FROM staging_songs 
    WHERE song_id != ''
    AND song_id IS NOT NULL
)

SELECT cte.song_id, ss.title, ss.artist_id, ss.year, ss.duration
FROM cte_song_id cte
INNER JOIN staging_songs ss ON cte.song_id = ss.song_id
""")

In [16]:
artists_table = spark.sql("""
WITH cte_artist_id AS 
(
    SELECT DISTINCT artist_id 
    FROM staging_songs 
    WHERE artist_id != ''
    AND artist_id IS NOT NULL
)

SELECT cte.artist_id, ss.artist_name AS name, ss.artist_location, ss.artist_latitude, ss.artist_longitude 
FROM cte_artist_id cte
INNER JOIN staging_songs ss ON cte.artist_id = ss.artist_id
""")

In [17]:
path_songs_table = "s3a://{}:{}@udacity-dend-bucket-asthlihzxhfc/songs_table".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
path_artists_table = "s3a://{}:{}@udacity-dend-bucket-asthlihzxhfc/artists_table".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

In [18]:
# https://www.upsolver.com/blog/apache-parquet-why-use
# https://docs.aws.amazon.com/athena/latest/ug/convert-to-columnar.html
songs_table.write.parquet(path_songs_table, mode='overwrite', partitionBy=["year", "artist_id"])

In [19]:
# https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
# https://sparkbyexamples.com/pyspark/pyspark-read-and-write-parquet-file/
artists_table.write.parquet(path_artists_table, mode='overwrite')

In [20]:
# Depending on the way we wrangle data, sometimes it may be useful if we save the staging table as parquet files
#path_staging_songs = "s3a://{}:{}@udacity-dend-bucket-asthlihzxhfc/staging_songs/".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
#df_staging_songs.write.parquet(path_staging_songs, mode='overwrite')

In [21]:
#path_log_data = "s3a://{}:{}@udacity-dend/log_data/*/*/*.json".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
path_log_data = "s3a://{}:{}@udacity-dend/log_data/2018/11/2018-11-30-events.json".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

In [22]:
# This Spark DataFrame will serve as the staging table of the log dataset
df_staging_events = spark.read.json(path_log_data)

In [23]:
# Filter the staging table, as we only need the log entries coming with specific page value
df_staging_events = df_staging_events.filter(df_staging_events.page == 'NextSong')

In [24]:
df_staging_events.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [25]:
df_staging_events.show(1)

+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-----------------+------+-------------+--------------------+------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|             song|status|           ts|           userAgent|userId|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-----------------+------+-------------+--------------------+------+
|Stephen Lynch|Logged In|   Jayden|     M|            0|    Bell|182.85669| free|Dallas-Fort Worth...|   PUT|NextSong|1.540991795796E12|      829|Jim Henson's Dead|   200|1543537327796|Mozilla/5.0 (comp...|    91|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+--------

In [26]:
df_staging_events.take(1)

[Row(artist='Stephen Lynch', auth='Logged In', firstName='Jayden', gender='M', itemInSession=0, lastName='Bell', length=182.85669, level='free', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1540991795796.0, sessionId=829, song="Jim Henson's Dead", status=200, ts=1543537327796, userAgent='Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; WOW64; Trident/6.0)', userId='91')]

In [27]:
df_staging_events.createOrReplaceTempView("staging_events")

In [28]:
# https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-window.html
# https://stackoverflow.com/questions/44923353/unable-to-select-top-10-records-per-group-in-sparksql
users_table = spark.sql("""
WITH cte_user_id AS 
(
    SELECT DISTINCT userId 
    FROM staging_events 
    WHERE userId != '' 
    AND userId IS NOT NULL
),
cte_staging_events_ranked AS
(
    SELECT DISTINCT userId, firstName, lastName, gender, level, ts, 
    ROW_NUMBER() OVER (PARTITION BY userId ORDER BY ts DESC) AS rank 
    FROM staging_events 
    WHERE userId != '' 
    AND userId IS NOT NULL 
    ORDER BY userId, rank
)

SELECT DISTINCT cte.userId, se.firstName, se.lastName, se.gender, se.level 
FROM cte_user_id cte
INNER JOIN (SELECT * FROM cte_staging_events_ranked ser WHERE ser.rank = 1) se ON cte.userId = se.userId
ORDER BY userId
""")

In [29]:
path_users_table = "s3a://{}:{}@udacity-dend-bucket-asthlihzxhfc/users_table".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

In [30]:
users_table.write.parquet(path_users_table, mode='overwrite')

In [31]:
# https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html#date-and-timestamp-functions
# https://docs.databricks.com/sql/language-manual/functions/date_part.html
# https://docs.databricks.com/sql/language-manual/functions/extract.html
time_table = spark.sql("""
WITH cte_ts AS 
(
    SELECT DISTINCT ts, to_timestamp(ts/1000) AS start_time 
    FROM staging_events 
    WHERE ts IS NOT NULL
)

SELECT start_time,
EXTRACT(hour FROM start_time) AS hour, 
EXTRACT(day FROM start_time) AS day, 
EXTRACT(week FROM start_time) AS week, 
EXTRACT(month FROM start_time) AS month, 
EXTRACT(year FROM start_time) AS year, 
EXTRACT(dayofweek FROM start_time) AS weekday 
FROM cte_ts
""")

# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.show.html
# """).show(n=20, truncate=False, vertical=False)

In [32]:
path_time_table = "s3a://{}:{}@udacity-dend-bucket-asthlihzxhfc/time_table".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

In [33]:
time_table.write.parquet(path_time_table, mode='overwrite', partitionBy=["year", "month"])

In [34]:
path_songs_table_parquet = "s3a://{}:{}@udacity-dend-bucket-asthlihzxhfc/songs_table".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
path_artists_table_parquet = "s3a://{}:{}@udacity-dend-bucket-asthlihzxhfc/artists_table".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

In [35]:
df_par_songs = spark.read.parquet(path_songs_table_parquet)
df_par_artists = spark.read.parquet(path_artists_table_parquet)

In [36]:
df_par_songs.createOrReplaceTempView("parquet_songs_table")
df_par_artists.createOrReplaceTempView("parquet_artists_table")

In [40]:
# https://thecodersstop.com/spark/generate-sequential-and-unique-ids-in-a-spark-dataframe/
# https://docs.databricks.com/sql/language-manual/functions/monotonically_increasing_id.html
songplays_table = spark.sql("""
WITH cte_song_data AS 
(
    SELECT s.song_id, s.title, s.artist_id, a.name, s.duration 
    FROM parquet_songs_table s 
    INNER JOIN parquet_artists_table a ON s.artist_id = a.artist_id 
)

SELECT monotonically_increasing_id() as songplay_id, 
to_timestamp(se.ts/1000) AS start_time, 
se.userId, 
se.level, 
cte.song_id, 
cte.artist_id, 
se.sessionId, 
se.location, 
se.userAgent, 
EXTRACT(month FROM to_timestamp(se.ts/1000)) AS month, 
EXTRACT(year FROM to_timestamp(se.ts/1000)) AS year 
FROM staging_events se
LEFT OUTER JOIN cte_song_data cte ON se.song = cte.title AND se.artist = cte.name AND se.length = cte.duration 
""")

In [41]:
path_songplays_table = "s3a://{}:{}@udacity-dend-bucket-asthlihzxhfc/songplays_table".format(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

In [42]:
songplays_table.write.parquet(path_songplays_table, mode='overwrite', partitionBy=["year", "month"])

In [None]:
#real    61m13.878s
#user    0m1.118s
#sys     0m0.231s