In [1]:
%%spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1561479440023_0005,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:

from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format
import pyspark.sql.functions as F


VBox()

In [3]:
song_data = "s3a://udacity-dend/song_data/A/A/*/*.json"

output_data = "s3a://dend-cda/output/"

VBox()

In [4]:
df = spark.read.json(song_data)

VBox()

In [5]:
df.printSchema()

VBox()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

In [6]:
# SONGS Table (song_id, title, artist_id, year, duration)
# extract columns to create songs table
df.createOrReplaceTempView("songs_table")
songs_table = df.select(
                  'song_id',
                  'title',
                  'artist_id',
                  'year',
                  'duration') \
                  .dropDuplicates(['song_id'])


VBox()

In [7]:
# write songs table to parquet files partitioned by year and artist
print("start - writing songs parquet files")
songs_table.write.mode('overwrite').partitionBy('year','artist_id').parquet(os.path.join(output_data, 'songs_table', 'songs_table.parquet'))
print("end - writing songs parquet files")

VBox()

start - writing songs parquet files
end - writing songs parquet files

In [8]:
# ARTISTS Table (artist_id, name, location, lattitude, longitude)
# extract columns to create artists table
df.createOrReplaceTempView("artists_table")
artists_table = df.select('artist_id', 
                            'artist_name',
                            'artist_location',
                            'artist_latitude',
                            'artist_longitude') \
                        .dropDuplicates(['artist_id']) \
                        .withColumnRenamed('artist_name', 'name') \
                        .withColumnRenamed('artist_location', 'location') \
                        .withColumnRenamed('artist_latitude', 'latitude') \
                        .withColumnRenamed('artist_longitude', 'longitude') \


VBox()

In [9]:
# write artists table to parquet files
print("start - writing artist parquet files")
artists_table.write.mode('overwrite').parquet(os.path.join(output_data, 'artists_table', 'artists_table.parquet'))
print("end - writing artist parquet files")

VBox()

start - writing artist parquet files
end - writing artist parquet files

In [10]:
log_data = "s3a://udacity-dend/log_data/*/*/*.json"

VBox()

In [11]:
# read log data file
df = spark.read.json(log_data)

VBox()

In [12]:
df.printSchema()

VBox()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

In [13]:
# read log data file
df = spark.read.json(log_data)
    
# filter by actions for song plays
df = df.filter(df.page == 'NextSong')

# extract columns for users table 
df.createOrReplaceTempView("users_table")
users_table = df.select('userId', 
                            'firstName',
                            'lastName',
                            'gender',
                            'level' ) \
                    .dropDuplicates(['userId']) \
                    .withColumnRenamed('userId', 'user_id') \
                    .withColumnRenamed('firstName', 'first_name') \
                    .withColumnRenamed('lastName', 'last_name')


    


VBox()

In [14]:
# write users table to parquet files
print("start - writing users parquet files")
users_table.write.mode('overwrite').parquet(os.path.join(output_data, 'users_table', 'users_table.parquet'))
print("end - writing users parquet files")

VBox()

start - writing users parquet files
end - writing users parquet files

In [15]:
df = df.withColumn("ts", (F.to_timestamp(df.ts/1000)))
df = df.withColumn("dt", (F.to_date(df.ts)))


VBox()

In [22]:
    
# extract columns to create time table
# start_time, hour, day, week, month, year, weekday
df.createOrReplaceTempView("time_table")
time_table = df.select('ts', 'dt')
time_table = time_table.withColumnRenamed('ts', 'start_time') 

time_table = time_table.withColumn('hour', hour(df.dt)) 
time_table = time_table.withColumn('day', dayofmonth(df.dt))  
time_table = time_table.withColumn('week', weekofyear(df.dt)) 
time_table = time_table.withColumn('month', month(df.dt)) 
time_table = time_table.withColumn('year', year(df.dt)) 
time_table = time_table.withColumn('weekday', dayofweek(df.dt)) 
              
time_table.printSchema()



VBox()

root
 |-- start_time: timestamp (nullable = true)
 |-- dt: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

In [23]:
# write time table to parquet files partitioned by year and month
print("start - writing time parquet files")
time_table.write.mode('overwrite').partitionBy('year','month').parquet(os.path.join(output_data, 'time_table', 'time.parquet'))
print("end - writing time parquet files")

VBox()

start - writing time parquet files
end - writing time parquet files

In [24]:
# read in song data to use for songplays table
song_df = spark.read.json(song_data)


VBox()

In [25]:
    
joined_df = df.join(song_df,
                    (df.song==song_df.title)&(df.artist==song_df.artist_name),
                    'inner'
                   )

VBox()

In [26]:
songplay_table = joined_df.distinct() \
                 .withColumn('songplay_id', F.monotonically_increasing_id()) \
                 .selectExpr('songplay_id',
                         'ts as start_time',
                         'extract(month from ts) as month',
                         'extract(year from ts) as year',
                         'userId as user_id',
                         'level',
                         'song_id',
                         'artist_id',
                         'sessionId as session_id',
                         'location',
                         'userAgent as user_agent')

VBox()

In [27]:
songplay_table.write.mode('overwrite').partitionBy('year','month').parquet(os.path.join(output_data, 'songplays_table', 'songsplay.parquet'))

VBox()