In [8]:
import configparser
from datetime import datetime
import os
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

Reading AWS IAM Credentials

In [9]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

Instantiating Spark session

In [10]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()
spark

In [11]:
df = spark.read.json("s3a://udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json").drop_duplicates()

In [12]:
#Reading song data from S3

#input_data="s3a://udacity-dend/"
#song_data = input_data + "song_data/*/*/*/"
#song_df = spark.read.json(song_data)

df = spark.read.json("data/song-data/song-data/*/*/*/*.json")

In [13]:
# extract columns to create songs table
df.createOrReplaceTempView("song_data_table")

In [14]:
df.count()

71

Create songs_table and write back to S3

In [15]:
songs_table = spark.sql("""
                            SELECT song_id, 
                            title,
                            artist_id,
                            year,
                            duration
                            FROM song_data_table
                            WHERE song_id IS NOT NULL
                        """)

In [16]:
# write songs table to parquet files partitioned by year and artist
output_data="data/output_data/"
songs_table.write.mode('overwrite').partitionBy("year", "artist_id").parquet(output_data+'songs_table/')
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



Create artists_table and write it back to s3

In [17]:
 artists_table = spark.sql("""
                                SELECT DISTINCT artist_id, 
                                artist_name,
                                artist_location,
                                artist_latitude,
                                artist_longitude
                                FROM song_data_table 
                                WHERE artist_id IS NOT NULL
                            """)

In [25]:
# write artists table to parquet files
output_data="data/output_data/"
artists_table.write.mode('overwrite').parquet(output_data+'artists_table/')

In [20]:
 # get filepath to log data file
input_data="data/"
log_data =input_data + 'log_data/*/*/2018-11-01-events.json'
df2 = spark.read.json(log_data).drop_duplicates()
df2.show(5)

+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+----------------+------+-------------+--------------------+------+
|           artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|            song|status|           ts|           userAgent|userId|
+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+----------------+------+-------------+--------------------+------+
|Infected Mushroom|Logged In|   Kaylee|     F|            6| Summers| 440.2673| free|Phoenix-Mesa-Scot...|   PUT|NextSong|1.540344794796E12|      139| Becoming Insane|   200|1541107053796|"Mozilla/5.0 (Win...|     8|
|        Girl Talk|Logged In|   Kaylee|     F|            8| Summers|160.15628| free|Phoenix-Mesa-Scot...|   PUT|NextSong|1.54034479

In [21]:
# filter by actions for song plays
df2 = df2.filter(df2.page == 'NextSong')
df2.createOrReplaceTempView("log_data_table")
df2.show(5)

+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+----------------+------+-------------+--------------------+------+
|           artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|            song|status|           ts|           userAgent|userId|
+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+----------------+------+-------------+--------------------+------+
|Infected Mushroom|Logged In|   Kaylee|     F|            6| Summers| 440.2673| free|Phoenix-Mesa-Scot...|   PUT|NextSong|1.540344794796E12|      139| Becoming Insane|   200|1541107053796|"Mozilla/5.0 (Win...|     8|
|        Girl Talk|Logged In|   Kaylee|     F|            8| Summers|160.15628| free|Phoenix-Mesa-Scot...|   PUT|NextSong|1.54034479

Extract columns for users table

In [22]:
 users_table = spark.sql("""
                            SELECT DISTINCT userId as user_id, 
                            firstName as first_name,
                            lastName as last_name,
                            gender as gender,
                            level as level
                            FROM log_data_table 
                            WHERE userId IS NOT NULL
                        """)

In [33]:
# write users table to parquet files
output_data="data/output_data/"
users_table.write.mode('overwrite').parquet(output_data+'users_table/')
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



Create timestamp and datetime column from original timestamp column

In [23]:
from pyspark.sql.types import *
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.utcfromtimestamp(int(x) / 1000), TimestampType())



In [24]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: F.to_date(x), TimestampType())   

In [25]:
df2 = df2.withColumn("timestamp", get_timestamp(df2.ts))
df2 = df2.withColumn("start_time", get_timestamp(df2.ts))

In [26]:
df2.select("start_time").show(20)

+--------------------+
|          start_time|
+--------------------+
|2018-11-01 21:17:...|
|2018-11-01 21:28:...|
|2018-11-01 22:23:...|
|2018-11-01 21:42:...|
|2018-11-01 21:11:...|
|2018-11-01 21:01:...|
|2018-11-01 21:08:...|
|2018-11-01 21:55:...|
|2018-11-01 21:24:...|
|2018-11-01 21:52:...|
|2018-11-01 21:05:...|
+--------------------+



Extract columns to create time table

In [27]:
df2.createOrReplaceTempView("log_data_table_time")
df2.show(5)

+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+----------------+------+-------------+--------------------+------+--------------------+--------------------+
|           artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|            song|status|           ts|           userAgent|userId|           timestamp|          start_time|
+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+----------------+------+-------------+--------------------+------+--------------------+--------------------+
|Infected Mushroom|Logged In|   Kaylee|     F|            6| Summers| 440.2673| free|Phoenix-Mesa-Scot...|   PUT|NextSong|1.540344794796E12|      139| Becoming Insane|   200|1541107053796|"Mozilla/5.0 (Win...|     8|2018-11

In [36]:
# extract columns to create time table
time_table = spark.sql("""
                        SELECT DISTINCT
                        start_time,                        
                        hour(start_time) as hour,
                        dayofmonth(start_time) as day,
                        weekofyear(start_time) as week,
                        month(start_time) as month,
                        year(start_time) as year,
                        dayofweek(start_time) as weekday
                        FROM
                        log_data_table_time
                         WHERE ts IS NOT NULL
                        """)

In [37]:
time_table.show(5)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-01 21:01:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:11:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:55:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 22:23:...|  22|  1|  44|   11|2018|      5|
|2018-11-01 21:05:...|  21|  1|  44|   11|2018|      5|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [87]:
# write time table to parquet files partitioned by year and month
output_data="data/output_data/"
time_table.write.mode('overwrite').partitionBy("year", "month").parquet(output_data+'time_table/')

In [32]:
 songplays_table = spark.sql("""
                                SELECT 
                                monotonically_increasing_id() as songplay_id,
                                logTb.start_time,
                                month(start_time) as month,
                                year(start_time) as year,
                                logTb.userId as user_id,
                                logTb.level as level,
                                songTb.song_id as song_id,
                                songTb.artist_id as artist_id,
                                logTb.sessionId as session_id,
                                logTb.location as location,
                                logTb.userAgent as user_agent
                                FROM log_data_table_time logTb
                                JOIN song_data_table songTb on logTb.artist = songTb.artist_name and logTb.song = songTb.title
                            """).drop_duplicates()

In [33]:
songplays_table.show(5)

+-----------+----------+-----+----+-------+-----+-------+---------+----------+--------+----------+
|songplay_id|start_time|month|year|user_id|level|song_id|artist_id|session_id|location|user_agent|
+-----------+----------+-----+----+-------+-----+-------+---------+----------+--------+----------+
+-----------+----------+-----+----+-------+-----+-------+---------+----------+--------+----------+



In [95]:
# write songplays table to parquet files partitioned by year and month
output_data="data/output_data/"
songplays_table.write.mode('overwrite').partitionBy("year", "month").parquet(output_data+'songplays_table/')