## Project: Data lake for Sparkify (By using AWS S3)
- #### Project object:
Build an ETL pipeline that extracts Sparkify user activities data (song_data and log_data) which resides in Sparkify data lake (AWS S3), processes the data using Spark cluster, and loads the data back into AWS S3 as a set of dimensional tables.



-----

## **PART 1. READ DATA**

- ### Import packages


In [1]:
import configparser
import os
import pyspark.sql.functions as F

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Doub, StringType as Str, LongType as Long, TimestampType

- ### Read the AWS access key

In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS CREDS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS CREDS', 'AWS_SECRET_ACCESS_KEY')

- ### Define the input and output file path

In [None]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://fikruanktest"

- ### Create Spark session

In [9]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

- ### Load song_data and log_data from S3

In [22]:
# define schema for song_data
SongSchema = R([
                Fld("artist_id",Str()),
                Fld("artist_latitude",Doub()),
                Fld("artist_location",Str()),
                Fld("artist_longitude",Doub()),
                Fld("artist_name",Str()),
                Fld("duration",Doub()),
                Fld("num_songs",Long()),
                Fld("song_id",Str()),
                Fld("title",Str()),
                Fld("year",Long()),
               ])

# load song_data from S3 bucket
song_data = os.path.join(input_data, "song_data/A/*/*/*.json")
song_df = spark.read.json(song_data)

In [None]:
# define schema for log_data
LogSchema = R([
                Fld("artist",Str()),
                Fld("auth",Str()),
                Fld("firstName",Str()),
                Fld("gender",Str()),
                Fld("itemInSession",Long()),
                Fld("lastName",Str()),
                Fld("length",Doub()),
                Fld("level",Str()),
                Fld("location",Str()),
                Fld("method",Str()),
                Fld("page",Str()),
                Fld("registration",Doub()),
                Fld("sessionId",Long()),
                Fld("song",Str()),
                Fld("status",Long()),
                Fld("ts",Long()),
                Fld("userAgent",Str()),
                Fld("userId",Str()),
              ])

# load log_data from S3 bucket
log_data = os.path.join(input_data, "log_data/*/*/*.json")
log_df = spark.read.json(log_data)

## **PART 2. DEAL WITH SONG_DATA AND LOG_DATA**

- ### Deal with song_data(extract data to form artist, song tables)

In [None]:
# ARTIST TABLE PART
# extract columns from song_df, drop duplicated value, drop none value
artist_df = song_df.select(['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude'])
artist_df = artist_df.dropDuplicates(['artist_id'])
artist_df = artist_df.dropna(how = "any", subset = ["artist_id"])
artist_df = artist_df.filter(artist_df.artist_id != "")

In [None]:
# check the data
artist_df.sort('artist_id').show(10)

In [None]:
# save artist table to s3 in parquet format
artist_df.write.parquet("{}/artist_table.parquet".format(output_data))

In [None]:
# SONG TABLE PART
# extract columns from song_df, drop duplicated value, drop none value
songinf_df = song_df.select(['song_id', 'title', 'artist_id', 'year', 'duration'])
songinf_df = songinf_df.dropDuplicates(['song_id'])
songinf_df = songinf_df.dropna(how = "any", subset = ["song_id"])
songinf_df = songinf_df.filter(songinf_df.song_id != "")

In [None]:
# check the data
songinf_df.sort('song_id').show(10)

In [None]:
# save song table to s3 in parquet format
songinf_df.write.partitionBy("year", "artist_id").parquet("{}/song_table.parquet".format(output_data))

- ### Deal with log_data(extract data to form user, time and songplay tables)

In [None]:
# USER TABLE PART
# extract columns from song_df, drop duplicated value, drop none value
user_df = log_df.select(['userId', 'firstName', 'lastName', 'gender', 'level'])
user_df = user_df.dropDuplicates(['userId'])
user_df = user_df.dropna(how = "any", subset = ["userId"])
user_df = user_df.filter(user_df.userId != "")

In [None]:
# check the data
user_df.sort('userId').show(10)

In [None]:
# save user table to s3 in parquet format
user_df.write.parquet("{}/user_table.parquet".format(output_data))

In [None]:
# TIME TABLE PART
# define a function for convert ts column in log_df
def convert_timestamp(x):
    datetime_data = datetime.fromtimestamp(x/1000)
    return datetime_data

# register this function by udf function so that spark can use it
convert_timestamp_udf=udf(convert_timestamp, TimestampType())  

In [None]:
# convert the value in ts column into datetime format, drop duplicated value, drop none value
time_df = log_df.select(['ts'])
time_df = time_df.dropDuplicates(['ts'])
time_df = time_df.dropna(how = "any", subset = ["ts"])

In [None]:
# check data in time table
time_df.show(10)

In [None]:
# extract start_time, hour, day, week, month, year, weekday from ts column
time_df = time_df.withColumn('start_time',convert_timestamp_udf('ts'))
time_df = time_df.withColumn('hour', F.hour('start_time'))
time_df = time_df.withColumn('day', F.dayofmonth('start_time'))
time_df = time_df.withColumn('week', F.weekofyear('start_time'))
time_df = time_df.withColumn('month', F.month('start_time'))
time_df = time_df.withColumn('year', F.year('start_time'))
time_df = time_df.withColumn('weekday', F.dayofweek('start_time'))

# form the final time table
time_df = time_df.select(['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday'])

In [None]:
# check data in time table
time_df.show(10)

In [None]:
# save time table to s3 in parquet format
time_df.write.partitionBy("year", "month").parquet("{}/time_table.parquet".format(output_data))

In [None]:
# SONGPLAY TABLE PART
# filter the log_df (extract "the value in page is equal to NextSong")
log_df_filter = log_df.where(log_df.page == 'NextSong')

# create start_time column for log_df_filter
log_df_filter = log_df_filter.withColumn('start_time',convert_timestamp_udf('ts'))

# define the cond variable for the following join operation
cond = [log_df_filter.artist == song_df.artist_name, 
        log_df_filter.song == song_df.title,
        log_df_filter.length == song_df.duration]

# join log_df_filter and song_df
songplay_df = log_df_filter.join(song_df, cond) \
                            .select([F.monotonically_increasing_id().alias('songplay_id'),
                                      log_df_filter.start_time,
                                      log_df_filter.userId,
                                      log_df_filter.level,
                                      song_df.song_id,
                                      song_df.artist_id,
                                      log_df_filter.sessionId,
                                      log_df_filter.location,
                                      log_df_filter.userAgent])

In [None]:
# check the data in songplay_df
songplay_df.show(10)

In [None]:
# save songplay table to s3 in parquet format
songplay_df.write.partitionBy("year", "month").parquet("{}/songplay_table.parquet".format(output_data))