# Explanation of ETL Steps

First import and install all necessary modules.

In [56]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, TimestampType
import pandas as pd
import boto3
import gc
!pip install s3fs



Get necessary credentials for accessing AWS S3 buckets.

In [57]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

Create Spark session and increase broadcast timeout. The last step depends on the size of the cluster / machine, which is used.

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [4]:
spark = create_spark_session()
spark.conf.set("spark.sql.broadcastTimeout",  900)

Empty target AWS S3 bucket in advance.<

In [None]:
s3r = boto3.resource("s3")
bucket = s3r.Bucket("christophndde4").objects.all().delete()

Load and transform log data. Afterwards based on this dataframe, the user and time table is built. After that the 2 dimension dataframes are written to parguet files in another AWS S3 bucket.

In [11]:
%%time
sdf = spark.read.format("json").load("s3a://udacity-dend/log_data/*/*/*.json")

CPU times: user 9.16 ms, sys: 0 ns, total: 9.16 ms
Wall time: 21.7 s


In [14]:
sdf = sdf.filter(F.col("page")=="NextSong")

In [15]:
userdf = sdf.select(F.col("userId").alias("user_id"), 
                 F.col("firstname").alias("first_name"), 
                 F.col("lastname").alias("last_name"), 
                 "gender", "level").distinct()\
         .orderBy("userId")

In [18]:
%%time
userdf.write.parquet("s3a://christophndde4/user_table/")

CPU times: user 82.8 ms, sys: 24.5 ms, total: 107 ms
Wall time: 10min 39s


In [18]:
gc.collect()

161

In [19]:
sdf = sdf.withColumn("timestamp", F.expr("cast(ts / 1000 as timestamp)"))

In [20]:
sdf = sdf.withColumn("datetime", F.expr("cast(timestamp as date)"))

In [22]:
tdf = sdf.select(F.col("timestamp").alias("start_time"))\
         .withColumn("hour", F.hour("start_time"))\
         .withColumn("day", F.dayofmonth("start_time"))\
         .withColumn("week", F.weekofyear("start_time"))\
         .withColumn("month", F.month("start_time"))\
         .withColumn("year", F.year("start_time"))\
         .withColumn("weekday", F.dayofweek("start_time"))\
         .orderBy("start_time")

In [25]:
%%time
tdf.write.partitionBy("year", "month").parquet("s3a://christophndde4/time_table/")

CPU times: user 168 ms, sys: 44.8 ms, total: 213 ms
Wall time: 22min 4s


In [23]:
gc.collect()

190

Load song data, create song, artist and songplay (fact) dataframe and write these two back to parquet files in another AWS S3 bucket.

In [27]:
%%time
songstage_df = spark.read.format("json").load("s3a://udacity-dend/song_data/*/*/*/*.json")

CPU times: user 123 ms, sys: 38.6 ms, total: 162 ms
Wall time: 6min 9s


In [30]:
songsdf = songstage_df.select("song_id", "title", "artist_id", "year", 
                              "duration")\
                      .orderBy(F.col("song_id"))

In [32]:
%%time
songs_df.write.partitionBy("year", "artist_id").parquet(output_data + "song_table.parquet/")

CPU times: user 193 ms, sys: 52.2 ms, total: 245 ms
Wall time: 26min 31s


In [34]:
gc.collect()

107

In [35]:
artistdf = songstage_df.select("artist_id", 
                               F.col("artist_name").alias("name"),
                               F.col("artist_location").alias("location"),
                               F.col("artist_latitude").alias("latitude"),
                               F.col("artist_longitude").alias("longitude"))\
                       .distinct().orderBy(F.col("artist_id"))

In [31]:
%%time
artistdf.write.parquet("s3a://christophndde4/artist_table/")

CPU times: user 270 ms, sys: 54 ms, total: 324 ms
Wall time: 24min 28s


In [39]:
gc.collect()

118

In [41]:
sdf = sdf.withColumn("songplay_id", F.monotonically_increasing_id())

In [51]:
songplay_df = sdf.join(songstage_df, 
                       (songstage_df.artist_name == sdf.artist) &
                       (songstage_df.title == sdf.song),
                       how="left")\
                 .select(sdf["songplay_id"],
                         sdf["timestamp"].alias("start_time"),
                         sdf["userId"].alias("user_id"),
                         sdf["level"],
                         songstage_df["song_id"],
                         songstage_df["artist_id"],
                         sdf["sessionId"].alias("session_id"),
                         songstage_df["artist_location"].alias("location"),
                         sdf["userAgent"].alias("user_agent"))\
                 .filter(songstage_df["song_id"].isNotNull())

In [52]:
%%time
songplay_df.write.partitionBy(F.year("start_time"), F.month("start_time").parquet("s3a://christophndde4/songplay_table/")

CPU times: user 56 ms, sys: 20.5 ms, total: 76.5 ms
Wall time: 7min 17s


In [54]:
gc.collect()

363