In [154]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lower, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

from pyspark.sql.types import StructType as R, StructField as Fld, \
     DoubleType as Dbl, LongType as Long, StringType as Str, \
     IntegerType as Int, DecimalType as Dec, DateType as Date, \
     TimestampType as Stamp


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


In [155]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

In [156]:
song_data = "song-data/song_data/*/*/*/"
#song-data/song_data/A/A/A/TRAAAAW128F429D538.json

song_schema = R([
    Fld("num_songs", Int()),
    Fld("artist_id", Str()),
    Fld("artist_latitude", Dec()),
    Fld("artist_longitude", Dec()),
    Fld("artist_location", Str()),
    Fld("artist_name", Str()),
    Fld("song_id", Str()),
    Fld("title", Str()),
    Fld("duration", Dbl()),
    Fld("year", Long())
])
song_df = spark.read.json(song_data,schema=song_schema)

In [157]:
song_df.printSchema()

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: decimal(10,0) (nullable = true)
 |-- artist_longitude: decimal(10,0) (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: long (nullable = true)



In [158]:
song_df.show(5, truncate = False)

+---------+------------------+---------------+----------------+-----------------+----------------------------------------------------------------------------------------------+------------------+----------------------------------------------------+---------+----+
|num_songs|artist_id         |artist_latitude|artist_longitude|artist_location  |artist_name                                                                                   |song_id           |title                                               |duration |year|
+---------+------------------+---------------+----------------+-----------------+----------------------------------------------------------------------------------------------+------------------+----------------------------------------------------+---------+----+
|1        |ARDR4AC1187FB371A1|null           |null            |                 |Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti|SOBAYLL12A8C138AF9|Sono andati? 

In [159]:
columns = ['title','artist_name']

for colName in columns:
    song_df = song_df.withColumn(colName, lower(col(colName)))

In [160]:
# extract columns to create songs table
songs_table = song_df.select("song_id",
                        "title",
                        "artist_id",
                        "year",
                        "duration").dropDuplicates(["song_id"])

In [161]:
song_df.select("song_id",
                        "title",
                        "artist_id",
                        "year",
                        "duration").show(5,False)

+------------------+----------------------------------------------------+------------------+----+---------+
|song_id           |title                                               |artist_id         |year|duration |
+------------------+----------------------------------------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|sono andati? fingevo di dormire                     |ARDR4AC1187FB371A1|0   |511.16363|
|SOOLYAZ12A6701F4A6|laws patrolling (album version)                     |AREBBGV1187FB523D2|0   |173.66159|
|SOBBUGU12A8C13E95D|setting fire to sleeping giants                     |ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|i hold your hand in mine [live at royal albert hall]|ARPBNLO1187FB3D52F|2000|43.36281 |
|SONYPOM12A8C13B2D7|i think my wife is running around on me (taco hell) |ARDNS031187B9924F0|2005|186.48771|
+------------------+----------------------------------------------------+------------------+----+---------+
only showing top 5 rows



In [162]:
songs_table.show(5, False)

+------------------+--------------------------+------------------+----+---------+
|song_id           |title                     |artist_id         |year|duration |
+------------------+--------------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|¿dónde va chichi?         |ARGUVEV1187B98BA17|1997|313.12934|
|SOMZWCG12A8C13C480|i didn't mean to          |ARD7TVE1187B99BFB1|0   |218.93179|
|SOUPIRU12A6D4FA1E1|der kleine dompfaff       |ARJIE2Y1187B994AB7|0   |152.92036|
|SOXVLOJ12AB0189215|amor de cabaret           |ARKRRTF1187B9984DA|0   |177.47546|
|SOWTBJW12AC468AC6E|broken-down merry-go-round|ARQGYP71187FB44566|0   |151.84934|
+------------------+--------------------------+------------------+----+---------+
only showing top 5 rows



In [163]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.parquet("output_data/" + "songs_table.parquet",
                          partitionBy = ["year", "artist_id"],
                          mode = "overwrite") 

In [164]:
# extract columns to create artists table
artists_table = song_df.select("artist_id",
                          "artist_name",
                          "artist_location",
                          "artist_latitude",
                          "artist_longitude").dropDuplicates(["artist_id"])

In [165]:
# write artists table to parquet files
artists_table.write.parquet("output_data/" + "artists_table.parquet", mode = "overwrite")

In [166]:
log_data = "log-data/"
#log-data/2018-11-01-events.json

log_schema = R([
    Fld("artist", Str()),
    Fld("auth", Str()),
    Fld("firstName", Str()),
    Fld("gender", Str()),
    Fld("itemInSession", Long()),
    Fld("lastName", Str()),
    Fld("length", Dbl()),
    Fld("level", Str()),
    Fld("location", Str()),
    Fld("method", Str()),
    Fld("page", Str()),
    Fld("registration", Dbl()),
    Fld("sessionId", Str()),
    Fld("song", Str()),
    Fld("status", Str()),
    Fld("ts", Long()),
    Fld("userAgent", Str()),
    Fld("userId", Str())
])

log_df = spark.read.json(log_data, schema=log_schema)

In [167]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: string (nullable = true)
 |-- song: string (nullable = true)
 |-- status: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [168]:
# filter by actions for song plays
log_df = log_df.filter(log_df.page == "NextSong")

In [169]:
columns = ['song','artist']

for colName in columns:
    log_df = log_df.withColumn(colName, lower(col(colName)))

In [170]:
log_df.select('userId','song').show(5, False)

+------+----------------------------------------------+
|userId|song                                          |
+------+----------------------------------------------+
|26    |sehr kosmisch                                 |
|26    |the big gundown                               |
|26    |marry me                                      |
|61    |blackbird                                     |
|80    |best of both worlds (remastered album version)|
+------+----------------------------------------------+
only showing top 5 rows



In [171]:
log_df.select('userId','song').show(5, False)

+------+----------------------------------------------+
|userId|song                                          |
+------+----------------------------------------------+
|26    |sehr kosmisch                                 |
|26    |the big gundown                               |
|26    |marry me                                      |
|61    |blackbird                                     |
|80    |best of both worlds (remastered album version)|
+------+----------------------------------------------+
only showing top 5 rows



In [172]:
log_df = log_df.withColumn('timestamp',( (log_df.ts.cast('float')/1000).cast("timestamp")) )

In [173]:
log_df.select('ts','timestamp').show(5, False)

+-------------+-----------------------+
|ts           |timestamp              |
+-------------+-----------------------+
|1542241826796|2018-11-15 00:29:39.712|
|1542242481796|2018-11-15 00:40:35.072|
|1542242741796|2018-11-15 00:44:57.216|
|1542253449796|2018-11-15 03:44:05.12 |
|1542260935796|2018-11-15 05:48:36.224|
+-------------+-----------------------+
only showing top 5 rows



In [174]:
# extract columns for users table    
users_table = log_df.selectExpr("userId as user_id",
                            "firstName as first_name",
                            "lastName as last_name",
                            "gender",
                            "level").dropDuplicates(["user_id"]) 
users_table.show(5, truncate = False)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|51     |Maia      |Burke    |F     |free |
|7      |Adelyn    |Jordan   |F     |free |
|15     |Lily      |Koch     |F     |paid |
|54     |Kaleb     |Cook     |M     |free |
|101    |Jayden    |Fox      |M     |free |
+-------+----------+---------+------+-----+
only showing top 5 rows



In [175]:
# write users table to parquet files
users_table.write.parquet("output_data/" + "users_table.parquet",mode = "overwrite")

In [176]:
# extract columns to create time table
time_table = log_df.selectExpr("timestamp as start_time",
                           "hour(timestamp) as hour",
                           "dayofmonth(timestamp) as day",
                           "weekofyear(timestamp) as week",
                           "month(timestamp) as month",
                           "year(timestamp) as year",
                           "dayofweek(timestamp) as weekday"
                           ).dropDuplicates(["start_time"])

time_table.show(5, False)

+-----------------------+----+---+----+-----+----+-------+
|start_time             |hour|day|week|month|year|weekday|
+-----------------------+----+---+----+-----+----+-------+
|2018-11-28 06:22:49.344|6   |28 |48  |11   |2018|4      |
|2018-11-28 12:14:31.936|12  |28 |48  |11   |2018|4      |
|2018-11-28 17:07:15.584|17  |28 |48  |11   |2018|4      |
|2018-11-28 20:41:20.64 |20  |28 |48  |11   |2018|4      |
|2018-11-05 18:13:05.152|18  |5  |45  |11   |2018|2      |
+-----------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [177]:
# write time table to parquet files partitioned by year and month
time_table.write.parquet("output_data/"  + "time_table.parquet",
                         partitionBy = ["year", "month", "week"],
                         mode = "overwrite")

In [178]:
song_log_joined_df = log_df.join(song_df, (log_df.song == song_df.title) & (log_df.artist == song_df.artist_name) & (log_df.length == song_df.duration), how='inner')

In [179]:
log_df.count()

6820

In [180]:
song_df.count()

71

In [181]:
song_log_joined_df.count()

1

In [185]:
from pyspark.sql.functions import monotonically_increasing_id
songplays_table = song_log_joined_df.distinct() \
                    .selectExpr("userId as user_id", "timestamp", "song_id", "artist_id", "level", "sessionId", "location", "userAgent", \
                                "year(timestamp) as year","month(timestamp) as month","weekofyear(timestamp) as week") \
                    .withColumn("songplay_id", monotonically_increasing_id()) \
                    .withColumnRenamed("userId","user_id")        \
                    .withColumnRenamed("timestamp","start_time")  \
                    .withColumnRenamed("sessionId","session_id")  \
                    .withColumnRenamed("userAgent", "user_agent")         

songplays_table.show(5)

+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+----+-----+----+-------------+
|user_id|          start_time|           song_id|         artist_id|level|session_id|            location|          user_agent|year|month|week|  songplay_id|
+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+----+-----+----+-------------+
|     15|2018-11-21 21:56:...|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4| paid|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|  47|1649267441664|
+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+----+-----+----+-------------+



In [186]:
songplays_table.write.parquet("output_data/" + "songplays_table.parquet",
                              partitionBy=["year", "month","week"],
                              mode="overwrite")