In [22]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import types as T

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

In [23]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

## Process Song data

In [24]:
input_data = "s3a://udacity-dend/"
output_data = "

song_data = os.path.join(input_data,"song_data/A/A/A/*.json")
    
# read song data file
df = spark.read.json(song_data)

print((df.count(), len(df.columns)))

# extract columns to create songs table
songs_table = df['song_id','title','artist_id','year','duration']
    
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy('year','artist_id').parquet(os.path.join(output_data,"songs.parquet"),'overwrite')

test = spark.read.parquet(os.path.join(output_data,"songs.parquet"))
#test.show(5)

#look at table with pandas
pd.set_option('max_colwidth',200)
test.limit(5).toPandas()
              

(24, 10)


Unnamed: 0,song_id,title,duration,year,artist_id
0,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remastered 04),192.522,0,ARA23XO1187B9AF18F
1,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),129.85424,2001,ARSVTNL1187B992A91
2,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),301.40036,1972,ARTC1LV1187B9A4858
3,SORRNOC12AB017F52B,The Last Beat Of My Heart (b-side),337.81506,2004,ARSZ7L31187FB4E610
4,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,118.07302,2005,AR73AIO1187B9AD57B


In [25]:
# extract columns to create artists table
artists_table = df['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']

artists_table.write.parquet(os.path.join(output_data,"artists.parquet"),'overwrite')

test = spark.read.parquet(os.path.join(output_data,"artists.parquet"))

#look at table with pandas
pd.set_option('max_colwidth',200)
test.limit(5).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARTC1LV1187B9A4858,The Bonzo Dog Band,"Goldsmith's College, Lewisham, Lo",51.4536,-0.01802
1,ARA23XO1187B9AF18F,The Smithereens,"Carteret, New Jersey",40.57885,-74.21956
2,ARSVTNL1187B992A91,Jonathan King,"London, England",51.50632,-0.12714
3,AR73AIO1187B9AD57B,Western Addiction,"San Francisco, CA",37.77916,-122.42005
4,ARXQBR11187B98A2CC,Frankie Goes To Hollywood,"Liverpool, England",,


## Process log data

In [26]:
# get filepath to log data file
#log_data =os.path.join(input_data,"log_data/*/*/*.json")

log_data = '/home/workspace/data/log-data/2018-11-19-events.json'

# read log data file
df = spark.read.json(log_data)

In [27]:
# filter by actions for song plays
df = df[df['page'] == 'NextSong'] 

# extract columns for users table    
users_table = df[['userId','firstName', 'lastName','gender','level']]
 
# write users table to parquet files
users_table.write.parquet(os.path.join(output_data,"users.parquet"),'overwrite')

test = spark.read.parquet(os.path.join(output_data,"users.parquet"))

#look at table with pandas
pd.set_option('max_colwidth',200)
test.limit(5).toPandas()

Unnamed: 0,userId,firstName,lastName,gender,level
0,7,Adelyn,Jordan,F,free
1,24,Layla,Griffin,F,paid
2,24,Layla,Griffin,F,paid
3,24,Layla,Griffin,F,paid
4,24,Layla,Griffin,F,paid


In [28]:
#add new column for timestamp
get_timestamp = F.udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.TimestampType()) 
df = df.withColumn("timestamp", get_timestamp(df.ts))


In [29]:
df.select('ts','timestamp').limit(5).toPandas()

Unnamed: 0,ts,timestamp
0,1542592496796,2018-11-19 01:54:56.796
1,1542592893796,2018-11-19 02:01:33.796
2,1542593318796,2018-11-19 02:08:38.796
3,1542593641796,2018-11-19 02:14:01.796
4,1542593868796,2018-11-19 02:17:48.796


In [30]:
#df2 = df2.drop('timestamp2')
df = df.withColumn("start_time", df.timestamp)
#df2.limit(5).toPandas()

df = df.withColumn('hour',hour('start_time')).withColumn('day',dayofmonth('start_time')).withColumn('week',weekofyear('start_time'))
df = df.withColumn('month',month('start_time')).withColumn('year',year('start_time')).withColumn('weekday',dayofweek('start_time'))
#df.limit(5).toPandas()

# extract columns to create time table
time_table = df[['start_time','hour','day','week','month','year','weekday']]

time_table.write.partitionBy('year','month').parquet(os.path.join(output_data,"time.parquet"),'overwrite')

test = spark.read.parquet(os.path.join(output_data,"time.parquet"))

#look at table with pandas
pd.set_option('max_colwidth',200)
test.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,weekday,year,month
0,2018-11-19 01:54:56.796,1,19,47,2,2018,11
1,2018-11-19 02:01:33.796,2,19,47,2,2018,11
2,2018-11-19 02:08:38.796,2,19,47,2,2018,11
3,2018-11-19 02:14:01.796,2,19,47,2,2018,11
4,2018-11-19 02:17:48.796,2,19,47,2,2018,11


In [31]:
# read in song data to use for songplays table
song_df = spark.read.parquet("songs.parquet")

df = df.join(song_df,song_df.title == df.song)
songplays_table = df['start_time', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent']
songplays_table.select(monotonically_increasing_id().alias('songplay_id')).collect()

# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(os.path.join(output_data, 'songplays.parquet'), 'overwrite')

test = spark.read.parquet(os.path.join(output_data,"songplays.parquet"))

#look at table with pandas
#is empty table because limited dataset
pd.set_option('max_colwidth',200)
test.limit(5).toPandas()

Unnamed: 0,start_time,userId,level,song_id,artist_id,sessionId,location,userAgent
