# 🪙 Camada Silver - Criptoativos

## 📄 Descrição
Este notebook realiza o processamento incremental da camada Bronze para a camada Silver, consolidando dados históricos e intradiários em uma única tabela Delta do domínio de criptoativos.

O pipeline utiliza **Structured Streaming com Delta Change Data Feed (CDF)** para garantir ingestão eficiente, incremental e confiável. Durante a primeira execução, o notebook carrega o histórico completo dos ativos. Após isso, realiza atualizações contínuas com base em novos dados recebidos diariamente.

Além disso, o notebook possui inteligência para detectar **novos ativos** adicionados na Bronze, realizando a carga histórica automaticamente, garantindo consistência e cobertura total do domínio.

## 📥 Entradas (Bronze)
- `bronze_historico`: histórico de preços diários via Yahoo Finance
- `bronze_intradiario`: cotações intradiárias via CoinGecko

## 📤 Saída (Silver)
- `silver.ativo_financeiro`: tabela Delta unificada contendo histórico e atualizações diárias de todos os criptoativos

## ⚙️ Tecnologias e Estratégias
- 🔄 Structured Streaming com `foreachBatch`
- 🧠 Processamento incremental via `readChangeFeed`
- ⚡ Merge Upsert com Delta Lake
- 🔍 Detecção automática de novos ativos
- ✅ Checagem de existência da tabela Silver e schema evolution automático
- 🗂 Checkpoints para tolerância a falhas e reprocessamentos seguros

---

> 💡 *Este pipeline está preparado para rodar continuamente em ambiente de produção, garantindo atualização constante dos dados financeiros do domínio de criptoativos.*


In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import expr, col
from pyspark.sql import DataFrame

In [0]:
spark.sql("create catalog if not exists lakehouse managed location 's3://databricks-9cwyoqzauqyermnrdpparb-cloud-storage-bucket/unity-catalog/1732645886098685'")
spark.sql("use catalog lakehouse")

In [0]:
spark.sql("create schema if not exists silver")
spark.sql("create volume if not exists silver.checkpoint_cripto")
spark.sql("create volume if not exists silver.schema_cripto")

In [0]:
checkpoint_path = "/Volumes/lakehouse/silver/checkpoint_cripto" 
schema_path = "/Volumes/lakehouse/silver/schema_cripto"
table_name = "silver.cripto"

In [0]:
# =====================================================================
# Leitura com Structured Streaming e Change Data Feed (CDF) dos dados da camada Bronze
# =====================================================================

# Lê as mudanças da tabela bronze_historico desde a primeira versão (full load na primeira execução)
df_historico = (
    spark.readStream
         .format("delta")
         .table("bronze.historico_cripto")
)
# Lê as mudanças da tabela bronze_intradiario
df_intradiario = (
    spark.readStream
         .format("delta")
         .table("bronze.diario_cripto")
)

# Ajuste do nome das colunas para facilitar a manipulação
df_historico = df_historico.withColumn("asset_name", col("ticker")).drop(col("ticker")).withColumn("time_interval",col("timestamp"))

# Acrescenta uma coluna para diferenciar a origem do dado (opcional, mas útil para debug/análise)
df_historico = df_historico.withColumn("source_type", expr("'historico'"))
df_intradiario = df_intradiario.withColumn("source_type", expr("'intradiario'"))

# Une os dois streams, garantindo que os schemas sejam compatíveis
df_union = df_historico.unionByName(df_intradiario)

In [0]:
def insert_to_silver(microBatchDF: DataFrame, batchId: int):
    """
    Função para inserir (append) os registros do microbatch na tabela Silver.
    Como as tabelas Bronze são insertOnly e o Structured Streaming capta apenas os registros novos,
    não é necessário fazer merge (upsert).
    """
    microBatchDF.write.format("delta").mode("append").saveAsTable("silver.cripto")
    print(f"Batch {batchId} inserido com sucesso na camada Silver.")


In [0]:

# =====================================================================
# Configura o streaming para processar incrementalmente na Silver
# =====================================================================

silver_stream = (
    df_union.writeStream
            .option("checkpointLocation", checkpoint_path)
            .foreachBatch(insert_to_silver)
            .outputMode("append")  # Pode ser "update" ou "append" dependendo da lógica de merge
            .trigger(availableNow=True)
            .start()
)

silver_stream.awaitTermination()


In [0]:
spark.sql("alter table silver.cripto cluster by AUTO")

In [0]:
spark.sql("optimize silver.cripto").display()