# 1.Open Spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [3]:
spark

# 2. Unzip dataset

In [18]:
import zipfile

In [78]:
with zipfile.ZipFile('./data/song-data.zip', 'r') as zip_ref:
    zip_ref.extractall('./data')

In [79]:
with zipfile.ZipFile('./data/log-data.zip', 'r') as zip_ref:
    zip_ref.extractall('./data/log-data')

# 3. Read Song file

In [4]:
song_path = './data/song_data/*/*/*/*.json'

In [5]:
song_data = spark.read.json(song_path)

In [6]:
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [7]:
song_data.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


## 3.1 song_table

### 3.1.1 Extract columns to create song table

In [8]:
song_table = song_data.select('song_id', 'title', 'artist_id',
                            'year', 'duration').dropDuplicates(['song_id'])

In [9]:
song_table.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOGOSOV12AF72A285E,¿Dónde va Chichi?,ARGUVEV1187B98BA17,1997,313.12934
1,SOMZWCG12A8C13C480,I Didn't Mean To,ARD7TVE1187B99BFB1,0,218.93179
2,SOUPIRU12A6D4FA1E1,Der Kleine Dompfaff,ARJIE2Y1187B994AB7,0,152.92036
3,SOXVLOJ12AB0189215,Amor De Cabaret,ARKRRTF1187B9984DA,0,177.47546
4,SOWTBJW12AC468AC6E,Broken-Down Merry-Go-Round,ARQGYP71187FB44566,0,151.84934


In [10]:
song_table = song_table.withColumnRenamed('title','song_title')\
            .withColumnRenamed('year','song_year')\
            .withColumnRenamed('duration','song_duration')

In [11]:
song_table.limit(5).toPandas()

Unnamed: 0,song_id,song_title,artist_id,song_year,song_duration
0,SOGOSOV12AF72A285E,¿Dónde va Chichi?,ARGUVEV1187B98BA17,1997,313.12934
1,SOMZWCG12A8C13C480,I Didn't Mean To,ARD7TVE1187B99BFB1,0,218.93179
2,SOUPIRU12A6D4FA1E1,Der Kleine Dompfaff,ARJIE2Y1187B994AB7,0,152.92036
3,SOXVLOJ12AB0189215,Amor De Cabaret,ARKRRTF1187B9984DA,0,177.47546
4,SOWTBJW12AC468AC6E,Broken-Down Merry-Go-Round,ARQGYP71187FB44566,0,151.84934


### 3.1.2 Write song table to parquet files

#### Option 1

In [97]:
# write songs table to parquet files partitioned by year and artist
song_table.write.partitionBy('song_year','artist_id').parquet('./data/song_table',
                              mode='overwrite')

#### Option 2

In [69]:
song_table.write.parquet('./data/song_table',
                              mode='overwrite',
                              partitionBy=['song_year', 'artist_id'])

## 3.2 artist_table

### 3.2.1 Extract columns to create artist table

artist_id, name, location, lattitude, longitude

In [54]:
song_data.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


In [12]:
artist_table = song_data.select('artist_id','artist_name','artist_location','artist_latitude','artist_longitude').dropDuplicates(['artist_id'])

In [13]:
artist_table.limit(5).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR9AWNF1187B9AB0B4,Kenny G featuring Daryl Hall,"Seattle, Washington USA",,
1,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
2,AR0RCMP1187FB3F427,Billie Jo Spears,"Beaumont, TX",30.08615,-94.10158
3,AREDL271187FB40F44,Soul Mekanik,,,
4,ARI3BMM1187FB4255E,Alice Stuart,Washington,38.8991,-77.029


### 3.2.2 Write artist table to parquet files

In [100]:
 # write artists table to parquet files
artist_table.write.parquet('./data/artist_table', mode ='overwrite')

# 4. Read Log file

In [8]:
log_path = './data/log-data/*.json'

In [9]:
log_data = spark.read.json(log_path)

In [10]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [11]:
log_data.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


## 4.1 User table

### 4.1.1 Create user table

user_id, first_name, last_name, gender, level

In [17]:
user_table = log_data.select('userId','firstName','lastName','gender','level').dropDuplicates(['userId'])

In [18]:
user_table.limit(5).toPandas()

Unnamed: 0,userId,firstName,lastName,gender,level
0,51,Maia,Burke,F,free
1,7,Adelyn,Jordan,F,free
2,15,Lily,Koch,F,paid
3,54,Kaleb,Cook,M,free
4,101,Jayden,Fox,M,free


In [19]:
user_table = user_table.withColumnRenamed('userId','user_id')\
.withColumnRenamed('firstName','user_first_name')\
.withColumnRenamed('lastName','user_last_name')\
.withColumnRenamed('gender','user_gender')\
.withColumnRenamed('level','user_level')

In [20]:
user_table.limit(5).toPandas()

Unnamed: 0,user_id,user_first_name,user_last_name,user_gender,user_level
0,51,Maia,Burke,F,free
1,7,Adelyn,Jordan,F,free
2,15,Lily,Koch,F,paid
3,54,Kaleb,Cook,M,free
4,101,Jayden,Fox,M,free


### 4.1.2 Write user table to parquet files

In [106]:
# write users table to parquet files
user_table.write.parquet('./data/user_table', mode = 'overwrite')

## 4.2 time table

### 4.2.1 Create time table

In [21]:
log_data.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


## Option 1

In [26]:
from pyspark.sql import functions as F

In [27]:
time_data = log_data.withColumn('start_time', 
                                F.from_unixtime(F.col('ts')/1000))

In [28]:
time_data.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9,2018-11-15 01:57:51
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12,2018-11-15 03:29:37


## Option 2

In [29]:
import datetime
from pyspark.sql.types import TimestampType

In [30]:
get_timestamp = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0), TimestampType())

In [31]:
timestamp = log_data.withColumn("start_time", get_timestamp('ts'))

time - timestamps of records in songplays broken down into specific units
start_time, hour, day, week, month, year, weekday

In [32]:
time_table = timestamp.select('ts', 'start_time')\
                        .withColumn('time_hour', F.hour('start_time'))\
                        .withColumn('time_week', F.weekofyear('start_time'))\
                        .withColumn('time_day', F.dayofmonth('start_time'))\
                        .withColumn('time_month', F.month('start_time'))\
                        .withColumn('time_year', F.year('start_time'))\
                        .withColumn('time_weekday', F.dayofweek('start_time')).dropDuplicates(['start_time'])

In [33]:
time_table.limit(5).toPandas()

Unnamed: 0,ts,start_time,time_hour,time_week,time_day,time_month,time_year,time_weekday
0,1542298336796,2018-11-15 16:12:16.796,16,46,15,11,2018,5
1,1542781092796,2018-11-21 06:18:12.796,6,47,21,11,2018,4
2,1542208815796,2018-11-14 15:20:15.796,15,46,14,11,2018,4
3,1541435519796,2018-11-05 16:31:59.796,16,45,5,11,2018,2
4,1542132026796,2018-11-13 18:00:26.796,18,46,13,11,2018,3


### 4.2.2 Write time table to parquet files

## Option 1 

In [210]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('time_year','time_month').parquet('./data/time_table', mode='overwrite')

## Option 2

In [211]:
time_table.write.parquet('./data/time_table',mode='overwrite', partitionBy = ['time_year','time_month'])

## 4.3 songplays table

### 4.3.1 Create songplay table

In [38]:
log_data.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [12]:
song_data.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


In [39]:
time_table.limit(2).toPandas()

Unnamed: 0,ts,start_time,time_hour,time_week,time_day,time_month,time_year,time_weekday
0,1542298336796,2018-11-15 16:12:16.796,16,46,15,11,2018,5
1,1542781092796,2018-11-21 06:18:12.796,6,47,21,11,2018,4


In [48]:
user_table.limit(5).toPandas()

Unnamed: 0,user_id,user_first_name,user_last_name,user_gender,user_level
0,51,Maia,Burke,F,free
1,7,Adelyn,Jordan,F,free
2,15,Lily,Koch,F,paid
3,54,Kaleb,Cook,M,free
4,101,Jayden,Fox,M,free


songplays - records in log data associated with song plays i.e. records with page NextSong
songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

In [36]:
log_data.createOrReplaceTempView('log_data')

In [37]:
song_data.createOrReplaceTempView('song_data')

In [38]:
time_table.createOrReplaceTempView('time_table')

In [44]:
songplay_table = spark.sql('''
                    SELECT monotonically_increasing_id() as songplay_id, t.start_time, l.userId as user_id, l.level, 
                    s.song_id, s.artist_id, 
                    t.time_year, t.time_month,
                    l.sessionId as session_id, l.location, l.userAgent as user_agent
                    FROM log_data l
                    LEFT JOIN song_data s on l.song = s.title and l.artist = s.artist_name and l.length = s.duration
                    LEFT JOIN time_table t on l.ts = t.ts
                    WHERE l.page = "NextSong" ''')

In [45]:
songplay_table.limit(5).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,time_year,time_month,session_id,location,user_agent
0,0,2018-11-15 00:30:26.796,26,free,,,2018,11,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
1,1,2018-11-15 00:41:21.796,26,free,,,2018,11,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
2,2,2018-11-15 00:45:41.796,26,free,,,2018,11,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
3,3,2018-11-15 03:44:09.796,61,free,,,2018,11,597,"Houston-The Woodlands-Sugar Land, TX","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
4,4,2018-11-15 05:48:55.796,80,paid,,,2018,11,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."


### 4.3.2 Write songplay_table to parquet file

In [72]:
# write songplays table to parquet files partitioned by year and month
songplay_table.write.partitionBy('time_year','time_month').parquet('./data/songplay_table', mode = 'overwrite')

In [73]:
songplay_table.write.parquet('./data/songplay_table', mode = 'overwrite', partitionBy = ['time_year','time_month'])

In [79]:
log_data.where(log_data.page == 'NextSong').limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
