In [1]:
import configparser
from datetime import datetime
import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECRET_ACCESS_KEY']

MissingSectionHeaderError: File contains no section headers.
file: 'dl.cfg', line: 1
"AWS_ACCESS_KEY_ID=''\n"

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
path = "data/2018-11-01-events.json"
user_log = spark.read.json(path)

In [5]:
user_log.take(1)

[Row(artist=None, auth='Logged In', firstName='Walter', gender='M', itemInSession=0, lastName='Frye', length=None, level='free', location='San Francisco-Oakland-Hayward, CA', method='GET', page='Home', registration=1540919166796.0, sessionId=38, song=None, status=200, ts=1541105830796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='39')]

In [6]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [7]:
user_log.createOrReplaceTempView("user_log_table")

In [12]:
user_id_table = spark.sql("SELECT distinct userid,firstName,lastName,gender,level FROM user_log_table")

In [13]:
user_id_table.show()

+------+---------+--------+------+-----+
|userid|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    39|   Walter|    Frye|     M| free|
|     8|   Kaylee| Summers|     F| free|
|    10|   Sylvie|    Cruz|     F| free|
|    26|     Ryan|   Smith|     M| free|
|   101|   Jayden|     Fox|     M| free|
+------+---------+--------+------+-----+



In [15]:
user_id_table.createOrReplaceTempView("user_id_table")

In [16]:
spark.sql("SELECT * FROM user_id_table").show() 

+------+---------+--------+------+-----+
|userid|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    39|   Walter|    Frye|     M| free|
|     8|   Kaylee| Summers|     F| free|
|    10|   Sylvie|    Cruz|     F| free|
|    26|     Ryan|   Smith|     M| free|
|   101|   Jayden|     Fox|     M| free|
+------+---------+--------+------+-----+



In [17]:
spark.udf.register("get_hour", lambda x:int(datetime.datetime.fromtimestamp(x/1000.0).hour))

<function __main__.<lambda>(x)>

In [18]:
spark.udf.register("get_year", lambda x:int(datetime.datetime.fromtimestamp(x/1000.0).year))

<function __main__.<lambda>(x)>

In [19]:
spark.udf.register("get_day", lambda x:int(datetime.datetime.fromtimestamp(x/1000.0).day))

<function __main__.<lambda>(x)>

In [20]:
spark.udf.register("get_month", lambda x:int(datetime.datetime.fromtimestamp(x/1000.0).month))

<function __main__.<lambda>(x)>

In [21]:
spark.udf.register("get_week", lambda x:int(datetime.datetime.fromtimestamp(x/1000.0).week))

<function __main__.<lambda>(x)>

In [22]:
time_table= spark.sql('''
          SELECT from_unixtime(ts/1000, 'yyyy-MM-dd HH:mm:ss') as start_time,
          get_hour(ts) as hour,
          get_day(ts) as day,
          weekofyear(from_unixtime(ts/1000, 'yyyy-MM-dd')) as week,
          get_month(ts) as month,
          get_year(ts) as year,
          dayofweek(from_unixtime(ts/1000, 'yyyy-MM-dd')) as weekday
          FROM user_log_table 
          LIMIT 1
          '''
          )

In [23]:
time_table.createOrReplaceTempView("time_table")

In [24]:
spark.sql("SELECT * FROM user_id_table").show() 

+------+---------+--------+------+-----+
|userid|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    39|   Walter|    Frye|     M| free|
|     8|   Kaylee| Summers|     F| free|
|    10|   Sylvie|    Cruz|     F| free|
|    26|     Ryan|   Smith|     M| free|
|   101|   Jayden|     Fox|     M| free|
+------+---------+--------+------+-----+



### songtable

In [25]:
path2 = "data/TRAAAAW128F429D538.json"

In [26]:
song_metadata_table = spark.read.json(path2)

In [27]:
song_metadata_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [28]:
song_metadata_table.take(1)

[Row(artist_id='ARD7TVE1187B99BFB1', artist_latitude=None, artist_location='California - LA', artist_longitude=None, artist_name='Casual', duration=218.93179, num_songs=1, song_id='SOMZWCG12A8C13C480', title="I Didn't Mean To", year=0)]

In [29]:
song_metadata_table.createOrReplaceTempView("song_metadata_table")

In [30]:
songs_table = spark.sql("SELECT song_id,title,artist_id, year,duration FROM song_metadata_table")

In [33]:
artist_table = spark.sql("SELECT artist_id, artist_name, artist_location, artist_latitude, artist_longitude FROM song_metadata_table")

In [34]:
songs_table.createOrReplaceTempView("songs_table")

In [35]:
artist_table.createOrReplaceTempView("artist_table")

In [36]:
songplay_table = spark.sql('''
               SELECT a.ts AS start_time, a.userId AS user_id,a.level, b.song_id, b.artist_id,
               a.sessionId,a.location,a.userAgent
               FROM user_log_table AS a LEFT JOIN song_metadata_table AS b
               ON a.artist = b.artist_name AND a.song = b.title

               ''')

In [37]:
songplay_table.show()

+-------------+-------+-----+-------+---------+---------+--------------------+--------------------+
|   start_time|user_id|level|song_id|artist_id|sessionId|            location|           userAgent|
+-------------+-------+-----+-------+---------+---------+--------------------+--------------------+
|1541105830796|     39| free|   null|     null|       38|San Francisco-Oak...|"Mozilla/5.0 (Mac...|
|1541106106796|      8| free|   null|     null|      139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|
|1541106106796|      8| free|   null|     null|      139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|
|1541106132796|      8| free|   null|     null|      139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|
|1541106352796|      8| free|   null|     null|      139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|
|1541106496796|      8| free|   null|     null|      139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|
|1541106673796|      8| free|   null|     null|      139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|


In [73]:
songplay = songplay_table.drop('songplay_id')

In [78]:
"s3://udacity-dend/" + "song_data"

's3://udacity-dend/song_data'