In [1]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('aws_glue').getOrCreate()

In [6]:
spark

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc=SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job=Job(glueContext)
#don't have to remember this, this is default in the aws glue.

In [9]:
from pyspark.sql.functions import explode, col

In [None]:
s3_path = "s3://spotify-daily-data-project/raw_data/to_processed"
source_dyf = glueContext.create_dynamic_frame_from_options(
    conntection_type="S3",
    connection_options={"paths":[s3_path]},
    format="json"
)

In [None]:
spotify_df= source_dyf.toDF() #converting the JSON to Data frame

In [None]:
spotify.show()

In [None]:
spotify_df = spotify_df.withColumn("items", explode("items")).show(5)
#explode will break the row containing multiple rows in same line into diff rows respec.

In [None]:
df = spotify_df

In [None]:
df.withColumn("items", explode("items")).select(
    col("items.track.album.id").alias("album_id")).show(5)

In [None]:
df.withColumn("items", explode("items")).select(
    col("items.track.album.name").alias("album_name")
).show(5)

In [None]:
def process_album(df):
    df=df.withColumn("items",explode=("items")).select(
        col("items.track.album.id").alias("album_id"),
        col("items.track.album.name").alias("album_name"),
        col("items.track.album.release_date").alias("album_name"),
        col("items.track.album.total_tracks").alias("total_tracks"),
        col("items.tracks.album.external_urls_spotify").alias("url")).drop_duplicates(['album_id'])
    return df

In [None]:
def process_artists(df):
    df_items_exploded = df.select(explode(col("items")).alias("item"))

    df_artist_exploded=df_items_exploded.select(explode(col("items")).alias("item")).select(explode(col("item.track.artists")).alias("artist")).show(5)

    df_artist=df_artist_exploded.select(
        col("artist.id").alias("artist_id"),
        col("artist.name").alias("artist_name"),
        col("artist.external_urls.spotify").alias("external_urls")).drop_duplicates(["artist_id"])
    return df_artist

In [None]:
def process_songs(df):
    df_exploded = df.select(explode(col("items")).alias("item"))
    df_songs= df_exploded.select(
        col("item.track.id").alias("songs_id"),
        col("item.track.name").alias("songs_name"),
        col("item.track.duration_ms").alias("duration_ms"),
        col("item.track.external_urls.spotify").alias("external_urls"),
        col("item.track.popularity").alias("popularity"),
        col("item.added_at").alias("songs_added"),
        col("item.track.album.id").alias("album_id"),
        col("item.track.artists")[0]["id"].alias("artist_id")

    ).drop_duplicates(['songs_id'])

    df_songs=df_songs.withColumn("songs_added", to_date(col("songs_added")))
    return df_songs
                    

In [None]:
album_df=process_album(spotify_df)
album_df.show(5)

In [None]:
artist_df= process_artists(spotify_df)
artist_df.show(5)

In [None]:
song_df=process_songs(spotify_df)
song_df.show(5)

In [None]:
def write_to_s3(df, path_suffix, format_type='csv'):
    #changing the spark df to dynamic frame so that it can directly write to the s3 bucket
    dynamic_frame= DynamicFrame.fromDF(df, glueContext,"dynamic_frame")

    glueContext.write_dynamic_frame.from_Options(
        frame= dynamic_frame,
        connection_type = "s3",
        connection_options={"path","s3://spotify-daily-project/transformed_data/{path_suffix}/"},
        format = format_type
    )

In [None]:
write_to_s3(album_df,"album/album_transformed{}".format(datetime.now().strftime("%Y-%m-%d")), 'csv')
#datetime.now will create the file named with datetime as suffix
write_to_s3(artist_df,"artist/artist_transformed{}".format(datetime.now().strftime("%Y-%m-%d")), 'csv')
write_to_s3(song_df,"songs/songs_transformed{}".format(datetime.now().strftime("%Y-%m-%d")), 'csv')

In [None]:
#Now, this will list out the s3 bucket and aim is to move processes file to 
# processed folder and delete the files into the to_processed folder.

#this particular func will list out the to_processed files first!
def list_s3_objects(bucket, prefix):
    s3_client = boto3.client('s3')
    response = s3_client.list_objects_v2(Bucket=bucket, Prefix = prefix)
    keys = [content['Key'] for content in response.get('Contents', []) if content['Key'].endswith('.json')]
    return keys

bucket_name = "spotify-daily-data-project"
prefix = 'raw_data/to_processed/'
spotify_keys= list_s3_objects(bucket_name, prefix)

In [None]:
#Now we will do the move and delete opertion on to the to_processed folder
import boto3

def move_and_delete_files(spotify_keys, Bucket):
    s3_resource = boto3.resource('s3')
    for key in spotify_keys:
        copy_source = {
            'Bucket' : Bucket,
            'Key': key
        }

        #Define the destination key
        destination_key = 'raw_data/processed/' + key.split("/")[-1]

        #copy the file to the new location
        s3_resource.meta.client.copy(copy_source, Bucket, destination_key)

        #delete the original file
        s3_resource.Object(Bucket, key).delete()


In [None]:
move_and_delete_files(spotify_keys, bucket_name)