### This should be run in an EMR cluster with Spark installed

In [2]:
import configparser
from datetime import datetime
import os
import timeit
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, TimestampType, DateType
from pyspark.sql.functions import udf, col, from_unixtime, to_timestamp, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

input_data = "s3a://udacity-dend/"
output_data = "s3a://datalakebucketudacity/" # Set the name to the new s3 bucket


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# get filepath to song data file
print('Extract song_data files from s3...')
song_data = input_data + 'song_data/A/A/A/*.json'  # A/A/A   */*/*

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Extract song_data files from s3...

In [5]:
# read song data file
df = spark.read.json(path = song_data, multiLine = True)
print('Extract song_data files from s3    {}'.format('Finished'))  

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Extract song_data files from s3    Finished

In [6]:
# extract columns to create songs table
songs_table = df.select('song_id', \
                            'title', \
                            'artist_id', \
                            'year', \
                            'duration')\
                            .drop_duplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy('year', 'artist_id').parquet(output_data + 'songs', 'overwrite')
print('Writing songs parquet files        {}'.format('Finished'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Writing songs parquet files        Finished

In [8]:
# extract columns to create artists table
artists_table = df.selectExpr('artist_id as artist', \
                                  'artist_name as name', \
                                  'artist_location as location', \
                                  'artist_latitude as latitude', \
                                  'artist_longitude as longitude')\
                                  .drop_duplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# write artists table to parquet files
artists_table.write.parquet(output_data + 'artists', 'overwrite')
print('Writing artists parquet files      {}'.format('Finished'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Writing artists parquet files      Finished

In [10]:
# get filepath to log data file
print('Extract log_data files from s3...')
log_data = os.path.join(input_data + 'log-data/*/*/2018-11-04-events.json') # 2018-11-04-events

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Extract log_data files from s3...

In [11]:
# read log data file
log_df = spark.read.json(path = log_data, multiLine = True)
print('Extract log_data files from s3     {}'.format('Finished'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Extract log_data files from s3     Finished

In [12]:
# filter by actions for song plays
log_df = log_df.where(log_df.page == 'NextSong')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# extract columns for users table    
users_table = log_df.select('userId', \
                                'firstName', \
                                'lastName', \
                                'gender', \
                                'level')\
                                .drop_duplicates()\
                                .where(col("userId").isNotNull())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# write users table to parquet files
users_table.write.parquet(output_data + 'users', 'overwrite')
print('Writing users parquet files        {}'.format('Finished'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Writing users parquet files        Finished

In [15]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda ms: datetime.fromtimestamp((ms/1000.0)), TimestampType())
log_df_time = log_df.withColumn('start_time', get_timestamp(log_df.ts))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# extract columns to create time table
time_table = log_df_time.select('start_time').drop_duplicates()
    
time_table = time_table.select('start_time', \
                                    hour('start_time').alias('hour'), \
                                    dayofmonth('start_time').alias('day'), \
                                    weekofyear('start_time').alias('week'), \
                                    month('start_time').alias('month'), \
                                    year('start_time').alias('year'), \
                                    date_format('start_time', 'u').alias('weekday'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year', 'month').parquet(output_data + 'time', 'overwrite')
print('Writing time parquet files         {}'.format('Finished'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Writing time parquet files         Finished

In [18]:
# read in song data to use for songplays table
song_df = spark.read\
                .format("parquet")\
                .option("basePath", output_data + 'songs/')\
                .load(output_data + 'songs/*/*/*.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
artist_df = spark.read\
                .format("parquet")\
                .option("basePath", output_data + 'artists/')\
                .load(output_data + 'artists/*.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# read song_and_artists data file
song_and_artists = song_df.join(artist_df, (song_df.artist_id == artist_df.artist)) \
                                    .select('song_id', \
                                            'title', \
                                            'artist_id', \
                                            'year', \
                                            'duration', \
                                            artist_df.location, \
                                            artist_df.name)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = song_and_artists.join(log_df_time, (song_and_artists.title == log_df_time.song) & \
                                                         (song_and_artists.name == log_df_time.artist) & \
                                                         (song_and_artists.duration == log_df_time.length), \
                                                         ('left_outer')) \
                                            .select(monotonically_increasing_id().alias('songplay_id'), \
                                                   log_df_time.start_time, \
                                                   log_df_time.userId.alias('user_id'), \
                                                   log_df.level, \
                                                   'song_id', \
                                                   'artist_id', \
                                                   log_df.sessionId.alias('session_id'), \
                                                   song_and_artists.location, \
                                                   log_df.userAgent.alias('user_agent'), \
                                                   year(log_df_time.start_time).alias('year'), \
                                                   month(log_df_time.start_time).alias('month'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy('year', 'month').parquet(output_data + 'songplays', 'overwrite')
print('Writing songplays parquet files    {}'.format('Finished'))

print('\nJob done!\n')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Writing songplays parquet files    Finished

Job done!