In [19]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from pyspark.sql.types import TimestampType, DateType

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
def create_spark_session():
    """
    Creating spark session
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark


def process_song_data(spark, input_data, output_data):
    """
    Processing the song data
    
    Keyword arguments:
    spark - spark session 
    input_data - a link to input data on S3
    output_data - a link to output data on S3
    """
    # get filepath to song data file
    song_data = os.path.join(input_data, 'song_data/A/A/*/*.json')
    
    # read song data file
    df = spark.read.json(song_data)

    # extract columns to create songs table
    songs_table = df.select('num_songs',
                            'artist_id',
                            'song_id',
                            'title',
                            'duration',
                            'year').dropDuplicates()
    
    # write songs table to parquet files partitioned by year and artist
    
    songs_table.write.partitionBy('year',
                                  'artist_id')\
                     .mode('overwrite')\
                     .parquet(os.path.join(output_data, 'songs'))

    # extract columns to create artists table
    artists_table = df.select('artist_id',
                              'artist_latitude',
                              'artist_longitude',
                              'artist_location',
                              'artist_name').dropDuplicates()
    
    # write artists table to parquet files
    
    artists_table.write.mode('overwrite')\
                       .parquet(os.path.join(output_data, 'artists'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
def process_log_data(spark, input_data, output_data):
    """
    Processing the log data
    
    Keyword arguments:
    spark - spark session 
    input_data - a link to input data on S3
    output_data - a link to output data on S3
    """
    # get filepath to log data file
    log_data = os.path.join(input_data, 'log_data/2018/11/*.json')

    # read log data file
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = df.filter(df.page == 'NextSong')

    # extract columns for users table    
    users_table = df.select('userId',\
                            'firstName',\
                            'lastName',\
                            'gender',\
                            'level').dropDuplicates()
    
    # write users table to parquet files
    users_table.write.mode('overwrite')\
                     .parquet(os.path.join(output_data, 'users'))

    # create timestamp column from original timestamp column
    # timestamp string to DateType
    df = df.withColumn('start_time', to_timestamp(df.ts).cast('string'))
    
    # create datetime column from original timestamp column
    df = df.withColumn('hour', hour(to_timestamp(df.ts)))
    df = df.withColumn('day', dayofmonth(to_timestamp(df.ts)))
    df = df.withColumn('week', weekofyear(to_timestamp(df.ts)))
    df = df.withColumn('month', month(to_timestamp(df.ts)))
    df = df.withColumn('year', year(to_timestamp(df.ts)))
    df = df.withColumn('dayofweek', dayofweek(to_timestamp(df.ts)))
    
    # extract columns to create time table
    time_table = df.select('start_time',\
                           'hour',\
                           'day',\
                           'week',\
                           'month',\
                           'year',\
                           'dayofweek').dropDuplicates()
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy('year', 'month')\
                    .mode('overwrite')\
                    .parquet(os.path.join(output_data, 'time'))

    # read in song data to use for songplays table
    song_df = spark.read.parquet('song_data/songs')

    # extract columns from joined song and log datasets to create songplays table
    conditions = [song_df.title == df.song, song_df.artist_name == df.artist]
    songplays_table = df.join(song_df, conditions)

    # write songplays table to parquet files partitioned by year and month
    songplays_table.mode('overwrite')\
                   .parquet(os.path.join(output_data, 'songplays'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
def main():
    """
    Main ETL process
    """
    spark = create_spark_session()
    input_data = "s3a://udacity-dend/"
    output_data = 's3://aws-logs-008040186545-us-east-1/output/'
    
    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)

main()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…