In [1]:
import os
import configparser
import pandas
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import TimestampType

In [8]:
# remove limit on total table columns-width so we can view the pandas dataframe for debugging
pandas.set_option('display.max_colwidth', -1)

config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [9]:
# Build a spark session
spark = SparkSession.builder\
.config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
.getOrCreate()

In [10]:
spark

In [11]:
# Path to datasets
song_data_path = 's3a://udacity-dend/song_data/*/*/*/*.json'
log_data_path = 's3a://udacity-dend/log_data/*/*/*.json'

In [12]:
# song_data_path = 'data/song_data/*/*/*/*.json'
# log_data_path = 'data/log-data/*.json'

In [None]:
# Read logs dataset
log_df = spark.read.json(log_data_path)
log_df.count()

In [None]:
# Read songs dataset
song_df = spark.read.json(song_data_path)
song_df.count()

In [9]:
song_df.toPandas().head(5)

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert Hall],2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco Hell),2005


In [10]:
log_df.toPandas().head(5)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0,12


In [11]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [12]:
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [13]:
#trim the whitespace from song titles
log_df = log_df.withColumn("song", rtrim(ltrim(log_df.song)))


In [14]:
song_df.createOrReplaceTempView('songs')
log_df.createOrReplaceTempView('logs')


In [15]:
### songplays ###
# fields: songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays = spark.createDataFrame(spark.sql("SELECT ts as start_time, userId as user_id, level, \
                      song, song_id, artist_id, sessionId as session_id, location, userAgent as user_agent\
                      FROM logs\
                      JOIN songs ON logs.song=songs.title\
                      WHERE logs.page='NextSong' \
                      ORDER BY start_time").collect())

In [16]:
songplays.printSchema()

root
 |-- start_time: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [17]:
# add 'songplay_id' column
w = Window.orderBy(col('start_time'))
songplays = songplays.withColumn("songplay_id", row_number().over(w))
# change data-type of start_time from long to timestamp
songplays = songplays.withColumn("start_time", to_timestamp(songplays.start_time/1000))

In [18]:
songplays.toPandas().head(5)

Unnamed: 0,start_time,user_id,level,song,song_id,artist_id,session_id,location,user_agent,songplay_id
0,2018-11-14 10:36:03.796,10,free,Intro,SOGDBUF12A8C140FAA,AR558FS1187FB45658,484,"Washington-Arlington-Alexandria, DC-VA-MD-WV","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.77.4 (KHTML, like Gecko) Version/7.0.5 Safari/537.77.4""",1
1,2018-11-19 14:44:20.796,24,paid,Intro,SOGDBUF12A8C140FAA,AR558FS1187FB45658,672,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36""",2
2,2018-11-22 03:26:47.796,15,paid,Setanta matins,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",3
3,2018-11-28 04:05:59.796,80,paid,Intro,SOGDBUF12A8C140FAA,AR558FS1187FB45658,992,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36""",4


In [19]:
### users - users in the app ###
# fields: user_id, first_name, last_name, gender, level
users = spark.createDataFrame(spark.sql("SELECT DISTINCT userId as user_id, firstName as first_name, \
                                         lastName as last_name, gender, level\
                                         FROM logs").collect())

In [20]:
users.count()

107

In [21]:
### songs - songs in music database ###
# fields: song_id, title, artist_id, year, duration
songs = spark.createDataFrame(spark.sql("SELECT song_id, title, artist_id, year, duration \
                                         FROM songs").collect())
songs.count()

71

In [22]:
### artists - artists in music database ###
# fields: artist_id, name, location, lattitude, longitude
artists = spark.createDataFrame(spark.sql("SELECT DISTINCT artist_id, artist_name as name, artist_location as location, \
                                           artist_latitude as latitude, artist_longitude as longitude \
                                           FROM songs").collect())
artists.count()

69

In [23]:
### time - timestamps of records in songplays broken down into specific units ###
# fields: start_time, hour, day, week, month, year, weekday
songplays.createOrReplaceTempView("songplays")
time = spark.sql("SELECT start_time, songplay_id FROM songplays")
time.count()

4

In [24]:
time.toPandas().head(5)

Unnamed: 0,start_time,songplay_id
0,2018-11-14 10:36:03.796,1
1,2018-11-19 14:44:20.796,2
2,2018-11-22 03:26:47.796,3
3,2018-11-28 04:05:59.796,4


In [25]:
# Add the missing fields to time dataframe
time = time.withColumn("hour", hour("start_time"))
time = time.withColumn("day", dayofmonth("start_time"))
time = time.withColumn("week", weekofyear("start_time"))
time = time.withColumn("month", month("start_time"))
time = time.withColumn("year", year("start_time"))
time = time.withColumn("weekday", date_format("start_time", "EEE"))
time.toPandas().head(5)

Unnamed: 0,start_time,songplay_id,hour,day,week,month,year,weekday
0,2018-11-14 10:36:03.796,1,10,14,46,11,2018,Wed
1,2018-11-19 14:44:20.796,2,14,19,47,11,2018,Mon
2,2018-11-22 03:26:47.796,3,3,22,47,11,2018,Thu
3,2018-11-28 04:05:59.796,4,4,28,48,11,2018,Wed


In [26]:
time.createOrReplaceTempView("time_dimension")

In [27]:
songplays.createOrReplaceTempView("songplays_fact")
# Save songplays table to S3 and add additional columns needed for proper partitioning
spark.createDataFrame(spark.sql("SELECT songplays_fact.songplay_id, songplays_fact.start_time, month, year, user_id, level, song_id, artist_id, session_id, location, user_agent \
           FROM songplays_fact \
           JOIN time_dimension ON time_dimension.songplay_id = songplays_fact.songplay_id").collect()).write\
.partitionBy(["year"]).partitionBy(["month"]).parquet("s3a://sparkify-goelakash/analytics/songplays.parquet")

In [29]:
# Save users table to S3
users.write.parquet("s3a://sparkify-goelakash/analytics/users.parquet")

In [30]:
songs.createOrReplaceTempView("songs_dimension")
# Save songs table to S3 and add additional columns needed for proper partitioning
artists.createOrReplaceTempView("artists_dimension")
spark.createDataFrame(spark.sql("SELECT song_id, title, songs_dimension.artist_id, songs_dimension.year, songs_dimension.duration, name as artist_name \
                                 FROM songs_dimension \
                                 JOIN artists_dimension ON artists_dimension.artist_id=songs_dimension.artist_id").collect())\
.write.partitionBy(["year"]).partitionBy(["artist_name"]).parquet("s3a://sparkify-goelakash/analytics/songs.parquet")

In [31]:
# Save artists table to S3
artists.write.parquet("s3a://sparkify-goelakash/analytics/artist.parquet")

In [32]:
# Save time table to S3 with proper partitioning
time.write.partitionBy(["year"]).partitionBy(["month"]).parquet("s3a://sparkify-goelakash/analytics/time.parquet")