In [0]:
df_bronze = spark.table('bronze_spotify_tracks')

df_bronze.display()

In [0]:
df_bronze.printSchema

In [0]:
from pyspark.sql.functions import col

df_artist = (
    df_bronze
    .select(
        col('artist_name'),
        col('artist_popularity').cast('int'),
        col('artist_followers').cast('long'),
        col('artist_genres')
    )
    .dropDuplicates()
)

df_artist.display()

In [0]:
(
    df_artist
    .write
    .format('delta')
    .mode('overwrite')
    .saveAsTable('silver_dim_artist')
)

In [0]:
%sql
select *
from silver_dim_artist
limit 10;

In [0]:
df_album = (
    df_bronze
    .select(
        col('album_id'),
        col('album_name'),
        col('album_release_date'),
        col('album_type'),
        col('album_total_tracks').cast('int')
    )
    .dropDuplicates()
)

df_album.display()

In [0]:
(
    df_album
    .write
    .format('delta')
    .mode('overwrite')
    .saveAsTable('silver_dim_album')
)

In [0]:
%sql
select *
from silver_dim_album
limit 10;

In [0]:
df_track = (
    df_bronze
    .select(
        col('track_id'),
        col('track_name'),
        col('track_number').cast('int'),
        col('track_duration_min').cast('double'),
        col('explicit'),
        col('album_id'),
        col('artist_name')
    )
    .dropDuplicates()
)

In [0]:
%sql
DROP TABLE IF EXISTS silver_dim_track;


In [0]:
(
    df_track
    .write
    .format('delta')
    .mode('append')
    .saveAsTable('silver_dim_track')
)

In [0]:
%sql
select *
from silver_dim_track
limit 10;

In [0]:
df_artist = spark.table('silver_dim_artist')
df_album = spark.table('silver_dim_album')
df_track = spark.table('silver_dim_track')

In [0]:
df_track.printSchema()
df_album.printSchema()

In [0]:
df_gold = (
    df_track.alias("t")
    .join(df_album.alias("al"), "album_id", "inner")
    .join(df_artist.alias("ar"), "artist_name", "inner")
    .select(
        col("t.track_id"),
        col("t.track_name"),
        col("ar.artist_name"),
        col("al.album_name"),
        col("al.album_release_date").alias("album_release_date"), 
        col("t.track_duration_min"),
        col("ar.artist_popularity"),
        col("t.explicit")
    )
)
display(df_gold)

In [0]:
(
    df_gold
    .write
    .format("delta")
    .mode("append")
    .saveAsTable("gold_fact_streaming")
)

In [0]:
from pyspark.sql import functions as F

ranking_artista = (
    df_gold
    .groupBy('artist_name')
    .agg(F.avg('artist_popularity').alias('avg_popularity'))
    .orderBy(F.desc('avg_popularity'))
    .limit(10)
)
display(ranking_artista)