In [1]:
import configparser
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType
import pyspark.sql.functions as F
import datetime

In [2]:
config = configparser.ConfigParser()

In [3]:
config.read('dl.cfg')

['dl.cfg']

In [4]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] =config['AWS']['AWS_SECRET_ACCESS_KEY']

In [5]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [31]:
input_data = "s3a://udacity-dend"

In [32]:
song_data = os.path.join(input_data, "song_data/*/*/*/*.json")

In [14]:
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [15]:
print('Extract columns to create song table')
artist_id = "artist_id"
artist_latitude = "artist_latitude"
artist_location = "artist_location"
artist_longitude = "artist_longitude"
artist_name = "artist_name"
duration = "duration"
num_songs = "num_songs"
song_id = "song_id"
title = "title"
year = "year"

Extract columns to create song table


In [None]:
print('Songs table: ')
song_table = song_df.select(song_id, title, artist_id, year, duration)
print(song_table.toPandas())

In [11]:
print (song_table)

DataFrame[song_id: string, title: string, artist_id: string, year: bigint, duration: double]


In [None]:
#create temp view
song_df_table = song_table.createOrReplaceTempView("song_df_table")

In [None]:
#create sql query to partition on year and artist_id
song_table = spark.sql(
                        """SELECT DISTINCT song_id, title, artist_id, year, duration
                        FROM song_df_table 
                        """
                      )

In [13]:
song_table.show()

+------------------+--------------------+------------------+----+----------+
|           song_id|               title|         artist_id|year|  duration|
+------------------+--------------------+------------------+----+----------+
|SOBTCUI12A8AE48B70|Faust: Ballet Mus...|ARSUVLW12454A4C8B8|   0|  94.56281|
|SOXRPUH12AB017F769|Exodus: Part I: M...|ARXQC081187FB4AD42|   0|1047.71873|
|SOVNKJI12A8C13CB0D|Take It To Da Hou...|ARWUNH81187FB4A3E0|2001| 227.10812|
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972| 301.40036|
|SOKTJDS12AF72A25E5|Drown In My Own T...|ARA23XO1187B9AF18F|   0|   192.522|
|SOYVBGZ12A6D4F92A8|Piano Sonata No. ...|ARLRWBW1242077EB29|   0| 221.70077|
|SOGXFIF12A58A78CC4|Hanging On (Mediu...|AR5LZJD1187FB4C5E5|   0| 204.06812|
|SOQAUGD12A58A7A92D|The Gold (Dubmati...|ARV3PXE1187B98E680|2008| 289.01832|
|SOEHWGF12A6D4F8B2B|Hips Don't Lie (f...|AR6PJ8R1187FB5AD70|   0| 217.36444|
|SOYQDUJ12A8C13F773|Shine On (Acousti...|ARWDPT81187B99C656|   0| 173.63546|

In [48]:
output_data = "s3a://dend-project-datalake"

In [None]:
song_table.write.partitionBy('year', 'artist_id').parquet(os.path.join(output_data, 'songs'), 'overwrite')

In [None]:
print('Artist table: ')
artist_table = song_df.select(artist_id, artist_name, artist_location, artist_longitude, artist_latitude)
print(artist_table.toPandas())

In [18]:
artist_df_table = artist_table.createOrReplaceTempView("artist_df_table")

In [19]:
#create sql query to 
artist_table = spark.sql(
                        """SELECT DISTINCT artist_id, artist_name, artist_location, artist_longitude, artist_latitude
                        FROM artist_df_table 
                        """
                      )

In [20]:
artist_table.show()

+------------------+--------------------+-----------------+----------------+---------------+
|         artist_id|         artist_name|  artist_location|artist_longitude|artist_latitude|
+------------------+--------------------+-----------------+----------------+---------------+
|ARMJAGH1187FB546F3|        The Box Tops|      Memphis, TN|       -90.04892|       35.14968|
|ARKCTSM11F4C83C839|     Igor Stravinsky|                 |            null|           null|
|AR8YYNB1187B9A4BB3|       Assemblage 23|                 |            null|           null|
|ARDDQKN1187FB50651|              Rednex|                 |            null|           null|
|AR2L9A61187B9ADDBC|     Tangerine Dream|  Berlin, Germany|        13.37698|       52.51607|
|AR54HGU1187FB5ACDE|          Girugämesh|                 |            null|           null|
|ARSZ7L31187FB4E610|           Devotchka|       Denver, CO|      -104.99226|       39.74001|
|ARH5D6D1187FB4C572|            Karunesh|  California - SF|      -122.

In [23]:
artist_table.write.parquet(os.path.join(output_data, 'artists'), 'overwrite')

In [None]:
log_data = os.path.join(input_data, "log_data/*/*/*.json")

In [7]:
log_df = spark.read.json(log_data)

In [8]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [9]:
log_df.count()

8056

In [10]:
artist= 'artist'
auth= 'auth'
firstName= 'firstName'
gender= 'gender'
itemInSession= 'itemInSession'
lastName= 'lastName'
length= 'length'
level= 'level'
location= 'location'
method= 'method'
page= 'page'
registration= 'registration'
sessionId= 'sessionId'
song= 'song'
status= 'status'
ts= 'ts'
userAgent= 'userAgent'
userId= 'userId'
timestamp='timestamp'
start_time='start_time'
hour = 'hour'
day='day'
week='week'
month='month'
year='year'
weekday='weekday'

In [11]:
print('Users table: ')
users_table = log_df.select(firstName, lastName, gender, level, userId)
print(users_table.limit(5).toPandas())

Users table: 
  firstName lastName gender level userId
0      Ryan    Smith      M  free     26
1      Ryan    Smith      M  free     26
2      Ryan    Smith      M  free     26
3     Wyatt    Scott      M  free      9
4    Austin  Rosales      M  free     12


In [12]:
#create the users temp_view 
users_df_table = users_table.createOrReplaceTempView("users_df_table")

In [13]:
#create sql query to 
users_table = spark.sql(
                        """SELECT DISTINCT firstName, lastName, gender, level, userId
                        FROM users_df_table 
                        """
                      )

In [53]:
users_table.show()

+---------+--------+------+-----+------+
|firstName|lastName|gender|level|userId|
+---------+--------+------+-----+------+
|     Lily|  Cooper|     F| free|    59|
|     Ryan|   Smith|     M| free|    26|
|  Shakira|    Hunt|     F| free|    84|
|  Kynnedi| Sanchez|     F| free|    89|
|   Jordan|   Hicks|     F| free|    37|
|   Martin| Johnson|     M| free|    55|
|    Wyatt|   Scott|     M| free|     9|
|   Jaleah|   Hayes|     F| paid|    70|
|   Cienna| Freeman|     F| free|    56|
|   Hayden|   Brock|     F| paid|    72|
|     null|    null|  null| free|      |
|  Jizelle|Benjamin|     F| free|     2|
|     Lily|   Burns|     F| free|    32|
|   Sylvie|    Cruz|     F| free|    10|
|     Kate| Harrell|     F| paid|    97|
|  Matthew|   Jones|     M| paid|    36|
|     Sara| Johnson|     F| paid|    95|
|   Austin| Rosales|     M| free|    12|
|Katherine|     Gay|     F| free|    57|
|    Tegan|  Levine|     F| paid|    80|
+---------+--------+------+-----+------+
only showing top

In [56]:
users_table.write.parquet(os.path.join(output_data, 'users'), 'overwrite')

In [14]:
#create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000), TimestampType())

In [15]:
log_df = log_df.withColumn("timestamp", get_timestamp(log_df.ts))

In [16]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [17]:
# create datetime column from original timestamp colum
get_datetime = udf(lambda x: F.to_date(x), TimestampType())

In [18]:
log_df = log_df.withColumn("start_time", get_timestamp(log_df.ts))

In [19]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- start_time: timestamp (nullable = true)



In [20]:
# extract columns to create time table
log_df = log_df.withColumn("hour", F.hour("timestamp"))
log_df = log_df.withColumn("day", F.dayofweek("timestamp"))
log_df = log_df.withColumn("week", F.weekofyear("timestamp"))
log_df = log_df.withColumn("month", F.month("timestamp"))
log_df = log_df.withColumn("year", F.year("timestamp"))
log_df = log_df.withColumn("weekday", F.dayofweek("timestamp"))

In [21]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullabl

In [22]:
artist= 'artist'
auth= 'auth'
firstName= 'firstName'
gender= 'gender'
itemInSession= 'itemInSession'
lastName= 'lastName'
length= 'length'
level= 'level'
location= 'location'
method= 'method'
page= 'page'
registration= 'registration'
sessionId= 'sessionId'
song= 'song'
status= 'status'
ts= 'ts'
userAgent= 'userAgent'
userId= 'userId'
timestamp='timestamp'
start_time='start_time'
hour = 'hour'
day='day'
week='week'
month='month'
year='year'
weekday='weekday'
timestamp='timestamp'
start_time = 'start_time'
hour = 'hour'
day = 'day'
week = 'week'
month = 'month'
year = 'year'
weekday = 'weekday'

In [23]:
time_table = log_df.select(start_time, hour, day, week, month, year, weekday)

In [24]:
time_table.show()

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:...|   0|  5|  46|   11|2018|      5|
|2018-11-15 00:41:...|   0|  5|  46|   11|2018|      5|
|2018-11-15 00:45:...|   0|  5|  46|   11|2018|      5|
|2018-11-15 01:57:...|   1|  5|  46|   11|2018|      5|
|2018-11-15 03:29:...|   3|  5|  46|   11|2018|      5|
|2018-11-15 03:44:...|   3|  5|  46|   11|2018|      5|
|2018-11-15 03:44:...|   3|  5|  46|   11|2018|      5|
|2018-11-15 05:34:...|   5|  5|  46|   11|2018|      5|
|2018-11-15 05:37:...|   5|  5|  46|   11|2018|      5|
|2018-11-15 05:48:...|   5|  5|  46|   11|2018|      5|
|2018-11-15 05:53:...|   5|  5|  46|   11|2018|      5|
|2018-11-15 05:55:...|   5|  5|  46|   11|2018|      5|
|2018-11-15 06:01:...|   6|  5|  46|   11|2018|      5|
|2018-11-15 06:01:...|   6|  5|  46|   11|2018|      5|
|2018-11-15 06:07:...|   6|  5|  46|   11|2018| 

In [25]:
time_table.count()

8056

In [26]:
#create temp view
time_table.createOrReplaceTempView("time_table_df")

In [27]:
#create sql query to partition on year and artist_id
time_table = spark.sql(
        """SELECT DISTINCT start_time, hour, day, week, month, year, weekday
           FROM time_table_df 
        """)

In [30]:
time_table.count()

8023

In [118]:
time_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'time'), 'overwrite')

In [39]:
log_df_table=log_df.createOrReplaceTempView("log_df_table")

In [40]:
song_df_table=song_df.createOrReplaceTempView("song_df_table")

In [50]:
songplays_table = spark.sql(
        """SELECT DISTINCT log_df_table.start_time, log_df_table.userId, log_df_table.level, log_df_table.sessionId, log_df_table.location,log_df_table.userAgent, 
        song_df_table.song_id, song_df_table.artist_id,time_table_df.year,time_table_df.month
        FROM log_df_table 
        INNER JOIN song_df_table 
        ON song_df_table.artist_name = log_df_table.artist 
        INNER JOIN time_table_df
        ON time_table_df.start_time = log_df_table.start_time
        """)

In [52]:
songplays_table.show()

+--------------------+------+-----+---------+--------------------+--------------------+------------------+------------------+----+-----+
|          start_time|userId|level|sessionId|            location|           userAgent|           song_id|         artist_id|year|month|
+--------------------+------+-----+---------+--------------------+--------------------+------------------+------------------+----+-----+
|2018-11-14 12:18:...|    29| paid|      559|Atlanta-Sandy Spr...|"Mozilla/5.0 (Mac...|SOFEAMD12AB017F1C2|AR8W8P31187B9A4063|2018|   11|
|2018-11-13 14:11:...|    29| paid|      486|Atlanta-Sandy Spr...|"Mozilla/5.0 (Mac...|SOLGHDZ12AB0183B11|ARF2EHS1187B994F4E|2018|   11|
|2018-11-21 14:52:...|    26| free|      811|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|SOLFPLO12AB0189A69|ARCBRSD11C8A42B798|2018|   11|
|2018-11-20 03:02:...|    85| paid|      658|       Red Bluff, CA|"Mozilla/5.0 (Mac...|SOLJCCO12A6701F987|ARR6LWJ1187FB44C8B|2018|   11|
|2018-11-27 17:39:...|    36| paid|      

In [53]:
songplays_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'songplays'), 'overwrite')