In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark import SparkContext
from datetime import datetime
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import os
import configparser
import pyspark.sql.functions as F
from time import time

In [2]:
config = configparser.ConfigParser()

In [3]:
config.read('dl.cfg')

['dl.cfg']

In [4]:
os.environ['AWS_ACCESS_KEY_ID']=config['Default']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['Default']['AWS_SECRET_ACCESS_KEY']

In [5]:
spark = SparkSession.builder \
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
                    .getOrCreate()

In [6]:
#song_data = "s3a://udacity-dend/song_data/*/*/*/*.json"
#log_data = "s3a://udacity-dend/log_data/*.json"

song_data = "s3a://udacity-dend/song_data/A/A/A/*.json"
log_data = "s3a://udacity-dend/log_data/2018/11/*.json"

In [7]:
df_song_data = spark.read.json(song_data)
df_log_data = spark.read.json(log_data)

In [8]:
df_song_data.printSchema()
df_song_data.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|  The Bonzo Dog Band|3

In [9]:
df_log_data.printSchema()
df_log_data.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            lo

In [10]:
#df_log_data = df_log_data.withColumn("timestamp", F.to_timestamp("ts"))
df_log_data = df_log_data.withColumn('timestamp',F.to_timestamp(F.from_unixtime(df_log_data.ts/1000, 'yyyy-MM-dd HH:mm:ss')))
df_log_data.printSchema()
df_log_data.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+-------------------+
|     artist|     auth|firstN

In [11]:
song_table_query = """
    SELECT DISTINCT song_id, title, artist_id, year, duration
    FROM song_data
    ORDER BY song_id
"""
df_song_data.createOrReplaceTempView("song_data")
songs_table = spark.sql(song_table_query)
songs_table.show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOABWAP12A8C13F82A|           Take Time|AR5LMPY1187FB573FE|1978|258.89914|
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
|SOAPERH12A58A787DC|The One And Only ...|ARZ5H0P1187B98A1DD|   0|230.42567|
|SOBLFFE12AF72AA5BA|              Scream|ARJNIUY12298900C91|2009| 213.9424|
|SOBRKGM12A8C139EF6|Welcome to the Pl...|ARXQBR11187B98A2CC|1985|821.05424|
|SOCIWDW12A8C13D406|           Soul Deep|ARMJAGH1187FB546F3|1969|148.03546|
|SODZYPO12A8C13A91E|Burn My Body (Alb...|AR1C2IX1187B99BF74|   0|177.99791|
|SOEKAZG12AB018837E|I'll Slap Your Fa...|ARSVTNL1187B992A91|2001|129.85424|
|SOERIDA12A6D4F8506|I Want You (Album...|ARBZIN01187FB362CC|2006|192.28689|
|SOFRDWL12A58A7CEF7|        Hit Da Scene|AR9Q9YC1187FB5609B|   0|252.94322|
|SOFSOCN12A8

In [17]:
songs_table.write.partitionBy("year","artist_id").parquet("s3a://ebs-bucket-dend"+"songs_table.parquet")
#songs_table.write.partitionBy("year","artist_id").\
#parquet(os.path.join("s3a://ebs-bucket-dend/output/", 'songs_table.parquet'), 'overwrite')
#songs_table.write.partitionBy("year","artist_id").format("parquet").save("s3a://ebs-bucket-dend/output/songs_table.parquet")

IllegalArgumentException: 'java.net.URISyntaxException: Relative path in absolute URI: s3a://ebs-bucket-dend_temporary'

In [13]:
artist_table_query = """
    SELECT DISTINCT artist_id, artist_name, artist_location, artist_latitude, artist_longitude
    FROM artists_data
    ORDER BY artist_id
"""
df_song_data.createOrReplaceTempView("artists_data")
artists_table = spark.sql(artist_table_query)
artists_table.show()

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|AR0MWD61187B9B2B12|International Noi...|                    |           null|            null|
|AR10USD1187B99F3F1|Tweeterfriendly M...|Burlington, Ontar...|           null|            null|
|AR1C2IX1187B99BF74|     Broken Spindles|                    |           null|            null|
|AR1KTV21187B9ACD72|            Cristina|     California - LA|       34.05349|      -118.24532|
|AR5LMPY1187FB573FE|   Chaka Khan_ Rufus|         Chicago, IL|       41.88415|       -87.63241|
|AR73AIO1187B9AD57B|   Western Addiction|   San Francisco, CA|       37.77916|      -122.42005|
|AR9Q9YC1187FB5609B|    Quest_ Pup_ Kevo|          New Jersey|           null|            null|
|ARA23XO1187B9AF18F|     The Smithereens

In [14]:
songs_table.write.parquet("output/artists_table.parquet")

In [15]:
df_log_data_filtered = df_log_data.filter(df_log_data.page == 'NextSong')
df_log_data_filtered.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|          timestamp|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smi

In [16]:
user_table_query = """
    SELECT DISTINCT userId as user_id, firstName as first_name, lastName as last_name, gender, level
    FROM users_data
    ORDER BY user_id
"""
df_log_data_filtered.createOrReplaceTempView("users_data")
users_table = spark.sql(user_table_query)
users_table.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     10|    Sylvie|     Cruz|     F| free|
|    100|     Adler|  Barrera|     M| free|
|    101|    Jayden|      Fox|     M| free|
|     11| Christian|   Porter|     F| free|
|     12|    Austin|  Rosales|     M| free|
|     13|       Ava| Robinson|     F| free|
|     14|  Theodore|   Harris|     M| free|
|     15|      Lily|     Koch|     F| free|
|     15|      Lily|     Koch|     F| paid|
|     16|     Rylan|   George|     M| paid|
|     16|     Rylan|   George|     M| free|
|     17|  Makinley|    Jones|     F| free|
|     18|     Jacob|   Rogers|     M| free|
|     19|   Zachary|   Thomas|     M| free|
|      2|   Jizelle| Benjamin|     F| free|
|     20|     Aiden|  Ramirez|     M| paid|
|     22|      Sean|   Wilson|     F| free|
|     23|    Morris|  Gilmore|     M| free|
|     24|     Layla|  Griffin|     F| paid|
|     25|    Jayden|   Graves|  

In [17]:
users_table.write.parquet("output/users_table.parquet")

In [23]:
df_log_data = df_log_data.withColumn("hour", F.hour("timestamp"))
df_log_data = df_log_data.withColumn("day", F.dayofmonth("timestamp"))
df_log_data = df_log_data.withColumn("week", F.weekofyear("timestamp"))
df_log_data = df_log_data.withColumn("month", F.month("timestamp"))
df_log_data = df_log_data.withColumn("year", F.year("timestamp"))
df_log_data = df_log_data.withColumn("weekday", F.dayofweek("timestamp"))
df_log_data.printSchema()
df_log_data.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- month: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-----------+---------+---------+-

In [24]:
time_table_query = """
    SELECT DISTINCT timestamp as start_time, hour, day, week, month, year, weekday
    FROM time_data
    ORDER BY start_time
"""
df_log_data.createOrReplaceTempView("time_data")
time_table = spark.sql(time_table_query)
time_table.show()

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-01 20:57:10|  20|  1|  44|   11|2018|      5|
|2018-11-01 21:01:46|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:02:12|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:05:52|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:08:16|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:11:13|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:17:33|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:24:53|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:28:54|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:42:00|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:50:15|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:52:05|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:55:25|  21|  1|  44|   11|2018|      5|
|2018-11-01 22:23:14|  22|  1|  44|   11|2018|      5|
|2018-11-02 01:25:34|   1|  2|  44|   11|2018|      6|
|2018-11-0

In [25]:
df_log_data_filtered = df_log_data_filtered.withColumn('timestamp',F.to_timestamp(F.from_unixtime(df_log_data_filtered.ts/1000, 'yyyy-MM-dd HH:mm:ss')))
df_log_data_filtered.printSchema()
df_log_data_filtered.show()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|              

In [26]:
df_log_data_filtered = df_log_data_filtered.withColumn("hour", F.hour("timestamp"))
df_log_data_filtered = df_log_data_filtered.withColumn("day", F.dayofmonth("timestamp"))
df_log_data_filtered = df_log_data_filtered.withColumn("week", F.weekofyear("timestamp"))
df_log_data_filtered = df_log_data_filtered.withColumn("month", F.month("timestamp"))
df_log_data_filtered = df_log_data_filtered.withColumn("year", F.year("timestamp"))
df_log_data_filtered = df_log_data_filtered.withColumn("weekday", F.dayofweek("timestamp"))
df_log_data_filtered.printSchema()
df_log_data_filtered.show()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+--------------------+---------+--

In [27]:
time_table.write.partitionBy("year","month").parquet("output/time_table.parquet")

In [29]:
"""SELECT DISTINCT staging_events.ts, staging_events.userId, staging_events.level, staging_songs.song_id,
                                            staging_songs.artist_id, staging_events.sessionId, staging_events.location, 
                                            staging_events.userAgent
                            FROM staging_events
                            JOIN staging_songs ON (staging_events.artist_name = staging_songs.artist_name);"""
                
song_df = df_song_data.select(["song_id", "artist_id", "artist_name"]).dropDuplicates()
song_df.show()

+------------------+------------------+--------------------+
|           song_id|         artist_id|         artist_name|
+------------------+------------------+--------------------+
|SOIGICF12A8C141BC5|AREWD471187FB49873|            Son Kite|
|SOHOZBI12A8C132E3C|AR0MWD61187B9B2B12|International Noi...|
|SOABWAP12A8C13F82A|AR5LMPY1187FB573FE|   Chaka Khan_ Rufus|
|SOKTJDS12AF72A25E5|ARA23XO1187B9AF18F|     The Smithereens|
|SOSMJFC12A8C13DE0C|AR1KTV21187B9ACD72|            Cristina|
|SOHKNRJ12A6701D1F8|AR10USD1187B99F3F1|Tweeterfriendly M...|
|SOXZYWX12A6310ED0C|ARC1IHZ1187FB4E920|        Jamie Cullum|
|SOOVHYF12A8C134892|ARCLYBR1187FB53913|          Neal Schon|
|SOIGHOD12A8C13B5A1|ARY589G1187B9A9F4E|         Talkdemonic|
|SOFRDWL12A58A7CEF7|AR9Q9YC1187FB5609B|    Quest_ Pup_ Kevo|
|SONQPZK12AB0182D84|ARKYKXP11F50C47A6A|    The Supersuckers|
|SOERIDA12A6D4F8506|ARBZIN01187FB362CC|        Paris Hilton|
|SOEKAZG12AB018837E|ARSVTNL1187B992A91|       Jonathan King|
|SOTAZDY12AB0187616|ARZK

In [34]:
songplay_table_query = """
    SELECT log_data.timestamp as start_time, log_data.userId as user_id,\
           log_data.level, songlist_data.song_id, songlist_data.artist_id, log_data.sessionId as session_id,\
           log_data.location, log_data.userAgent as user_agent
    FROM log_data
    JOIN songlist_data ON log_data.artist = songlist_data.artist_name
    ORDER BY start_time
"""
df_log_data_filtered.createOrReplaceTempView("log_data")
df_song_data.createOrReplaceTempView("songlist_data")
songplay_table = spark.sql(songplay_table_query)
songplay_table.show()

+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|         start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|2018-11-05 02:21:55|     44| paid|SONRWUU12AF72A4283|ARGE7G11187FB37E05|       237|Waterloo-Cedar Fa...|Mozilla/5.0 (Maci...|
|2018-11-06 20:05:45|     97| paid|SOBLFFE12AF72AA5BA|ARJNIUY12298900C91|       293|Lansing-East Lans...|"Mozilla/5.0 (X11...|
|2018-11-15 16:55:31|     42| paid|SONRWUU12AF72A4283|ARGE7G11187FB37E05|       404|New York-Newark-J...|"Mozilla/5.0 (Win...|
|2018-11-16 16:40:43|     97| paid|SOXZYWX12A6310ED0C|ARC1IHZ1187FB4E920|       633|Lansing-East Lans...|"Mozilla/5.0 (X11...|
|2018-11-19 15:36:04|     49| paid|SOFSOCN12A8C143F5D|ARXR32B1187FB57099|       724|San Francisco-Oak...|Mozill

In [36]:
songplay_table.write.partitionBy("year","month").parquet("output/songplay.parquet")