# Sparkify DataLake : using AWS EMR + S3
- This file is to be run on a EMR cluster. (No need to explicity retrieve AWS credentials or create a spark session as we are directly executing code on AWS)
- This notebook contains code for an ETL pipeline that reads input files from S3, processes them using pyspark and finally loads back the processed data to s3.
- At the end, this notebook contains sample queries that can be executed by Sparkify Analytics team for exploratory analysis

#### Import packages 

In [1]:
from datetime import datetime
import os
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1679441267340_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

####  Set Input and Output path

In [2]:
#input data path
input_data = "s3a://udacity-dend/"
#output data path
output_data = "s3://juhi-sparkify/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Process input data files:

- **Song.json**: processed by function **process_song_data()**
- **log.json** : processed by function **process_log_data()**

#### process_song_data():
  This function performs following tasks:<br><br>
    1.  Reads song data from song dataset in s3, creates songs table and writes songs table to parquet files partitioned by year and artist.<br>
    2.  Creates artists table using above song data and writes artists table to parquet files.

In [3]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = os.path.join(input_data, 'song_data/A/A/A/*.json')
    
    # read song data file
    df = spark.read.json(song_data)

    # extract columns to create songs table
    songs_table = df.select(['song_id','title','artist_id','year','duration'])
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.mode("overwrite").parquet(os.path.join(output_data, 'songs'), partitionBy=['year', 'artist_id'])

    # extract columns to create artists table
    artists_table = df.select(['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude'])
    
    # write artists table to parquet files
    artists_table.write.mode("overwrite").parquet(os.path.join(output_data, 'artists'))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### process_log_data():
 This function performs following tasks:<br><br>
 Reads log data from log dataset in s3, based on action: 'NextSong',  creates users,time, and songplays table and  writes these tables to paraquet files. Songsplays table is partitioned partitioned by year and artist.

In [4]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = os.path.join(input_data, 'log_data/*/*/*.json')

    # read log data file
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = df.where(df.page == 'NextSong')

    # extract columns for users table    
    users_table = df.select(['userId', 'firstName', 'lastName', 'gender', 'level'])
    users_table = users_table.drop_duplicates(subset=['userId'])
    
    # write users table to parquet files
    users_table.write.mode("overwrite").parquet(os.path.join(output_data, 'users'))

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d'))
    df = df.withColumn('timestamp', get_timestamp('ts'))
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000)))
    df = df.withColumn('datetime', get_datetime(df.ts))
    
    
    # extract columns to create time table
    time_table = df.select(
                 F.col("timestamp").alias("start_time"),
                 F.hour("timestamp").alias('hour'),
                 F.dayofmonth("timestamp").alias('day'),
                 F.weekofyear("timestamp").alias('week'),
                 F.month("timestamp").alias('month'), 
                 F.year("timestamp").alias('year'), 
                 F.date_format(F.col("timestamp"), "E").alias("weekday")
            )
    
    # write time table to parquet files partitioned by year and month
    time_table.write.mode("overwrite").parquet(os.path.join(output_data, 'time'), partitionBy=['year', 'month'])
    
    # read in song data to use for songplays table
    song_df = spark.read.json(input_data + 'song_data/A/A/A/*.json')

    # extract columns from joined song and log datasets to create songplays table 
    df = df.alias('log_df')
    song_df    = song_df.alias('song_df')
    new_df  = df.join(song_df, col('log_df.artist') == col(
        'song_df.artist_name'), 'inner')
    
    songplays_table = new_df.select(
            col('log_df.datetime').alias('start_time'),
            col('log_df.userId').alias('user_id'),
            col('log_df.level').alias('level'),
            col('song_df.song_id').alias('song_id'),
            col('song_df.artist_id').alias('artist_id'),
            col('log_df.sessionId').alias('session_id'),
            col('log_df.location').alias('location'), 
            col('log_df.userAgent').alias('user_agent'),
            year('log_df.datetime').alias('year'),
            month('log_df.datetime').alias('month')) \
            .withColumn('songplay_id', F.monotonically_increasing_id())

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.mode("overwrite").parquet(os.path.join(output_data, 'songplays'), partitionBy=['year', 'month'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Execute the above defined functions to load, process and store data on S3

In [5]:
#Process song files
process_song_data(spark, input_data, output_data)  
#Process log files
process_log_data(spark, input_data, output_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 36084)
Traceback (most recent call last):
  File "/usr/lib64/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 266, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 254, in authenticate_and_accum_updates
    received_to

#### All the steps of ETL pipeline have been exexuted : Processed data has been uploaded to "s3://juhi-sparkify/"

# Sample Queries

#### 1.  Find the total number of paid and free users:

In [13]:
users_df = spark.read.parquet("s3a://juhi-sparkify/users")
users_df.createOrReplaceTempView("users")
spark.sql("""
    select
        level as subscription_level,
        count(userid) as total_count
    from 
        users
    group by 1""").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+-----------+
|subscription_level|total_count|
+------------------+-----------+
|              free|         75|
|              paid|         21|
+------------------+-----------+

#### 2. Total number of songs played year wise in descending order

In [7]:
songplays_df = spark.read.parquet("s3a://juhi-sparkify/songplays")
songplays_df.createOrReplaceTempView("songplays")
spark.sql("""
    select
        year,
        count(*) as songplay_count
    from 
        songplays
    group by 1
    order by 2 desc
""").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------------+
|year|songplay_count|
+----+--------------+
|2018|            10|
+----+--------------+