# Experimetation for etl

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col,  monotonically_increasing_id 

from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, desc, dayofweek
from pyspark.sql.types import TimestampType, MapType, StringType

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config["AWS"]['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config["AWS"]['AWS_SECRET_ACCESS_KEY']

## Data

In [3]:
# S3
#input_data = "s3a://udacity-dend/"
#output_data = ""

# Local
input_data = {
    "logs": "data/logs",
    "songs": "data/song_data"
}
output_data = "output"

## Init spark

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()
spark

## Process songs data

In [5]:
# get filepath to song data file
song_data = input_data["songs"] + "/*/*/*/*.json"

# read song data file
df = spark.read.json(song_data)

df.limit(10).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005
5,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994
6,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),0
7,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),0
8,ARI2JSK1187FB496EF,51.50632,"London, England",-0.12714,Nick Ingman;Gavyn Wright,111.62077,1,SODUJBS12A8C132150,Wessex Loses a Bride,0
9,AR0RCMP1187FB3F427,30.08615,"Beaumont, TX",-94.10158,Billie Jo Spears,133.32853,1,SOGXHEG12AB018653E,It Makes No Difference Now,1992


In [6]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [7]:
# extract columns to create songs table
songs_table = df.select("song_id", "title", "artist_id", "year", "duration")
print(songs_table.count())
songs_table = songs_table.distinct()
print(songs_table.count())
songs_table.limit(10).toPandas()
songs_table.groupBy("title").count().orderBy(desc("count")).limit(10).toPandas()

71
71


Unnamed: 0,title,count
0,¿Dónde va Chichi?,1
1,Got My Mojo Workin,1
2,Sonnerie lalaleulé hi houuu,1
3,Superconfidential,1
4,The Moon And I (Ordinary Day Album Version),1
5,La Culpa,1
6,Wild Rose (Back 2 Basics Mix),1
7,Twist and Shout,1
8,Broken-Down Merry-Go-Round,1
9,Baby Come To Me,1


In [8]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy("year","artist_id").mode("overwrite").parquet(output_data + "/songs")

In [9]:
# extract columns to create artists table
artists_table = df.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude")\
                    .withColumnRenamed("artist_name", "name")\
                    .withColumnRenamed("artist_location", "location")\
                    .withColumnRenamed("artist_latitude", "latitude")\
                    .withColumnRenamed("artist_longitude", "longitude")\
                    .distinct()

artists_table.limit(10).toPandas()

Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712
1,ARXR32B1187FB57099,Gob,,,
2,AROGWRA122988FEE45,Christos Dantis,,,
3,ARBGXIG122988F409D,Steel Rain,California - SF,37.77916,-122.42005
4,AREVWGE1187B9B890A,Bitter End,Noci (BA),-13.442,-41.9952
5,AREDL271187FB40F44,Soul Mekanik,,,
6,ARGSAFR1269FB35070,Blingtones,,,
7,ARH4Z031187B9A71F2,Faye Adams,"Newark, NJ",40.73197,-74.17418
8,ARGSJW91187B9B1D6B,JennyAnyKind,North Carolina,35.21962,-80.01955
9,ARJIE2Y1187B994AB7,Line Renaud,,,


In [10]:
# write artists table to parquet files
artists_table.write.mode("overwrite").parquet(output_data + "/artists")

## Process logs data

In [11]:
# get filepath to log data file
log_data = input_data["logs"] + "/*.json"

# read log data file
df = spark.read.json(log_data)

df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [12]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [13]:
# filter by actions for song plays
df = df.filter(df.page == "NextSong")
df.limit(5).toPandas()


Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [14]:
# extract columns for users table    
users_table = df.select("userId", "firstName", "lastName", "gender", "level")\
                .withColumnRenamed("userId", "user_id")\
                .withColumnRenamed("firstName", "first_name")\
                .withColumnRenamed("lastName", "last_name")\
                .distinct()


users_table.limit(10).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,7,Adelyn,Jordan,F,free
2,71,Ayleen,Wise,F,free
3,81,Sienna,Colon,F,free
4,87,Dustin,Lee,M,free
5,23,Morris,Gilmore,M,free
6,75,Joseph,Gutierrez,M,free
7,16,Rylan,George,M,paid
8,2,Jizelle,Benjamin,F,free
9,3,Isaac,Valdez,M,free


In [15]:
# write users table to parquet files
users_table.write.mode("overwrite").parquet(output_data + "/users")

In [16]:
# create datetime column from original timestamp column
get_datetime = udf(lambda ts: datetime.fromtimestamp(ts/1000), TimestampType())
df = df.withColumn("datetime", get_datetime(df.ts))
df.limit(10).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,datetime
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09.796
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55.796
5,Magic Sam,Logged In,Tegan,F,3,Levine,132.04853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Call Me If You Need Me,200,1542261224796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:53:44.796
6,Edward Sharpe & The Magnetic Zeros,Logged In,Tegan,F,4,Levine,306.31138,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Home,200,1542261356796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:55:56.796
7,Usher featuring will.i.am,Logged In,Tegan,F,5,Levine,395.72853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,OMG,200,1542261662796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:01:02.796
8,Helen Reddy,Logged In,Tegan,F,7,Levine,176.50893,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Candle On The Water,200,1542262057796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:07:37.796
9,Taylor Swift,Logged In,Tegan,F,8,Levine,201.06404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Our Song,200,1542262233796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:10:33.796


In [17]:
# extract columns to create time table
time_table = df.select("ts", "datetime")\
                .withColumn("hour", hour(df.datetime))\
                .withColumn("day", dayofmonth(df.datetime))\
                .withColumn("week", weekofyear(df.datetime))\
                .withColumn("month", month(df.datetime))\
                .withColumn("year", year(df.datetime))\
                .withColumn("weekday", dayofweek(df.datetime))\
                .withColumnRenamed("ts", "start_time").distinct()
    
time_table = time_table.drop("datetime")
time_table.limit(10).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,1542290963796,14,15,46,11,2018,5
1,1542300503796,16,15,46,11,2018,5
2,1542308546796,19,15,46,11,2018,5
3,1542314984796,20,15,46,11,2018,5
4,1542794465796,10,21,47,11,2018,4
5,1542840436796,22,21,47,11,2018,4
6,1542175469796,6,14,46,11,2018,4
7,1542180736796,7,14,46,11,2018,4
8,1542187177796,9,14,46,11,2018,4
9,1543423290796,16,28,48,11,2018,4


In [18]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year","month").mode("overwrite").parquet(output_data + "/time")

In [22]:
# Songs support
songs_support = songs_table.join(artists_table, songs_table.artist_id == artists_table.artist_id).select("song_id", "title",artists_table.artist_id, "duration", "name")

songs_support.limit(10).toPandas()

Unnamed: 0,song_id,title,artist_id,duration,name
0,SOGOSOV12AF72A285E,¿Dónde va Chichi?,ARGUVEV1187B98BA17,313.12934,Sierra Maestra
1,SOTTDKS12AB018D69B,It Wont Be Christmas,ARMBR4Y1187B9990EB,241.47546,David Martin
2,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,207.77751,The Dillinger Escape Plan
3,SOIAZJW12AB01853F1,Pink World,AR8ZCNI1187B9A069B,269.81832,Planet P Project
4,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,ARDNS031187B9924F0,186.48771,Tim Wilson
5,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),ARKFYS91187B98E58F,267.7024,Jeff And Sheri Easter
6,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,ARPBNLO1187FB3D52F,43.36281,Tiny Tim
7,SOBCOSW12A8C13D398,Rumba De Barcelona,AR7SMBG1187B9B9066,218.38322,Los Manolos
8,SOWTBJW12AC468AC6E,Broken-Down Merry-Go-Round,ARQGYP71187FB44566,151.84934,Jimmy Wakely
9,SOQHXMF12AB0182363,Young Boy Blues,ARGSJW91187B9B1D6B,218.77506,JennyAnyKind


In [25]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = df.join(songs_support, (df.song == songs_support.title) & (df.artist == songs_support.name) & (df.length == songs_support.duration) )\
                    .join(time_table, df.ts == time_table.start_time)\
                    .withColumn("id", monotonically_increasing_id())\
                    .select("id","ts", "year", "month", "userId", "level", "song_id", "artist_id", "sessionId", "location", "userAgent")\
                    .withColumnRenamed("ts", "start_time")\
                    .withColumnRenamed("userId", "user_id")\
                    .withColumnRenamed("sessionId", "session_id")\
                    .withColumnRenamed("userAgent", "user_agent")

songplays_table.limit(10).toPandas()

Unnamed: 0,id,start_time,year,month,user_id,level,song_id,artist_id,session_id,location,user_agent
0,1692217114624,1542837407796,2018,11,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [26]:
# write songplays table to parquet files
songplays_table.write.partitionBy("year","month").mode("overwrite").parquet(output_data + "/songplays")