Python notebook created for code development.  This code is then deployed in the etl.py script

In [73]:
# Configure notebook
import pandas as pd
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, to_date
from pyspark.sql.types import IntegerType, StringType, TimestampType, DateType

# configure session
config = configparser.ConfigParser()

# comment out for testing on smaller dataset
#config.read('dl.cfg')

['dl.cfg']

In [68]:
# comment out for testing on smaller dataset
# configure AWS connection
# os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
# os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

In [69]:
# Configure spark session
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [70]:
# Create spark session
spark = create_spark_session()

In [5]:
# get filepath to song data file
song_data_zip = "data/song-data.zip"

In [6]:
#unzip
from zipfile import ZipFile

with ZipFile(song_data_zip, 'r') as zipObj:
   # Extract all the contents of zip file in current directory
   zipObj.extractall('data')

In [7]:
# get filepath to song data file (unzipped)
song_data = "./data/song_data/*/*/*/*.json"

In [8]:
# read song data file
df_song = spark.read.json(song_data)

In [9]:
# Display first 5 rows to check content
df_song.head(5)

[Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0),
 Row(artist_id='AREBBGV1187FB523D2', artist_latitude=None, artist_location='Houston, TX', artist_longitude=None, artist_name="Mike Jones (Featuring CJ_ Mello & Lil' Bran)", duration=173.66159, num_songs=1, song_id='SOOLYAZ12A6701F4A6', title='Laws Patrolling (Album Version)', year=0),
 Row(artist_id='ARMAC4T1187FB3FA4C', artist_latitude=40.82624, artist_location='Morris Plains, NJ', artist_longitude=-74.47995, artist_name='The Dillinger Escape Plan', duration=207.77751, num_songs=1, song_id='SOBBUGU12A8C13E95D', title='Setting Fire to Sleeping Giants', year=2004),
 Row(artist_id='ARPBNLO1187FB3D52F', artist_latitude=40.71455, artist_location='New York, NY', artist_lo

In [10]:
# Create view of df_song from which to access the columns
df_song.createOrReplaceTempView("df_song_data")
# Create artist table containing columns required
artists_table = spark.sql("""
SELECT DISTINCT
    artist_id
    , artist_name as name
    , artist_location as location
    , artist_latitude as latitude
    , artist_longitude as longitude
FROM df_song_data
""")

In [11]:
# Display first 5 rows to check content
artists_table.head(5)

[Row(artist_id='ARPBNLO1187FB3D52F', name='Tiny Tim', location='New York, NY', latitude=40.71455, longitude=-74.00712),
 Row(artist_id='ARBEBBY1187B9B43DB', name='Tom Petty', location='Gainesville, FL', latitude=None, longitude=None),
 Row(artist_id='AR0IAWL1187B9A96D0', name='Danilo Perez', location='Panama', latitude=8.4177, longitude=-80.11278),
 Row(artist_id='ARMBR4Y1187B9990EB', name='David Martin', location='California - SF', latitude=37.77916, longitude=-122.42005),
 Row(artist_id='ARD0S291187B9B7BF5', name='Rated R', location='Ohio', latitude=None, longitude=None)]

In [12]:
#print schema to check data types
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [13]:
# Create song table containing columns required (df_song_data view already created above and will be used here)
song_table = spark.sql("""
SELECT DISTINCT
    song_id
    , title
    , artist_id
    , year
    , duration
FROM df_song_data
""")

In [14]:
# Create artist table containing columns required
song_table.head(5)

[Row(song_id='SOGNCJP12A58A80271', title='Do You Finally Need A Friend', artist_id='ARB29H41187B98F0EF', year=1972, duration=342.56934),
 Row(song_id='SOOJPRH12A8C141995', title='Loaded Like A Gun', artist_id='ARBGXIG122988F409D', year=0, duration=173.19138),
 Row(song_id='SOFCHDR12AB01866EF', title='Living Hell', artist_id='AREVWGE1187B9B890A', year=0, duration=282.43546),
 Row(song_id='SOWTBJW12AC468AC6E', title='Broken-Down Merry-Go-Round', artist_id='ARQGYP71187FB44566', year=0, duration=151.84934),
 Row(song_id='SOGOSOV12AF72A285E', title='¿Dónde va Chichi?', artist_id='ARGUVEV1187B98BA17', year=1997, duration=313.12934)]

In [15]:
#print schema to check data types
song_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [16]:
# Note: Write to parquet file syntax taken from stackoverflow
# -- https://stackoverflow.com/questions/43731679/how-to-save-a-partitioned-parquet-file-in-spark-2-1
# -- https://stackoverflow.com/questions/27033823/how-to-overwrite-the-output-directory-in-spark

In [17]:
# write songs table to parquet files partitioned by year and artist
song_table.write.mode("overwrite").partitionBy("year", "artist_id").parquet("output/song_table.parquet")

In [18]:
# write artists table to parquet files
artists_table.write.mode("overwrite").parquet("output/artist_table.parquet")

In [19]:
# get filepath to song data file
log_data_zip = "data/log-data.zip"

In [20]:
#unzip
from zipfile import ZipFile

with ZipFile(log_data_zip, 'r') as zipObj:
   # Extract all the contents of zip file in current directory
   zipObj.extractall('data/log_data')

In [21]:
# get filepath to logdata file
log_data = "data/log_data/*.json"

In [22]:
# read log data file
df_log = spark.read.json(log_data)

In [23]:
# show top 5 rows to check content
df_log.head(5)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='Train', auth='Logged In'

In [24]:
# filter by actions for song plays
df_log = df_log.filter(df_log["page"] == "NextSong")

In [25]:
# display first 5 rows
df_log.head(5)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='Train', auth='Logged In'

In [26]:
# Create view of df_log from which to access the columns
df_log.createOrReplaceTempView("df_log_data")
# extract columns for users table
users_table = spark.sql("""
SELECT DISTINCT 
    userId as user_id
    , firstName as first_name
    , lastName as last_name
    , gender
    , level
FROM df_log_data
""")

In [27]:
# show top 5 rows to check content
users_table.head(5)

[Row(user_id='98', first_name='Jordyn', last_name='Powell', gender='F', level='free'),
 Row(user_id='34', first_name='Evelin', last_name='Ayala', gender='F', level='free'),
 Row(user_id='85', first_name='Kinsley', last_name='Young', gender='F', level='paid'),
 Row(user_id='38', first_name='Gianna', last_name='Jones', gender='F', level='free'),
 Row(user_id='85', first_name='Kinsley', last_name='Young', gender='F', level='free')]

In [29]:
#print schema to check data types
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [30]:
#user_id is a string but could be an integer, let's update the datatype here
users_table = users_table.withColumn("user_id", users_table["user_id"].cast(IntegerType()))

In [32]:
#print schema to check data types
users_table.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [33]:
# show top 5 rows to check content
users_table.head(5)

[Row(user_id=98, first_name='Jordyn', last_name='Powell', gender='F', level='free'),
 Row(user_id=34, first_name='Evelin', last_name='Ayala', gender='F', level='free'),
 Row(user_id=85, first_name='Kinsley', last_name='Young', gender='F', level='paid'),
 Row(user_id=38, first_name='Gianna', last_name='Jones', gender='F', level='free'),
 Row(user_id=85, first_name='Kinsley', last_name='Young', gender='F', level='free')]

In [34]:
# write users table to parquet files
users_table.write.mode("overwrite").parquet("output/users_table.parquet")

In [35]:
# create timestamp column from original timestamp column
# Syntax derived from stackoverflow - https://stackoverflow.com/questions/49971903/converting-epoch-to-datetime-in-pyspark-data-frame-using-udf
df_timestamp = df_log.withColumn('timestamp', (df_log["ts"]/1000).cast(dataType=TimestampType()))

In [36]:
# Return first 5 rows to check content
df_timestamp.head(5)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', timestamp=datetime.datetime(2018, 11, 15, 0, 30, 26, 796000)),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari

In [37]:
# Obtain datetime from the timestamp
df_datetime = df_timestamp.withColumn('datetime', to_date(df_timestamp["timestamp"]))

In [38]:
df_datetime.head()

Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', timestamp=datetime.datetime(2018, 11, 15, 0, 30, 26, 796000), datetime=datetime.date(2018, 11, 15))

In [39]:
df_datetime.toPandas().head(5)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp,datetime
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796,2018-11-15
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796,2018-11-15
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796,2018-11-15
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09.796,2018-11-15
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55.796,2018-11-15


In [40]:
# Create view of df_datetime from which to access the columns
df_datetime.createOrReplaceTempView("df_datetime_data")
# extract columns to create time table
# Note:  Identified dayofweek function from spark documentation - https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.dayofweek.html
time_table = spark.sql("""
SELECT DISTINCT
    timestamp AS start_time
    , extract(hour from timestamp) as hour
    , extract(day from timestamp) as day
    , extract(week from timestamp) as week
    , extract(month from timestamp) as month
    , extract(year from timestamp) as year
    , extract(dayofweek from timestamp) as weekday
FROM df_datetime_data
""")

In [41]:
# show first 5 rows to check content
time_table.head(5)

[Row(start_time=datetime.datetime(2018, 11, 15, 21, 4, 27, 796000), hour=21, day=15, week=46, month=11, year=2018, weekday=5),
 Row(start_time=datetime.datetime(2018, 11, 21, 0, 57, 58, 796000), hour=0, day=21, week=47, month=11, year=2018, weekday=4),
 Row(start_time=datetime.datetime(2018, 11, 14, 0, 17, 37, 796000), hour=0, day=14, week=46, month=11, year=2018, weekday=4),
 Row(start_time=datetime.datetime(2018, 11, 14, 7, 8, 14, 796000), hour=7, day=14, week=46, month=11, year=2018, weekday=4),
 Row(start_time=datetime.datetime(2018, 11, 14, 9, 2, 39, 796000), hour=9, day=14, week=46, month=11, year=2018, weekday=4)]

In [42]:
#print schema to check data types
time_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [43]:
# write time table to parquet files partitioned by year and month
time_table.write.mode("overwrite").partitionBy("year","month").parquet("output/time_table.parquet")

In [47]:
# read in song data to use for songplays table
df_song = spark.read.json(song_data)

# create view of song_data dataframe from which we will extract the required columns
df_song.createOrReplaceTempView("df_song_data")

# extract columns from joined song and log datasets to create songplays table 
songplays_table = spark.sql("""
SELECT DISTINCT 
    d.timestamp AS start_time
    , d.userId as user_id
    , d.level
    , s.song_id
    , s.artist_id
    , d.sessionId as session_id
    , s.artist_location as location
    , d.userAgent as user_agent
FROM df_datetime_data as d
JOIN df_song_data as s
ON d.song = s.title
AND d.artist = s.artist_name
AND d.length = s.duration
""")

In [48]:
# show top 5 rows to check content
songplays_table.toPandas().head(5)

########
# only 1 row returned in sample file - check this in the final output!
########

Unnamed: 0,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,2018-11-21 21:56:47.796,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,Dubai UAE,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [50]:
#print schema to check data types
songplays_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [52]:
#user_id is a string but could be an integer, let's update the datatype here
songplays_table = songplays_table.withColumn("user_id", songplays_table["user_id"].cast(IntegerType()))

In [53]:
#print schema to check data types
songplays_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [54]:
# show top 5 rows to check content
songplays_table.head(5)

[Row(start_time=datetime.datetime(2018, 11, 21, 21, 56, 47, 796000), user_id=15, level='paid', song_id='SOZCTXZ12AB0182364', artist_id='AR5KOSW1187FB35FF4', session_id=818, location='Dubai UAE', user_agent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"')]

In [56]:
# write songplays table to parquet files partitioned by artist_id
songplays_table.write.mode("overwrite").partitionBy("artist_id").parquet("output/songplays_table.parquet")

In [57]:
# Remove data file directories

In [58]:
import shutil
shutil.rmtree("data/song_data/")

In [59]:
shutil.rmtree("data/log_data/")

In [60]:
shutil.rmtree("output/")

In [61]:
shutil.rmtree("spark-warehouse/")

In [72]:
spark.stop()