# ETL with partitions

In [1]:
import zipfile
import os
import shutil
import configparser
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, desc
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, to_date, monotonically_increasing_id
from pyspark.sql.types import TimestampType, DateType

## Extract data locally

In [2]:
song_folder = "./data/song_data"
log_folder = "./data/log_data"
songs_table_folder = "./data/songs_table"
artists_table_folder = "./data/artists_table"
users_table_folder = "./data/users_table"
time_table_folder = "./data/time_table"
songplays_table_folder = "./data/songplays_table"
try:
    shutil.rmtree(song_folder)
    shutil.rmtree(log_folder)
    shutil.rmtree(songs_table_folder)
    shutil.rmtree(artists_table_folder)
    shutil.rmtree(users_table_folder)
    shutil.rmtree(time_table_folder)
    shutil.rmtree(songplays_table_folder)
except:
    print("All folders were deleted")

All folders were deleted


In [3]:
# Extract Song Data
song_zip = "./data/song-data.zip"
with zipfile.ZipFile(song_zip, 'r') as zip_ref:
    zip_ref.extractall("./data/")

In [4]:
# Extract Log Data
log_zip = "./data/log-data.zip"
with zipfile.ZipFile(log_zip, 'r') as zip_ref:
    zip_ref.extractall(log_folder)

## Create Spark Session

In [5]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

## Process Song Data

In [6]:
# read song data file
song_files = song_folder + "/*/*/*"
df = spark.read.json(song_files)
print(df.count())
df.printSchema()

71
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



### Creating Songs Table

In [7]:
# extract columns to create songs table
song_columns = ['song_id','title','artist_id','year','duration']
song_primary_key = ['song_id']
song_table = df.select(song_columns).dropDuplicates(song_primary_key)

In [8]:
song_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [9]:
# write songs table to parquet files partitioned by year and artist
partition_columns = ["year", "artist_id"]
song_table.write.partitionBy(partition_columns).parquet(songs_table_folder, mode = "overwrite")

### Creating Artists Table

In [10]:
# extract columns to create artists table
artist_columns = ['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']
artist_primary_key = ['artist_id']
artist_table = df.select(artist_columns).dropDuplicates(artist_primary_key)

In [11]:
artist_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)



In [12]:
# write artists table to parquet files
artist_table.write.parquet(artists_table_folder, mode = "overwrite")

## Process Log Data

In [13]:
# read log data file
log_df = spark.read.json(log_folder)
print(log_df.count())
log_df.printSchema()

8056
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [14]:
# filter by actions for song plays
log_df = log_df.where("page == 'NextSong'")
print(log_df.count())
log_df.printSchema()

6820
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



### Creating Users Table

In [15]:
# extract columns for users table
user_columns = ['userId','firstName','lastName','gender','level']
user_primary_key = ['userId']
users_table = log_df.select(user_columns).dropDuplicates(user_primary_key)

In [16]:
users_table.printSchema()

root
 |-- userId: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [17]:
# write users table to parquet files
users_table.write.parquet(users_table_folder,mode='overwrite')

### Creating Time Table

In [18]:
# Function to convert timestamp to datetime
@udf(TimestampType())
def get_timestamp(ts):
    return datetime.fromtimestamp(ts/1e3)
# extract columns for time table
time_columns = ['ts']
time_primary_key = ['ts']
time_table = log_df.select(time_columns).dropDuplicates(time_primary_key)

In [19]:
# create additional time columns
time_table = time_table.withColumn('month',month(get_timestamp('ts'))).withColumn('year',year(get_timestamp('ts'))).withColumn('day',dayofmonth(get_timestamp('ts'))).withColumn('hour',hour(get_timestamp('ts'))).withColumn('week',weekofyear(get_timestamp('ts'))).withColumn('weekday',date_format(get_timestamp('ts'),"u"))
time_table.printSchema()

root
 |-- ts: long (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: string (nullable = true)



In [20]:
# write time table to parquet files partitioned by year and month
partition_columns = ["year", "month"]
time_table.write.partitionBy(partition_columns).parquet(time_table_folder, mode="overwrite")

### Creating Songplays Table

In [21]:
# read song data file
song_files = song_folder + "/*/*/*"
song_df = spark.read.json(song_files)

In [22]:
# Filter columns of song dataset
songplays_columns_song = ['artist_id','artist_name','song_id','title']
filtered_song = song_df.select(songplays_columns_song)

In [23]:
# Filter columns of log dataset
songplays_columns_log = ['ts','userId','level','song','artist','sessionId','location','userAgent']
filtered_log = log_df.select(songplays_columns_log)

In [24]:
# Create additional columns
filtered_log = filtered_log.withColumn('month',month(get_timestamp('ts'))).withColumn('year',year(get_timestamp('ts')))

In [25]:
# create songplays table 
filtered_song.createOrReplaceTempView("song_view")
filtered_log.createOrReplaceTempView("log_view")
songplays_table = spark.sql(
    """
    SELECT 
        l.ts as ts, 
        l.userId as user_id, 
        l.level as level, 
        s.song_id as song_id,
        s.artist_id as artist_id,
        l.sessionId as session_id,
        l.location as location,
        l.userAgent as user_agent,
        l.month as month,
        l.year as year
    FROM log_view l
    JOIN song_view s
    ON l.song = s.title AND l.artist = s.artist_name
    """
).withColumn("songplay_id", monotonically_increasing_id())

In [26]:
songplays_table.printSchema()

root
 |-- ts: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- songplay_id: long (nullable = false)



In [27]:
# write songplays table to parquet files partitioned by year and month
partition_columns = ["year", "month"]
songplays_table.write.partitionBy(partition_columns).parquet(songplays_table_folder, mode = 'overwrite')

In [34]:
!rm -r ./data/spark-warehouse