## Required Imports & Libraries

In [1]:
import configparser
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

## Read Configuration file

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"] = config['AWS']['AWS_SECRET_ACCESS_KEY']

## Define Spark Session and Context

spark = SparkSession \
        .builder  \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
        .getOrCreate()
spark

## Read the Song data: s3://udacity-dend/song_data

In [None]:
# get filepath to song data file
song_data = "s3a://udacity-dend/song_data/*/*/*/*.json"

# read song data file
df = spark.read.json(song_data)
df.printSchema()
df.show(3)

## Create the 1st Dimension table songs and write into Parquet file insode my S3

In [None]:
# extract columns to create songs table (1st dimension table)
#song_id, title, artist_id, year, duration
songs_table = df.select(['song_id','title','artist_id','year','duration'])

# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy(['year','artist_id']).parquet("s3a://udacity-datalake-msaied/Project4_Sparkify_DataLake/songs.parquet", mode="overwrite")

## Create the 2nd Dimension table Artists and write into Parquet file insode my S3

In [None]:
# extract columns to create artists table (2nd dimension table)
#artist_id, name, location, lattitude, longitude
artists_table = df.selectExpr("artist_id", "artist_name as name", "artist_location as location", "artist_latitude as latitude, artist_longitude as logitude")

# write artists table to parquet files
artists_table.write.parquet("s3a://udacity-datalake-msaied/Project4_Sparkify_DataLake/artist.parquet", mode="overwrite")

## Read the Log data: s3://udacity-dend/log_data

In [None]:
# get filepath to log data file
log_data = "s3a://udacity-dend/log-data/*.json"

# read log data file
df = spark.read.json(log_data)

# filter by actions for song plays
df = df.filter(df.page == 'NextSong')
df.printSchema()
df.show(3)

## Create the 3rd Dimension table Users and write into Parquet file insode my S3

In [None]:
# extract columns for users table (3rd dimension table)
#user_id, first_name, last_name, gender, level
users_table = df.selectExpr('userId as user_id','firstName as first_name','lastName as last_name','gender','level').distinct()

# write users table to parquet files
users_table.write.parquet("s3a://udacity-datalake-msaied/Project4_Sparkify_DataLake/users.parquet", mode="overwrite")

## Create the 4th Dimension table Time and write into Parquet file insode my S3

In [None]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000).replace(microsecond=0), TimestampType())
df = df.withColumn("start_time", get_timestamp("ts"))

# extract columns to create time table (4th dimension table)
#start_time, hour, day, week, month, year, weekday
time_table = df.select(['start_time'])
time_table = time_table.withColumn("hour", hour("start_time"))
time_table = time_table.withColumn("day", dayofmonth("start_time"))
time_table = time_table.withcolumn("week", weekofyear("start_time"))
time_table = time_table.withColumn("month", month("start_time"))
time_table = time_table.withColumn("year", year("start_time"))
time_table = time_table.withColumn("weekday", dayofweek("start_time"))

time_table = time_table.select("start_time", "hour", "day", "week", "month", "year", "weekday").distinct()

# write time table to parquet files partitioned by year and month
time_table.write.partitionBy(['year','month']).parquet("s3a://udacity-datalake-msaied/Project4_Sparkify_DataLake/time.parquet", mode="overwrite")

## Create the Fact table SongPlays and write into Parquet file insode my S3

In [None]:
# read in song data to use for songplays table
song_df = spark.read.parquet("s3a://udacity-datalake-msaied/Project4_Sparkify_DataLake/songs.parquet")

# extract columns from joined song and log datasets to create songplays table 
#songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays_df = df.join( \
    song_df, \
    [ \
        df.song == song_df.title, \
        df.artist = song_df.name, \
        df.length == song_df.duration \
    ],"left" \
)

songplays_table = songplays_df.join(time_table, "start_time", "left") \
    .select( \
        "start_time", \
        col("userId").alias("user_id"), \
        "level", \
        "song_id", \
        "artist_id", \
        col("sessionId").alias("session_id"), \
        "location", \
        col("userAgent").alias("user_agent"), \
        "year", \
        "month" \
    )

songplays_table.withColumn("songplay_id",monotonically_increasing_id())

# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy(['year','month']).parquet("s3a://udacity-datalake-msaied/Project4_Sparkify_DataLake/songplays.parquet", mode="overwrite")