In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql import types as T
from schema import *

config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [2]:
from schema import *

In [3]:
os.environ['AWS_ACCESS_KEY_ID']     = config.get("AWS", 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get("AWS", 'AWS_SECRET_ACCESS_KEY')

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [5]:
spark = create_spark_session()
input_data = "s3a://udacity-dend/"
output_data = ""

In [6]:
logPath = 's3a://udacity-dend/log_data/2018/11/2018-11-12-events.json'
songPath = 's3a://udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json'

In [7]:
df_log = spark.read.json(logPath)

In [8]:
df_song = spark.read.json(songPath)

In [9]:
df_song.head()

Row(artist_id='ARJIE2Y1187B994AB7', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Line Renaud', duration=152.92036, num_songs=1, song_id='SOUPIRU12A6D4FA1E1', title='Der Kleine Dompfaff', year=0)

    # extract columns to create songs table
    songs_table = 
    
    # write songs table to parquet files partitioned by year and artist
    songs_table

    # extract columns to create artists table
    artists_table = 
    
    # write artists table to parquet files
    artists_table

In [10]:
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [11]:
# artist table
artist_id = "artist_id"
name = "artist_name"
location = "artist_location"
latitude = "artist_latitude"
longitude = "artist_longitude"

In [12]:
# song table
song_id = "song_id"
title = "title"
artist_id = "artist_id"
year = "year"
duration = "duration"

In [13]:
df_artistTable = df_song.select(artist_id, name, location, latitude, longitude)
df_artistTable.toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARJIE2Y1187B994AB7,Line Renaud,,,


In [14]:
df_songTable = df_song.select(song_id, title, artist_id, year, duration)
df_songTable.toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOUPIRU12A6D4FA1E1,Der Kleine Dompfaff,ARJIE2Y1187B994AB7,0,152.92036


In [15]:
# user table
user_id = "userId"
first_name = "firstName"
last_name = "lastName"
gender = "gender"
level = "level"

In [16]:
# time table
start_time = "ts"
hour = "hour"
day = "day"
week = "week"
month = "month"
year = "year"
weekday = "weekday"

In [17]:
df_log.limit(5).toPandas()["page"]

0        Home
1    NextSong
2    NextSong
3    NextSong
4        Home
Name: page, dtype: object

In [18]:
get_timestamp = F.udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.TimestampType()) 
get_hour = F.udf(lambda x: x.hour, T.IntegerType()) 
get_day = F.udf(lambda x: x.day, T.IntegerType()) 
get_week = F.udf(lambda x: x.isocalendar()[1], T.IntegerType()) 
get_month = F.udf(lambda x: x.month, T.IntegerType()) 
get_year = F.udf(lambda x: x.year, T.IntegerType()) 
get_weekday = F.udf(lambda x: x.weekday(), T.IntegerType()) 

In [19]:
df_log = df_log.withColumn("timestamp", get_timestamp(df_log.ts))
df_log = df_log.withColumn("hour", get_hour(df_log.timestamp))
df_log = df_log.withColumn("day", get_day(df_log.timestamp))
df_log = df_log.withColumn("week", get_week(df_log.timestamp))
df_log = df_log.withColumn("month", get_month(df_log.timestamp))
df_log = df_log.withColumn("year", get_year(df_log.timestamp))
df_log = df_log.withColumn("weekday", get_weekday(df_log.timestamp))
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,...,ts,userAgent,userId,timestamp,hour,day,week,month,year,weekday
0,,Logged In,Celeste,F,0,Williams,,free,"Klamath Falls, OR",GET,...,1541990217796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,2018-11-12 02:36:57.796,2,12,46,11,2018,0
1,Pavement,Logged In,Sylvie,F,0,Cruz,99.16036,free,"Washington-Arlington-Alexandria, DC-VA-MD-WV",PUT,...,1541990258796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",10,2018-11-12 02:37:38.796,2,12,46,11,2018,0
2,Barry Tuckwell/Academy of St Martin-in-the-Fie...,Logged In,Celeste,F,1,Williams,277.15873,free,"Klamath Falls, OR",PUT,...,1541990264796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,2018-11-12 02:37:44.796,2,12,46,11,2018,0
3,Gary Allan,Logged In,Celeste,F,2,Williams,211.22567,free,"Klamath Falls, OR",PUT,...,1541990541796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,2018-11-12 02:42:21.796,2,12,46,11,2018,0
4,,Logged In,Jacqueline,F,0,Lynch,,paid,"Atlanta-Sandy Springs-Roswell, GA",GET,...,1541990714796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",29,2018-11-12 02:45:14.796,2,12,46,11,2018,0


In [20]:
df_log = df_log.filter(df_log["page"] == "NextSong")

In [21]:
df_log.limit(5).toPandas()["page"]

0    NextSong
1    NextSong
2    NextSong
3    NextSong
4    NextSong
Name: page, dtype: object

In [22]:
df_userTable = df_log.select(user_id, first_name, last_name, gender, level)
df_userTable.toPandas()

Unnamed: 0,userId,firstName,lastName,gender,level
0,10,Sylvie,Cruz,F,free
1,53,Celeste,Williams,F,free
2,53,Celeste,Williams,F,free
3,53,Celeste,Williams,F,free
4,29,Jacqueline,Lynch,F,paid
5,29,Jacqueline,Lynch,F,paid
6,29,Jacqueline,Lynch,F,paid
7,29,Jacqueline,Lynch,F,paid
8,29,Jacqueline,Lynch,F,paid
9,29,Jacqueline,Lynch,F,paid


In [23]:
df_timeTable = df_log.select(start_time, hour, day, week, month, year, weekday)
df_timeTable.toPandas()

Unnamed: 0,ts,hour,day,week,month,year,weekday
0,1541990258796,2,12,46,11,2018,0
1,1541990264796,2,12,46,11,2018,0
2,1541990541796,2,12,46,11,2018,0
3,1541990752796,2,12,46,11,2018,0
4,1541990842796,2,12,46,11,2018,0
5,1541991021796,2,12,46,11,2018,0
6,1541991266796,2,12,46,11,2018,0
7,1541991432796,2,12,46,11,2018,0
8,1541991648796,3,12,46,11,2018,0
9,1541991804796,3,12,46,11,2018,0


In [24]:
# songplay table
songplay_id = "songplayIndex"
start_time = "ts"
user_id = "userId"
level = "level"
song_id = "song_id"
artist_id = "artist_id"
session_id = "sessionId"
location = "artist_location"
user_agent = "userAgent"


In [25]:
df_log = df_log.withColumn("rowIndex", F.monotonically_increasing_id())

In [26]:
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,...,userAgent,userId,timestamp,hour,day,week,month,year,weekday,rowIndex
0,Pavement,Logged In,Sylvie,F,0,Cruz,99.16036,free,"Washington-Arlington-Alexandria, DC-VA-MD-WV",PUT,...,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",10,2018-11-12 02:37:38.796,2,12,46,11,2018,0,0
1,Barry Tuckwell/Academy of St Martin-in-the-Fie...,Logged In,Celeste,F,1,Williams,277.15873,free,"Klamath Falls, OR",PUT,...,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,2018-11-12 02:37:44.796,2,12,46,11,2018,0,1
2,Gary Allan,Logged In,Celeste,F,2,Williams,211.22567,free,"Klamath Falls, OR",PUT,...,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,2018-11-12 02:42:21.796,2,12,46,11,2018,0,2
3,Charttraxx Karaoke,Logged In,Celeste,F,3,Williams,225.17506,free,"Klamath Falls, OR",PUT,...,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",53,2018-11-12 02:45:52.796,2,12,46,11,2018,0,3
4,The Libertines,Logged In,Jacqueline,F,1,Lynch,179.53914,paid,"Atlanta-Sandy Springs-Roswell, GA",PUT,...,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",29,2018-11-12 02:47:22.796,2,12,46,11,2018,0,4


# songplay

In [27]:
df_song2 = spark.read.json(songPath)

In [28]:
df_song2 = df_song2.select(song_id, artist_id, name, title)

In [29]:
df_song2.toPandas()

Unnamed: 0,song_id,artist_id,artist_name,title
0,SOUPIRU12A6D4FA1E1,ARJIE2Y1187B994AB7,Line Renaud,Der Kleine Dompfaff


In [30]:
# songplay table
songplay_id = "songplayIndex"
#start_time = "ts"
#user_id = "userId"
#level = "level"
#song_id = "song_id"
#artist_id = "artist_id"
session_id = "sessionId"
location = "location"
user_agent = "userAgent"
song = "song"
artist = "artist"

In [31]:
df_log2 = df_log2.select(start_time, user_id, level, session_id, location, user_agent, song, artist, year, month)

NameError: name 'df_log2' is not defined

In [None]:
df_log2.limit(5).toPandas()

In [None]:
final = df_song2.join(df_log2, (df_song2.artist_name == df_log2.artist) & (df_song2.title == df_log2.song))

In [None]:
final.show()

In [None]:
final = final.select(start_time, user_id, level, song_id, artist_id, session_id, location, user_agent, year, month)

In [None]:
final.show()

In [None]:
final2 = final.withColumn("songplay_id", F.monotonically_increasing_id())

In [None]:
final2.show()