In [20]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col



""" reading configuration from the config file in the workspace (AWS keyID/SecretKey)"""
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [21]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [22]:
spark
print(config['AWS']['AWS_ACCESS_KEY_ID'])

AKIAXQFRFA7IBBC5YANI


In [23]:
input_data = "s3a://udacity-dend/"
song_data = input_data + "song_data/A/A/A/*.json"
print(song_data)
# read song data file
df = spark.read.json(song_data)

# extract columns to create songs table

df

s3a://udacity-dend/song_data/A/A/A/*.json


DataFrame[artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: bigint, song_id: string, title: string, year: bigint]

In [24]:
# extract columns to create songs table
songs_table =df.select('song_id', 'title', 'year', 'duration', 'artist_id').dropDuplicates()


# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy('year', 'artist_id').parquet("songs_table/songs_table.parquet", "overwrite")



In [25]:
# extract columns to create artists table()
artists_table =  df.select('artist_id', 'artist_name', 'artist_location','artist_latitude','artist_longitude' ).dropDuplicates().withColumnRenamed("artist_name", "name").withColumnRenamed("artist_location", "location").withColumnRenamed("artist_latitude", "lattitude").withColumnRenamed("artist_longitude", "longitude")
df
# write artists table to parquet files
artists_table.write.parquet("artists_table/artists_table.parquet", "overwrite")

In [26]:
# get filepath to log data file
input_data = "s3a://udacity-dend/"
log_data =  input_data + "log_data/*/*/*.json"

# read log data file
df = spark.read.json(log_data)
    
# filter by actions for song plays
df = df.filter(df.page == "NextSong")


# extract columns for users table    
users_table = df.select('userId', 'firstName', 'lastName', 'gender', 'level').withColumnRenamed("userId", "user_id").withColumnRenamed("firstName", "first_name").withColumnRenamed("lastName", "last_name").dropDuplicates()
    
# wite users table to parquet files
users_table.write.parquet("users/users.parquet", "overwrite")

print(users_table.head())

Row(user_id='26', first_name='Ryan', last_name='Smith', gender='M', level='free')


In [27]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: x/1000)
df = df.withColumn('start_time', get_timestamp(df.ts))

# create datetime column from original timestamp column
get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000)))
df = df.withColumn('datetime', get_datetime(df.start_time))

print(df.head())



Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', start_time='1.542241826796E9', datetime='1970-01-18 20:24:01.826000')


In [28]:
from pyspark.sql.functions import (year, month, dayofmonth, hour,weekofyear, dayofweek)
# extract columns to create time table
time_table = df.select('datetime').withColumn("start_time", df.datetime).withColumn("hour", hour(df.datetime)).withColumn("day", dayofmonth(df.datetime)).withColumn("week", weekofyear(df.datetime)).withColumn("month", month(df.datetime)).withColumn("year", year(df.datetime)).withColumn("weekday", dayofweek(df.datetime)).dropDuplicates()

# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year', 'month').parquet("time_table/time_table.parquet", "overwrite")
print(time_table.head())

Row(datetime='1970-01-18 20:24:27.351000', start_time='1970-01-18 20:24:27.351000', hour=20, day=18, week=3, month=1, year=1970, weekday=1)


In [29]:
# read in song data to use for songplays table
song_df = spark.read.json(input_data + "song_data/A/A/*/*.json")

print(song_df.head())

Row(artist_id='ARSUVLW12454A4C8B8', artist_latitude=35.83073, artist_location='Tennessee', artist_longitude=-85.97874, artist_name='Royal Philharmonic Orchestra/Sir Thomas Beecham', duration=94.56281, num_songs=1, song_id='SOBTCUI12A8AE48B70', title='Faust: Ballet Music (1959 Digital Remaster): VI.     Variations du miroir (Allegretto)', year=0)


In [30]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = df.join(song_df,((song_df.artist_name == df.artist) & (song_df.title == df.song) & (song_df.duration ==df.length)) )
print(songplays_table.head())

Row(artist='Elena', auth='Logged In', firstName='Lily', gender='F', itemInSession=5, lastName='Koch', length=269.58322, level='paid', location='Chicago-Naperville-Elgin, IL-IN-WI', method='PUT', page='NextSong', registration=1541048010796.0, sessionId=818, song='Setanta matins', status=200, ts=1542837407796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='15', start_time='1.542837407796E9', datetime='1970-01-18 20:33:57.407000', artist_id='AR5KOSW1187FB35FF4', artist_latitude=49.80388, artist_location='Dubai UAE', artist_longitude=15.47491, artist_name='Elena', duration=269.58322, num_songs=1, song_id='SOZCTXZ12AB0182364', title='Setanta matins', year=0)


In [31]:
from pyspark.sql.functions import monotonically_increasing_id
songplays_table = songplays_table.select('start_time', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent').withColumn('songplay_id', monotonically_increasing_id())\
.withColumnRenamed('userId', 'user_id').withColumnRenamed('sessionId', 'session_id').withColumnRenamed('userAgent', 'user_agent').dropDuplicates()

print(songplays_table.head())


Row(start_time='1.541440182796E9', user_id='73', level='paid', song_id='SOHDWWH12A6D4F7F6A', artist_id='ARC0IOF1187FB3F6E6', session_id=255, location='Tampa-St. Petersburg-Clearwater, FL', user_agent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.78.2 (KHTML, like Gecko) Version/7.0.6 Safari/537.78.2"', songplay_id=1)


In [32]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet("songplays_table/songplays_table.parquet", "overwrite")