In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read("dl.cfg")

os.environ["AWS_ACCESS_KEY_ID"]=config["SECRET"]["AWS_ACCESS_KEY_ID"]
os.environ["AWS_SECRET_ACCESS_KEY"]=config["SECRET"]["AWS_SECRET_ACCESS_KEY"]

In [3]:
spark = SparkSession \
            .builder \
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
            .getOrCreate()

In [4]:
input_data = config["STORAGE"]["INPUT_DATA"]
output_data = config["STORAGE"]["OUTPUT_DATA"]

In [5]:
# get filepath to song data file
song_data = os.path.join(input_data, "song_data/A/A/A/*.json")

# read song data file
song_df = spark.read.json(song_data)

In [6]:
song_df.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARTC1LV1187B9A4858,51.4536,"Goldsmith's College, Lewisham, Lo",-0.01802,The Bonzo Dog Band,301.40036,1,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),1972
1,ARA23XO1187B9AF18F,40.57885,"Carteret, New Jersey",-74.21956,The Smithereens,192.522,1,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remast...,0
2,ARSVTNL1187B992A91,51.50632,"London, England",-0.12714,Jonathan King,129.85424,1,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),2001
3,AR73AIO1187B9AD57B,37.77916,"San Francisco, CA",-122.42005,Western Addiction,118.07302,1,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,2005
4,ARXQBR11187B98A2CC,,"Liverpool, England",,Frankie Goes To Hollywood,821.05424,1,SOBRKGM12A8C139EF6,Welcome to the Pleasuredome,1985


In [7]:
# extract columns to create songs table
songs_table = song_df.select("song_id", "title", "artist_id", "year", "duration").dropDuplicates(["song_id"])
songs_table.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOAPERH12A58A787DC,The One And Only (Edited),ARZ5H0P1187B98A1DD,0,230.42567
1,SOSMJFC12A8C13DE0C,Is That All There Is?,AR1KTV21187B9ACD72,0,343.87546
2,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),ARTC1LV1187B9A4858,1972,301.40036
3,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),ARSVTNL1187B992A91,2001,129.85424
4,SODZYPO12A8C13A91E,Burn My Body (Album Version),AR1C2IX1187B99BF74,0,177.99791


In [8]:
# write songs table to parquet files partiti oned by year and artist
songs_table.write.mode("overwrite").partitionBy("year", "artist_id").parquet(os.path.join(output_data, "songs"))

In [10]:
# extract columns to create artists table
artists_table = song_df.selectExpr("artist_id", "artist_name as name", 
                              "artist_location as location", 
                              "artist_latitude as lattitude", 
                              "artist_longitude as longitude").dropDuplicates(["artist_id"])
artists_table.limit(5).toPandas()

Unnamed: 0,artist_id,name,location,lattitude,longitude
0,AR9Q9YC1187FB5609B,Quest_ Pup_ Kevo,New Jersey,,
1,ARA23XO1187B9AF18F,The Smithereens,"Carteret, New Jersey",40.57885,-74.21956
2,ARZKCQM1257509D107,Dataphiles,,,
3,ARC1IHZ1187FB4E920,Jamie Cullum,,,
4,ARGE7G11187FB37E05,Cyndi Lauper,"Brooklyn, NY",,


In [11]:
# write artists table to parquet files
artists_table.write.mode("overwrite").parquet(os.path.join(output_data, "artists"))

In [12]:
# get filepath to log data file
log_data = os.path.join(input_data, "log_data/2018/11/*.json")

# read log data file
log_df = spark.read.json(log_data)

In [13]:
log_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [14]:
# extract columns for users table    
users_table = log_df.selectExpr("userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level").where("page = 'NextSong' and userId IS NOT NULL")
users_table.limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,26,Ryan,Smith,M,free
2,26,Ryan,Smith,M,free
3,61,Samuel,Gonzalez,M,free
4,80,Tegan,Levine,F,paid


In [15]:
# write users table to parquet files
users_table.write.mode("overwrite").parquet(os.path.join(output_data, "users"))

In [16]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: str(int(int(x) / 1000)))
time_df = log_df.withColumn("timestamp", get_timestamp(log_df["ts"]))

# create input_data column from original timestamp column
get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000.0)))
time_df = log_df.withColumn("datetime", get_datetime(time_df["ts"]))

# extract columns to create time table
time_table = time_df.selectExpr("datetime as start_time", 
                           "hour(datetime) as hour",
                            "day(datetime) as day",
                            "weekofyear(datetime) as week",
                            "month(datetime) as month",
                            "year(datetime) as year",
                            "dayofweek(datetime) as weekday").dropDuplicates(["start_time"])
time_table.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 11:22:06.796000,11,15,46,11,2018,5
1,2018-11-15 18:09:32.796000,18,15,46,11,2018,5
2,2018-11-15 18:59:14.796000,18,15,46,11,2018,5
3,2018-11-15 19:01:55.796000,19,15,46,11,2018,5
4,2018-11-21 03:57:19.796000,3,21,47,11,2018,4


In [None]:
# write time table to parquet files partitioned by year and month
time_table.write.mode("overwrite").partitionBy("year", "month").parquet(os.path.join(output_data, "time"))

In [None]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = song_df.join(log_df, log_df.artist ==  song_df.artist_name, how = "inner") \
                    .selectExpr("monotonically_increasing_id() as songplay_id",
                                "to_timestamp(ts/1000) as start_time",
                                "month(to_timestamp(ts/1000)) as month",
                                "year(to_timestamp(ts/1000)) as year",
                                "userId as user_id",
                                "level as level",
                                "song_id as song_id",
                                "artist_id as artist_id",
                                "sessionId as session_id",
                                "location as location",
                                "userAgent as user_agent")
songplays_table.limit(5).toPandas()

In [None]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.mode("overwrite").partitionBy("year", "month").parquet(os.path.join(output_data, "songplays"))