In [1]:
#import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_timestamp, to_date
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StringType, IntegerType, DoubleType, TimestampType
                              

# config = configparser.ConfigParser()
# config.read('dl.cfg')

# os.environ['fs.s3a.access.key']=config['AWS']['AWS_ACCESS_KEY_ID']
# os.environ['fs.s3a.secret.key']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [2]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [3]:
from zipfile import ZipFile
# 1st we get the log_data 
with ZipFile('data/log-data.zip', 'r') as zip_ref:
    zip_ref.extractall('data/log-log_data')
log_data = spark.read.json('data/log-log_data')    
log_data.toPandas().head(2)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


In [4]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [5]:
# now we get the song_data
with ZipFile('data/song-data.zip','r') as z_file:
    z_file.extractall('data/song-data/')
song_data = spark.read.json('data/song-data/song_data/A/A/A/*.json' )
song_data.limit(2).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARKFYS91187B98E58F,,,,Jeff And Sheri Easter,267.7024,1,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),0
1,AR10USD1187B99F3F1,,"Burlington, Ontario, Canada",,Tweeterfriendly Music,189.57016,1,SOHKNRJ12A6701D1F8,Drop of Rain,0


In [6]:
song_data.count()

11

In [7]:
# extract columns to create songs table
songs_table = song_data.select('song_id', 'title', 'artist_id', 'year', 'duration')
songs_table.limit(2).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),ARKFYS91187B98E58F,0,267.7024
1,SOHKNRJ12A6701D1F8,Drop of Rain,AR10USD1187B99F3F1,0,189.57016


In [8]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy('year', 'artist_id').mode('overwrite').parquet('data_parquet/song_table.parquet')

In [9]:
# extract columns to create artists table
artists_table = song_data.select('artist_id',
                                 'artist_name',
                                 'artist_location',
                                 'artist_latitude',
                                 'artist_longitude')
artists_table.limit(2).toPandas()


Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARKFYS91187B98E58F,Jeff And Sheri Easter,,,
1,AR10USD1187B99F3F1,Tweeterfriendly Music,"Burlington, Ontario, Canada",,


In [10]:
# now we filter only page = NextSong
log_data = log_data.filter('page = "NextSong"')
log_data.toPandas().head(2)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


In [11]:
# extract columns for users table
log_data.createOrReplaceTempView('log_table')
users_table = spark.sql("""
    SELECT userId as user_id,
           firstName as first_name,
           lastName as last_name,
           gender,
           level
    FROM log_table

""")
users_table.limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,26,Ryan,Smith,M,free
2,26,Ryan,Smith,M,free
3,61,Samuel,Gonzalez,M,free
4,80,Tegan,Levine,F,paid


In [12]:
# write users_table to parquet files
users_table.write.mode('overwrite').parquet('data_parquet/users_table.parquet')

In [13]:
# now we focus on the time table
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [14]:
# change ts column to timestamp
log_data = log_data.withColumn('ts1', to_timestamp(log_data.ts/1000))
log_data.limit(2).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,ts1
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796


In [15]:
log_data = log_data.withColumn('ts_date', to_date(to_timestamp(log_data.ts/1000)))
log_data.limit(2).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,ts1,ts_date
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796,2018-11-15
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796,2018-11-15


In [16]:
# now let's create the time table 
log_data.createOrReplaceTempView('log_table')
time_table = spark.sql("""
                SELECT ts1 AS start_time,
                       EXTRACT(hour from ts_date) AS hour,
                       EXTRACT(day from ts_date) AS day,
                       EXTRACT(week from ts_date) AS week,
                       EXTRACT(month from ts_date) AS month,
                       EXTRACT(year from ts_date) AS year,
                       WEEKDAY(ts_date) AS weekday     
                FROM log_table
""")
time_table.limit(2).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 00:30:26.796,0,15,46,11,2018,3
1,2018-11-15 00:41:21.796,0,15,46,11,2018,3


In [17]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year','month').mode('overwrite').parquet('data_parquet/time_table.parquet')

In [18]:
# here we join song_table and log_table to get songplays table
song_data.createOrReplaceTempView('song_table')
log_data.createOrReplaceTempView('log_table')
songplays_table = spark.sql('''
                    SELECT  log.ts1 AS start_time,
                            log.userId AS user_id,
                            log.level,
                            s.song_id,
                            s.artist_id,
                            log.sessionId AS session_id,
                            log.location,
                            log.userAgent AS user_agent,
                            month(log.ts1) AS month,
                            year(log.ts1) AS year        
                    FROM song_table s
                    JOIN log_table log
                    ON log.artist=s.artist_name
                      AND log.song=s.title
''')
songplays_table.toPandas()


Unnamed: 0,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,month,year


In [19]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy('year','month').\
                mode('overwrite').\
                parquet('data_parquet/songplays_table.parquet')