In [1]:
import os
from datetime import datetime
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("sparkify_etl").config("spark.sql.session.timeZone", "UTC")\
                            .master("local").getOrCreate()

## First, Let us check the schema for log data files

In [59]:
file_path = os.path.join("data","log-data")
data_log = spark.read.json(file_path)
data_log = data_log.where(F.col("page")=="NextSong")
data_log.printSchema()
data_log.limit(2).toPandas()

# Observation
# 1. itemInSession can be integer
# 2. timestamp column can be datetime

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


## We will check schema and explore song data files next

In [60]:
file_path = os.path.join("data","song_data","*","*","*")
data_song = spark.read.json(file_path)
data_song.printSchema()

# Since there's one record per song file, we don't need to use limit
data_song.limit(5).toPandas()

# Observation
#lat,long can be double

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


## Let's first create the user table

In [61]:
#user_id, first_name, last_name, gender, level

df_user = data_log.select("userId","firstName","lastName","gender","level")

# User sql expression to cast specific columns
df_user = df_user.withColumn("userId",F.expr("cast(userId as long) userId"))
df_user.printSchema()
df_user.limit(5).toPandas()

root
 |-- userId: long (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



Unnamed: 0,userId,firstName,lastName,gender,level
0,26,Ryan,Smith,M,free
1,26,Ryan,Smith,M,free
2,26,Ryan,Smith,M,free
3,61,Samuel,Gonzalez,M,free
4,80,Tegan,Levine,F,paid


## Next we will create songs table

In [62]:
#song_id, title, artist_id, year, duration

df_song = data_song.select("song_id","title","artist_id","year","duration")
df_song = df_song.withColumn("year",F.col("year").cast(T.IntegerType()))
df_song.printSchema()
df_song.toPandas()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: double (nullable = true)



Unnamed: 0,song_id,title,artist_id,year,duration
0,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,ARDR4AC1187FB371A1,0,511.16363
1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),AREBBGV1187FB523D2,0,173.66159
2,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751
3,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,ARPBNLO1187FB3D52F,2000,43.36281
4,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,ARDNS031187B9924F0,2005,186.48771
5,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,ARNF6401187FB57032,1994,305.16200
6,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),ARLTWXK1187FB5A3F8,0,326.00771
7,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),ARPFHN61187FB575F6,0,279.97995
8,SODUJBS12A8C132150,Wessex Loses a Bride,ARI2JSK1187FB496EF,0,111.62077
9,SOGXHEG12AB018653E,It Makes No Difference Now,AR0RCMP1187FB3F427,1992,133.32853


## Artist Table will be created from song data as well

In [63]:
# artist_id, name, location, lattitude, longitude

df_artist = data_song.select("artist_id","artist_name","artist_location","artist_latitude","artist_longitude")
df_artist = df_artist.withColumn("artist_latitude",F.col("artist_latitude").cast(T.DecimalType()))
df_artist = df_artist.withColumn("artist_longitude",F.col("artist_longitude").cast(T.DecimalType()))
df_artist.printSchema()
df_artist.toPandas()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: decimal(10,0) (nullable = true)
 |-- artist_longitude: decimal(10,0) (nullable = true)



Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARDR4AC1187FB371A1,Montserrat Caballé;Placido Domingo;Vicente Sar...,,,
1,AREBBGV1187FB523D2,Mike Jones (Featuring CJ_ Mello & Lil' Bran),"Houston, TX",,
2,ARMAC4T1187FB3FA4C,The Dillinger Escape Plan,"Morris Plains, NJ",41,-74
3,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",41,-74
4,ARDNS031187B9924F0,Tim Wilson,Georgia,33,-83
5,ARNF6401187FB57032,Sophie B. Hawkins,"New York, NY [Manhattan]",41,-74
6,ARLTWXK1187FB5A3F8,King Curtis,"Fort Worth, TX",33,-97
7,ARPFHN61187FB575F6,Lupe Fiasco,"Chicago, IL",42,-88
8,ARI2JSK1187FB496EF,Nick Ingman;Gavyn Wright,"London, England",52,0
9,AR0RCMP1187FB3F427,Billie Jo Spears,"Beaumont, TX",30,-94


## Our next dimension Table would be of Time where we'd split "ts" timestamp col further to      granular level

In [64]:
# start_time, hour, day, week, month, year, weekday

df_time = data_log.select("ts")
time_format = "yyyy-MM-dd' 'HH:mm:ss.SSS"


#func = F.udf("start_time")
df_time = df_time.withColumn("start_time", \
                             F.to_utc_timestamp(F.from_unixtime(F.col("ts")/1000,format=time_format),tz="UTC"))
df_time = df_time.withColumn("hour",F.hour(F.col("start_time")))
df_time = df_time.withColumn("day",F.dayofmonth(F.col("start_time")))
df_time = df_time.withColumn("week",F.weekofyear(F.col("start_time")))
df_time = df_time.withColumn("month",F.month(F.col("start_time")))
df_time = df_time.withColumn("year",F.year(F.col("start_time")))
df_time = df_time.withColumn("weekday",F.dayofweek(F.col("start_time")))

df_time.printSchema()
df_time.limit(2).toPandas()

root
 |-- ts: long (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



Unnamed: 0,ts,start_time,hour,day,week,month,year,weekday
0,1542241826796,2018-11-15 00:30:26,0,15,46,11,2018,5
1,1542242481796,2018-11-15 00:41:21,0,15,46,11,2018,5


## Now that we've created all DIMENSIONAL tables let us proceed for the FACTS table creation

### In order to create facts table, we have to perform joins

#### SQL syntax is better for longer join queries, but the same can be replicated using spark dataframe operations

In [65]:
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
#TODO : Partition by specific keys before uploading to S3 as parquet

df_song_play = data_song.join(data_log,data_song.title==data_log.song, how="inner").\
                    select("userId","level","song_id","artist_id","sessionId","location","userAgent")
df_song_play.printSchema()
df_song_play.limit(2).toPandas()

root
 |-- userId: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)



Unnamed: 0,userId,level,song_id,artist_id,sessionId,location,userAgent
0,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
1,10,free,SOGDBUF12A8C140FAA,AR558FS1187FB45658,484,"Washington-Arlington-Alexandria, DC-VA-MD-WV","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."


In [70]:
# First let us define some views
data_log.createOrReplaceTempView("t_log")
data_song.createOrReplaceTempView("t_song")
df_time.createOrReplaceTempView("t_time")

df_song_play = spark.sql("select t_time.start_time, t_log.userId, t_log.level, \
                          t_song.song_id, t_song.artist_id, t_log.sessionId, \
                          t_log.location, t_log.userAgent \
                          from t_log  \
                          inner join t_song \
                          on t_log.song=t_song.title \
                          inner join t_time \
                          on t_time.ts = t_log.ts \
                          where t_log.artist = t_song.artist_name \
                          and song_id is not null \
                          ")
df_song_play.limit(2).toPandas()

Unnamed: 0,start_time,userId,level,song_id,artist_id,sessionId,location,userAgent
0,2018-11-21 21:56:47,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [72]:
df_song_play.write.mode("overwrite").parquet("data/output.parquet")

In [77]:
df = spark.read.parquet('data/time_parquet.parquet')
df.toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,...,ts,userAgent,userId,start_time,hour,day,week,month,year,weekday
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,...,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26,6,15,46,11,2018,5
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,...,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21,6,15,46,11,2018,5
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,...,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41,6,15,46,11,2018,5
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,...,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09,9,15,46,11,2018,5
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,...,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55,11,15,46,11,2018,5
5,Magic Sam,Logged In,Tegan,F,3,Levine,132.04853,paid,"Portland-South Portland, ME",PUT,...,1542261224796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:53:44,11,15,46,11,2018,5
6,Edward Sharpe & The Magnetic Zeros,Logged In,Tegan,F,4,Levine,306.31138,paid,"Portland-South Portland, ME",PUT,...,1542261356796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:55:56,11,15,46,11,2018,5
7,Usher featuring will.i.am,Logged In,Tegan,F,5,Levine,395.72853,paid,"Portland-South Portland, ME",PUT,...,1542261662796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:01:02,11,15,46,11,2018,5
8,Helen Reddy,Logged In,Tegan,F,7,Levine,176.50893,paid,"Portland-South Portland, ME",PUT,...,1542262057796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:07:37,11,15,46,11,2018,5
9,Taylor Swift,Logged In,Tegan,F,8,Levine,201.06404,paid,"Portland-South Portland, ME",PUT,...,1542262233796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:10:33,11,15,46,11,2018,5
