In [None]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 5a72bb77-7e0e-44b5-81be-a8bbb95720b4
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session 5a72bb77-7e0e-44b5-81be-a8bbb95720b4 to get into ready status...
Session 5a72bb77-7e0e-44b5-81be-a8bbb95720b4 ha

In [None]:
from pyspark.sql.functions import explode, col, to_date, to_timestamp
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame
import boto3




In [None]:
s3_path = "s3://spotify-etl-project-sannu/raw_data/to_process/"
source_dyf = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    connection_options={"paths":[s3_path]},
    format="json",
    format_options={"jsonPath": "$[*]"}
)




In [None]:
spotify_df = source_dyf.toDF()



In [None]:
album_df = spotify_df.select(col("track.album.id").alias("album_id"),
                  col("track.album.name").alias("album_name"),
                  col("track.album.release_date").alias("album_release_date"),
                  col("track.album.total_tracks").alias("album_total_tracks"),
                  col("track.album.external_urls.spotify").alias("album_url"),
                 ).drop_duplicates(["album_id"])




In [None]:
album_df = album_df.withColumn(
    "album_release_date",
    to_date("album_release_date", "yyyy-MM-dd")
)




In [None]:
artist_df = spotify_df.select(col("track.artists"))




In [None]:
artist_df = artist_df.withColumn("artist",explode("artists"))




In [None]:
artist_df = artist_df.select(col("artist.id").alias("artist_id"),
                  col("artist.name").alias("artist_name"),
                  col("artist.external_urls.spotify").alias("external_url"),
                 ).drop_duplicates(["artist_id"])




In [None]:
songs_df = spotify_df.select(
                  col("track.id").alias("song_id"),
                  col("track.name").alias("song_name"),
                  col("track.duration_ms").alias("duration_ms"),
                  col("track.external_urls.spotify").alias("url"),
                  col("track.popularity").alias("popularity"),
                  col("added_at").alias("song_added"),
                  col("track.album.id").alias("album_id"),
                  col("track.artists")
                 ).drop_duplicates(["song_id"])




In [None]:
songs_df = songs_df.withColumn("artists", explode("artists")).withColumn("artists", col("artists.id"))




In [None]:
songs_df = songs_df.withColumnRenamed("artists", "artist_id")




In [None]:
songs_df = songs_df.withColumn(
    "song_added",
    to_timestamp("song_added", "yyyy-MM-dd'T'HH:mm:ss'Z'")
)




In [None]:
def write_to_s3(df, path_suffix, format_type="csv"):
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
    
    glueContext.write_dynamic_frame.from_options(
        frame = dynamic_frame,
        connection_type = "s3",
        connection_options = {"path": f"s3://spotify-etl-project-sannu/transformed_data/{path_suffix}/"},
        format = format_type
    )




In [None]:
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S-%f")

write_to_s3(album_df, f"album/album_transformed_{timestamp}", "csv")
write_to_s3(artist_df, f"artist/artist_transformed_{timestamp}", "csv")
write_to_s3(songs_df, f"songs/song_transformed_{timestamp}", "csv")




In [None]:
s3 = boto3.client('s3')
Bucket = "spotify-etl-project-sannu"
Key = "raw_data/to_process/"
    
spotify_keys = []
for file in s3.list_objects(Bucket=Bucket, Prefix=Key)['Contents']:
    file_key = file['Key']
    if file_key.split('.')[-1] == "json":
        spotify_keys.append(file_key)




In [None]:
s3_resource = boto3.resource('s3')
for key in spotify_keys:
    copy_source = {
        'Bucket': Bucket,
        'Key': key
    }
    s3_resource.meta.client.copy(copy_source, Bucket, 'raw_data/processed/' + key.split("/")[-1])    
    s3_resource.Object(Bucket, key).delete()

{'ResponseMetadata': {'RequestId': 'KRQZ940X13MX6YHX', 'HostId': 'V890iP6dnoKqxc1RjTY1+Ua2Gyi85ICAus9WuR/PJgA5eQxylg4F6xJBr5AMyyT6u8Txe0sS4Roz0keXMaVOaw==', 'HTTPStatusCode': 204, 'HTTPHeaders': {'x-amz-id-2': 'V890iP6dnoKqxc1RjTY1+Ua2Gyi85ICAus9WuR/PJgA5eQxylg4F6xJBr5AMyyT6u8Txe0sS4Roz0keXMaVOaw==', 'x-amz-request-id': 'KRQZ940X13MX6YHX', 'date': 'Fri, 11 Jul 2025 12:30:38 GMT', 'server': 'AmazonS3'}, 'RetryAttempts': 0}}
