# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [4]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session 1cfbe57f-4743-431b-b0aa-071956983124.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session 1cfbe57f-4743-431b-b0aa-071956983124.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 5.0


You are already connected to a glueetl session 1cfbe57f-4743-431b-b0aa-071956983124.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: None
Setting new worker type to: G.1X


You are already connected to a glueetl session 1cfbe57f-4743-431b-b0aa-071956983124.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: None
Setting new number of workers to: 5



In [18]:
from pyspark.sql.functions import explode,col,to_date
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame




In [6]:
s3_path = "s3://spotify-etl-project-shubham/raw_data/to_processed/"
source_dyf = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    connection_options={"paths":[s3_path]},
    format="json"
)




In [7]:
spotify_df = source_dyf.toDF()



In [1]:
def process_albums(df):
    df = df.withColumn("items", explode("items")).select(
        col("items.track.album.id").alias("album_id"),
        col("items.track.album.name").alias("album_name"),
        col("items.track.album.release_date").alias("release_date"),
        col("items.track.album.total_tracks").alias("total_tracks"),
        col("items.track.album.external_urls.spotify").alias("url")
    ).drop_duplicates(["album_id"])
    return df


def process_artists(df):
    # First, explode the items to get individual tracks
    df_items_exploded = df.select(explode(col("items")).alias("item"))
    
    # Then, explode the artists array within each item to create a row for each artist
    df_artists_exploded = df_items_exploded.select(explode(col("item.track.artists")).alias("artist"))
    
    # Now, select the artist attributes, ensuring each artist is in its own row
    df_artists = df_artists_exploded.select(
        col("artist.id").alias("artist_id"),
        col("artist.name").alias("artist_name"),
        col("artist.external_urls.spotify").alias("external_url")
    ).drop_duplicates(["artist_id"])
    
    return df_artists


def process_songs(df):
    # Explode the items array to create a row for each song
    df_exploded = df.select(explode(col("items")).alias("item"))
    
    # Extract song information from the exploded DataFrame
    df_songs = df_exploded.select(
        col("item.track.id").alias("song_id"),
        col("item.track.name").alias("song_name"),
        col("item.track.duration_ms").alias("duration_ms"),
        col("item.track.external_urls.spotify").alias("url"),
        col("item.track.popularity").alias("popularity"),
        col("item.added_at").alias("song_added"),
        col("item.track.album.id").alias("album_id"),
        col("item.track.artists")[0]["id"].alias("artist_id")
    ).drop_duplicates(["song_id"])
    
    # Convert string dates in 'song_added' to actual date types
    df_songs = df_songs.withColumn("song_added", to_date(col("song_added")))
    
    return df_songs


Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: 1cfbe57f-4743-431b-b0aa-071956983124
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session 1cfbe57f-4743-431b-b0aa-071956983124 to get into ready status...
Session 1cfbe57f-4743-431b-b0aa-071956983124 has been created.



In [10]:
albums_df = process_albums(spotify_df)
albums_df.show(5)

+--------------------+--------------------+------------+------------+--------------------+
|            album_id|          album_name|release_date|total_tracks|                 url|
+--------------------+--------------------+------------+------------+--------------------+
|07V9HO6Djetw5j5lX...|    So Close To What|  2025-02-20|          16|https://open.spot...|
|07w0rG5TETcyihsEI...|                 SOS|  2022-12-09|          23|https://open.spot...|
|0DLvFVIfwt3OHdK9k...|Where I've Been, ...|  2024-05-31|          12|https://open.spot...|
|0TxewlKVKdpP18dGg...|       Love Somebody|  2024-10-18|           1|https://open.spot...|
|0fSfkmx0tdPqFYkJu...|               MUSIC|  2025-03-14|          30|https://open.spot...|
+--------------------+--------------------+------------+------------+--------------------+
only showing top 5 rows


In [11]:
artist_df = process_artists(spotify_df)
artist_df.show(5)

+--------------------+-------------+--------------------+
|           artist_id|  artist_name|        external_url|
+--------------------+-------------+--------------------+
|0Y5tJX1MQlPlqiwlO...| Travis Scott|https://open.spot...|
|0du5cEVh5yTK9QJze...|   Bruno Mars|https://open.spot...|
|0fTSzq9jAh4c36UVb...|  Alex Warren|https://open.spot...|
|0iEtIxbK0KxaSlF7G...| Metro Boomin|https://open.spot...|
|0ys2OFYzWYB5hRDLC...|Fuerza Regida|https://open.spot...|
+--------------------+-------------+--------------------+
only showing top 5 rows


In [13]:
song_df = process_songs(spotify_df)
song_df.show(5)

+--------------------+----------------+-----------+--------------------+----------+----------+--------------------+--------------------+
|             song_id|       song_name|duration_ms|                 url|popularity|song_added|            album_id|           artist_id|
+--------------------+----------------+-----------+--------------------+----------+----------+--------------------+--------------------+
|00iLTetTLAeImmBlh...|           K POP|     112992|https://open.spot...|        73|2025-03-22|0fSfkmx0tdPqFYkJu...|699OTQXzgjhIYAHMy...|
|0NUqi0ps17YpLUC3k...|      DIE TRYING|     195431|https://open.spot...|        87|2025-03-22|6Rl6YoCarF2GHPSQm...|2HPaUgqeutzr3jx5a...|
|0UtnpKaReKUg2Gqua...|     Money Trees|     386906|https://open.spot...|        70|2025-03-22|748dZDqSZy6aPXKcI...|2YZyLoL8N0Wb9xBt1...|
|0WbMK4wrZ1wFSty9F...|Good Luck, Babe!|     218423|https://open.spot...|        94|2025-03-22|1WAjjRMfZjEXtB0lQ...|7GlBOeep6PqTfFi59...|
|0WiyWiJDkNCyGNqkv...|     Chasin' You|  

In [15]:
def write_to_s3(df, path_suffix, format_type="csv"):
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
    
    glueContext.write_dynamic_frame.from_options(
        frame=dynamic_frame,
        connection_type="s3",
        connection_options={"path": f"s3://spotify-etl-project-shubham/transformed_data/{path_suffix}/"},
        format=format_type
    
    )




In [19]:
write_to_s3(albums_df, "album/album_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")),'csv')




In [20]:
write_to_s3(artist_df, "artist/artist_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")
write_to_s3(song_df, "songs/songs_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")




In [None]:
def list_s3_objects(bucket,prefix):
    s3_client = boto3.client("s3")
    response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
    keys = [content["Key"] for content in response.get('Contents',[]) if content["Key"].endswith(".json")]
    return keys
    
bucket_name = "spotify-etl-project-shubham"
prefix = "raw_data/to_processed/"
spotify_keys = list_s3_objects(bucket_name,prefix)

def move_and_delete_files(spotify_keys,Bucket):
    s3_resource = boto3.resource("s3")
    for key in spotify_keys:
        copy_source = {
            "Bucket" : Bucket,
            "Key": key
        }
        
        # Define the destination key
        destination_key = "raw_data/processed/" + key.split("/")[-1]
        
        # Copy the file to the new location
        s3_resource.meta.client.copy(copy_source, Bucket, destination_key)
        
        # Delete the original file
        s3_resource.Object(Bucket, key).delete()
        
move_and_delete_files(spotify_keys,bucket_name)