In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import udf
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import TimestampType, IntegerType
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from datetime import datetime
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [51]:
datainput = "data/song_data/*/*/*/*"

In [52]:
df = spark.read.json(datainput).dropDuplicates()

In [53]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [54]:
df.count()

71

In [55]:
df_songs = df.select(["song_id","title", "artist_id","year", "duration"])

In [56]:
df_songs.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [57]:
df_songs.show(1)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOWQTQZ12A58A7B63E|Streets On Fire (...|ARPFHN61187FB575F6|   0|279.97995|
+------------------+--------------------+------------------+----+---------+
only showing top 1 row



In [58]:
df_songs.write.parquet("/output/songs/", partitionBy=["year","artist_id"], mode = "overwrite")

In [59]:
df_artists = df.select(["artist_id","artist_name", "artist_location","artist_longitude","artist_latitude"]).dropDuplicates()

In [60]:
df_artists.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_latitude: double (nullable = true)



In [61]:
df_artists.count()

69

In [62]:
df_artists.write.parquet("/output/artists/",mode = "overwrite")

### Process log files

In [63]:
log_data = 'data/log_data/*'
df_log = spark.read.json(log_data)

In [64]:
df = df_log.where("page=='NextSong'")

In [65]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [66]:
users_table = df.select(["userId","firstName", "lastName","gender","level"]).dropDuplicates()

In [67]:
users_table.write.parquet('/output/users/', mode = "overwrite")

In [68]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: str(int(x/1000)))
df = df.withColumn("time_stamp", get_timestamp("ts"))

# create datetime column from original timestamp column
get_datetime = udf(lambda x:datetime.fromtimestamp(x/1000), TimestampType())
df = df.withColumn("start_time",get_datetime("ts"))

In [69]:
df.limit(1).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,time_stamp,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1542241826,2018-11-15 00:30:26.796


In [70]:
from pyspark.sql.functions import dayofweek

In [71]:
time = df.select("start_time", hour("start_time").alias("hour"), \
                dayofmonth("start_time").alias("day"), \
                weekofyear("start_time").alias("week"), \
                month("start_time").alias("month"), \
                year("start_time").alias("year"), \
                dayofweek("start_time").alias("weekday")).dropDuplicates()

In [72]:
time.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [73]:
time.show(5)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 14:09:...|  14| 15|  46|   11|2018|      5|
|2018-11-15 15:24:...|  15| 15|  46|   11|2018|      5|
|2018-11-15 16:31:...|  16| 15|  46|   11|2018|      5|
|2018-11-15 19:22:...|  19| 15|  46|   11|2018|      5|
|2018-11-21 17:40:...|  17| 21|  47|   11|2018|      4|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [74]:
songs_log_df = spark.read.parquet("/output/songs/*/*/*") 

In [75]:
songs_log_df.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)



In [76]:
songs_log_df.count()

71

In [77]:
songs_log_df.show(5)

+------------------+--------------------+---------+
|           song_id|               title| duration|
+------------------+--------------------+---------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|
|SOYMRWW12A6D4FAB14|The Moon And I (O...| 267.7024|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|279.97995|
+------------------+--------------------+---------+
only showing top 5 rows



In [78]:
df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,time_stamp,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1542241826,2018-11-15 00:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1542242481,2018-11-15 00:41:21.796
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,1542242741,2018-11-15 00:45:41.796
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,1542253449,2018-11-15 03:44:09.796
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,1542260935,2018-11-15 05:48:55.796


In [79]:
artists_table = spark.read.parquet("/output/artists/*")

In [80]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_latitude: double (nullable = true)



In [92]:
songplay_table = df.join(songs_log_df,df.song == songs_log_df.title)\
                   .join(artists_table, df.artist == artists_table.artist_name)\
                   .select(monotonically_increasing_id().alias("songplay_id"), df.start_time, df.userId.alias("user_id"),\
                           df.level, songs_log_df.song_id, artists_table.artist_id, df.sessionId.alias("session_id"),\
                           df.location, df.userAgent.alias("user_agent"))

In [93]:
songplay_table.toPandas()

Unnamed: 0,songplay_id,start_time,userId,level,song_id,artist_id,session_id,location,user_agent
0,0,2018-11-21 21:56:47.796,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [91]:
songplay_table.show()

+-----------------------------+--------------------+------+-----+------------------+------------------+----------+--------------------+--------------------+
|monotonically_increasing_id()|          start_time|userId|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------------------------+--------------------+------+-----+------------------+------------------+----------+--------------------+--------------------+
|                            0|2018-11-21 21:56:...|    15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+-----------------------------+--------------------+------+-----+------------------+------------------+----------+--------------------+--------------------+



In [None]:
songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent