In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import pyspark.sql.types as TS
from pyspark.sql.functions import dayofweek
from pyspark.sql.functions import monotonically_increasing_id

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1674836326174_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
def create_spark_session():
    '''
    Function which creates a SparkSession, which is an entry point to underlying Spark functionality in order to 
    programmatically create Spark RDD, DataFrame, and DataSet.
    
    OUTPUT:
    spark: SparkSession
    '''
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
def process_song_data(spark, input_data, output_data):
    '''
    Function which loads the data from the S3 bucket, creates the songs_table and artists_table and saves them as parquet files.
    
    INPUT:
    spark: SparkSession
    input_data (string): S3 bucket address of the input data
    output_data (string): S3 bucket address to save the processed data
    '''
    # get filepath to song data file
    song_data = '{}{}'.format(input_data, 'song_data/A/A/A/*.json')
    
    # read song data file
    df = spark.read.json(song_data)
    
    # extract columns to create songs table
    songs_table = df.select('song_id','title','artist_id','year','duration').dropDuplicates(['song_id'])
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy('year','artist_id').mode('overwrite').parquet(output_data + "songs/")

    # extract columns to create artists table
    artists_table = df.select('artist_id','artist_name','artist_location','artist_latitude',
                              'artist_longitude').dropDuplicates(['artist_id'])
    
    # write artists table to parquet files
    artists_table.write.mode('overwrite').parquet(output_data + "artists/")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
def process_log_data(spark, input_data, output_data):
    '''
    Function which loads the data from the S3 bucket, creates the users_table, time_table and songplays_table and saves
    them as parquet files.
    
    INPUT:
    spark: SparkSession
    input_data (string): S3 bucket address of the input data
    output_data (string): S3 bucket address to save the processed data
    '''
    # get filepath to log data file
    log_data = '{}{}'.format(input_data, 'log_data/*/*/*.json')
    song_data = '{}{}'.format(input_data, 'song_data/*/*/*/*.json')

    # read log data file
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = df.where(df.page == 'NextSong')

    # extract columns for users table    
    users_table = df.select('userId','firstName','lastName','gender','level').dropDuplicates(['userId'])
    
    # write users table to parquet files
    users_table.write.mode('overwrite').parquet(output_data + "users/")

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000), TS.TimestampType())
    df = df.withColumn('ts_timestamp',get_timestamp('ts'))
    
    # create datetime column from original timestamp column
#     get_datetime = udf()
#     df = 
    
    # extract columns to create time table
    time_table = df.select('ts_timestamp', hour('ts_timestamp').alias('hour'), dayofmonth('ts_timestamp').alias('day'),
          weekofyear('ts_timestamp').alias('weekofyear'),month('ts_timestamp').alias('month'), 
          year('ts_timestamp').alias('year'), dayofweek('ts_timestamp').alias('dayofweek')).dropDuplicates(['ts_timestamp'])
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy('year', 'month').mode('overwrite').parquet(output_data + "time/")

    # read in song data to use for songplays table
    song_df = spark.read.json(song_data)

    # extract columns from joined song and log datasets to create songplays table 
    songplays_table = df.join(song_df, df['song'] == song_df['title']).drop('title')
    songplays_table = songplays_table.withColumn('songplay_id', monotonically_increasing_id()).select('songplay_id','ts_timestamp', 'userId', 'level',
                                                                                                      'song_id', 'artist_id', 'sessionId', 'location', 'userAgent')
    # create year and month columns for the partition 
    songplays_table = songplays_table.withColumn('year', year('ts_timestamp')).withColumn('month', month('ts_timestamp'))
    
    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.partitionBy('year', 'month').mode('overwrite').parquet(output_data + "songplays/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
def main():
    '''
    Main function which runs the defined code to create an star schema optimized for queries on song play analysis.
    '''
    spark = create_spark_session()
    input_data = "s3a://udacity-dend/"
    output_data = "s3a://sparkifyggo/"
    
    process_song_data(spark, input_data, output_data)
    process_log_data(spark, input_data, output_data)
    # process_log_data(spark, input_data, output_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
main()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 42080)
Traceback (most recent call last):
  File "/usr/lib64/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 266, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 254, in authenticate_and_accum_updates
    received_to

In [7]:
# spark = create_spark_session()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# input_data = "s3a://udacity-dend/"
# song_data = '{}{}'.format(input_data, 'song_data/*/*/*/*.json')
# df = spark.read.json(song_data)
# df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# input_data = "s3a://udacity-dend/"
# output_data = "s3a://sparkifyggo/"
# log_data = '{}{}'.format(input_data, 'log_data/*/*/*.json')
# song_data = '{}{}'.format(input_data, 'song_data/*/*/*/*.json')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# # read log data file
# df = spark.read.json(log_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# # filter by actions for song plays
# df = df.where(df.page == 'NextSong')
# df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# # extract columns for users table    
# users_table = df.select('userId','firstName','lastName','gender','level')
# users_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# # write users table to parquet files
# users_table.write.mode('overwrite').parquet(output_data + "users/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# # create timestamp column from original timestamp column
# get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000), TS.TimestampType())
# df = df.withColumn('ts_timestamp',get_timestamp('ts'))
# df.select('ts_timestamp').show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# # extract columns to create time table
# time_table = df.select('ts_timestamp', hour('ts_timestamp').alias('hour'), dayofmonth('ts_timestamp').alias('day'), weekofyear('ts_timestamp').alias('weekofyear'),
#                            month('ts_timestamp').alias('month'), year('ts_timestamp').alias('year'), dayofweek('ts_timestamp').alias('dayofweek'))
# time_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# # write time table to parquet files partitioned by year and month
# time_table.write.partitionBy('year', 'month').mode('overwrite').parquet(output_data + "time/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# # read in song data to use for songplays table
# song_df = spark.read.json(song_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# # extract columns from joined song and log datasets to create songplays table 
# songplays_table = df.join(song_df, df['song'] == song_df['title']).drop('title')
# songplays_table = songplays_table.withColumn('songplay_id', monotonically_increasing_id()).select('songplay_id','ts_timestamp', 'userId', 'level',
#                                                                                                     'song_id', 'artist_id', 'sessionId', 'location', 'userAgent')
# songplays_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# # create year and month columns for the partition 
# songplays_table = songplays_table.withColumn('year', year('ts_timestamp')).withColumn('month', month('ts_timestamp'))
# songplays_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# # write songplays table to parquet files partitioned by year and month
# songplays_table.write.partitionBy('year', 'month').mode('overwrite').parquet(output_data + "songplays/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…