In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['aws']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['aws']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [4]:
input_path = "data"

In [5]:
output_path = "output/data-lake"

In [6]:
song_path = os.path.join(input_path, "song_data/*/*/*")

In [7]:
df = spark.read.format('json').load(song_path)
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [8]:
artists_table = df.select(\
                      'artist_id', \
                      col('artist_name').alias('name'), \
                      col('artist_location').alias('location'), \
                      col('artist_latitude').alias('latitude'), \
                      col('artist_longitude').alias('longitude')).dropDuplicates(subset=['artist_id'])
artists_table.show(3)

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|AR9AWNF1187B9AB0B4|Kenny G featuring...|Seattle, Washingt...|    null|     null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|  8.4177|-80.11278|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|30.08615|-94.10158|
+------------------+--------------------+--------------------+--------+---------+
only showing top 3 rows



In [9]:
songs_table = df.select('song_id', 'title', 'artist_id', 'year', 'duration').dropDuplicates(subset=['song_id'])
songs_table.show(3)

+------------------+-------------------+------------------+----+---------+
|           song_id|              title|         artist_id|year| duration|
+------------------+-------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|  ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOMZWCG12A8C13C480|   I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
|SOUPIRU12A6D4FA1E1|Der Kleine Dompfaff|ARJIE2Y1187B994AB7|   0|152.92036|
+------------------+-------------------+------------------+----+---------+
only showing top 3 rows



In [10]:
table_path = os.path.join(output_path, "songs")
songs_table.write.mode('overwrite').partitionBy("year", "artist_id").parquet(table_path)

In [11]:
table_path = os.path.join(output_path, "artists")
artists_table.write.mode('overwrite').parquet(table_path)

In [12]:
log_path = os.path.join(input_path, "events")

In [13]:
df = spark.read.format('json').load(log_path)
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [14]:
df = df.filter(df.page == 'NextSong')

In [15]:
users_table = df.select(\
                                 col('userId').alias('user_id'), \
                                 col('firstName').alias('first_name'), \
                                 col('lastName').alias('last_name'), 'gender', 'level').dropDuplicates(subset=['user_id'])

In [16]:
table_path = os.path.join(output_path, "users")
users_table.write.mode('overwrite').parquet(table_path)

In [17]:
from datetime import datetime
from pyspark.sql.types import TimestampType

get_timestamp = udf(lambda ts: datetime.fromtimestamp(float(ts)/1000.0), TimestampType())
df = df.withColumn('timestamp', get_timestamp('ts'))

In [18]:
from pyspark.sql.functions import from_unixtime

from pyspark.sql.types import DateType

get_datestamp = udf(lambda ts: datetime.fromtimestamp(float(ts)/1000.0), DateType())
df = df.withColumn("datetime", get_datestamp('ts'))

In [19]:
from pyspark.sql.functions import year, month, hour, dayofweek, dayofyear, weekofyear

time_table = df.select(\
                        col('ts').alias('start_time'), 'timestamp', 'datetime',
                        hour('timestamp').alias('hour'),\
                        dayofyear('timestamp').alias('day'),\
                       weekofyear('timestamp').alias('week'),\
                       month('timestamp').alias('month'),\
                       year('timestamp').alias('year'),\
                       dayofweek('timestamp').alias('weekday')).dropDuplicates(subset=['start_time'])

In [20]:
table_path = os.path.join(output_path, "time")
time_table.write.mode('overwrite').partitionBy("year", "month").parquet(table_path)

In [21]:
table_path = os.path.join(output_path, "songs")
songs_table = spark.read.parquet(table_path)
songs_table.show(3)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|2000|ARPBNLO1187FB3D52F|
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|2005|ARDNS031187B9924F0|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|   0|ARLTWXK1187FB5A3F8|
+------------------+--------------------+---------+----+------------------+
only showing top 3 rows



In [22]:
from pyspark.sql.functions import monotonically_increasing_id

songplays_table = df.join(songs_table,  df.song == songs_table.title)\
                    .withColumn('songplay_id', monotonically_increasing_id())\
                    .withColumn('year', year('timestamp')) \
                    .withColumn('month', month('timestamp')) \
                    .select(\
                         'songplay_id',
                         col('ts').alias('start_time'),
                        col('userId').alias('user_id'),
                        'level',
                        'song_id',
                        'artist_id',
                        col('sessionId').alias('session_id'),
                        'location',
                        col('userAgent').alias('user_agent'),
                        'year',
                        'month')
songplays_table.show(3)

+-----------+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|songplay_id|   start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|year|month|
+-----------+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|          0|1542837407796|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
|          1|1542171963796|     10| free|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|2018|   11|
|          2|1542618860796|     24| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       672|Lake Havasu City-...|"Mozilla/5.0 (Win...|2018|   11|
+-----------+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----

In [23]:
table_path = os.path.join(output_path, "songplays")
songplays_table.write.mode('overwrite').partitionBy("year", "month").parquet(table_path)