In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 0900690c-5970-4317-901b-49dcd4583df5
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session 0900690c-5970-4317-901b-49dcd4583df5 to get into ready status...
Session 0900690c-5970-4317-901b-49dcd4583df5 has been created.



In [5]:
from pyspark.sql.functions import explode, col




In [2]:
# To get our raw data that need to process from s3 bucket 

s3_path = "s3://spotify-etl-project-iman/bronze_layer/to_processed/"
df = glueContext.create_dynamic_frame.from_options(
    "s3",
    {"paths": [s3_path]},
    format="json"
)




In [4]:
# Convert json data into a dataframe
spotify_df = df.toDF()



In [None]:
# Get album data
spotify_df.withColumn("items", explode("items")).select(
    col("items.track.album.id").alias("album_id"),
    col("items.track.album.name").alias("album_name"),
    col("items.track.album.release_date").alias("release_date"),
    col("items.track.album.total_tracks").alias("total_tracks"),
    col("items.track.album.external_urls.spotify").alias("url")
).drop_duplicates(["album_id"])

In [6]:
# The reason why we  need to  explode is because the data for artist is nested
df_artist_exploded = spotify_df.select(explode(col("items")).alias("items")).select(explode(col("items.track.artists")).alias("artists"))




In [7]:
# Get artist data
df_artist_exploded.select(
    col("artists.id").alias("artist_id"),
    col("artists.name").alias("artist_name"),
    col("artists.external_urls.spotify").alias("url")
).drop_duplicates(["artist_id"])

+----------------------+-------------------+------------------------------------------------------+
|artist_id             |artist_name        |url                                                   |
+----------------------+-------------------+------------------------------------------------------+
|06HL4z0CvFAxyc27GXpf02|Taylor Swift       |https://open.spotify.com/artist/06HL4z0CvFAxyc27GXpf02|
|0PCCGZ0wGLizHt2KZ7hhA2|Artemas            |https://open.spotify.com/artist/0PCCGZ0wGLizHt2KZ7hhA2|
|0Y5tJX1MQlPlqiwlOH1tJY|Travis Scott       |https://open.spotify.com/artist/0Y5tJX1MQlPlqiwlOH1tJY|
|0jPHHnU8GUWEF7rwPE9osY|Jordan Adetunji    |https://open.spotify.com/artist/0jPHHnU8GUWEF7rwPE9osY|
|11p2E654TTU8e0nZWBR4AL|Mark Ambor         |https://open.spotify.com/artist/11p2E654TTU8e0nZWBR4AL|
|181bsRPaVXVlUKXrxwZfHK|Megan Thee Stallion|https://open.spotify.com/artist/181bsRPaVXVlUKXrxwZfHK|
|1QsdzIKkTT5gDFj8GB1cIX|Yseult             |https://open.spotify.com/artist/1QsdzIKkTT5gDFj8GB1cIX|


In [8]:
df_songs_explode = spotify_df.select(explode(col("items")).alias("items"))




In [9]:
# Get songs data
df_songs_explode.select(
    col("items.track.id").alias("song_id"),
    col("items.track.name").alias("song_name"),
    col("items.track.duration_ms").alias("duration_ms"),
    col("items.track.popularity").alias("popularity"),
    col("items.track.external_urls.spotify").alias("url"),
    col("items.added_at").alias("added_date"),
    col("items.track.album.id").alias("album_id"),
    col("items.track.artists")[0]["id"].alias("artist_id")
).drop_duplicates(["song_id"])

DataFrame[song_id: string, song_name: string, duration_ms: int, popularity: int, url: string, added_date: string, album_id: string, artist_id: string]
