# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


####  Run this cell to set up and start your interactive session.


In [20]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session 4fa1fe3a-e747-48f4-858f-9f8f5c46c4fc.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session 4fa1fe3a-e747-48f4-858f-9f8f5c46c4fc.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 5.0


You are already connected to a glueetl session 4fa1fe3a-e747-48f4-858f-9f8f5c46c4fc.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session 4fa1fe3a-e747-48f4-858f-9f8f5c46c4fc.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



In [51]:
s3_path = "s3://spotify-data-pipeline-spark/raw_data/to_processed/"
source_dyf = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    connection_options={"paths":[s3_path]},
    format="json"
)




In [52]:
source_dyf.show()

{"href": "https://api.spotify.com/v1/playlists/5ABHKGoOzxkaa28ttQV9sE/tracks?offset=0&limit=100&additional_types=track", "items": [{"added_at": "2024-02-15T21:57:53Z", "added_by": {"external_urls": {"spotify": "https://open.spotify.com/user/jonathan.holgersson"}, "href": "https://api.spotify.com/v1/users/jonathan.holgersson", "id": "jonathan.holgersson", "type": "user", "uri": "spotify:user:jonathan.holgersson"}, "is_local": false, "primary_color": null, "track": {"preview_url": null, "available_markets": ["AR", "AU", "AT", "BE", "BO", "BR", "BG", "CA", "CL", "CO", "CR", "CY", "CZ", "DK", "DO", "DE", "EC", "EE", "SV", "FI", "FR", "GR", "GT", "HN", "HK", "HU", "IS", "IE", "IT", "LV", "LT", "LU", "MY", "MT", "MX", "NL", "NZ", "NI", "NO", "PA", "PY", "PE", "PH", "PL", "PT", "SG", "SK", "ES", "SE", "CH", "TW", "TR", "UY", "US", "GB", "AD", "LI", "MC", "ID", "JP", "TH", "VN", "RO", "IL", "ZA", "SA", "AE", "BH", "QA", "OM", "KW", "EG", "MA", "DZ", "TN", "LB", "JO", "PS", "IN", "BY", "KZ", "M

In [53]:
spotify_df = source_dyf.toDF()




In [54]:
spotify_df.show()

+--------------------+--------------------+-----+----+------+--------+-----+
|                href|               items|limit|next|offset|previous|total|
+--------------------+--------------------+-----+----+------+--------+-----+
|https://api.spoti...|[{2024-02-15T21:5...|  100|NULL|     0|    NULL|  100|
+--------------------+--------------------+-----+----+------+--------+-----+


In [55]:
from pyspark.sql.functions import *
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame




In [56]:
def process_albums(df):
    # Step 1: Explode the 'items' array to get individual album items
    df_items_exploded = df.select(explode(col("items")).alias("item"))
    
    # Step 2: Extract the relevant album information
    df_albums = df_items_exploded.select(
        col("item.track.album.id").alias("album_id"),
        col("item.track.album.name").alias("album_name"),
        col("item.track.album.release_date").alias("release_date"),
        col("item.track.album.total_tracks").alias("total_tracks"),
        col("item.track.album.external_urls.spotify").alias("url")
    )
    
    # Step 3: Drop duplicates based on the 'album_id'
    df_albums = df_albums.drop_duplicates(["album_id"])
    
    return df_albums

def process_artists(df):
    df_items_exploded = df.select(explode(col("items")).alias("item"))
    df_artists_exploded = df_items_exploded.select(explode(col("item.track.artists")).alias("artist"))
    df_artists = df_artists_exploded.select(
        col("artist.id").alias("artist_id"),
        col("artist.name").alias("artist_name"),
        col("artist.external_urls.spotify").alias("external_url")
    ).drop_duplicates(["artist_id"])
    
    return df_artists

def process_songs(df):
    df_exploded = df.select(explode(col("items")).alias("item"))
    df_songs = df_exploded.select(
        col("item.track.id").alias("song_id"),
        col("item.track.name").alias("song_name"),
        col("item.track.duration_ms").alias("duration_ms"),
        col("item.track.external_urls.spotify").alias("url"),
        col("item.track.popularity").alias("popularity"),
        col("item.added_at").alias("song_added"),
        col("item.track.album.id").alias("album_id"),
        col("item.track.artists")[0]["id"].alias("artist_id")
    ).drop_duplicates(["song_id"])
    df_songs = df_songs.withColumn("song_added", to_date(col("song_added")))
    
    return df_songs




In [57]:
album_df = process_albums(spotify_df)
artist_df = process_artists(spotify_df)
song_df = process_songs(spotify_df)




In [58]:
def write_to_s3(df, path_suffix, format_type="csv"):
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
    
    glueContext.write_dynamic_frame.from_options(
        frame = dynamic_frame,
        connection_type = "s3",
        connection_options = {"path": f"s3://spotify-data-pipeline-spark/transformed_data/{path_suffix}/"},
        format = format_type
    )




In [60]:
#write data to s3   
write_to_s3(album_df, "album/album_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")
write_to_s3(artist_df, "artist/artist_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")
write_to_s3(song_df, "songs/songs_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), "csv")


