In [1]:
import configparser
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import hour, dayofmonth, weekofyear, month, year, date_format
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType, TimestampType
from datetime import datetime as dt

#### Setup Spark Session

In [4]:
cfg = configparser.ConfigParser()
cfg.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = cfg.get('AWS', 'AWS_KEY')
os.environ['AWS_SECRET_ACCESS_KEY'] = cfg.get('AWS', 'AWS_SECRET')
input_data = cfg.get('DATA', 'DATA_INPUT')
output_data = cfg.get('DATA', 'DATA_OUTPUT')

In [3]:
%%time
spark = SparkSession.builder.config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.5').getOrCreate()

CPU times: user 70.4 ms, sys: 18.5 ms, total: 88.9 ms
Wall time: 13.6 s


#### Process Song Data
The files are partitioned by the first three letters of each song's track ID.

In [5]:
# get filepath to song data file
song_data = input_data + 'song_data/*/*/*/*.json'

# define schema for song data
song_schema = StructType([
    StructField('artist_id', StringType()),
    StructField('artist_name', StringType()),
    StructField('artist_location', StringType()),
    StructField('artist_latitude', DoubleType()),
    StructField('artist_longitude', DoubleType()),
    StructField('duration', DoubleType()),
    StructField('num_songs', IntegerType()),
    StructField('title', StringType()),
    StructField('year', IntegerType())
])

In [6]:
%%time
df = spark.read.json(song_data, schema = song_schema)
df.cache()
print('Songs records: {}'.format(df.count()))
df.show(5)

Songs records: 71
+------------------+--------------------+-----------------+---------------+----------------+---------+---------+--------------------+----+
|         artist_id|         artist_name|  artist_location|artist_latitude|artist_longitude| duration|num_songs|               title|year|
+------------------+--------------------+-----------------+---------------+----------------+---------+---------+--------------------+----+
|ARDR4AC1187FB371A1|Montserrat Caball...|                 |           null|            null|511.16363|        1|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|Mike Jones (Featu...|      Houston, TX|           null|            null|173.66159|        1|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|Morris Plains, NJ|       40.82624|       -74.47995|207.77751|        1|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|            Tiny Tim|     New York, NY|       40.71455|       -74.00712| 43.36281|        1|I Hold Your Hand ...|2000|
|ARDNS031

Songs Table, with files partitioned by year and then artist.

In [7]:
%%time
# song_id will be an autoincrementing column
songs_cols = ['title', 'artist_id', 'year', 'duration']
songs_table = df.select(songs_cols).dropDuplicates().withColumn('song_id', monotonically_increasing_id())
print('Songs count: {}'.format(songs_table.count()))
songs_table.show(5)

Songs count: 71
+--------------------+------------------+----+---------+-----------+
|               title|         artist_id|year| duration|    song_id|
+--------------------+------------------+----+---------+-----------+
|               Intro|AR558FS1187FB45658|2003| 75.67628|51539607552|
|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|68719476736|
|Kutt Free (DJ Vol...|ARNNKDK1187B98BBD5|   0|407.37914|68719476737|
|Get Your Head Stu...|AREDL271187FB40F44|   0| 45.66159|77309411328|
|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|94489280512|
+--------------------+------------------+----+---------+-----------+
only showing top 5 rows

CPU times: user 2.94 ms, sys: 4.33 ms, total: 7.27 ms
Wall time: 5.12 s


In [8]:
%%time
songs_table.write.partitionBy('year', 'artist_id').parquet(output_data + 'songs/') #.limit(5)

CPU times: user 80.9 ms, sys: 25.6 ms, total: 106 ms
Wall time: 10min 57s


Artists Table

In [9]:
%%time
artists_cols = ['artist_id',
                'artist_name as name',
                'artist_location as location',
                'artist_latitude as latitude',
                'artist_longitude as longitude']

# using selectExpr() because of 'as' column aliases present
artists_table = df.selectExpr(artists_cols).dropDuplicates()
print('Artists count: {}'.format(artists_table.count()))
artists_table.show(5)

Artists count: 69
+------------------+---------------+---------------+--------+----------+
|         artist_id|           name|       location|latitude| longitude|
+------------------+---------------+---------------+--------+----------+
|ARPBNLO1187FB3D52F|       Tiny Tim|   New York, NY|40.71455| -74.00712|
|ARXR32B1187FB57099|            Gob|               |    null|      null|
|AROGWRA122988FEE45|Christos Dantis|               |    null|      null|
|ARBGXIG122988F409D|     Steel Rain|California - SF|37.77916|-122.42005|
|AREVWGE1187B9B890A|     Bitter End|      Noci (BA)| -13.442|  -41.9952|
+------------------+---------------+---------------+--------+----------+
only showing top 5 rows

CPU times: user 5.11 ms, sys: 229 µs, total: 5.34 ms
Wall time: 3.3 s


In [10]:
%%time
artists_table.write.parquet(output_data + 'artists/') #.limit(5)

CPU times: user 57.1 ms, sys: 27.9 ms, total: 85 ms
Wall time: 8min 52s


#### Process Log Data
The log files in the dataset you'll be working with are partitioned by year and month.

In [36]:
%%time
log_data = input_data + 'log-data/*/*/*.json'
df = spark.read.json(log_data).where("page = 'NextSong'")
df.cache()
print("'NextSong' events count: {}".format(df.count()))
df.printSchema()

'NextSong' events count: 6820
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

CPU times: user 1.58 ms, sys: 3.59 ms, total: 5.17 ms
Wall time: 12.9 s


Users Table

In [12]:
%%time
users_cols = ['userId as user_id', 'firstName as first_name', 'lastName as last_name', 'gender', 'level']
users_table = df.selectExpr(users_cols).dropDuplicates()
print('Users count: {}'.format(users_table.count()))
users_table.show(5)

Users count: 104
+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     71|    Ayleen|     Wise|     F| free|
|     81|    Sienna|    Colon|     F| free|
|     87|    Dustin|      Lee|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows

CPU times: user 4.38 ms, sys: 720 µs, total: 5.1 ms
Wall time: 2.77 s


In [13]:
%%time
users_table.write.parquet(output_data + 'users/') #.limit(5)

CPU times: user 106 ms, sys: 9.4 ms, total: 115 ms
Wall time: 11min 57s


Time Table, with files partitioned by year and month.

In [37]:
%%time
to_timestamp = udf(lambda x : dt.utcfromtimestamp(x / 1e3), TimestampType())
df = df.withColumn('start_time', to_timestamp('ts'))

time_table = df.select('start_time').dropDuplicates() \
    .withColumn('hour', hour('start_time')) \
    .withColumn('day',  dayofmonth('start_time')) \
    .withColumn('week', weekofyear('start_time')) \
    .withColumn('month', month('start_time')) \
    .withColumn('year', year('start_time')) \
    .withColumn('weekday', date_format('start_time', 'E')) # https://stackoverflow.com/a/12781297
print('Timestamps count: {}'.format(time_table.count()))
time_table.show(5)

Timestamps count: 6813
+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-21 06:18:...|   6| 21|  47|   11|2018|    Wed|
|2018-11-21 18:49:...|  18| 21|  47|   11|2018|    Wed|
|2018-11-14 15:20:...|  15| 14|  46|   11|2018|    Wed|
|2018-11-05 16:31:...|  16|  5|  45|   11|2018|    Mon|
|2018-11-13 18:00:...|  18| 13|  46|   11|2018|    Tue|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows

CPU times: user 38.7 ms, sys: 13.6 ms, total: 52.4 ms
Wall time: 2.61 s


In [15]:
%%time
time_table.write.partitionBy('year', 'month').parquet(output_data + 'time/') #.limit(5)

CPU times: user 159 ms, sys: 60.6 ms, total: 219 ms
Wall time: 33min 40s


Songplays Table, with files partitioned by year and month.

In [38]:
%%time
# select specific columns to skip ambiguous ones
df = df.join(songs_table.select('song_id', 'title'), (df.song == songs_table.title)) \
       .join(artists_table.select('artist_id', 'name'), (df.artist == artists_table.name))

df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- song_id: long (nullable = false)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)

CPU times: user 6.99 ms, sys: 0 ns, total: 6.99 ms
Wall time: 51.4 ms


In [39]:
%%time
# songplay_id will be an autoincrementing column
# year and month will be added based on start_time
songplays_cols = ['start_time',
                  'userId as user_id',
                  'level',
                  'song_id',
                  'artist_id',
                  'sessionId as session_id',
                  'location',
                  'userAgent as user_agent']

songplays_table = df.selectExpr(songplays_cols).dropDuplicates() \
                    .withColumn('songplay_id', monotonically_increasing_id()) \
                    .withColumn('month', month('start_time')) \
                    .withColumn('year', year('start_time'))

print('Songplays count: {}'.format(songplays_table.count()))
songplays_table.show(5)

Songplays count: 1
+--------------------+-------+-----+-------------+------------------+----------+--------------------+--------------------+------------+-----+----+
|          start_time|user_id|level|      song_id|         artist_id|session_id|            location|          user_agent| songplay_id|month|year|
+--------------------+-------+-----+-------------+------------------+----------+--------------------+--------------------+------------+-----+----+
|2018-11-21 21:56:...|     15| paid|1348619730944|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|188978561024|   11|2018|
+--------------------+-------+-----+-------------+------------------+----------+--------------------+--------------------+------------+-----+----+

CPU times: user 44.6 ms, sys: 16.3 ms, total: 60.8 ms
Wall time: 10.2 s


In [40]:
%%time
songplays_table.write.partitionBy('year', 'month').parquet(output_data + 'songplays/')

CPU times: user 25.2 ms, sys: 13.9 ms, total: 39.2 ms
Wall time: 1min 3s
