Imprting libraries


In [0]:
from datetime import datetime,timedelta
from pyspark.sql import functions as F
import json

In [0]:
today_date = datetime.now().strftime("%Y-%m-%d")


In [0]:
#------ READING ROW DATA ---------
today_date = datetime.now().strftime("%Y-%m-%d")
df = spark.read.option('multiline','true').json(f's3://my-raw-spotify-data/bronze/user_recent_played/{today_date}/{today_date}_recent_tracks.json')


In [0]:
df = df.withColumn('items',F.explode('items')).select('items')


In [0]:
df_flat = (
    df.select(
        F.col('items.played_at').alias('played_at').cast('timestamp'),
        F.col('items.track.id').alias('track_id'),
        F.col('items.track.name').alias('track_name'),
        F.col('items.track.artists')[0].getField('id').alias('first_artist_id'),
        F.col('items.track.artists')[0].getField('name').alias('first_artist_name'),
        F.col('items.track.album.id').alias('album_id'),
        F.col('items.track.album.name').alias('album_name'),
        
        F.col('items.track.duration_ms').alias('duration_ms'),
        F.col('items.track.popularity').alias('popularity'),
        F.col('items.track.album.release_date').alias('album_release_date').cast('date'),
        
        
        # OTHER ARTISTS IF THEY ARE PRESENT
        F.when(F.size(F.col('items.track.artists')) > 1,
               F.col('items.track.artists')[1].getField('id')).alias('second_artist_id'),
        F.when(F.size(F.col('items.track.artists')) > 1,
               F.col('items.track.artists')[1].getField('name')).alias('second_artist_name')
    )    
    
    )


In [0]:
%sql

USE CATALOG my_spotify;
CREATE SCHEMA IF NOT EXISTS silver;


In [0]:
   # Idempotent merge
   
   from delta.tables import DeltaTable

   silver_path = 's3://my-spotify-delta-lakehouse/silver'

   if not DeltaTable.isDeltaTable(spark, silver_path):
        df_flat.write.format("delta") \
        .option("path", silver_path) \
        .saveAsTable("silver.user_recent_played")
        
        print("✅ Tabella creata e registrata con successo.")

   
   
   else:
       DeltaTable.forPath(spark, silver_path).alias('target').merge(
           df_flat.alias('source'),
           'target.played_at = source.played_at'
       ).whenNotMatchedInsertAll().execute()
       print("✅ Merge completato con successo.")

In [0]:
last_op = (
    spark.sql("Describe HISTORY my_spotify.silver.user_recent_played;")
    .select("version", "timestamp","operation", "operationMetrics")
    .orderBy("version", ascending=False)
    .first())

row_inserted = last_op['operationMetrics']["numTargetRowsInserted"]

output = {
    "status": "success",
    "operation_type":last_op.operation,
    "rows_inserted": row_inserted
    }

# Return JSON to the Job
dbutils.notebook.exit(json.dumps(output))
