In [1]:
from pyspark.sql import SparkSession
import os

In [2]:
spark = SparkSession \
    .builder \
    .appName("AWS Spark Processes Ran Locally") \
    .getOrCreate()

In [3]:
spark.sparkContext.getConf().getAll()

[('spark.driver.host', 'b826327bf2aa'),
 ('spark.app.id', 'local-1628169470559'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'AWS Spark Processes Ran Locally'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.port', '36933'),
 ('spark.ui.showConsoleProgress', 'true')]

In [4]:
log_data_path = "data/log-data/"

In [150]:
#this format is only used if the json file is not in json lines format. i.e. json lines - json file contains 1 json object per line of document
#spark.read.option("multiline", "true") - 
df = spark.read.json(log_data_path)
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [6]:
df.select("*").count()

8056

In [7]:
df.select("song").show()

+--------------------+
|                song|
+--------------------+
|       Sehr kosmisch|
|     The Big Gundown|
|            Marry Me|
|                null|
|                null|
|           Blackbird|
|                null|
|                null|
|                null|
|Best Of Both Worl...|
|Call Me If You Ne...|
|                Home|
|                 OMG|
|                null|
| Candle On The Water|
|            Our Song|
|Baby Boy [feat. B...|
|      Black Hole Sun|
|               Human|
|            Addicted|
+--------------------+
only showing top 20 rows



In [151]:
# filter by actions for song plays
df = df.filter(df.page == "NextSong")
df.select("*").count()

6820

In [157]:
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
get_timestamp = udf(lambda ts : datetime.fromtimestamp(ts/1000.0) if type(ts) is int or type(ts) is float else None, TimestampType())
df = df.withColumn('start_time', get_timestamp(df.ts))
df = df.withColumn('hour', hour(df.start_time))
df = df.withColumn('day', dayofmonth(df.start_time))
df = df.withColumn('week', weekofyear(df.start_time))
df = df.withColumn('month', month(df.start_time))
df = df.withColumn('year', year(df.start_time))
df = df.withColumn('weekday', dayofweek(df.start_time))
df = df.withColumn('weekend', (df.weekday == 7) | (df.weekday == 1))

# extract columns to create time table
#columns : start_time, hour, day, week, month, year, weekend
fields = ["start_time", "hour", "day", "week", "month", "year", "weekend"]
exprs = ["{} as {}".format(field,field) for field in fields]
time_table = df.selectExpr(*exprs)
time_table.show()

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekend|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:...|   0| 15|  46|   11|2018|  false|
|2018-11-15 00:41:...|   0| 15|  46|   11|2018|  false|
|2018-11-15 00:45:...|   0| 15|  46|   11|2018|  false|
|2018-11-15 03:44:...|   3| 15|  46|   11|2018|  false|
|2018-11-15 05:48:...|   5| 15|  46|   11|2018|  false|
|2018-11-15 05:53:...|   5| 15|  46|   11|2018|  false|
|2018-11-15 05:55:...|   5| 15|  46|   11|2018|  false|
|2018-11-15 06:01:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:07:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:10:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:13:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:14:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:17:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:18:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:21:...|   6| 15|  46|   11|2018| 

In [140]:
df.show()

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+----+---+----+-----+----+-------+-------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|          start_time|hour|day|week|month|year|weekday|weekend|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+----+---+----+-----+----+-------+-------+
|            Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12

In [9]:
from pyspark.sql.types import IntegerType
# extract columns for users table    
#user_id, first_name, last_name, gender, level
users_table = df.select(df.userId.cast("integer").alias("user_id"), df.firstName.alias("first_name"),df.lastName.alias("last_name"),df.gender,df.level)

users_table = users_table.dropDuplicates(['user_id'])
users_table.select("*").count()

96

In [10]:
users_table.sort(users_table.user_id).show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|      2|   Jizelle| Benjamin|     F| free|
|      3|     Isaac|   Valdez|     M| free|
|      4|    Alivia|  Terrell|     F| free|
|      5|    Elijah|    Davis|     M| free|
|      6|   Cecilia|    Owens|     F| free|
|      7|    Adelyn|   Jordan|     F| free|
|      8|    Kaylee|  Summers|     F| free|
|      9|     Wyatt|    Scott|     M| free|
|     10|    Sylvie|     Cruz|     F| free|
|     11| Christian|   Porter|     F| free|
|     12|    Austin|  Rosales|     M| free|
|     13|       Ava| Robinson|     F| free|
|     14|  Theodore|   Harris|     M| free|
|     15|      Lily|     Koch|     F| paid|
|     16|     Rylan|   George|     M| paid|
|     17|  Makinley|    Jones|     F| free|
|     18|     Jacob|   Rogers|     M| free|
|     19|   Zachary|   Thomas|     M| free|
|     20|     Aiden|  Ramirez|     M| paid|
|     22|      Sean|   Wilson|  

In [11]:
output_data = "data/"
# write users table to parquet files
users_table.write.parquet(os.path.join(output_data, "parquet_data/users"), "overwrite")

In [44]:
# create timestamp column from original timestamp column
# there is no need to do this function because you can just write a function to convert the ts to datetime

In [81]:
from pyspark.sql.functions import udf, expr, monotonically_increasing_id
from pyspark.sql.types import TimestampType, StructType, StructField, BooleanType
from pyspark.sql import DataFrameReader as dfReader
from datetime import datetime

@udf(TimestampType())
def datetimeFromTimestamp(ts):
    if type(ts) is int or type(ts) is float:
        return datetime.fromtimestamp(ts/1000.0)
    else:
        return None

@udf(StructType([StructField("start_time", TimestampType()),
                StructField("hour", IntegerType()),
                StructField("day", IntegerType()),
                StructField("week", IntegerType()),
                StructField("month", IntegerType()),
                StructField("year", IntegerType()),
                StructField("weekend", BooleanType())]))
def parseDateTime(ts):
    if type(ts) is int or type(ts) is float:
        dt = datetime.fromtimestamp(ts/1000.0)
        return{
        "start_time" : dt,
        "hour" : dt.hour,
        "day" : dt.day,
        "week" : dt.isocalendar()[1],
        "month" : dt.month,
        "year" : dt.year,
        "weekend" : True if dt.weekday() >= 5 and dt.weekday() <= 6 else False
        }
    else:
        dt = None
        return{
        "start_time" : dt,
        "hour" : dt,
        "day" : dt,
        "week" : dt,
        "month" : dt,
        "year" : dt,
        "weekend" : dt
        }

dtDf = df.select(parseDateTime(df.ts).alias("DateTimeStruct"))

dtDf.printSchema()

root
 |-- DateTimeStruct: struct (nullable = true)
 |    |-- start_time: timestamp (nullable = true)
 |    |-- hour: integer (nullable = true)
 |    |-- day: integer (nullable = true)
 |    |-- week: integer (nullable = true)
 |    |-- month: integer (nullable = true)
 |    |-- year: integer (nullable = true)
 |    |-- weekend: boolean (nullable = true)



In [66]:
# extract columns to create time table
#columns : start_time, hour, day, week, month, year, weekend
fields = ["start_time", "hour", "day", "week", "month", "year", "weekend"]
exprs = ["DateTimeStruct['{}'] as {}".format(field,field) for field in fields]
time_table = dtDf.selectExpr(*exprs)
time_table.show()

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekend|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:...|   0| 15|  46|   11|2018|  false|
|2018-11-15 00:41:...|   0| 15|  46|   11|2018|  false|
|2018-11-15 00:45:...|   0| 15|  46|   11|2018|  false|
|2018-11-15 03:44:...|   3| 15|  46|   11|2018|  false|
|2018-11-15 05:48:...|   5| 15|  46|   11|2018|  false|
|2018-11-15 05:53:...|   5| 15|  46|   11|2018|  false|
|2018-11-15 05:55:...|   5| 15|  46|   11|2018|  false|
|2018-11-15 06:01:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:07:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:10:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:13:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:14:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:17:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:18:...|   6| 15|  46|   11|2018|  false|
|2018-11-15 06:21:...|   6| 15|  46|   11|2018| 

In [52]:
time_partition_list = ['year', 'month']
# write time table to parquet files partitioned by year and month
time_table.write.parquet(os.path.join(output_data, "parquet_data/time"), "overwrite", time_partition_list)

In [147]:
# read in song data to use for songplays table
song_df = spark.read.parquet(os.path.join(output_data, "parquet_data/songs"))
#song_df = song_df.dropDuplicates(["title"])
song_df = song_df.drop('year')
song_df.show()

+------------------+--------------------+---------+------------------+
|           song_id|               title| duration|         artist_id|
+------------------+--------------------+---------+------------------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|ARPBNLO1187FB3D52F|
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|ARDNS031187B9924F0|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|ARLTWXK1187FB5A3F8|
|SOYMRWW12A6D4FAB14|The Moon And I (O...| 267.7024|ARKFYS91187B98E58F|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|279.97995|ARPFHN61187FB575F6|
|SOUDSGM12AC9618304|Insatiable (Instr...|266.39628|ARNTLGG11E2835DDB9|
|SOPEGZN12AB0181B3D|Get Your Head Stu...| 45.66159|AREDL271187FB40F44|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|173.66159|AREBBGV1187FB523D2|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|511.16363|ARDR4AC1187FB371A1|
|SOBBUGU12A8C13E95D|Setting Fire to S...|207.77751|ARMAC4T1187FB3FA4C|
|SOFFKZS12AB017F194|A Higher Place (A...|236.17261|ARBEBBY1187B9B43DB|
|SOBLG

In [156]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [158]:
# extract columns from joined song and log datasets to create songplays table
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
join_cond = [df.song == songDf.title, df.artist == songDf.artist_name, df.length == songDf.duration]

songDf = songDf.drop('year')

songsLogsJoinTable = df.join(songDf, join_cond, 'full')
songplaysUnclean = songsLogsJoinTable.withColumn("songplay_id", monotonically_increasing_id())
#songplaysUnclean = songplaysUnclean.withColumn('dtStruct', parseDateTime(songplaysUnclean.ts))
songplays_table = songplaysUnclean.select(songplaysUnclean.songplay_id, songplaysUnclean.start_time,\
                                          songplaysUnclean.userId.alias("user_id"), songplaysUnclean.level,\
                                          songplaysUnclean.song_id, songplaysUnclean.artist_id,\
                                          songplaysUnclean.itemInSession.alias("session_id"), songplaysUnclean.location,\
                                          songplaysUnclean.userAgent.alias("user_agent"), songplaysUnclean.year,\
                                          songplaysUnclean.month)
songplays_table.show()

+-----------+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|songplay_id|          start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|year|month|
+-----------+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|          0|2018-11-20 14:21:...|     44| paid|   null|     null|        19|Waterloo-Cedar Fa...|Mozilla/5.0 (Maci...|2018|   11|
|          1|2018-11-21 23:36:...|     15| paid|   null|     null|        32|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
|          2|2018-11-21 12:29:...|     15| paid|   null|     null|        51|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
|          3|2018-11-05 13:42:...|     44| paid|   null|     null|        57|Waterloo-Cedar Fa...|Mozilla/5.0 (Maci...|2018|   11|
|          4|2018-11-04 07:00:...|     25| paid|   null|     null|        28|    Ma

In [76]:
#songplaysUnclean.show()

In [91]:
songplays_partition_list = ['year', 'month']
# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(os.path.join(output_data, "parquet_data/songplays"), "overwrite", songplays_partition_list)

In [153]:
#you have to use a relative path here to dive down to where the json files live
song_data_path = "data/song_data/*/*/*/*.json"
#song data json files are contained within a tree of folders - we need to determine how to get data from all endpoints of the folders
#Recursive File Lookup
songDf = spark.read.format("json")\
    .option("recursiveFileLookup", "true")\
    .load(song_data_path)
songDf = songDf.dropDuplicates()

In [154]:
songDf.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [41]:
#songDf.select("artist_id", "artist_name").show()

In [39]:
songDf.select("*").count()

71

In [28]:
# extract columns to create songs table
# Columns : song_id, title, artist_id, year, duration
songs_table = df.select("song_id", "title","artist_id", "year", "duration")

# write songs table to parquet files partitioned by year and artist
songs_table.select("*").count()

71

In [89]:

# extract columns to create artists table
# Columns : artist_id, name, location, lattitude, longitude
artists_table = df.select("artist_id", df["artist_name"].alias("name"), df["artist_location"].alias("location"), df["artist_latitude"].alias("latitude"), df["artist_longitude"].alias("longitude"))

# write artists table to parquet files
artists_table.write.parquet(os.path.join(output_data, 'parquet_data/artists'), "overwrite")

In [128]:

partitionList = list(['year', 'artist_id'])
# write songs table to parquet files partitioned by year and artist

#One way to do it
#songs_table.write.partitionBy("year", "artist_id").mode("overwrite").save(os.path.join(output_data, 'parquet_song_data/songs'))

#The way the doco describes saving the parquet files
songs_table.write.parquet(os.path.join(output_data, 'parquet_data/songs'), "overwrite", partitionList)