# Carga na camada Gold das informações:
- classic.hit(camada Silver) 
- search_track_artist(camada Silver)

## importando lib functions para o processamento para a camada GOLD

In [0]:
# Importa as funções do PySpark com o apelido "F" para facilitar o uso (ex: F.col, F.sum)
from pyspark.sql import functions as F

## Verificando se as delta table existem
## Limpando delta table da tracks_analytics e genres_summary da camada Gold

In [0]:
if spark.catalog.tableExists("workspace.gold.tracks_analytics"):
    spark.sql("TRUNCATE TABLE workspace.gold.tracks_analytics")
else:
    print("Tabela não existe. Nenhuma ação foi tomada.")

if spark.catalog.tableExists("workspace.gold.genres_summary"):
    spark.sql("TRUNCATE TABLE workspace.gold.genres_summary")
else:
    print("Tabela não existe. Nenhuma ação foi tomada.")

## Extraindo as informações da camada Silver

In [0]:
# Leitura das tabelas Silver
df_api = spark.table("workspace.silver.search_track_artist")
df_csv = spark.table("workspace.silver.classic_hit")

## Padronizando as informações extraidas

In [0]:
# Padronizar nomes e tipos
df_csv = (
    df_csv
    .withColumnRenamed("track", "track_name")
    .withColumnRenamed("artist", "artist_name")
    .withColumnRenamed("Duration", "duration_ms")
    .withColumnRenamed("Popularity", "csv_popularity")
)

# Padroniza colunas da API
df_api = (
    df_api
    .withColumnRenamed("popularity", "api_popularity")
)

## Desnormalizando as infomrações

In [0]:
# Fazer o join (normalizado)
df_join = (
    df_csv.alias("csv")
    .join(
        df_api.alias("api"),
        (F.lower(F.trim(F.col("csv.track_name"))) == F.lower(F.trim(F.col("api.track_name")))) &
        (F.lower(F.trim(F.col("csv.artist_name"))) == F.lower(F.trim(F.col("api.artist_name")))),
        "inner"
    )
)

## Criando novas colunas analíticas para a camada Gold

In [0]:
# Criar colunas analíticas (camada Gold)
df_gold = (
    df_join
    # Cria uma nova coluna convertendo a duração de milissegundos para minutos
    .withColumn("duration_min", F.col("api.duration_ms") / 60000)
    
    # Calcula a diferença de popularidade entre a API e o CSV
    .withColumn("popularity_diff", F.col("api_popularity") - F.col("csv_popularity"))
    
    # Cria uma pontuação combinando "Danceability" e "Speechiness"
    .withColumn("dance_score", F.col("Danceability") * (1 - F.col("Speechiness")))
    
    # Marca se a música é um "hit" (popularidade acima de 70)
    .withColumn("is_hit", F.when(F.col("api_popularity") > 70, F.lit("Sim")).otherwise("Não"))
    
    # Seleciona apenas as colunas finais para a camada Gold
    .select(
        "api.track_id", "csv.track_name", "csv.artist_name",
        "csv.Genre", "csv.year",
        "api.album_name", "api.album_type", "api.album_image_url",
        "csv.Danceability", "csv.Speechiness",
        "csv_popularity", "api_popularity", "popularity_diff",
        "duration_min", "dance_score", "is_hit"
    )
)

## Carregando 1º tabela na camada Gold

In [0]:
# Crie a tabela final:
(
    df_gold
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("workspace.gold.tracks_analytics")
)

## Carregando 2º tabela na camada Gold (Visão agregada por Genero)

In [0]:
# visões agregadas(Genero)

# Média por gênero
df_genres = (
    df_gold.groupBy("Genre")
    .agg(
        F.avg("api_popularity").alias("avg_popularity"),
        F.avg("Danceability").alias("avg_danceability"),
        F.count("*").alias("num_tracks")
    )
)
df_genres.write.format("delta").mode("overwrite").saveAsTable("workspace.gold.genres_summary")