# This is a notebook for building etl.py.

In [79]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [80]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [81]:
spark = create_spark_session()

In [82]:
# extract zip files in data directory
import zipfile
with zipfile.ZipFile('data/song-data.zip', 'r') as zip_ref:
    zip_ref.extractall('data')
    
with zipfile.ZipFile('data/log-data.zip', 'r') as zip_ref:
    zip_ref.extractall('data/log-data')

In [83]:
input_data = 'data/'
output_data = 'output/'

### Build function process_song_data(spark, input_data, output_data)

In [84]:
song_data = input_data + 'song_data/*/*/*/*'

# read song data file
df = spark.read.json(song_data)
df.limit(3).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004


In [85]:
# extract columns to create songs table (song_id, title, artist_id, year, duration)
songs_table = df.select("song_id","title","artist_id","year","duration").drop_duplicates()
songs_table.limit(3).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOGOSOV12AF72A285E,¿Dónde va Chichi?,ARGUVEV1187B98BA17,1997,313.12934
1,SOTTDKS12AB018D69B,It Wont Be Christmas,ARMBR4Y1187B9990EB,0,241.47546
2,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751


In [86]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.parquet(output_data + "songs/", mode="overwrite", partitionBy=["year","artist_id"])

In [87]:
# extract columns to create artists table (artist_id, name, location, lattitude, longitude)
artists_table = df.select("artist_id","artist_name","artist_location","artist_latitude","artist_longitude").drop_duplicates()
artists_table.limit(3).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR3JMC51187B9AE49D,Backstreet Boys,"Orlando, FL",28.53823,-81.37739
1,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
2,ARWB3G61187FB49404,Steve Morse,"Hamilton, Ohio",,


In [88]:
# write artists table to parquet files
artists_table.write.parquet(output_data + "artists/", mode="overwrite")

### Build function process_log_data(spark, input_data, output_data)

In [89]:
# get filepath to log data file
log_data = input_data + 'log-data/'

# read log data file
df = spark.read.json(log_data)
df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [90]:
# filter by actions for song plays
df = df.where(df.page == "NextSong")
df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [91]:
# extract columns for users table (user_id, first_name, last_name, gender, level)   
users_table = df.select("userId","firstName","lastName","gender","level").drop_duplicates()
users_table.limit(3).toPandas()

Unnamed: 0,userId,firstName,lastName,gender,level
0,57,Katherine,Gay,F,free
1,84,Shakira,Hunt,F,free
2,22,Sean,Wilson,F,free


In [92]:
# write users table to parquet files
users_table.write.parquet(output_data + "users/", mode="overwrite")

In [93]:
# create timestamp column from original timestamp column
from pyspark.sql.types import TimestampType
get_timestamp = udf(lambda x : datetime.utcfromtimestamp(int(x)/1000), TimestampType())
df = df.withColumn("start_time", get_timestamp("ts"))
df.limit(1).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796


In [94]:
# extract columns to create time table (start_time, hour, day, week, month, year, weekday)
time_table = df.withColumn("hour",hour("start_time")) \
               .withColumn("day",dayofmonth("start_time")) \
               .withColumn("week",weekofyear("start_time")) \
               .withColumn("month",month("start_time")) \
               .withColumn("year",year("start_time")) \
               .withColumn("weekday",dayofweek("start_time")) \
               .select("ts","start_time","hour", "day", "week", "month", "year", "weekday").drop_duplicates()

time_table.limit(3).toPandas()

Unnamed: 0,ts,start_time,hour,day,week,month,year,weekday
0,1542279962796,2018-11-15 11:06:02.796,11,15,46,11,2018,5
1,1542299805796,2018-11-15 16:36:45.796,16,15,46,11,2018,5
2,1542765178796,2018-11-21 01:52:58.796,1,21,47,11,2018,4


In [95]:
# write time table to parquet files partitioned by year and month
time_table.write.parquet(output_data + "time/", mode="overwrite", partitionBy=["year","month"])

In [96]:
song_data = input_data + 'song_data/*/*/*/*'

# read song data file
df_stage = spark.read.json(song_data)

# read in song data to use for songplays table
song_df = df_stage.select("song_id","title","artist_id","year","duration").drop_duplicates()
song_df.limit(1).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOGOSOV12AF72A285E,¿Dónde va Chichi?,ARGUVEV1187B98BA17,1997,313.12934


In [123]:
# creating two staging views to use sql query later to create songplays table
df.createOrReplaceTempView("staging_events")
song_df.createOrReplaceTempView("staging_songs")

In [124]:
# extract columns from joined song and log datasets to create songplays table (songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
songplays_table = spark.sql("""
                            SELECT monotonically_increasing_id() as songplay_id,
                                   to_timestamp(e.ts/1000) as start_time, 
                                   year(to_timestamp(e.ts/1000)) as year,
                                   month(to_timestamp(e.ts/1000)) as month,
                                   e.userid as user_id, 
                                   e.level, 
                                   s.song_id, 
                                   s.artist_id, 
                                   e.sessionid as session_id, 
                                   e.location, 
                                   e.useragent as user_agent,
                                   e.song
                            FROM staging_songs s JOIN staging_events e
                            ON s.title = e.song
                            """)

songplays_table.limit(5).toPandas()

Unnamed: 0,songplay_id,start_time,year,month,user_id,level,song_id,artist_id,session_id,location,user_agent,song
0,0,2018-11-21 21:56:47.796,2018,11,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",Setanta matins
1,1,2018-11-14 05:06:03.796,2018,11,10,free,SOGDBUF12A8C140FAA,AR558FS1187FB45658,484,"Washington-Arlington-Alexandria, DC-VA-MD-WV","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",Intro
2,2,2018-11-19 09:14:20.796,2018,11,24,paid,SOGDBUF12A8C140FAA,AR558FS1187FB45658,672,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",Intro
3,3,2018-11-27 22:35:59.796,2018,11,80,paid,SOGDBUF12A8C140FAA,AR558FS1187FB45658,992,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",Intro


In [99]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(output_data + "songplays/", mode="overwrite", partitionBy=["year","month"])

### Check output data

In [126]:
test_df = spark.read.parquet(output_data+'songplays/')
test_df.limit(5).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,year,month
0,0,2018-11-21 21:56:47.796,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
1,1,2018-11-14 05:06:03.796,10,free,SOGDBUF12A8C140FAA,AR558FS1187FB45658,484,"Washington-Arlington-Alexandria, DC-VA-MD-WV","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
2,2,2018-11-19 09:14:20.796,24,paid,SOGDBUF12A8C140FAA,AR558FS1187FB45658,672,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
3,3,2018-11-27 22:35:59.796,80,paid,SOGDBUF12A8C140FAA,AR558FS1187FB45658,992,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
