In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import monotonically_increasing_id

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [4]:
input_data = "data" # "s3a://udacity-dend/"
output_data = "s3a://babak-udacity-project4/"

In [7]:
# Subset of song data
song_data = os.path.join(input_data, 'song_data/*/*/*/*.json')

In [8]:
# read song data file
df = spark.read.json(song_data)

In [9]:
df.printSchema()
df.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|   

In [18]:
songs_table = df.select('song_id', 'title', 'artist_id', 'year', 'duration')

In [19]:
songs_table.printSchema()
songs_table.show(5)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
|SOKTJDS12AF72A25E5|Drown In My Own T...|ARA23XO1187B9AF18F|   0|  192.522|
|SOEKAZG12AB018837E|I'll Slap Your Fa...|ARSVTNL1187B992A91|2001|129.85424|
|SOQPWCR12A6D4FB2A3|A Poor Recipe For...|AR73AIO1187B9AD57B|2005|118.07302|
|SOBRKGM12A8C139EF6|Welcome to the Pl...|ARXQBR11187B98A2CC|1985|821.05424|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [22]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.mode('overwrite') \
        .partitionBy('year', 'artist_id') \
        .parquet(os.path.join(output_data, 'songs'))

In [29]:
# extract columns to create artists table
artists_table = df.select('artist_id', 
                          df.artist_name.alias('name'), 
                          df.artist_location.alias('location'),
                          df.artist_latitude.alias('latitude'),
                          df.artist_longitude.alias('longitude'))

In [30]:
artists_table.printSchema()
artists_table.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+------------------+--------------------+--------------------+--------+----------+
|         artist_id|                name|            location|latitude| longitude|
+------------------+--------------------+--------------------+--------+----------+
|ARTC1LV1187B9A4858|  The Bonzo Dog Band|Goldsmith's Colle...| 51.4536|  -0.01802|
|ARA23XO1187B9AF18F|     The Smithereens|Carteret, New Jersey|40.57885| -74.21956|
|ARSVTNL1187B992A91|       Jonathan King|     London, England|51.50632|  -0.12714|
|AR73AIO1187B9AD57B|   Western Addiction|   San Francisco, CA|37.77916|-122.42005|
|ARXQBR11187B98A2CC|Frankie Goes To H...|  Liverpool, England|    null|      null|
+------------------+--------------------+--------------------+--------+----------+
only showing top 5 rows



In [31]:
artists_table.write.mode('overwrite') \
        .parquet(os.path.join(output_data, 'artists'))

In [10]:
# get filepath to log data file
log_data = os.path.join(input_data, 'log_data/2018/11')

In [11]:
# read log data file
df = spark.read.json(log_data)

In [12]:
# filter by actions for song plays
df = df.filter(df.page == 'NextSong')

In [13]:
df.printSchema()
df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|         

In [45]:
# extract columns for users table    
users_table = df.select(col('userId').alias('user_id'),
                            col('firstName').alias('first_name'),
                            col('lastName').alias('last_name'),
                            col('gender'),
                            col('level'))

In [46]:
users_table.printSchema()
users_table.show(5)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     61|    Samuel| Gonzalez|     M| free|
|     80|     Tegan|   Levine|     F| paid|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [14]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda ts: ts // 1000, 'long')
df2 = df.withColumn('ts', get_timestamp('ts'))

In [63]:
df2.printSchema()
df2.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+----------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            

In [15]:
# create datetime column from original timestamp column
get_datetime = udf(lambda ts: datetime.fromtimestamp(ts), 'timestamp')
df3 = df2.withColumn('datetime', get_datetime('ts'))

In [16]:
df3.printSchema()
df3.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- datetime: timestamp (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+----------+--------------------+------+-------------------+
|     artist|     auth|first

In [70]:
# extract columns to create time table
time_table = df3.select(col('ts').alias('start_time'),
                        hour('datetime').alias('hour'),
                        dayofmonth('datetime').alias('day'),
                        weekofyear('datetime').alias('week'),
                        month('datetime').alias('month'),
                        year('datetime').alias('year'),
                        date_format('datetime', 'E').alias('weekday'))

In [71]:
time_table.printSchema()
time_table.show(5)

root
 |-- start_time: long (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: string (nullable = true)

+----------+----+---+----+-----+----+-------+
|start_time|hour|day|week|month|year|weekday|
+----------+----+---+----+-----+----+-------+
|1542241826|   0| 15|  46|   11|2018|    Thu|
|1542242481|   0| 15|  46|   11|2018|    Thu|
|1542242741|   0| 15|  46|   11|2018|    Thu|
|1542253449|   3| 15|  46|   11|2018|    Thu|
|1542260935|   5| 15|  46|   11|2018|    Thu|
+----------+----+---+----+-----+----+-------+
only showing top 5 rows



In [8]:
# read in song data to use for songplays table
song_df = spark.read.parquet(os.path.join(output_data, 'songs'))

In [9]:
song_df.printSchema()
song_df.show(5)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOKTJDS12AF72A25E5|Drown In My Own T...|  192.522|   0|ARA23XO1187B9AF18F|
|SOEKAZG12AB018837E|I'll Slap Your Fa...|129.85424|2001|ARSVTNL1187B992A91|
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|301.40036|1972|ARTC1LV1187B9A4858|
|SORRNOC12AB017F52B|The Last Beat Of ...|337.81506|2004|ARSZ7L31187FB4E610|
|SOQPWCR12A6D4FB2A3|A Poor Recipe For...|118.07302|2005|AR73AIO1187B9AD57B|
+------------------+--------------------+---------+----+------------------+
only showing top 5 rows



In [25]:
# read in artist data to use for songplays table
artist_df = spark.read.parquet(os.path.join(output_data, 'artists'))

In [26]:
artist_df.printSchema()
artist_df.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+------------------+--------------------+--------------------+--------+----------+
|         artist_id|                name|            location|latitude| longitude|
+------------------+--------------------+--------------------+--------+----------+
|ARTC1LV1187B9A4858|  The Bonzo Dog Band|Goldsmith's Colle...| 51.4536|  -0.01802|
|ARA23XO1187B9AF18F|     The Smithereens|Carteret, New Jersey|40.57885| -74.21956|
|ARSVTNL1187B992A91|       Jonathan King|     London, England|51.50632|  -0.12714|
|AR73AIO1187B9AD57B|   Western Addiction|   San Francisco, CA|37.77916|-122.42005|
|ARXQBR11187B98A2CC|Frankie Goes To H...|  Liverpool, England|    null|      null|
+------------------+--------------------+--------------------+--------+----------+
only showing top 5 rows



In [28]:
# joined with artist to populate artist name 
song_df = song_df.join(artist_df.select('artist_id', col('name').alias('artist_name')), 'artist_id')

In [29]:
song_df.printSchema()
song_df.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_name: string (nullable = true)

+------------------+------------------+--------------------+---------+----+------------------+
|         artist_id|           song_id|               title| duration|year|       artist_name|
+------------------+------------------+--------------------+---------+----+------------------+
|ARA23XO1187B9AF18F|SOKTJDS12AF72A25E5|Drown In My Own T...|  192.522|   0|   The Smithereens|
|ARSVTNL1187B992A91|SOEKAZG12AB018837E|I'll Slap Your Fa...|129.85424|2001|     Jonathan King|
|ARTC1LV1187B9A4858|SOAFBCP12A8C13CC7D|King Of Scurf (20...|301.40036|1972|The Bonzo Dog Band|
|ARSZ7L31187FB4E610|SORRNOC12AB017F52B|The Last Beat Of ...|337.81506|2004|         Devotchka|
|AR73AIO1187B9AD57B|SOQPWCR12A6D4FB2A3|A Poor Recipe For...|118.07302|2005| Western Addictio

In [30]:
# extract columns from joined song, artist and log datasets to create songplays table 
conditions = [
    df.song == song_df.title,
    df.artist == song_df.artist_name,
    df.length == song_df.duration
]
songplays_table = df \
    .join(song_df, conditions) \
    .select(col('userId').alias('user_id'),
            'level',
            'song_id',
            'artist_id',
            col('sessionId').alias('session_id'),
            'location',
            col('userAgent').alias('user_agent')) \
    .withColumn('songplays_id', monotonically_increasing_id())

In [31]:
songplays_table.printSchema()
songplays_table.show(5)

root
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- songplays_id: long (nullable = false)

+-------+-----+-------+---------+----------+--------+----------+------------+
|user_id|level|song_id|artist_id|session_id|location|user_agent|songplays_id|
+-------+-----+-------+---------+----------+--------+----------+------------+
+-------+-----+-------+---------+----------+--------+----------+------------+

