In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg', encoding='utf-8-sig')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
# create spark session
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
spark

**Process song_data**

In [4]:
# get filepath to song data file
input_data = "s3a://udacity-dend/"
#song_data = 's3a://udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json'
song_data = os.path.join(input_data, 'song_data/A/B/C/TRABCEI128F424C983.json')    
# read song data file
df = spark.read.json(song_data)
df.head()

Row(artist_id='ARJIE2Y1187B994AB7', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Line Renaud', duration=152.92036, num_songs=1, song_id='SOUPIRU12A6D4FA1E1', title='Der Kleine Dompfaff', year=0)

In [5]:
# extract columns to create songs table
songs_table = df.select(['song_id', 'title', 'artist_id', 'year', 'duration'])
songs_table.take(1)

[Row(song_id='SOUPIRU12A6D4FA1E1', title='Der Kleine Dompfaff', artist_id='ARJIE2Y1187B994AB7', year=0, duration=152.92036)]

In [6]:
output_data = 's3a://raywong-bucket-one'
songsParquetPath = os.path.join(output_data, "songs")


In [7]:
# write songs table to parquet files partitioned by year and artist
output_data = 's3a://raywong-bucket-one'
songs_path = os.path.join(output_data, "songs")
songs_table = df.write.mode('overwrite').partitionBy('year','artist_id').parquet(songs_path)  


In [8]:
# extract columns to create artists table
artists_table = df.select(col('artist_name').alias('name'), col('artist_location').alias('location'), col('artist_latitude').alias('latitude'), col('artist_longitude').alias('longitude'))
artists_path = os.path.join(output_data, "artists") 
# write artists table to parquet files
artists_table = df.write.mode('overwrite').parquet(artists_path)

**Process log_data**

In [9]:
# get filepath to log data file
log_data = os.path.join(input_data, 'log_data/2018/11/2018-11-13-events.json')
# read log data file
df = spark.read.json(log_data)    
# filter by actions for song plays
df = df.filter(df.page=='NextSong')
df.head()

Row(artist='Fu', auth='Logged In', firstName='Kevin', gender='M', itemInSession=1, lastName='Arellano', length=280.05832, level='free', location='Harrisburg-Carlisle, PA', method='PUT', page='NextSong', registration=1540006905796.0, sessionId=514, song='Ja I Ty', status=200, ts=1542069637796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='66')

In [10]:
# extract columns for users table    
users_table = df.select(col('userId').alias('user_id'), col('firstName').alias('first_name'), col('lastName').alias('last_name'), 'gender', 'level')
    
# write users table to parquet files
users_path = os.path.join(output_data, "users") 
users_table = df.write.mode('overwrite').parquet(users_path)

In [11]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: str(int(x) // 1000))

df = df.withColumn("start_time", get_timestamp(df.ts))
    
# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(int(x) // 1000))
df = df.withColumn("datetime", get_timestamp(df.ts))
    
df = df.withColumn('hour', hour('datetime'))
df = df.withColumn('day', dayofmonth('datetime'))
df = df.withColumn('week', weekofyear('datetime'))
df = df.withColumn('month', month('datetime'))
df = df.withColumn('year', year('datetime'))
df = df.withColumn('weekday', dayofweek('datetime'))
df.head()

Row(artist='Fu', auth='Logged In', firstName='Kevin', gender='M', itemInSession=1, lastName='Arellano', length=280.05832, level='free', location='Harrisburg-Carlisle, PA', method='PUT', page='NextSong', registration=1540006905796.0, sessionId=514, song='Ja I Ty', status=200, ts=1542069637796, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='66', start_time='1542069637', datetime='1542069637', hour=None, day=None, week=None, month=None, year=None, weekday=None)

In [12]:
# extract columns to create time table 
time_table = df.select('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday')
    
# write time table to parquet files partitioned by year and month
time_path = os.path.join(output_data, "time")
time_table = df.write.mode('overwrite').partitionBy('year','month').parquet(time_path)  

In [13]:
# read in song data to use for songplays table
song_data = os.path.join(input_data, 'song_data/A/B/C/TRABCEI128F424C983.json')  
song_df = spark.read.json(song_data)
song_df = song_df.withColumnRenamed('year', 'year_song')
song_df.columns
# extract columns from joined song and log datasets to create songplays table 
song_log_df = df.join(song_df, (song_df.artist_name == df.artist) & (song_df.title == df.song))

songplays_table = song_log_df.select('start_time', col('userId').alias('user_id'), 'level', 'song_id', 'artist_id', \
                            col('sessionId').alias('session_id'), 'location', col('userAgent').alias('user_agent'))

# write songplays table to parquet files partitioned by year and month
songplays_path = os.path.join(output_data, "songplays")
songplays_table = song_log_df.write.mode('overwrite').partitionBy('year','month').parquet(songplays_path)  