Loading the libraries

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek, monotonically_increasing_id

Function to create session 

In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data =os.path.join(input_data,"song_data/*/*/*/*.json")
    
    # read song data file
    df =  spark.read.json(song_data)
    print('\t Read JSON finished...\n')
    # extract columns to create songs table
    songs_table = df.select('song_id', 'title', 'artist_id',
                            'year', 'duration') \
                    .dropDuplicates()
    songs_table.createOrReplaceTempView('songs')
    
    
    # write songs table to parquet files partitioned by year and artist
    
    songs_table.write.mode("overwrite").partitionBy("year", "artist_id").parquet(output_data + "songs")
    
    # extract columns to create artists table
    artists_fields = ["artist_id", "artist_name as name","artist_location as location","artist_latitude as latitude",
                      "artist_longitude as longitude"]
    artists_table = df.selectExpr(artists_fields).dropDuplicates()
    
    # write artists table to parquet files
    print('\t start writing the parquet file\n')
    artists_table.write.parquet(os.path.join(output_data,'artists/artists.parquet'), 'overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
def process_log_data(spark, input_data, output_data):
     # get filepath to log data file
    log_data = os.path.join(input_data, 'log_data/*/*/*.json')

    # read log data file
    log_df =  spark.read.json(log_data)
    
    # filter by actions for song plays
    log_df = log_df.filter(log_df.page == 'NextSong') 

    # extract columns for users table    
    users_fields = ["userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level"]
    users_table = log_df.selectExpr(users_fields).dropDuplicates()
    
    # write users table to parquet files
    users_table.write.parquet(os.path.join(output_data, 'users.parquet'), 'overwrite')

    # create timestamp column from original timestamp column
    
    get_timestamp = udf(lambda x: str(int(int(x)/1000)))
    log_df = log_df.withColumn('timestamp', get_timestamp(log_df.ts))
    
    # create datetime column from original timestamp column
             
    get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000)))
    log_df= log_df.withColumn('start_time', get_datetime(log_df.ts))
    
    # extract columns to create time table
    
    log_df = log_df.withColumn("hour", hour("start_time")) \
        .withColumn("day", dayofmonth("start_time")) \
        .withColumn("week", weekofyear("start_time")) \
        .withColumn("weekday", dayofweek("start_time"))\
        .withColumn("month", month("start_time")) \
        .withColumn("year", year("start_time")) 
       
    time_table = log_df.select("start_time", "hour", "day", "week", "weekday", "month", "year")

    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy('year', 'month') \
                    .parquet(os.path.join(output_data,
                                          'time/time.parquet'), 'overwrite')


    # read in song data to use for songplays table  
    song_df = spark.read.parquet(os.path.join(output_data, "songs"))


    # extract columns from joined song and log datasets to create songplays table 
    log_df = log_df.alias('log_df')
    song_df = song_df.alias('song_df')
    joined_df = log_df.join(song_df, song_df.title == log_df.song)
    songplays_table = joined_df.select(
        col('log_df.start_time').alias('start_time'),
        col('log_df.userId').alias('user_id'),
        col('log_df.level').alias('level'),
        col('song_df.song_id').alias('song_id'),
        col('song_df.artist_id').alias('artist_id'),
        col('log_df.sessionId').alias('session_id'),
        col('log_df.location').alias('location'), 
        col('log_df.userAgent').alias('user_agent'))\
        .withColumn("year", date_format(col("start_time"), "yyyy")) \
        .withColumn("month", date_format(col("start_time"), "MM")) \
        .withColumn('songplay_id', monotonically_increasing_id())

    # write songplays table to parquet files partitioned by year and month
    time_table = time_table.alias('timetable')

    songplays_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data,
                                 'songplays/songplays.parquet'),
                                 'overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
spark = create_spark_session()
input_data = "s3a://udacity-dend/"
output_data = "s3://aws-emr-resources-214852037575-us-east-1/notebooks/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
process_song_data(spark, input_data, output_data)

In [10]:
process_log_data(spark, input_data, output_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Start processing log data...