In [15]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session 18381d80-0516-4d9d-970b-087d8b4fdca0.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session 18381d80-0516-4d9d-970b-087d8b4fdca0.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 5.0


You are already connected to a glueetl session 18381d80-0516-4d9d-970b-087d8b4fdca0.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session 18381d80-0516-4d9d-970b-087d8b4fdca0.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



In [16]:
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame




In [2]:
from pyspark.sql.functions import explode, col, to_date




In [3]:
s3_path = "s3://spotify-etl-saiteja/Raw_data/for_process/"
source_df = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    connection_options={"paths":[s3_path]},
    format="json")




In [4]:
spotify_df = source_df.toDF()



In [5]:
def process_albums(df):
    df = df.withColumn("items", explode("items")).select(
        col("items.track.album.id").alias("album_id"),
        col("items.track.album.name").alias("album_name"),
        col("items.track.album.release_date").alias("release_date"),
        col("items.track.album.total_tracks").alias("total_tracks"),
        col("items.track.album.external_urls.spotify").alias("url")
    ).drop_duplicates(["album_id"])
    return df




In [6]:
df = spotify_df




In [7]:
df_artists_explode = df.select(explode(col("items")).alias("item")).select(explode(col("item.track.artists")).alias("artist"))




In [8]:
def process_artists(df):
    df_items_exploded = df.select(explode(col("items")).alias("item"))

    df_artists_exploded = df_items_exploded.select(explode(col("item.track.artists")).alias("artist"))
    df_artists = df_artists_exploded.select(
        col("artist.id").alias("artist_id"),
        col("artist.name").alias("artist_name"),
        col("artist.external_urls.spotify").alias("external_url")
    ).drop_duplicates(["artist_id"])
    
    return df_artists




In [9]:
def process_songs(df):
    df_exploded = df.select(explode(col("items")).alias("item"))
    df_songs = df_exploded.select(
        col("item.track.id").alias("song_id"),
        col("item.track.name").alias("song_name"),
        col("item.track.duration_ms").alias("duration_ms"),
        col("item.track.external_urls.spotify").alias("url"),
        col("item.track.popularity").alias("popularity"),
        col("item.added_at").alias("song_added"),
        col("item.track.album.id").alias("album_id"),
        col("item.track.artists")[0]["id"].alias("artist_id")
    ).drop_duplicates(["song_id"])
    
    # Convert string dates in 'song_added' to actual date types
    df_songs = df_songs.withColumn("song_added", to_date(col("song_added")))
    
    return df_songs




In [10]:
album_df = process_albums(spotify_df)
album_df.show()

+--------------------+--------------------+------------+------------+--------------------+
|            album_id|          album_name|release_date|total_tracks|                 url|
+--------------------+--------------------+------------+------------+--------------------+
|0hBRqPYPXhr1RkTDG...|The Secret of Us ...|  2024-10-18|          20|https://open.spot...|
|0ny6mZMBrYSO0s8HA...|A Jolly Christmas...|     1957-09|          15|https://open.spot...|
|10FLjwfpbxLmW8c25...|    Die With A Smile|  2024-08-16|           1|https://open.spot...|
|15XcLhiVMlSOipUdd...|                MUSE|  2024-07-19|           7|https://open.spot...|
|1DILNh7maaYyKxe15...|Rock 'N' Roll Rar...|  1986-01-01|          20|https://open.spot...|
|1Ph9nV8cNv7Gq7yHO...|Spotify Singles H...|  2023-11-14|           1|https://open.spot...|
|1Ss0ArMRr91m83mOg...|                 GNX|  2024-11-21|          12|https://open.spot...|
|1pSxnWkixgHxo9jnG...|Run Rudolph Run -...|  2024-10-15|           1|https://open.spot...|

In [11]:
artist_df = process_artists(spotify_df)
artist_df.show()

+--------------------+--------------------+--------------------+
|           artist_id|         artist_name|        external_url|
+--------------------+--------------------+--------------------+
|0HS96tPggl2nwiz3g...|Mitchell Ayres & ...|https://open.spot...|
|0JXiS2FrAg3wQYJHc...|  B. Swanson Quartet|https://open.spot...|
|0du5cEVh5yTK9QJze...|          Bruno Mars|https://open.spot...|
|0wi4yTYlGtEnbGo4l...|     Shakin' Stevens|https://open.spot...|
|0ys2OFYzWYB5hRDLC...|       Fuerza Regida|https://open.spot...|
|11p2E654TTU8e0nZW...|          Mark Ambor|https://open.spot...|
|1GxkXlMwML1oSg5eL...|       Michael Bublé|https://open.spot...|
|1HY2Jd0NmPuamShAr...|           Lady Gaga|https://open.spot...|
|1Ml4OuStDoympbREU...| The Fontane Sisters|https://open.spot...|
|1Mxqyy3pSjf8kZZL4...|       Frank Sinatra|https://open.spot...|
|1Xyo4u8uXC1ZmMpat...|          The Weeknd|https://open.spot...|
|1iCnM8foFssWlPRLf...|          Gigi Perez|https://open.spot...|
|1oSPZhvZMIrWW5I41...|   

In [12]:
song_df = process_songs(spotify_df)
song_df.show()

+--------------------+--------------------+-----------+--------------------+----------+----------+--------------------+--------------------+
|             song_id|           song_name|duration_ms|                 url|popularity|song_added|            album_id|           artist_id|
+--------------------+--------------------+-----------+--------------------+----------+----------+--------------------+--------------------+
|0HZk0QsXPhMNAWNDR...|          Hallelujah|     268960|https://open.spot...|        60|2024-12-31|3sId8sOH47yqOWopz...|26AHtbjWKiwYzsoGo...|
|0SUKNXEqZnRw7g0NO...|Let It Snow! Let ...|     155586|https://open.spot...|        47|2024-12-31|2B9FcHKqz3PWJlh0y...|1Mxqyy3pSjf8kZZL4...|
|0bYg9bo50gSsH3LtX...|All I Want for Ch...|     241106|https://open.spot...|        69|2024-12-31|61ulfFSmmxMhc2wCd...|4iHNK0tOyZPYnBU7n...|
|0hz5PNF2wejwUKCMp...|     Merry Christmas|     208546|https://open.spot...|        12|2024-12-31|3mJm77zryIt06nbQV...|6eUKZXaKkcviH0Ku9...|
|0lizgQ7Qw35o

In [13]:
def write_to_s3(df, path_suffix, format_type="csv"):
    # Convert back to DynamicFrame
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
    
    glueContext.write_dynamic_frame.from_options(
        frame = dynamic_frame,
        connection_type = "s3",
        connection_options = {"path": f"s3://spotify-etl-saiteja/Transformation_data/{path_suffix}/"},
        format = format_type
    )





In [17]:
#write data to s3   
write_to_s3(album_df, "album/album_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")





In [18]:
write_to_s3(artist_df, "artist/artist_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")





In [19]:
write_to_s3(song_df, "songs/songs_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")


