In [1]:
from pyspark.sql import SparkSession
import os
import configparser
import  pyspark.sql.functions as F
from pyspark.sql.functions import year, month, date_format, dayofmonth, hour, dayofweek, weekofyear
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Dat, TimestampType

In [2]:
config = configparser.ConfigParser()

config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [212]:
output_path='s3a://sparkify-dend-igrc/'

## Step 1 - Create a Spark Session using hadoop-aws package

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

## Step 2 -  Process Song Data from S3

In [242]:
df = spark.read.json("s3a://udacity-dend/song_data/A/B/C/*.json")

In [243]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [39]:
df.show(1)

+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude|artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+--------------------+----+
|ARLTWXK1187FB5A3F8|       32.74863| Fort Worth, TX|       -97.32925|King Curtis|326.00771|        1|SODREIN12A58A7F2E5|A Whiter Shade Of...|   0|
+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+--------------------+----+
only showing top 1 row



**Songs table**

In [244]:
df.createOrReplaceTempView('songs')

In [250]:
songs_table = spark.sql("""
    SELECT DISTINCT (song_id), 
        title, 
        artist_id, 
        year, 
        duration 
    FROM songs
    DISTRIBUTE BY (year, artist_id)
""");

In [251]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOWQTQZ12A58A7B63E|Streets On Fire (...|ARPFHN61187FB575F6|   0|279.97995|
|SONSKXP12A8C13A2C9|         Native Soul|AR0IAWL1187B9A96D0|2003|197.19791|
|SORRZGD12A6310DBC3|      Harajuku Girls|ARVBRGZ1187FB4675A|2004|290.55955|
|SOBHXUU12A6D4F5F14|National Emblem (...|ARBDJHO1252CCFA6FC|   0|188.73424|
|SOIKLJM12A8C136355|           Eso Duele|AR7AE0W1187B98E40E|2003|196.25751|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [256]:
songs_table.write.partitionBy("year", "artist_id").parquet('songs/songs.parquet')

**Artists Table**

In [49]:
artists_table = spark.sql("""
    SELECT DISTINCT(artist_id), 
        artist_name, 
        artist_location, 
        artist_latitude, 
        artist_longitude
    FROM songs
""");

In [50]:
artists_table.show(5)

+------------------+--------------------+------------------+---------------+----------------+
|         artist_id|         artist_name|   artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+------------------+---------------+----------------+
|ARBDJHO1252CCFA6FC|The Band of HM Ro...|                  |           null|            null|
|ARAADXM1187FB3ECDB|    Styles Of Beyond|Woodland Hills, CA|        34.1688|      -118.61092|
|ARCWVUK1187FB3C71A|     Brigitte Bardot|                  |           null|            null|
|AR0IAWL1187B9A96D0|        Danilo Perez|            Panama|         8.4177|       -80.11278|
|ARJIE2Y1187B994AB7|         Line Renaud|                  |           null|            null|
+------------------+--------------------+------------------+---------------+----------------+
only showing top 5 rows



In [51]:
artists_table.write.parquet('artists/')

## Step 3: Load and process Events log data

In [182]:
df_log = spark.read.json("s3a://udacity-dend/log_data/2018/11/*.json")

In [183]:
df_log.printSchema()
df_log.show(1)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|m

In [215]:
df_log.createOrReplaceTempView("events");

In [None]:
# filter actions and convert timestamp format

In [219]:
df_log_filtered = spark.sql("""
    SELECT *,
    CAST(events.ts/1000 as Timestamp) AS start_time
    FROM events
    WHERE page='NextSong'
""")
df_log_filtered.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|          start_time|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:...|
|The Prodigy|Logged In|     Ryan|     M|            1|  

In [227]:
df_log_filtered.createOrReplaceTempView("events");

**Users Table**

In [220]:
users_table = spark.sql("""
    SELECT DISTINCT(userId) AS user_id, 
        firstName AS first_name, 
        lastName AS last_name, 
        gender, 
        level 
    FROM events
    WHERE page='NextSong'
""");

In [221]:
users_table.show(2)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
+-------+----------+---------+------+-----+
only showing top 2 rows



In [69]:
users_table.write.parquet('users/')

**Songplays Table**

In [228]:
songplays_table = spark.sql("""
    SELECT DISTINCT(start_time), 
        e.userId as user_id,
        s.song_id,
        s.artist_id,
        e.level, 
        e.sessionId AS session_id, 
        e.location, 
        e.userAgent AS user_agent,
        year(start_time) as year,
        month(start_time) as month
    FROM events e
    JOIN songs s
    ON s.title=e.song
    DISTRIBUTE BY (year, month)    
""");
songplays_table.show(5)

+--------------------+-------+------------------+------------------+-----+----------+--------------------+--------------------+----+-----+
|          start_time|user_id|           song_id|         artist_id|level|session_id|            location|          user_agent|year|month|
+--------------------+-------+------------------+------------------+-----+----------+--------------------+--------------------+----+-----+
|2018-11-19 09:14:...|     24|SOGDBUF12A8C140FAA|AR558FS1187FB45658| paid|       672|Lake Havasu City-...|"Mozilla/5.0 (Win...|2018|   11|
|2018-11-14 05:06:...|     10|SOQFYBD12AB0182188|ARAADXM1187FB3ECDB| free|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|2018|   11|
|2018-11-27 22:35:...|     80|SOGDBUF12A8C140FAA|AR558FS1187FB45658| paid|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|2018|   11|
|2018-11-27 22:35:...|     80|SOQFYBD12AB0182188|ARAADXM1187FB3ECDB| paid|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|2018|   11|
|2018-11-14 05:06:...|     

In [191]:
songplays_table.write.partitionBy("year", "month").parquet('songplays/')

**Time Table**

In [207]:
time_table = spark.sql("""   
    SELECT DISTINCT(start_time),
            hour(start_time) AS hour,
            dayofmonth(start_time) AS day,
            month(start_time) AS month,
            weekofyear(start_time) AS week,
            year(start_time) AS year,
            dayofweek(start_time) AS weekday
    FROM events e 
    DISTRIBUTE BY (year, month)
""")
time_table.show(5)

+--------------------+----+---+-----+----+----+-------+
|          start_time|hour|day|month|week|year|weekday|
+--------------------+----+---+-----+----+----+-------+
|2018-11-19 09:14:...|   9| 19|   11|  47|2018|      2|
|2018-11-27 22:35:...|  22| 27|   11|  48|2018|      3|
|2018-11-14 05:06:...|   5| 14|   11|  46|2018|      4|
+--------------------+----+---+-----+----+----+-------+



In [229]:
time_table.write.partitionBy("year", "month").parquet('time/')

In [None]:
# Reading parquet files

In [257]:
song_df = spark.read.parquet("songs/songs.parquet")

In [262]:
time_df = spark.read.parquet("time/*/*/*.parquet")

In [258]:
song_df.show(5)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOCEMJV12A6D4F7667|Giant Steps (Alte...|220.44689|   0|ARIOZCU1187FB3A3DC|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|   0|ARLTWXK1187FB5A3F8|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|279.97995|   0|ARPFHN61187FB575F6|
|SOAPVNX12AB0187625|I Remember Nights...| 249.3122|1998|AR5T40Y1187B9996C6|
|SOMAPYF12A6D4FEC3E|All Day & All Of ...|156.62975|   0|AR5S9OB1187B9931E3|
+------------------+--------------------+---------+----+------------------+
only showing top 5 rows



In [263]:
time_df.show(5)

+--------------------+----+---+----+-------+
|          start_time|hour|day|week|weekday|
+--------------------+----+---+----+-------+
|2018-11-15 18:57:...|  18| 15|  46|      5|
|2018-11-21 02:16:...|   2| 21|  47|      4|
|2018-11-21 16:16:...|  16| 21|  47|      4|
|2018-11-28 12:25:...|  12| 28|  48|      4|
|2018-11-28 16:42:...|  16| 28|  48|      4|
+--------------------+----+---+----+-------+
only showing top 5 rows



In [265]:
songplays_df = spark.read.parquet("songplays/*/*/*.parquet")

In [266]:
songplays_df.show(5)

+--------------------+-------+------------------+------------------+-----+----------+--------------------+--------------------+
|          start_time|user_id|           song_id|         artist_id|level|session_id|            location|          user_agent|
+--------------------+-------+------------------+------------------+-----+----------+--------------------+--------------------+
|2018-11-19 09:14:...|     24|SOGDBUF12A8C140FAA|AR558FS1187FB45658| paid|       672|Lake Havasu City-...|"Mozilla/5.0 (Win...|
|2018-11-14 05:06:...|     10|SOQFYBD12AB0182188|ARAADXM1187FB3ECDB| free|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|
|2018-11-27 22:35:...|     80|SOGDBUF12A8C140FAA|AR558FS1187FB45658| paid|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|
|2018-11-27 22:35:...|     80|SOQFYBD12AB0182188|ARAADXM1187FB3ECDB| paid|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|
|2018-11-14 05:06:...|     10|SOGDBUF12A8C140FAA|AR558FS1187FB45658| free|       484|Washington-Arling..