In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql import types as T


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=''
os.environ['AWS_SECRET_ACCESS_KEY']=''

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
        .getOrCreate()



VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1668959177457_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
input_data = "s3://udacity-dend/"
output_data = "s3://aws-data-lake-uswest2/"

song_data = input_data + "song_data/*/*/*/*.json"
    
    # read song data file
df = spark.read.json(song_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# extract columns to create songs table
songs_table = df.select(["song_id", "title", "artist_id", "year", "duration"]).dropDuplicates(["song_id"])
    
    # write songs table to parquet files partitioned by year and artist
songs_table.write.parquet(output_data + "songs.parquet", partitionBy=["year", "artist_id"], mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# extract columns to create artists table
artists_table = df.selectExpr("artist_id", "artist_name as name", "artist_location as location", "artist_latitude as lattitude", "artist_longitude as longitude").dropDuplicates(["artist_id"])
    
    # write artists table to parquet files
artists_table.write.parquet(output_data + "artists.parquet", mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# get filepath to log data file
log_data = input_data + "log_data/*/*/*.json"

    # read log data file
df = spark.read.json(log_data)
    
    # filter by actions for song plays
df = df.filter(df.page == "NextSong") 

    # extract columns for users table    
users_table = df.selectExpr("userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level").dropDuplicates(["user_id"])
    
    # write users table to parquet files
users_table.write.parquet(output_data + "users.parquet", mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
    # create timestamp column from original timestamp column
get_timestamp = F.udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.TimestampType())
df = df.withColumn("timestamp", get_timestamp(df.ts))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# create datetime column from original timestamp column
get_datetime = F.udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.TimestampType())
df = df.withColumn("datetime", get_datetime(df.ts))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
    # extract columns to create time table
time_table = df.selectExpr("timestamp as start_time", "hour(timestamp) as hour", "day(timestamp) as day", "weekofyear(timestamp) as week", "month(timestamp) as month", "year(timestamp) as year", "dayofweek(timestamp) as weekday").dropDuplicates(["start_time"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
    # write time table to parquet files partitioned by year and month
time_table.write.parquet(output_data + "time.parquet", partitionBy=["year", "month"], mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 37162)
Traceback (most recent call last):
  File "/usr/lib64/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 266, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 254, in authenticate_and_accum_updates
    received_to

In [10]:
# read in song data to use for songplays table
song_data = input_data + "song_data/*/*/*/*.json"
song_df = spark.read.json(song_data)
song_df.createOrReplaceTempView("songs")
df.createOrReplaceTempView("logs")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# extract columns from joined song and log datasets to create songplays table 
from pyspark.sql.window import Window

# join song_df and log_df
song_log_joined_table = df.join(song_df, (df.song == song_df.title) & (df.artist == song_df.artist_name) & (df.length == song_df.duration), how='inner')

    # extract columns from joined song and log datasets to create songplays table 
songplays_table = song_log_joined_table.distinct() \
                        .select("userId", "timestamp", "song_id", "artist_id", "level", "sessionId", "location", "userAgent" ) \
                        .withColumn("songplay_id", F.row_number().over( Window.partitionBy('timestamp').orderBy("timestamp"))) \
                        .withColumnRenamed("userId","user_id")        \
                        .withColumnRenamed("timestamp","start_time")  \
                        .withColumnRenamed("sessionId","session_id")  \
                        .withColumnRenamed("userAgent", "user_agent") \