# ETL Processes
The notebook being used to predfine the ETL process using a small part of the dataset. Afterwards the `etl.py` file is applied loading the whole dataset.

In [2]:
# Load libraries

%load_ext sql

import configparser
import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofweek, hour, weekofyear

In [3]:
# Define a spark session enabling spark environment.

spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [8]:
# Extract song data from data source and place it in a dateframe
    
# Rootpath input data (Amazon S3 bucket). Note: No root path required as relative path sufficient due to local testing
# song_data_root = "s3a://udacity-dend/"
    
# Get filepath to data source. Note: Relative path to test data which have been extracted from S3 bucket before.
song_data = "data/song_data/*.json"
log_data = "data/log_data/*.json"

# Read song data and log data files into respective dataframe schemas
df_song_data = spark.read.json(song_data)
df_log_data = spark.read.json(log_data)
print("Success: Read log and song data from S3")

print(df_log_data.printSchema())

Success: Read log and song data from S3
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

None


In [5]:
## Extract data and push them into dataframes

# Udf helper function
# Extract datetime based on the timestamp from log dataframe
get_datetime_udf = udf(lambda ts: datetime.datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S'))

# Add timestamp column to log data frame as it is needed by songsplay and time dateframe tables
df_log_data = df_log_data.withColumn("datetime", get_datetime_udf(df_log_data.ts))

# Create Fact table containing all played songs
# Combination of songs and artist describing available item
# Items which have been actually played are described by calling NextSong page
df_songsplay_table = df_log_data.join(df_song_data, (df_log_data.artist == df_song_data.artist_name) & \
                                    (df_log_data.song == df_song_data.title)) \
                                        .select("userId",
                                                "song_Id",
                                                "artist_Id",
                                                "sessionId",
                                                "level",
                                                "location",
                                                "userAgent",
                                                "datetime",
                                                month("datetime").alias("month"),
                                                year("datetime").alias("year"),) \
                                                .filter(df_log_data.page == "NextSong")

# Extract songs columns and modify its datatype to create songs table
df_songs_table = df_song_data.selectExpr("song_id", 
                                         "title", 
                                         "artist_id", 
                                         "cast(song_id as int) year", 
                                         "cast(song_id as float) duration")

# Extract columns to create artists table
df_artists_table = df_song_data.selectExpr("artist_id", 
                                           "artist_name", 
                                           "artist_location", 
                                           "cast(artist_latitude as float) artist_latitude", 
                                           "cast(artist_longitude as float) artist_longitude") \
                                           .dropDuplicates(["artist_id"])

# Extract user columns to create users table
df_users_table = df_log_data.select("userId", 
                                    "firstName", 
                                    "lastName", 
                                    "gender", 
                                    "level") \
                                    .dropDuplicates(["userId"])

# Create time table dataframe for all songs which have been played
df_time_table = df_songsplay_table.select([hour("datetime").alias("hour"),
                                          month("datetime").alias("month"),
                                          year("datetime").alias("year"),
                                          dayofweek("datetime").alias("weekday"),
                                          weekofyear("datetime").alias("weekofyear"),
                                          "datetime"])

print("Success: Create dataframe tables")

Success: Create dataframe tables


In [11]:
## Save dataframes (tables) as parquet files 

# Rootpath output data (Amazon S3 bucket). Note: No root path required as relative path sufficient due to local testing
# output_data_root = "s3a://s3-bucket-udacity/"

# Write songs back to parquet files partitioned by year and artist id
songs_table_parquet = df_songs_table.write.parquet(partitionBy=["year","artist_id"], 
                                                   path="data/song_data/songs.parquet", 
                                                   mode="overwrite")

# Write users back to parquet file
users_table_parquet = df_users_table.write.parquet(path="data/song_data/users.parquet", 
                                                   mode="overwrite")

# Write artists back to parquet file
artists_table_parquet = df_artists_table.write.parquet(path="data/song_data/artists.parquet", 
                                                       mode="overwrite")

# Write played songs back to parquet file partitioned by year and month
songplays_table_parquet = df_songsplay_table.write.parquet(partitionBy=["year","month"], 
                                                           path="data/songsplay_data/songsplay.parquet", 
                                                           mode="overwrite")

# Write time details back to parquet file partitioned by year and month
time_table_parquet = df_time_table.write.parquet(partitionBy=["year","month"], 
                                                 path="data/time_data/time.parquet", 
                                                 mode="overwrite")

print("Success: Write data into parquet files")