In [1]:
import configparser
from datetime import datetime
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, LongType as Lg, DateType as Date

In [2]:
config = configparser.ConfigParser()

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [4]:
rm -r data/Output

## Process the song files

In [5]:
df_song = spark.read.json("data/song_data/song_data/*/*/*/*.json")
df_song.head(2)

[Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0),
 Row(artist_id='AREBBGV1187FB523D2', artist_latitude=None, artist_location='Houston, TX', artist_longitude=None, artist_name="Mike Jones (Featuring CJ_ Mello & Lil' Bran)", duration=173.66159, num_songs=1, song_id='SOOLYAZ12A6701F4A6', title='Laws Patrolling (Album Version)', year=0)]

In [6]:
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [7]:
### Modify the schema data types
### song_id auto increment field
Song_Schema = R([
    Fld("artist_id",Str()),
    Fld("artist_latitude",Dbl()),
    Fld("artist_location",Str()),
    Fld("artist_longitude",Dbl()),
    Fld("artist_name",Str()),
    Fld("duration",Dbl()),
    Fld("num_songs",Int()),
    Fld("title",Str()),
    Fld("year",Int()),
])


In [8]:
df_song_schema=spark.read.json("data/song_data/song_data/*/*/*/*.json", schema=Song_Schema)

In [9]:
### songs - songs in music database --> song_id, title, artist_id, year, duration
from pyspark.sql.functions import monotonically_increasing_id
song_extracted_fields=['title','artist_id','year','duration']
songs=df_song_schema.select(song_extracted_fields).dropDuplicates().withColumn("song_id", monotonically_increasing_id())
songs.limit(5).toPandas()

Unnamed: 0,title,artist_id,year,duration,song_id
0,Intro,AR558FS1187FB45658,2003,75.67628,51539607552
1,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751,68719476736
2,Kutt Free (DJ Volume Remix),ARNNKDK1187B98BBD5,0,407.37914,68719476737
3,Get Your Head Stuck On Your Neck,AREDL271187FB40F44,0,45.66159,77309411328
4,Amor De Cabaret,ARKRRTF1187B9984DA,0,177.47546,94489280512


In [10]:
## write songs table to parquet files partitioned by year and artist

output_path='data/Output/'
songs.write.partitionBy("year", "artist_id").parquet(output_path + 'songs/')

In [11]:
### artists - artists in music database --> artist_id, name, location, lattitude, longitude
artists_extracted_fields=['artist_id','artist_name as name','artist_location as location','artist_latitude as lattitude','artist_longitude as longitude']
artists=df_song_schema.selectExpr(artists_extracted_fields).dropDuplicates()
artists.limit(5).toPandas()

Unnamed: 0,artist_id,name,location,lattitude,longitude
0,ARPFHN61187FB575F6,Lupe Fiasco,"Chicago, IL",41.88415,-87.63241
1,AR3JMC51187B9AE49D,Backstreet Boys,"Orlando, FL",28.53823,-81.37739
2,ARXR32B1187FB57099,Gob,,,
3,AROUOZZ1187B9ABE51,Willie Bobo,"New York, NY [Spanish Harlem]",40.79195,-73.94512
4,AROGWRA122988FEE45,Christos Dantis,,,


In [12]:
## Writing back the artists table

artists.write.parquet(output_path + 'artists/')

## Process the log files

In [13]:
df_log = spark.read.json("data/log_data/*.json")
df_log.head(2)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')]

In [14]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [15]:
## users - users in the app --> user_id, first_name, last_name, gender, level

users_extracted_fields=['userId as user_id','firstName as first_name','lastName as last_name','gender','level']
users=df_log.selectExpr(users_extracted_fields).dropDuplicates()
users.limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,7,Adelyn,Jordan,F,free
2,71,Ayleen,Wise,F,free
3,81,Sienna,Colon,F,free
4,87,Dustin,Lee,M,free


In [16]:
## write users table to parquet files

users.write.parquet(output_path + 'users/')

In [17]:
### time: time - timestamps of records in songplays broken down into specific units 
## start_time, hour, day, week, month, year, weekday
from pyspark.sql.types import TimestampType
import pyspark.sql.functions as F
from pyspark.sql import types as T
from datetime import datetime

df_log = df_log.filter(df_log.page == 'NextSong')
get_timestamp = F.udf(lambda x: datetime.fromtimestamp( (x/1000) ), T.TimestampType()) 
df_log = df_log.withColumn("timestamp", get_timestamp(df_log.ts))
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09.796
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55.796


In [18]:
## start_time, hour, day, week, month, year, weekday
## https://sparkbyexamples.com/spark/spark-extract-hour-minute-and-second-from-timestamp/
time=df_log.select('timestamp').dropDuplicates()\
    .withColumn("hour", hour(col('timestamp')))\
    .withColumn("day", dayofmonth(col('timestamp')))\
    .withColumn("week", weekofyear(col('timestamp')))\
    .withColumn("month", month(col('timestamp')))\
    .withColumn("year", year(col('timestamp')))\
    .withColumn("weekday", date_format(col("timestamp"), 'E'))
time.limit(5).toPandas()

Unnamed: 0,timestamp,hour,day,week,month,year,weekday
0,2018-11-21 06:18:12.796,6,21,47,11,2018,Wed
1,2018-11-21 18:49:23.796,18,21,47,11,2018,Wed
2,2018-11-14 15:20:15.796,15,14,46,11,2018,Wed
3,2018-11-05 16:31:59.796,16,5,45,11,2018,Mon
4,2018-11-13 18:00:26.796,18,13,46,11,2018,Tue


In [19]:
# write time table to parquet files partitioned by year and month

time.write.partitionBy("year", "month").parquet(output_path + 'time/')

## Constructing the Songplay Table

In [20]:
## Reading songs and artists from parquet files

songs_parquet= spark.read.parquet(output_path+ 'songs/*/*/*')
artists_parquet=spark.read.parquet(output_path+ 'artists/*')

In [21]:
songs_parquet.limit(5).toPandas()

Unnamed: 0,title,duration,song_id
0,I Hold Your Hand In Mine [Live At Royal Albert...,43.36281,816043786240
1,I Think My Wife Is Running Around On Me (Taco ...,186.48771,919123001344
2,A Whiter Shade Of Pale (Live @ Fillmore West),326.00771,1211180777472
3,The Moon And I (Ordinary Day Album Version),267.7024,231928233984
4,Streets On Fire (Explicit Album Version),279.97995,1374389534720


In [22]:
artists_parquet.limit(5).toPandas()

Unnamed: 0,artist_id,name,location,lattitude,longitude
0,ARDR4AC1187FB371A1,Montserrat Caballé;Placido Domingo;Vicente Sar...,,,
1,ARMAC4T1187FB3FA4C,The Dillinger Escape Plan,"Morris Plains, NJ",40.82624,-74.47995
2,AREBBGV1187FB523D2,Mike Jones (Featuring CJ_ Mello & Lil' Bran),"Houston, TX",,
3,ARD842G1187B997376,Blue Rodeo,"Toronto, Ontario, Canada",43.64856,-79.38533
4,AR9AWNF1187B9AB0B4,Kenny G featuring Daryl Hall,"Seattle, Washington USA",,


In [23]:
songplay_table_insert = ("""INSERT INTO songplays (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent) 
select staging_events.ts,
    staging_events.user_id,
    staging_events.level,
    staging_songs.song_id,
    staging_songs.artist_id,
    staging_events.session_id,
    staging_events.location,
    staging_events.user_agent  
FROM staging_events 
JOIN staging_songs 
    ON (staging_events.artist=staging_songs.artist_name)
    AND (staging_events.length=staging_songs.duration)
    AND (staging_events.song=staging_songs.title)
    WHERE staging_events.page = 'NextSong'
""")

In [24]:
## Select Only records with page NextSong
df_log=df_log.filter(df_log.page == 'NextSong')

In [25]:
# extract columns from joined song and log datasets to create songplays table
#songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplay_song_fields=['song_id','artist_id','artist_name as artist','duration as length','title as song']
songs2=df_song_schema.dropDuplicates().withColumn("song_id", monotonically_increasing_id())
songplay_song=songs2.selectExpr(songplay_song_fields).dropDuplicates()
songplay_log_fields=['ts','userId','level','sessionId','location','userAgent','artist','length','song','timestamp']
songplay_log=df_log.select(songplay_log_fields).dropDuplicates()

In [26]:

songplay_song.limit(5).toPandas()

Unnamed: 0,song_id,artist_id,artist,length,song
0,1133871366144,AREDL271187FB40F44,Soul Mekanik,45.66159,Get Your Head Stuck On Your Neck
1,266287972352,ARH4Z031187B9A71F2,Faye Adams,156.39465,Crazy Mixed Up World
2,661424963584,ARHHO3O1187B989413,Bob Azzam,191.84281,Auguri Cha Cha
3,317827579904,AR9AWNF1187B9AB0B4,Kenny G featuring Daryl Hall,236.93016,Baby Come To Me
4,1314259992577,ARULZCI1241B9C8611,Luna Orbit Project,335.51628,Midnight Star


In [27]:
songplay_log.limit(5).toPandas()

Unnamed: 0,ts,userId,level,sessionId,location,userAgent,artist,length,song,timestamp
0,1542308104796,44,paid,619,"Waterloo-Cedar Falls, IA",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...,Caetano Veloso,223.97342,O Samba E O Tango,2018-11-15 18:55:04.796
1,1542320230796,44,paid,619,"Waterloo-Cedar Falls, IA",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...,Elvis Presley,273.76281,Suspicious Minds,2018-11-15 22:17:10.796
2,1542175209796,49,free,553,"San Francisco-Oakland-Hayward, CA",Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...,Alliance Ethnik,265.76934,Creil City,2018-11-14 06:00:09.796
3,1542196700796,29,paid,559,"Atlanta-Sandy Springs-Roswell, GA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",Bon Iver,329.24689,Blindsided,2018-11-14 11:58:20.796
4,1543397176796,82,paid,140,"Atlanta-Sandy Springs-Roswell, GA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3...",The Cranberries,141.50485,Carry On,2018-11-28 09:26:16.796


In [28]:
#songplay = pd.merge(songplay_log, songplay_song,  how='left', left_on=['artist','length','song'], right_on = ['artist','length','song'])
#songplay.limit(5).toPandas()

songplay= songplay_log.join(
    songplay_song,
    (songplay_log.artist == songplay_song.artist)
        & (songplay_log.length == songplay_song.length)
        & (songplay_log.song == songplay_song.song),
    "left"
)
songplay.limit(10).toPandas()

Unnamed: 0,ts,userId,level,sessionId,location,userAgent,artist,length,song,timestamp,song_id,artist_id,artist.1,length.1,song.1
0,1542308104796,44,paid,619,"Waterloo-Cedar Falls, IA",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...,Caetano Veloso,223.97342,O Samba E O Tango,2018-11-15 18:55:04.796,,,,,
1,1542320230796,44,paid,619,"Waterloo-Cedar Falls, IA",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...,Elvis Presley,273.76281,Suspicious Minds,2018-11-15 22:17:10.796,,,,,
2,1542175209796,49,free,553,"San Francisco-Oakland-Hayward, CA",Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...,Alliance Ethnik,265.76934,Creil City,2018-11-14 06:00:09.796,,,,,
3,1542196700796,29,paid,559,"Atlanta-Sandy Springs-Roswell, GA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",Bon Iver,329.24689,Blindsided,2018-11-14 11:58:20.796,,,,,
4,1543397176796,82,paid,140,"Atlanta-Sandy Springs-Roswell, GA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3...",The Cranberries,141.50485,Carry On,2018-11-28 09:26:16.796,,,,,
5,1543446235796,24,paid,984,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",Janet Jackson,260.88444,Because Of Love,2018-11-28 23:03:55.796,,,,,
6,1541430545796,24,paid,23,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",Bodo Wartke,645.27628,Liebeslied (Sprachen: Deutsch_ Englisch_ Franz...,2018-11-05 15:09:05.796,,,,,
7,1543590162796,16,paid,1076,"Birmingham-Hoover, AL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",The Beautiful South,236.5122,Masculine Eclipse,2018-11-30 15:02:42.796,,,,,
8,1543593217796,16,paid,1076,"Birmingham-Hoover, AL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",The Pussycat Dolls,245.18485,When I Grow Up,2018-11-30 15:53:37.796,,,,,
9,1542388621796,97,paid,633,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",RÃÂ¶yksopp,300.14649,What Else Is There?,2018-11-16 17:17:01.796,,,,,


In [30]:
# write songplays table to parquet files partitioned by year and month
#fields: songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplay_final_fields=['timestamp','userId','level','song_id','artist_id','sessionId','location','userAgent']
  
songplay_final=songplay.select(songplay_final_fields)\
    .withColumn("month", month(col('timestamp')))\
    .withColumn("year", year(col('timestamp')))\
    .withColumn("songplay_id",monotonically_increasing_id())
               

In [31]:
songplay_final.limit(5).toPandas()

Unnamed: 0,timestamp,userId,level,song_id,artist_id,sessionId,location,userAgent,month,year,songplay_id
0,2018-11-15 18:55:04.796,44,paid,,,619,"Waterloo-Cedar Falls, IA",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...,11,2018,0
1,2018-11-15 22:17:10.796,44,paid,,,619,"Waterloo-Cedar Falls, IA",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...,11,2018,1
2,2018-11-14 06:00:09.796,49,free,,,553,"San Francisco-Oakland-Hayward, CA",Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...,11,2018,2
3,2018-11-14 11:58:20.796,29,paid,,,559,"Atlanta-Sandy Springs-Roswell, GA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",11,2018,3
4,2018-11-28 09:26:16.796,82,paid,,,140,"Atlanta-Sandy Springs-Roswell, GA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3...",11,2018,4


In [32]:
songplay_final.write.partitionBy("year", "month").parquet(output_path + 'songplay/')