# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 2f57f848-379b-4078-a83b-75de1129fcc4
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session 2f57f848-379b-4078-a83b-75de1129fcc4 to get into ready status...
Session 2f57f848-379b-4078-a83b-75de1129fcc4 ha

In [2]:
s3_path = "s3://spitify-etl-pipline-yasida/raw-data/to_processed/"
source_dyf = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    connection_options={"paths": [s3_path]},
    format="json"
)




In [3]:
source_dyf.show()

{"href": "https://api.spotify.com/v1/playlists/37i9dQZEVXbNG2KDcFcKOF/tracks?offset=0&limit=100&additional_types=track", "items": [{"added_at": "2024-09-13T13:10:58Z", "added_by": {"external_urls": {"spotify": "https://open.spotify.com/user/"}, "href": "https://api.spotify.com/v1/users/", "id": "", "type": "user", "uri": "spotify:user:"}, "is_local": false, "primary_color": null, "track": {"preview_url": null, "available_markets": ["AR", "AU", "AT", "BE", "BO", "BR", "BG", "CA", "CL", "CO", "CR", "CY", "CZ", "DK", "DO", "DE", "EC", "EE", "SV", "FI", "FR", "GR", "GT", "HN", "HK", "HU", "IS", "IE", "IT", "LV", "LT", "LU", "MY", "MT", "MX", "NL", "NZ", "NI", "NO", "PA", "PY", "PE", "PH", "PL", "PT", "SG", "SK", "ES", "SE", "CH", "TW", "TR", "UY", "US", "GB", "AD", "LI", "MC", "ID", "JP", "TH", "VN", "RO", "IL", "ZA", "SA", "AE", "BH", "QA", "OM", "KW", "EG", "MA", "DZ", "TN", "LB", "JO", "PS", "IN", "KZ", "MD", "UA", "AL", "BA", "HR", "ME", "MK", "RS", "SI", "KR", "BD", "PK", "LK", "GH", 

In [4]:
spotify_df = source_dyf.toDF()



In [5]:
spotify_df.show()

+--------------------+--------------------+-----+----+------+--------+-----+
|                href|               items|limit|next|offset|previous|total|
+--------------------+--------------------+-----+----+------+--------+-----+
|https://api.spoti...|[{2024-09-13T13:1...|  100|null|     0|    null|   50|
+--------------------+--------------------+-----+----+------+--------+-----+


In [6]:
from pyspark.sql.functions import explode, col, to_date




In [7]:
df =spotify_df 




In [8]:
spotify_df.printSchema()

root
 |-- href: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- added_at: string (nullable = true)
 |    |    |-- added_by: struct (nullable = true)
 |    |    |    |-- external_urls: struct (nullable = true)
 |    |    |    |    |-- spotify: string (nullable = true)
 |    |    |    |-- href: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |    |-- uri: string (nullable = true)
 |    |    |-- is_local: boolean (nullable = true)
 |    |    |-- primary_color: void (nullable = true)
 |    |    |-- track: struct (nullable = true)
 |    |    |    |-- preview_url: string (nullable = true)
 |    |    |    |-- available_markets: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- explicit: boolean (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    | 

In [9]:
df = spotify_df.withColumn("item", explode("items")).select(
        col("item.track.album.id").alias("album_id"),
        col("item.track.album.name").alias("album_name"),
        col("item.track.album.release_date").alias("release_date"),
        col("item.track.album.total_tracks").alias("total_tracks"),
        col("item.track.album.external_urls.spotify").alias("url")
    ).drop_duplicates(["album_id"]).show(5)

+--------------------+--------------------+------------+------------+--------------------+
|            album_id|          album_name|release_date|total_tracks|                 url|
+--------------------+--------------------+------------+------------+--------------------+
|00ESTj1vBUkIbaTHa...|        SE ME OLVIDA|  2024-08-23|           1|https://open.spot...|
|0DLvFVIfwt3OHdK9k...|Where I've Been, ...|  2024-05-31|          12|https://open.spot...|
|0EiI8ylL0FmWWpgHV...|The Rise and Fall...|  2023-09-22|          14|https://open.spot...|
|0Wmt50XH9EZvSuML0...|Neva Play (feat. ...|  2024-09-06|           1|https://open.spot...|
|0lgs2Sa82lyX89nBU...|      FERXXOCALIPSIS|  2023-12-01|          10|https://open.spot...|
+--------------------+--------------------+------------+------------+--------------------+
only showing top 5 rows


In [10]:
df.printSchema()

AttributeError: 'NoneType' object has no attribute 'printSchema'


In [11]:
def process_albums(df):
    df = spotify_df.withColumn("item", explode("items")).select(
        col("item.track.album.id").alias("album_id"),
        col("item.track.album.name").alias("album_name"),
        col("item.track.album.release_date").alias("release_date"),
        col("item.track.album.total_tracks").alias("total_tracks"),
        col("item.track.album.external_urls.spotify").alias("url")
    ).drop_duplicates(["album_id"])
    return df




In [12]:
df_items_exploded = spotify_df.select(explode(col("items")).alias("item"))
df_artists_exploded = df_items_exploded.select(explode(col("item.track.artists")).alias("artist"))




In [13]:
# Now, select the artist attributes, ensuring each artist is in its own row
df_artists = df_artists_exploded.select(
    col("artist.id").alias("artist_id"),
    col("artist.name").alias("artist_name"),
    col("artist.external_urls.spotify").alias("external_url")
).drop_duplicates(["artist_id"]).show(5)

+--------------------+------------+--------------------+
|           artist_id| artist_name|        external_url|
+--------------------+------------+--------------------+
|06HL4z0CvFAxyc27G...|Taylor Swift|https://open.spot...|
|0PCCGZ0wGLizHt2KZ...|     Artemas|https://open.spot...|
|0Y5tJX1MQlPlqiwlO...|Travis Scott|https://open.spot...|
|0du5cEVh5yTK9QJze...|  Bruno Mars|https://open.spot...|
|12GqGscKJx3aE4t07...|  Peso Pluma|https://open.spot...|
+--------------------+------------+--------------------+
only showing top 5 rows


In [14]:
def process_artists(df):
    # First, explode the items to get individual tracks
    df_items_exploded = spotify_df.select(explode(col("items")).alias("item"))
    
    # Then, explode the artists array within each item to create a row for each artist
    df_artists_exploded = df_items_exploded.select(explode(col("item.track.artists")).alias("artist"))
    
    # Now, select the artist attributes, ensuring each artist is in its own row
    df_artists = df_artists_exploded.select(
        col("artist.id").alias("artist_id"),
        col("artist.name").alias("artist_name"),
        col("artist.external_urls.spotify").alias("external_url")
    ).drop_duplicates(["artist_id"])
    
    return df_artists




In [15]:
df_exploded = spotify_df.select(explode(col("items")).alias("item"))




In [16]:
# Extract song information from the exploded DataFrame
df_songs = df_exploded.select(
    col("item.track.id").alias("song_id"),
    col("item.track.name").alias("song_name"),
    col("item.track.duration_ms").alias("duration_ms"),
    col("item.track.external_urls.spotify").alias("url"),
    col("item.track.popularity").alias("popularity"),
    col("item.added_at").alias("song_added"),
    col("item.track.album.id").alias("album_id"),
    col("item.track.artists")[0]["id"].alias("artist_id")
).drop_duplicates(["song_id"]).withColumn("song_added", to_date(col("song_added"))).show(5)

+--------------------+----------------+-----------+--------------------+----------+----------+--------------------+--------------------+
|             song_id|       song_name|duration_ms|                 url|popularity|song_added|            album_id|           artist_id|
+--------------------+----------------+-----------+--------------------+----------+----------+--------------------+--------------------+
|0AjmK0Eai4zGrLaJw...|       Too Sweet|     251424|https://open.spot...|        82|2024-09-13|1vL2mgGTukkrUxXt0...|2FXC3k01G6Gw61bmp...|
|0OA00aPt3BV10qeMI...|       Big Dawgs|     190666|https://open.spot...|        93|2024-09-13|6Yw4204wbgmpsGTzj...|4nVa6XlBFlIkF6msW...|
|0QZ5yyl6B6utIWkxe...|The Night We Met|     208211|https://open.spot...|        78|2024-09-13|3yoNZlqerJnsnMN5E...|6ltzsmQQbmdoHHbLZ...|
|0UYnhUfnUj5adChuA...|     Sailor Song|     211978|https://open.spot...|        88|2024-09-13|4DWrYvfGXRE8ko5Zx...|1iCnM8foFssWlPRLf...|
|0WbMK4wrZ1wFSty9F...|Good Luck, Babe!|  

In [17]:
def process_songs(df):
    # Explode the items array to create a row for each song
    df_exploded = df.select(explode(col("items")).alias("item"))
    
    # Extract song information from the exploded DataFrame
    df_songs = df_exploded.select(
        col("item.track.id").alias("song_id"),
        col("item.track.name").alias("song_name"),
        col("item.track.duration_ms").alias("duration_ms"),
        col("item.track.external_urls.spotify").alias("url"),
        col("item.track.popularity").alias("popularity"),
        col("item.added_at").alias("song_added"),
        col("item.track.album.id").alias("album_id"),
        col("item.track.artists")[0]["id"].alias("artist_id")
    ).drop_duplicates(["song_id"]).withColumn("song_added", to_date(col("song_added")))
    return df_songs




In [18]:
#process data
album_df = process_albums(spotify_df)
artist_df = process_artists(spotify_df)
song_df = process_songs(spotify_df)




In [19]:
def write_to_s3(df, path_suffix, format_type="csv"):
    # Check if DataFrame is not None
    if df is not None:
        # Convert back to DynamicFrame
        dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
        
        # Write the DynamicFrame to S3
        glueContext.write_dynamic_frame.from_options(
            frame=dynamic_frame,
            connection_type="s3",
            connection_options={"path": f"s3://spitify-etl-pipline-yasida/transformed_data/{path_suffix}/"},
            format=format_type
        )
    else:
        print("DataFrame is None. Cannot write to S3.")




In [20]:
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame




In [21]:
#write data to s3   
write_to_s3(album_df, "album_data/album_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")
write_to_s3(artist_df, "artist_data/artist_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")
write_to_s3(song_df, "songs_data/songs_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")




In [22]:
job.commit()


