# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [1]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: da177329-8887-45be-9be3-3d12e09ebb41
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session da177329-8887-45be-9be3-3d12e09ebb41 to get into ready status...
Session da177329-8887-45be-9be3-3d12e09ebb41 ha

In [17]:
from pyspark.sql.functions import explode,col,to_date
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame




In [3]:
s3_path = "s3://spotify-etl-spark-project/raw_data/to_processed/"
source_df = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    connection_options={"paths": [s3_path]},
    format="json"
)




In [4]:
spotify_df = source_df.toDF()



In [24]:
df_artist_exploded = spotify_df.withColumn("items", explode("items")).select(explode(col("items.track.artists")).alias("artists"))
df = df_artist_exploded.select(
        col("artists.id").alias("artist_id"),
        col("artists.name").alias("artist_name"),
        col("artists.external_urls.spotify").alias("url"),
).drop_duplicates(['artist_id']).show(5,False)

+----------------------+------------+------------------------------------------------------+
|artist_id             |artist_name |url                                                   |
+----------------------+------------+------------------------------------------------------+
|02wf6SCDwrs2qTX09X0IRE|Yb Wasg'ood |https://open.spotify.com/artist/02wf6SCDwrs2qTX09X0IRE|
|07MaeHw0cyfmgQ9D9I61UT|DJ JEEAN 011|https://open.spotify.com/artist/07MaeHw0cyfmgQ9D9I61UT|
|09cKncAQn28NqTUORLMwSR|Ariis       |https://open.spotify.com/artist/09cKncAQn28NqTUORLMwSR|
|0JGZRsRtFTQ0ZVBq9b2ZEj|DJ Javi26   |https://open.spotify.com/artist/0JGZRsRtFTQ0ZVBq9b2ZEj|
|0Vxqvpm9tpZcz6d4egDtjX|MC PR       |https://open.spotify.com/artist/0Vxqvpm9tpZcz6d4egDtjX|
+----------------------+------------+------------------------------------------------------+
only showing top 5 rows


In [32]:
def process_album(df):
    df = df.withColumn("items",explode("items")).select(
        col("items.track.album.id").alias("album_id"),
        col("items.track.album.name").alias("album_name"),
        col("items.track.album.release_date").alias("release_date"),
        col("items.track.album.total_tracks").alias("total_tracks"),
        col("items.track.album.external_urls.spotify").alias("url"),
    ).drop_duplicates(['album_id'])
    return df

def process_artists(df):
    df_artist_exploded = df.withColumn("items", explode("items")).select(explode(col("items.track.artists")).alias("artists"))
    df = df_artist_exploded.select(
        col("artists.id").alias("artist_id"),
        col("artists.name").alias("artist_name"),
        col("artists.external_urls.spotify").alias("url"),
    ).drop_duplicates(['artist_id'])
    return df

def process_songs(df):
    df_songs_exploded = df.select(explode(col("items")).alias("items"))
    df = df_songs_exploded \
    .withColumn("artist_id", explode(col("items.track.artists.id"))) \
    .select(
        col("items.track.id").alias("song_id"),
        col("items.track.name").alias("song_name"),
        col("items.track.duration_ms").alias("duration_ms"),
        col("items.track.external_urls.spotify").alias("url"),
        col("items.track.popularity").alias("popularity"),
        col("items.added_at").alias("songs_added"),
        col("items.track.album.id").alias("album_id"),
        col("artist_id")
    ).drop_duplicates(['song_id', 'artist_id'])
    df = df.withColumn("songs_added", to_date("songs_added"))
    return df




In [35]:
album_df = process_album(spotify_df)
album_df.show(5)

+--------------------+--------------------+------------+------------+--------------------+
|            album_id|          album_name|release_date|total_tracks|                 url|
+--------------------+--------------------+------------+------------+--------------------+
|05EQpLFKhJ1m4YElm...|     Ela Vira Mortal|  2022-08-05|           1|https://open.spot...|
|06a7H7nusNMvM7yL8...|                AURA|  2024-07-12|           3|https://open.spot...|
|0I7096R74VFP2xnMm...|     PASSO BEM SOLTO|  2024-04-23|           5|https://open.spot...|
|0JQT0alFPE0pKxgiL...|Montagem sombra e...|  2024-11-30|           4|https://open.spot...|
|0Qi6ZoqjRHB9W3yZl...|       Amor Na Praia|  2025-03-26|           5|https://open.spot...|
+--------------------+--------------------+------------+------------+--------------------+
only showing top 5 rows


In [36]:
artist_df = process_artists(spotify_df)
artist_df.show(5)

+--------------------+------------+--------------------+
|           artist_id| artist_name|                 url|
+--------------------+------------+--------------------+
|02wf6SCDwrs2qTX09...| Yb Wasg'ood|https://open.spot...|
|07MaeHw0cyfmgQ9D9...|DJ JEEAN 011|https://open.spot...|
|09cKncAQn28NqTUOR...|       Ariis|https://open.spot...|
|0JGZRsRtFTQ0ZVBq9...|   DJ Javi26|https://open.spot...|
|0Vxqvpm9tpZcz6d4e...|       MC PR|https://open.spot...|
+--------------------+------------+--------------------+
only showing top 5 rows


In [33]:
songs_df = process_songs(spotify_df)
songs_df.show(5)

+--------------------+--------------------+-----------+--------------------+----------+-----------+--------------------+--------------------+
|             song_id|           song_name|duration_ms|                 url|popularity|songs_added|            album_id|           artist_id|
+--------------------+--------------------+-----------+--------------------+----------+-----------+--------------------+--------------------+
|02WKo37gebkkQKor2...|Beeper Funk - Slowed|     103375|https://open.spot...|        68| 2025-03-07|5eEfzNcFaXPsBXJzk...|0sjGfRBmq6lKmUNgt...|
|02WKo37gebkkQKor2...|Beeper Funk - Slowed|     103375|https://open.spot...|        68| 2025-03-07|5eEfzNcFaXPsBXJzk...|648ZoUpTM6iIMTCgo...|
|03E0f4JneWZEtrQPU...|BLOODY! - ULTRA S...|     139384|https://open.spot...|        70| 2025-04-05|0yr5YRR7l813mmHIT...|1qaCszXlFNPEnCELY...|
|03E0f4JneWZEtrQPU...|BLOODY! - ULTRA S...|     139384|https://open.spot...|        70| 2025-04-05|0yr5YRR7l813mmHIT...|2ZuUKw4Pfvj7zIKxY...|
|06Pqe

In [39]:
def write_to_s3(df,path_suffix,format_type = "csv"):
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
    
    glueContext.write_dynamic_frame.from_options(
        frame = dynamic_frame,
        connection_type = "s3",
        connection_options = {"path": f"s3://spotify-etl-spark-project/transformed_data/{path_suffix}/"},
        format = format_type
    )




In [41]:
write_to_s3(album_df, "album_data/album_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")),"csv")
write_to_s3(artist_df, "artist_data/artist_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")),"csv")
write_to_s3(songs_df, "songs_data/songs_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")),"csv")


