In [None]:
from pyspark.sql import SparkSession
from azure.storage.blob import BlobServiceClient
from delta import configure_spark_with_delta_pip
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import os

# Configurações do Azure Data Lake
account_name = os.getenv("AZURE_STORAGE_ACCOUNT_NAME")
sas_token = os.getenv("AZURE_STORAGE_SAS_TOKEN")
account_url = f"https://{account_name}.blob.core.windows.net"

try:
    spark = SparkSession.builder \
        .appName("Transform Silver To Gold") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config(f"fs.azure.sas.gold.{account_name}.blob.core.windows.net", sas_token)

    spark = configure_spark_with_delta_pip(spark).getOrCreate()
except Exception as e:
    print(f"Erro ao acessar configurar a sessão do spark: {e}")
    raise

# Listar todos os arquivos na camada Silver (com base em arquivos .parquet)
blob_service_client = BlobServiceClient(account_url=f"{account_url}?{sas_token}")
container_client = blob_service_client.get_container_client("gold")

try:
    # Inicialize a lista de tabelas
    silver_tables = [blob.name for blob in container_client.list_blobs() if blob.name.endswith(".parquet")]
    print(f"Tabelas encontradas no container 'gold': {silver_tables}")
except Exception as e:
    print(f"Erro ao acessar o container ou listar blobs: {e}")

if not silver_tables:
    print("Nenhum arquivo .parquet encontrado no container.")
else:
    for table_path in silver_tables:
        try:
            table_name = table_path.split(".")[0]
            silver_path = f"wasbs://silver@{account_name}.blob.core.windows.net/{table_name + ".parquet"}"
            gold_path = f"wasbs://gold@{account_name}.blob.core.windows.net/{table_name + ".parquet"}"

            df_silver = spark.read.parquet(silver_path)

            # Deduplicar com base no campo de timestamp
            df_gold = df_silver \
                .withColumn("rank", 
                            F.row_number().over(
                                Window.partitionBy("id").orderBy(F.desc("updated_at"))
                            )) \
                .filter(F.col("rank") == 1) \
                .drop("rank")
            
            # Salvar os dados no formato Delta no Azure Blob Storage
            df_gold.write.format("delta").mode("overwrite").save(gold_path)

            print(f"Tabela {table_name} transformada e salva em {gold_path}")
        except Exception as e:
            print(f"Erro ao processar a coleção {table_name}: {e}")
