In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
import pyspark.sql.types as t
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.functions import monotonically_increasing_id

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

print("Access Key : " + config['AWS']['AWS_ACCESS_KEY_ID'])

Access Key : AKIAWCG27L6NYCRF5WMM


In [3]:
def get_timestamp (timestamp):
    return datetime.fromtimestamp(timestamp / 1000)

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

In [5]:
spark

In [9]:
spark.read.json("./data/log-data.zip").count()

2989

In [11]:
spark.read.json("./data/log-data.zip").printSchema()

root
 |-- _corrupt_record: string (nullable = true)



In [6]:
input_data = "s3a://udacity-dend/"
output_data = "s3://vamsi-udaicty-datalake"

In [7]:
sample_song_data = "s3a://udacity-dend/song_data/A/A/A/*.json"
sample_log_data = "s3a://udacity-dend/log_data/2018/11/*.json"

In [8]:
#song_data = os.path.join(input_data, 'song_data/*/*/*/*.json')
#print(song_data)

In [9]:
#song_df = spark.read.json(song_data)
song_df = spark.read.json(sample_song_data)
song_df.show(5)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|  The Bonzo Dog Band|301.40036|        1|SOAFBCP12A8C13CC7D|King Of Scurf (20...|1972|
|ARA23XO1187B9AF18F|       40.57885|Carteret, New Jersey|       -74.21956|     The Smithereens|  192.522|        1|SOKTJDS12AF72A25E5|Drown In My Own T...|   0|
|ARSVTNL1187B992A91|       51.50632|     London, England|        -0.12714|       Jonathan King|129.85424|        1|SOEKAZG12AB018837E|I'll Slap Your Fa...|2001|
|AR73AIO1187B9AD57B|       37.7791

In [10]:
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [11]:
song_df.count()

24

In [12]:
# songs dataframe
songs_df = song_df.select('song_id', 'title', 'artist_id', 'year', 'duration') \
                         .filter("artist_id is not null") \
                         .distinct()

In [13]:
songs_df.show(5, truncate = False)

+------------------+----------------------------+------------------+----+---------+
|song_id           |title                       |artist_id         |year|duration |
+------------------+----------------------------+------------------+----+---------+
|SODZYPO12A8C13A91E|Burn My Body (Album Version)|AR1C2IX1187B99BF74|0   |177.99791|
|SOIGHOD12A8C13B5A1|Indian Angel                |ARY589G1187B9A9F4E|2004|171.57179|
|SOOVHYF12A8C134892|I'll Be Waiting             |ARCLYBR1187FB53913|1989|304.56118|
|SOAPERH12A58A787DC|The One And Only (Edited)   |ARZ5H0P1187B98A1DD|0   |230.42567|
|SOHKNRJ12A6701D1F8|Drop of Rain                |AR10USD1187B99F3F1|0   |189.57016|
+------------------+----------------------------+------------------+----+---------+
only showing top 5 rows



In [14]:
# artists_df dataframe
artists_df = song_df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude') \
                           .withColumnRenamed("artist_name", "name") \
                           .withColumnRenamed("artist_location", "location") \
                           .withColumnRenamed("artist_latitude", "latitude") \
                           .withColumnRenamed("artist_longitude", "longitude") \
                           .filter("artist_id is not null") \
                           .distinct()

In [15]:
artists_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [16]:
artists_df.show(5, truncate = False)

+------------------+-------------+---------------+--------+----------+
|artist_id         |name         |location       |latitude|longitude |
+------------------+-------------+---------------+--------+----------+
|ARSVTNL1187B992A91|Jonathan King|London, England|51.50632|-0.12714  |
|ARXR32B1187FB57099|Gob          |               |null    |null      |
|ARZKCQM1257509D107|Dataphiles   |               |null    |null      |
|ARC1IHZ1187FB4E920|Jamie Cullum |               |null    |null      |
|AR1KTV21187B9ACD72|Cristina     |California - LA|34.05349|-118.24532|
+------------------+-------------+---------------+--------+----------+
only showing top 5 rows



In [17]:
artists_df.count()

24

In [18]:
sample_log_df = spark.read.json(sample_log_data)

In [19]:
sample_log_df.select('ts').show(5,truncate=False)

+-------------+
|ts           |
+-------------+
|1542241826796|
|1542242481796|
|1542242741796|
|1542247071796|
|1542252577796|
+-------------+
only showing top 5 rows



In [20]:
sample_log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [21]:
def get_timestamp (timestamp):
    return datetime.fromtimestamp(timestamp / 1000.0)

In [22]:
get_datetime = udf(lambda s: get_timestamp(s), t.TimestampType())

In [23]:
log_df = sample_log_df.withColumn('datetime', get_datetime('ts')).drop('ts')

In [24]:
log_df.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+--------------------+------+--------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           userAgent|userId|            datetime|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+--------------------+------+--------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:...|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E1

In [25]:
log_df_filtered = log_df.filter(log_df.page == 'NextSong') \
                            .withColumnRenamed("firstName", "first_name") \
                            .withColumnRenamed("lastName", "last_name") \
                            .withColumnRenamed("userId", "user_id") \
                            .withColumnRenamed("sessionId", "session_id") \
                            .withColumnRenamed("userAgent", "user_agent") 

In [26]:
log_df_filtered.show(5,truncate=False)

+-----------+---------+----------+------+-------------+---------+---------+-----+------------------------------------+------+--------+-----------------+----------+----------------------------------------------+------+-----------------------------------------------------------------------------------------------------------------------------------------+-------+-----------------------+
|artist     |auth     |first_name|gender|itemInSession|last_name|length   |level|location                            |method|page    |registration     |session_id|song                                          |status|user_agent                                                                                                                               |user_id|datetime               |
+-----------+---------+----------+------+-------------+---------+---------+-----+------------------------------------+------+--------+-----------------+----------+----------------------------------------------+------+-------

In [27]:
users_df = log_df_filtered.select('user_id', 'first_name', 'last_name', 'gender', 'level').distinct()

In [28]:
users_df.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     71|    Ayleen|     Wise|     F| free|
|     81|    Sienna|    Colon|     F| free|
|     87|    Dustin|      Lee|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [29]:
users_df.count()

104

In [30]:
datetime_df = log_df_filtered.select('datetime').distinct()

In [31]:
time_df = datetime_df.select(col('datetime').alias('start_time'), 
                                    hour('datetime').alias('hour'), 
                                    dayofmonth('datetime').alias('day'), 
                                    weekofyear('datetime').alias('week'), 
                                    month('datetime').alias('month'), 
                                    year('datetime').alias('year'),
                                    dayofweek('datetime') \
                                    .alias('weekday')) \
                                    .distinct()

In [32]:
time_df.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [33]:
time_df.show(5,truncate=False)

+-----------------------+----+---+----+-----+----+-------+
|start_time             |hour|day|week|month|year|weekday|
+-----------------------+----+---+----+-----+----+-------+
|2018-11-15 16:31:27.796|16  |15 |46  |11   |2018|5      |
|2018-11-18 14:35:01.796|14  |18 |46  |11   |2018|1      |
|2018-11-29 15:27:37.796|15  |29 |48  |11   |2018|5      |
|2018-11-16 22:50:29.796|22  |16 |46  |11   |2018|6      |
|2018-11-05 09:55:50.796|9   |5  |45  |11   |2018|2      |
+-----------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [34]:
song_df2 =  songs_df.join(artists_df, songs_df.artist_id == artists_df.artist_id) \
                           .select("song_id", "title", "duration", songs_df.artist_id, "name") \
                           .distinct()

In [35]:
song_df2.show(5,truncate=False)

+------------------+-------------------------------------+---------+------------------+------------------+
|song_id           |title                                |duration |artist_id         |name              |
+------------------+-------------------------------------+---------+------------------+------------------+
|SOAPERH12A58A787DC|The One And Only (Edited)            |230.42567|ARZ5H0P1187B98A1DD|Snoop Dogg        |
|SOOVHYF12A8C134892|I'll Be Waiting                      |304.56118|ARCLYBR1187FB53913|Neal Schon        |
|SOFSOCN12A8C143F5D|Face the Ashes                       |209.60608|ARXR32B1187FB57099|Gob               |
|SOABWAP12A8C13F82A|Take Time                            |258.89914|AR5LMPY1187FB573FE|Chaka Khan_ Rufus |
|SOAFBCP12A8C13CC7D|King Of Scurf (2007 Digital Remaster)|301.40036|ARTC1LV1187B9A4858|The Bonzo Dog Band|
+------------------+-------------------------------------+---------+------------------+------------------+
only showing top 5 rows



In [36]:
songplays_df = log_df_filtered.join(song_df2, \
                      (log_df_filtered.song == song_df2.title) & (log_df_filtered.length == song_df2.duration) & \
                      (log_df_filtered.artist == song_df2.name), 'left_outer') \
                      .select('datetime', 'user_id',  'level', 'song_id', 'artist_id', 'session_id', 'location', 'user_agent') \
                      .withColumn("songplay_id", monotonically_increasing_id())

In [37]:
songplays_df_final = songplays_df.join(time_df, songplays_df.datetime == time_df.start_time, 'left_outer') \
                                     .drop("hour","day","week","weekday")

In [38]:
songplays_df_final.printSchema()

root
 |-- datetime: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



In [39]:
songplays_df_final.show(5,truncate=False)

+-----------------------+-------+-----+-------+---------+----------+------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+-----------+-----------------------+-----+----+
|datetime               |user_id|level|song_id|artist_id|session_id|location                            |user_agent                                                                                                                               |songplay_id|start_time             |month|year|
+-----------------------+-------+-----+-------+---------+----------+------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+-----------+-----------------------+-----+----+
|2018-11-15 00:30:26.796|26     |free |null   |null     |583       |San Jose-Sunnyvale-Santa Clara, CA  |"Mozilla/5.0 (X11; Lin

In [40]:
output_data = "./spark-warehouse"

In [41]:
# write songplays dataframe to parquet files partitioned by year and month
songplays_df_final.write \
                  .mode("overwrite") \
                  .partitionBy("year", "month") \
                  .parquet(os.path.join(output_data, 'songplays'))

In [42]:
songplays_parquet_read = spark.read.parquet(os.path.join(output_data, 'songplays'))
songplays_parquet_read.show(5)

+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+-----------+--------------------+----+-----+
|            datetime|user_id|level|song_id|artist_id|session_id|            location|          user_agent|songplay_id|          start_time|year|month|
+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+-----------+--------------------+----+-----+
|2018-11-15 00:30:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|          0|2018-11-15 00:30:...|2018|   11|
|2018-11-15 00:41:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|          1|2018-11-15 00:41:...|2018|   11|
|2018-11-15 00:45:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|          2|2018-11-15 00:45:...|2018|   11|
|2018-11-15 03:44:...|     61| free|   null|     null|       597|Houston-The Woodl...|"M

In [43]:
output_data_s3 = "s3a://vamsi-udaicty-datalake"

In [44]:
# write songplays dataframe to parquet files partitioned by year and month
songplays_df_final.write \
                  .mode("overwrite") \
                  .partitionBy("year", "month") \
                  .parquet(os.path.join(output_data, 'songplays'))

In [45]:
songplays_df_read = spark.read.parquet("s3a://vamsi-udaicty-datalake/songplays")

In [46]:
songplays_df_read.show(5,truncate=False)

+-----------------------+-------+-----+----------+------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+-------+---------+
|datetime               |user_id|level|session_id|location                            |user_agent                                                                                                                               |song_id|artist_id|
+-----------------------+-------+-----+----------+------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+-------+---------+
|2018-11-15 00:30:26.796|26     |free |583       |San Jose-Sunnyvale-Santa Clara, CA  |"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"|null   |null     |
|2018-11-15 00:41:21.796

In [47]:
songplays_df_read.count()

6820