In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 16bbf949-ec99-4038-a957-c0813c5a53fd
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session 16bbf949-ec99-4038-a957-c0813c5a53fd to get into ready status...
Session 16bbf949-ec99-4038-a957-c0813c5a53fd has been created.



In [97]:
from pyspark.sql.functions import *
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame




In [2]:
s3_path = "s3://spotify-etl-project-rajeshwar/raw_data/to_processed/"
source_df = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    connection_options={"paths":[s3_path]},
    format = "json"
)




In [3]:
source_df

<awsglue.dynamicframe.DynamicFrame object at 0x7f52c5cb0220>


In [53]:
spotify_df = source_df.toDF()




In [89]:
##Creating functions

def process_albums(df):
    df = df.withColumn("items",explode("items")).select(
         col("items.track.album.id").alias("album_id"),
         col("items.track.album.name").alias("album_name"),
         col("items.track.album.release_date").alias("album_release_date"),
         col("items.track.album.total_tracks").alias("album_total_tracks"),
         col("items.track.album.external_urls.spotify").alias("album_url")).drop_duplicates(['album_id'])
    return df


def process_artists(df):
    #First explode the items to get individual tracks
    df_items_exploded = df.select(explode(col("items")).alias("item"))
    
    # Then, explode the artist array within each item to create a row for each artist
    df_artist_exploded = df_items_exploded.select(explode(col("item.track.artists")).alias("artist"))
    
    # Now, select the artist attributes, ensuring each artist is in its own row
    df_artist = df_artist_exploded.select(
                col("artist.id").alias("artist_id"),
                col("artist.name").alias("artist_name"),
                col("artist.external_urls.spotify").alias("artist_external_url")).drop_duplicates(['artist_id'])
    
    return df_artist


def process_songs(df):
    # Explode the items array to create a row for each song
    df_song_explode = df.select(explode(col("items")).alias("item"))
    
    # Extract song information from the exploded dataframe
    df_songs = df_song_explode.select(
               col("item.track.id").alias("song_id"),
               col("item.track.name").alias("song_name"),
               col("item.track.duration_ms").alias("song_duration_ms"),
               col("item.track.external_urls.spotify").alias("song_url"),
               col("item.track.popularity").alias("song_popularity"),
               col("item.added_at").alias("song_added_at"),
               col("item.track.album.id").alias("album_id"),
               col("item.track.album.artists")[0]["id"].alias("artist_id")).drop_duplicates(['song_id'])
    
    # Convert string dates in 'song_added' to actual date types
    
    df_songs = df_songs.withColumn("song_added_at",to_date(col("song_added_at")))
    
    return df_songs
    
    





In [90]:
album_df = process_albums(spotify_df)
album_df.show(5)

+--------------------+--------------------+------------------+------------------+--------------------+
|            album_id|          album_name|album_release_date|album_total_tracks|           album_url|
+--------------------+--------------------+------------------+------------------+--------------------+
|0DLvFVIfwt3OHdK9k...|Where I've Been, ...|        2024-05-31|                12|https://open.spot...|
|0EiI8ylL0FmWWpgHV...|The Rise and Fall...|        2023-09-22|                14|https://open.spot...|
|0lgs2Sa82lyX89nBU...|      FERXXOCALIPSIS|        2023-12-01|                10|https://open.spot...|
|0wCLHkBRKcndhMQQp...|  I Don't Wanna Wait|        2024-04-05|                 2|https://open.spot...|
|15XcLhiVMlSOipUdd...|                MUSE|        2024-07-19|                 7|https://open.spot...|
+--------------------+--------------------+------------------+------------------+--------------------+
only showing top 5 rows


In [91]:
artist_df = process_artists(spotify_df)
artist_df.show(5)

+--------------------+---------------+--------------------+
|           artist_id|    artist_name| artist_external_url|
+--------------------+---------------+--------------------+
|06HL4z0CvFAxyc27G...|   Taylor Swift|https://open.spot...|
|0PCCGZ0wGLizHt2KZ...|        Artemas|https://open.spot...|
|0Y5tJX1MQlPlqiwlO...|   Travis Scott|https://open.spot...|
|0du5cEVh5yTK9QJze...|     Bruno Mars|https://open.spot...|
|0jPHHnU8GUWEF7rwP...|Jordan Adetunji|https://open.spot...|
+--------------------+---------------+--------------------+
only showing top 5 rows


In [92]:
song_df = process_songs(spotify_df)
song_df.show(5)

+--------------------+-------------------+----------------+--------------------+---------------+-------------+--------------------+--------------------+
|             song_id|          song_name|song_duration_ms|            song_url|song_popularity|song_added_at|            album_id|           artist_id|
+--------------------+-------------------+----------------+--------------------+---------------+-------------+--------------------+--------------------+
|0OA00aPt3BV10qeMI...|          Big Dawgs|          190666|https://open.spot...|             91|   2024-08-16|6Yw4204wbgmpsGTzj...|4nVa6XlBFlIkF6msW...|
|0WbMK4wrZ1wFSty9F...|   Good Luck, Babe!|          218423|https://open.spot...|             96|   2024-08-16|1WAjjRMfZjEXtB0lQ...|7GlBOeep6PqTfFi59...|
|0nJW01T7XtvILxQgC...|When I Was Your Man|          213826|https://open.spot...|             88|   2024-08-16|58ufpQsJ1DS5kq4hh...|0du5cEVh5yTK9QJze...|
|17phhZDn6oGtzMe56...|       Lose Control|          210688|https://open.spot...|  

In [99]:
# Write data back to S3

def write_to_s3(df,path_suffix,format_type="csv"):
    dynamic_frame = DynamicFrame.fromDF(df,glueContext,"dynamic_frame")
    
    glueContext.write_dynamic_frame.from_options(
        frame = dynamic_frame,
        connection_type = "s3",
        connection_options = {"path":f"s3://spotify-etl-project-rajeshwar/transform_data/{path_suffix}/"},
        format = format_type
        
    )




In [100]:
write_to_s3(album_df,"album_data/album_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")),'csv')




In [101]:
write_to_s3(artist_df,"artist_data/artist_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")),'csv')




In [102]:
write_to_s3(song_df,"songs_data/song_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")),'csv')


