In [0]:
dbutils.widgets.removeAll()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:

dbutils.widgets.text("catalog", "proyectofinal")
dbutils.widgets.text("schema_source", "bronze")
dbutils.widgets.text("schema_sink", "silver")


In [0]:
catalog = dbutils.widgets.get("catalog")
schema_source = dbutils.widgets.get("schema_source")
schema_sink = dbutils.widgets.get("schema_sink")

In [0]:
df_artists = spark.table(f"{catalog}.{schema_source}.artists")
df_songs = spark.table(f"{catalog}.{schema_source}.songs")


#UDF

In [0]:
from pyspark.sql import functions as F

def to_array(colname):
    c = F.col(colname)

    # 1) Normaliza null/vacío
    c = F.when(c.isNull() | (F.trim(c) == ""), F.lit("[]")).otherwise(c)

    # 2) Si viene con [ ] o comillas, normaliza a formato JSON-like y luego split
    #    - Quita corchetes []
    #    - Quita comillas dobles y simples
    cleaned = F.regexp_replace(c, r"^\[|\]$", "")
    cleaned = F.regexp_replace(cleaned, r"\"", "")
    cleaned = F.regexp_replace(cleaned, r"\'", "")

    # 3) Split por coma y trim
    arr = F.transform(F.split(cleaned, r"\s*,\s*"), lambda x: F.trim(x))
    # 4) Filtra vacíos
    arr = F.filter(arr, lambda x: x.isNotNull() & (x != ""))

    return arr

def clamp01(c):
    return F.when(c < 0, 0).when(c > 1, 1).otherwise(c)



#Track


In [0]:
df_track = (
    df_songs
    .select(
        F.col("id").alias("track_id"),
        F.trim(F.col("name")).alias("track_name"),
        F.trim(F.col("album_name")).alias("album_name"),
        F.col("year").cast("int").alias("release_year"),
        F.trim(F.lower(F.col("genre"))).alias("genre_main"),
        F.col("popularity").cast("int").alias("track_popularity"),
        F.col("duration_ms").cast("long").alias("duration_ms")
    )
    .dropDuplicates(["track_id"])
)

df_track.write.mode("overwrite").saveAsTable(f"{catalog}.{schema_sink}.df_track_transformed")

# Audio

In [0]:

df_audio = (
    df_songs
    .select(
        F.col("id").alias("track_id"),
        clamp01(F.col("danceability").cast("double")).alias("danceability"),
        clamp01(F.col("energy").cast("double")).alias("energy"),
        F.col("key").cast("int").alias("key"),
        F.col("loudness").cast("double").alias("loudness"),
        F.col("mode").cast("int").alias("mode"),
        clamp01(F.col("speechiness").cast("double")).alias("speechiness"),
        clamp01(F.col("acousticness").cast("double")).alias("acousticness"),
        clamp01(F.col("instrumentalness").cast("double")).alias("instrumentalness"),
        clamp01(F.col("liveness").cast("double")).alias("liveness"),
        clamp01(F.col("valence").cast("double")).alias("valence"),
        F.col("tempo").cast("double").alias("tempo")
    )
    .dropDuplicates(["track_id"])
)

df_audio.write.mode("overwrite").saveAsTable(f"{catalog}.{schema_sink}.df_audio_transformed")

# Artists


In [0]:
df_artist = (
    df_artists
    .select(
        F.col("id").cast("string").alias("artist_id"),
        F.trim(F.col("name")).alias("artist_name"),
        # followers y popularity vienen string: casteamos a long/int si son numéricos
        F.regexp_replace(F.col("followers"), r"[^0-9]", "").cast("long").alias("followers"),
        F.regexp_replace(F.col("popularity"), r"[^0-9]", "").cast("int").alias("artist_popularity"),
        F.trim(F.lower(F.col("main_genre"))).alias("main_genre"),
        F.col("genres").alias("genres_raw"),
        to_array("genres").alias("genres_arr")
    )
    .dropDuplicates(["artist_id"])
)
df_artist.write.mode("overwrite").saveAsTable(f"{catalog}.{schema_sink}.df_artist_transformed")

#Bridge

In [0]:
df_bridge = (
    df_songs
    .select(
        F.col("id").alias("track_id"),
        to_array("artist_ids").alias("artist_ids_arr")
    )
    .withColumn("artist_id", F.explode_outer(F.col("artist_ids_arr")))
    .select(
        F.col("track_id"),
        F.col("artist_id").cast("string").alias("artist_id")
    )
    .filter(F.col("artist_id").isNotNull() & (F.col("artist_id") != ""))
    .dropDuplicates(["track_id", "artist_id"])
)
df_bridge.write.mode("overwrite").saveAsTable(f"{catalog}.{schema_sink}.df_bridge_transformed")