In [7]:
# import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import udf, col, row_number
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType


# config = configparser.ConfigParser()
# config.read('dl.cfg')

# os.environ['AWS_ACCESS_KEY_ID']=config['AWS_ACCESS_KEY_ID']
# os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark


def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = '{}/song_data/*/*/*/*'.format(input_data)
    
    # read song data file
    df = spark.read.json(song_data)

    # extract columns to create songs table
    songs_table = df.select('song_id', 'title', 'artist_id', 'year', 'duration') \
                    .withColumn('year', col('year').cast('int')) \
                    .dropDuplicates()
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.parquet(path='{}/songs'.format(output_data), mode='overwrite', partitionBy=['year', 'artist_id'])

    # extract columns to create artists table
    artists_table = df.select(col('artist_id'), \
                          col('artist_name').alias('name'), \
                          col('artist_location').alias('location'), \
                          col('artist_latitude').alias('latitude'), \
                          col('artist_longitude').alias('longitude')) \
                    .dropDuplicates()
    
    # write artists table to parquet files
    artists_table.write.parquet(path='{}/artists'.format(output_data), mode='overwrite')


def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = '{}/log_data/*/*/*'.format(input_data)

    # read log data file
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = df.filter(col('page') == 'NextSong')

    # extract columns for users table    
    users_table = df.select(col('userId').alias('user_id'), \
                          col('firstName').alias('first_name'), \
                          col('lastName').alias('last_name'), \
                          col('gender'), \
                          col('level')) \
                    .withColumn('user_id', col('user_id').cast('int')) \
                    .dropDuplicates()
    
    # write users table to parquet files
    users_table.write.parquet(path='{}/users'.format(output_data), mode='overwrite')

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1e3), TimestampType())
    df = df.withColumn('dt', get_timestamp('ts'))
    
    # extract columns to create time table
    time_table = df.withColumn('hour', hour('dt')) \
        .withColumn('day', dayofmonth('dt')) \
        .withColumn('week', weekofyear('dt')) \
        .withColumn('month', month('dt')) \
        .withColumn('year', year('dt')) \
        .withColumn('weekday', date_format('dt', 'F').cast('int')) \
        .select(col('ts').alias('start_time'), 'hour', 'day', 'week', 'month', 'year', 'weekday') \
        .dropDuplicates()
    
    # write time table to parquet files partitioned by year and month
    time_table.write.parquet(path='{}/time'.format(output_data), mode='overwrite', partitionBy=['year', 'month'])

    # read in song data to use for songplays table
    song_df = spark.read.json('{}/song_data/*/*/*/*'.format(input_data))

    # extract columns from joined song and log datasets to create songplays table
    window = Window.orderBy(col('ts'))
    songplays_table = df.join(song_df, (df.song == song_df.title) & (df.length == song_df.duration) & (df.artist == song_df.artist_name), how='left') \
                        .withColumn('songplay_id', row_number().over(window)) \
                        .select(col('songplay_id'), \
                                col('ts').alias('start_time'), \
                                col('userId').cast('int').alias('user_id'), \
                                col('level'), \
                                col('song_id'), \
                                col('artist_id'), \
                                col('sessionId').cast('int').alias('session_id'), \
                                col('location'), \
                                col('userAgent').alias('user_agent'))

    # write songplays table to parquet files
    songplays_table.write.parquet(path='{}/songplays'.format(output_data), mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
spark = create_spark_session()
input_data = "s3a://udacity-dend"
output_data = "s3://huynv26-udacity/datalake"

process_song_data(spark, input_data, output_data)
process_log_data(spark, input_data, output_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…