In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import *
from pyspark.sql import types as T


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [2]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [3]:
input_path = "s3a://udacity-dend/"
output_path = "s3a://kokobucket/"

In [4]:
# import boto3
# s3 = boto3.resource('s3')
# my_bucket = s3.Bucket(input_path)

In [4]:
# log-data/2018/11/2018-11-01-events.json
# log-data/2018/11/2018-11-02-events.json
log_path = 'log-data/2018/11/2018-11-03-events.json'
log_data_path = os.path.join(input_path, log_path)
log_data_path

's3a://udacity-dend/log-data/2018/11/2018-11-03-events.json'

In [5]:
song_data = 'song_data/A/A/A/*.json'
song_data_path = os.path.join(input_path, song_data)
song_data_path

's3a://udacity-dend/song_data/A/A/A/*.json'

In [7]:
# for my_bucket_object in my_bucket.objects.all():
#     print(my_bucket_object.key)

In [8]:
# for my_bucket_object in my_bucket.objects.all():
#     print(my_bucket_object)

# Table information

## Fact Table
#### songplays - records in log data associated with song plays i.e. records with page NextSong
`songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent`

## Dimension Tables
#### users - users in the app
`user_id, first_name, last_name, gender, level`
#### songs - songs in music database
`song_id, title, artist_id, year, duration`
#### artists - artists in music database
`artist_id, name, location, latitude, longitude`
#### time - timestamps of records in songplays broken down into specific units
`start_time, hour, day, week, month, year, weekday`

# Load in data from S3 udacity-dend bucket

In [6]:
df_log = spark.read.json(log_data_path)
df_log.take(5)

[Row(artist=None, auth='Logged Out', firstName=None, gender=None, itemInSession=0, lastName=None, length=None, level='free', location=None, method='PUT', page='Login', registration=None, sessionId=52, song=None, status=307, ts=1541207073796, userAgent=None, userId=''),
 Row(artist=None, auth='Logged In', firstName='Celeste', gender='F', itemInSession=1, lastName='Williams', length=None, level='free', location='Klamath Falls, OR', method='GET', page='Home', registration=1541077528796.0, sessionId=52, song=None, status=200, ts=1541207123796, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"', userId='53'),
 Row(artist='Mynt', auth='Logged In', firstName='Celeste', gender='F', itemInSession=2, lastName='Williams', length=166.94812, level='free', location='Klamath Falls, OR', method='PUT', page='NextSong', registration=1541077528796.0, sessionId=52, song='Playa Haters', status=200, ts=1541207150796, userAgent='"Mozill

In [10]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [11]:
df_song = spark.read.json(song_data_path)
df_song.take(5)

[Row(artist_id='ARTC1LV1187B9A4858', artist_latitude=51.4536, artist_location="Goldsmith's College, Lewisham, Lo", artist_longitude=-0.01802, artist_name='The Bonzo Dog Band', duration=301.40036, num_songs=1, song_id='SOAFBCP12A8C13CC7D', title='King Of Scurf (2007 Digital Remaster)', year=1972),
 Row(artist_id='ARA23XO1187B9AF18F', artist_latitude=40.57885, artist_location='Carteret, New Jersey', artist_longitude=-74.21956, artist_name='The Smithereens', duration=192.522, num_songs=1, song_id='SOKTJDS12AF72A25E5', title='Drown In My Own Tears (24-Bit Digitally Remastered 04)', year=0),
 Row(artist_id='ARSVTNL1187B992A91', artist_latitude=51.50632, artist_location='London, England', artist_longitude=-0.12714, artist_name='Jonathan King', duration=129.85424, num_songs=1, song_id='SOEKAZG12AB018837E', title="I'll Slap Your Face (Entertainment USA Theme)", year=2001),
 Row(artist_id='AR73AIO1187B9AD57B', artist_latitude=37.77916, artist_location='San Francisco, CA', artist_longitude=-122.

In [12]:
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



# Extract songs and artists from song_data

In [15]:
df_song.createOrReplaceTempView("song_data")

- __Extract songs table from song_data__

`song_id, title, artist_id, year, duration`

In [18]:
songs_table = spark.sql("""
                    SELECT DISTINCT
                        song_id, title, artist_id, year, duration
                    FROM song_data
                    WHERE song_id IS NOT NULL
""")

In [19]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOHOZBI12A8C132E3C|         Smash It Up|AR0MWD61187B9B2B12|2000|195.39546|
|SOXZYWX12A6310ED0C|     It's About Time|ARC1IHZ1187FB4E920|   0| 246.9873|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
|SOIGICF12A8C141BC5|        Game & Watch|AREWD471187FB49873|2004|580.54485|
|SOAPERH12A58A787DC|The One And Only ...|ARZ5H0P1187B98A1DD|   0|230.42567|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



- Write songs_table to s3 bucket in parquet file

In [19]:
songs_table.write.mode("overwrite").parquet(output_path + "songs.parquet")

- __Extract artists table from song_data__

`artist_id, name, location, latitude, longitude`

In [20]:
artists_table = spark.sql("""
                    SELECT DISTINCT
                        artist_id, 
                        artist_name AS name,
                        artist_location AS location,
                        artist_latitude AS latitude,
                        artist_longitude AS longitude
                    FROM song_data
                    WHERE artist_id IS NOT NULL
""")

In [21]:
artists_table.show(5)

+------------------+-----------------+-----------+--------+---------+
|         artist_id|             name|   location|latitude|longitude|
+------------------+-----------------+-----------+--------+---------+
|ARJNIUY12298900C91|     Adelitas Way|           |    null|     null|
|AR5LMPY1187FB573FE|Chaka Khan_ Rufus|Chicago, IL|41.88415|-87.63241|
|AR1C2IX1187B99BF74|  Broken Spindles|           |    null|     null|
|ARC1IHZ1187FB4E920|     Jamie Cullum|           |    null|     null|
|ARKYKXP11F50C47A6A| The Supersuckers|           |    null|     null|
+------------------+-----------------+-----------+--------+---------+
only showing top 5 rows



In [22]:
# Write to s3 bucket
#artists_table.write.mode("overwrite").parquet(output_path + "artists.parquet")

# Extract users, time and songplays from song_data and log_data

## Read in log data

In [23]:
log_data_path

's3a://udacity-dend/log-data/2018/11/2018-11-03-events.json'

In [24]:
df_log = spark.read.json(log_data_path)

In [25]:
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged Out,,,0,,,free,,PUT,Login,,52,,307,1541207073796,,
1,,Logged In,Celeste,F,1,Williams,,free,"Klamath Falls, OR",GET,Home,1541078000000.0,52,,200,1541207123796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53.0
2,Mynt,Logged In,Celeste,F,2,Williams,166.94812,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,Playa Haters,200,1541207150796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53.0
3,Taylor Swift,Logged In,Celeste,F,3,Williams,230.47791,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,You Belong With Me,200,1541207316796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53.0
4,Amy Winehouse,Logged In,Celeste,F,4,Williams,229.85098,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,Valerie,200,1541207546796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53.0


In [26]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [27]:
df_log = df_log.where("page = 'NextSong'")
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Mynt,Logged In,Celeste,F,2,Williams,166.94812,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,Playa Haters,200,1541207150796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53
1,Taylor Swift,Logged In,Celeste,F,3,Williams,230.47791,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,You Belong With Me,200,1541207316796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53
2,Amy Winehouse,Logged In,Celeste,F,4,Williams,229.85098,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,Valerie,200,1541207546796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53
3,Jimmy Eat World,Logged In,Celeste,F,5,Williams,285.83138,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,Dizzy,200,1541207775796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53
4,Maldita Nerea,Logged In,Anabelle,F,0,Simpson,241.162,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,158,Supelicula,200,1541254670796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69


- __Extract artists table from song_data__

`user_id, first_name, last_name, gender, level`

In [28]:
# Create a view
df_log.createOrReplaceTempView("log_data")

In [29]:
users_table = spark.sql("""
                SELECT DISTINCT
                    userId AS user_id,
                    firstName AS first_name,
                    lastName AS last_name,
                    gender,
                    level
                FROM log_data
                WHERE userId IS NOT NULL
""")

In [31]:
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     63|      Ayla|  Johnson|     F| free|
|      6|   Cecilia|    Owens|     F| free|
|     15|      Lily|     Koch|     F| paid|
|     95|      Sara|  Johnson|     F| paid|
|     69|  Anabelle|  Simpson|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [111]:
#users_table.write.mode("overwrite").parquet(output_path + "users.parquet")

In [38]:
#get_timestamp = udf(lambda x: datetime.fromtimestamp((x/1000.0), T.TimestampType()))
get_timestamp = udf(lambda x:( int(int(x)/ 1000)))
get_datetime = udf(lambda x: datetime.fromtimestamp(x), T.TimestampType())

In [39]:
df_log = df_log.withColumn("timestamp", get_timestamp(df_log.ts))

In [40]:
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp
0,Mynt,Logged In,Celeste,F,2,Williams,166.94812,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,Playa Haters,200,1541207150796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,1541207150
1,Taylor Swift,Logged In,Celeste,F,3,Williams,230.47791,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,You Belong With Me,200,1541207316796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,1541207316
2,Amy Winehouse,Logged In,Celeste,F,4,Williams,229.85098,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,Valerie,200,1541207546796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,1541207546
3,Jimmy Eat World,Logged In,Celeste,F,5,Williams,285.83138,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,Dizzy,200,1541207775796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,1541207775
4,Maldita Nerea,Logged In,Anabelle,F,0,Simpson,241.162,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,158,Supelicula,200,1541254670796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69,1541254670


In [41]:
df_log = df_log.withColumn("date", get_datetime(df_log.timestamp))

In [42]:
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp,date
0,Mynt,Logged In,Celeste,F,2,Williams,166.94812,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,Playa Haters,200,1541207150796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,1541207150,2018-11-03 01:05:50
1,Taylor Swift,Logged In,Celeste,F,3,Williams,230.47791,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,You Belong With Me,200,1541207316796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,1541207316,2018-11-03 01:08:36
2,Amy Winehouse,Logged In,Celeste,F,4,Williams,229.85098,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,Valerie,200,1541207546796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,1541207546,2018-11-03 01:12:26
3,Jimmy Eat World,Logged In,Celeste,F,5,Williams,285.83138,free,"Klamath Falls, OR",PUT,NextSong,1541078000000.0,52,Dizzy,200,1541207775796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,1541207775,2018-11-03 01:16:15
4,Maldita Nerea,Logged In,Anabelle,F,0,Simpson,241.162,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,158,Supelicula,200,1541254670796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69,1541254670,2018-11-03 14:17:50


In [43]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- date: timestamp (nullable = true)



In [44]:
df_log.createOrReplaceTempView("log_data")

- __Extract time table from log_data__

`start_time, hour, day, week, month, year, weekday`

In [45]:
time_table = spark.sql("""
                SELECT DISTINCT
                    date AS start_time,
                    hour(date) AS hour,
                    day(date) AS day,
                    weekofyear(date) AS week,
                    month(date) AS month,
                    year(date) AS year,
                    date_format(date, "EEEE") AS weekday
                FROM log_data
                WHERE date IS NOT NULL
""")

In [46]:
time_table.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-03 18:06:36,18,3,44,11,2018,Saturday
1,2018-11-03 15:59:38,15,3,44,11,2018,Saturday
2,2018-11-03 18:18:06,18,3,44,11,2018,Saturday
3,2018-11-03 18:29:03,18,3,44,11,2018,Saturday
4,2018-11-03 16:58:43,16,3,44,11,2018,Saturday


- __Extract songplays table from song_data and log_data__

`songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent`

In [47]:
df_song.columns

['artist_id',
 'artist_latitude',
 'artist_location',
 'artist_longitude',
 'artist_name',
 'duration',
 'num_songs',
 'song_id',
 'title',
 'year']

In [48]:
df_log.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId',
 'timestamp',
 'date']

In [61]:
songplays_table = spark.sql("""
                    SELECT DISTINCT
                        monotonically_increasing_id() AS songplay_id,
                        L.date AS start_time,
                        L.userId AS user_id,
                        L.level AS level,
                        S.song_id AS song_id,
                        S.artist_id AS artist_id,
                        L.sessionId AS session_id,
                        L.location AS location,
                        L.userAgent AS user_agent,
                        month(L.date) AS month,
                        year(L.date) AS year
                    FROM song_data AS S 
                        JOIN log_data AS L 
                            ON (S.artist_name = L.artist) AND (S.title = L.song) AND (S.duration = L.length)
""")

In [62]:
songplays_table.show(5)

+-----------+----------+-------+-----+-------+---------+----------+--------+----------+-----+----+
|songplay_id|start_time|user_id|level|song_id|artist_id|session_id|location|user_agent|month|year|
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+-----+----+
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+-----+----+



In [63]:
# spark.sql("""
#                 SELECT * 
#                 FROM song_data AS S 
#                         JOIN log_data AS L ON S.artist_name = L.artist""").limit(5).toPandas()