### Branching and Tagging - Visão Geral

As tabelas do Iceberg matem um log de snapshots, que por sua vez representão as mudanças aplicadas na tabela.
Os snapshots são fundamentais no formato de tabela Iceberg, pois eles garantem o isolamento das consultas/queries e permitem o "time travel" entre as diferentes "versões de dados" da tabela.

Para carontrolar o tamanho dos metadados e custos de armazenamento, Iceberg fornece um gerenciamento de ciclo de vida de snapshots. Que pode ser executado via procedures como **_expire_snapshots_** para remoção de de arquivos de dados não utilizados com base nas propriedades dos snapshots.

Para possibilidar o gerenciamento de ciclo de vidade de snapshots. O Iceberg suporta Branches e Tags, Esses recursos garantem que os snapshots tenham seus proprios ciclos de vida que por sua vez podem ser controlados por politicas de retenção em nivel de Branch e tag.

**Branches (Ramificações):** São referências mutáveis que podem ser atualizadas para apontar para um novo snapshot. Isso significa que uma branch pode evoluir ao longo do tempo, acompanhando as mudanças na tabela. Uma branch é como uma linha de desenvolvimento independente, permitindo que você faça alterações sem afetar a "linha principal" de desenvolvimento (main branch).

**Tags (Etiquetas):** São referências imutáveis a um snapshot específico. Uma vez criada, uma tag sempre apontará para o mesmo snapshot, independentemente de quaisquer alterações subsequentes na tabela. As tags servem como marcadores fixos no tempo, permitindo que você retorne a um estado específico da tabela com facilidade.

fonte: https://iceberg.apache.org/docs/1.6.1/branching/

### Write-Audit-Publish (WAP)

Com as Branches você poderá perfeitamente controlar e gerenciar as versões dos seus dados de forma isolada,
assim como ocorre na Engenharia de Software com os códigos.
A partir de agora, com Apache Iceberg podemos falar do padrão de engenharia de dados WAP, de forma mais clara,
que visa garantir a qualidade dos dados antes de disponibiliza-los para o consumo.<br>

O padrão Write-Audit-Publish (WAP) é uma prática recomendada para garantir a qualidade dos dados em pipelines de dados.
É semelhante aos fluxos de trabalho de Integração Contínua e Entrega (CI/CD) comuns no desenvolvimento de software.

**Write (Escrita):** Os dados são gravados inicialmente em um local de teste ou staging, em vez de serem gravados diretamente na
tabela principal. Isso isola os dados de produção de possíveis inconsistências. No Apache Iceberg, isso geralmente é feito
gravando em um branch separado da tabela principal.

**Audit (Auditoria):**  Uma vez que os dados estão em staging, eles são submetidos a um processo de validação completo para garantir que atendam aos padrões de qualidade. Isso pode incluir a verificação de valores nulos ou duplicados, validação de tipos
de dados e verificação da integridade dos dados. Qualquer ferramenta de qualidade de dados pode ser usada para essa etapa.

**Publish (Publicação):**  Após a validação, os dados são promovidos para a tabela de produção. No Iceberg, isso geralmente é feito por meio de um fast-forward da branch de staging para o branch principal. 

![ci/cd](https://www.tabular.io/wp-content/uploads/2024/01/image1-1024x182.png) <br>

fonte: https://www.tabular.io/apache-iceberg-cookbook/data-engineering-write-audit-publish/

### Carregar Recursos

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName('dataincode') \
    .config("spark.jars", "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.6.0.jar") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.local.warehouse", "s3a://datalake/iceberg") \
    .getOrCreate()

#Ajuste de log WARN log para ERROR
spark.sparkContext.setLogLevel("ERROR")

In [None]:
# Importar funções 
from IPython.display import display, HTML

In [None]:
%run ./Includes/Utils.ipynb

In [None]:
%run ./Includes/Datasets.ipynb

In [None]:
spark.sql("USE iceberg")

In [None]:
spark.sql("SHOW TABLES in bronze").show()

In [None]:
spark.sql("SELECT count(*) FROM iceberg.bronze.vendas").show()

In [None]:
## Listar branches da tabela

spark.sql("SELECT * FROM iceberg.bronze.vendas.refs;").toPandas()

### Ativar o Write-Audit-Publish(WAP) e Criar a nova Branche

In [None]:
## Ativar WAP
spark.sql("""
    ALTER TABLE iceberg.bronze.vendas
    SET TBLPROPERTIES (
        'write.wap.enabled' = 'true'
    )"""
)

In [None]:
# Criar uma branch de desenvolvimento chamada etl_0

spark.sql("ALTER TABLE iceberg.bronze.vendas CREATE BRANCH etl_0")

In [None]:
# Criar uma branch chamada dev_etl

# spark.sql("ALTER TABLE iceberg.bronze.vendas CREATE BRANCH etl_1")

### <span style="color:blue"> Quando for trabalhar com as branches, é importante adicionar o prefixo``` branch_ ``` ao nome da nova branch </span>

- <span style="color:red">Cuidado para não inserir dados por engano na branch ``` main ``` </span>.
- <span style="color:red">Caso isso ocorra após a criação da branch de etl, você perderar a referencia da branch **'ancestral'** e não conseguirá fazer um merge </span>.
- <span style="color:red">Caso você perca a referencia do ultimo estado 'válido/correto' da tabela. Terá que fazer um rollback para o ultimo snapshot sem alterações </span>.


### Inspecionar tabela

In [None]:
## Exibir os dados da brach etl_0 ===> Quando for listar uma branche é importante adicionar o prefixo branch_etl_0

spark.sql("SELECT count(*) FROM iceberg.bronze.vendas.branch_main").toPandas()

In [None]:
## Listar snapshots

spark.sql("SELECT * FROM iceberg.bronze.vendas.snapshots;").toPandas()

In [None]:
## Listar tabela por versão main
spark.sql("SELECT count(*) FROM iceberg.bronze.vendas VERSION AS OF 'main' ").toPandas()

In [None]:
## Listar tabela por versão dev_etl
spark.sql("SELECT count(*) FROM iceberg.bronze.vendas VERSION AS OF 'etl_0' ").toPandas()

In [None]:
## Listar tabela por versão dev_etl
# spark.sql("SELECT count(*) FROM iceberg.bronze.vendas VERSION AS OF 'etl_1' ").toPandas()

### Inserir Novos dados na branch dev

In [None]:
## Inserir novos dados

mar_data_df = create_dataframe(columns_schema, mar_data)

mar_data_df.writeTo("iceberg.bronze.vendas.branch_etl_0").append()

In [None]:
## Inserir novos dados

# apr_data_df = create_dataframe(columns_schema, apr_data)

# apr_data_df.writeTo("iceberg.bronze.vendas.branch_etl_1").append()

In [None]:
## Exibir novamente os dados da brach dev_branch

spark.sql("SELECT count(*) FROM iceberg.bronze.vendas.branch_etl_0").toPandas()

In [None]:
## Exibir os dados da brach main

spark.sql("SELECT count(*) FROM iceberg.bronze.vendas").toPandas()

### Fast forward - Merge da dev_branch na branch main
Para publicar a branch ``` 'dev' ``` na branch ``` 'main' ``` basta usar a procedure: catalog.system.fast_forward('catalog.db.tabela', 'main', 'dev').<br>
 
Esta procedure faz uma mescla entre as duas versões da tabela, o processo é similar a mesclar um pull request no Git.
 
Esta procedure moverá o ponteiro do branch main para o mesmo commit do branch ``` 'dev' ``` , e efetivamente faz a publicação das alterações na branch principal (``` 'main' ```).

In [None]:
spark.sql(f" CALL iceberg.system.fast_forward('iceberg.bronze.vendas', 'main', 'etl_0')")

In [None]:
# spark.sql(f" CALL iceberg.system.fast_forward('iceberg.bronze.vendas', 'main', 'etl_1')")

### Se tudo deu certo vamos deletar a branch de dev

In [None]:
## Em caso de sucesso ou falha é bom não deixar historico desnecessário ==> Boas praticas do padrão Feature-Branch
spark.sql("ALTER TABLE iceberg.bronze.vendas DROP BRANCH etl_0")

In [None]:
## Em caso de sucesso ou falha é bom não deixar historico desnecessário ==> Boas praticas do padrão Feature-Branch
# spark.sql("ALTER TABLE iceberg.bronze.vendas DROP BRANCH etl_1")

In [None]:

spark.sql("SELECT * FROM iceberg.bronze.vendas.refs;").toPandas()

In [None]:

#Exibir versão final
spark.sql("SELECT count(*) FROM iceberg.bronze.vendas").toPandas()

### Tags
Você pode criar uma tag usando SQL, especificando o nome da tag, o snapshot de referência e, opcionalmente, um período de retenção dos dados refente ao snapshot.

Uma vez criada, a tag sempre apontará para um snapshot, mesmo que novos snapshots sejam criados na tabela. Com isso você congela o estado da tabela e pode acessa-lo facilmente por meio de uma referencia amigavel

Expiração de Tags e Retenção: Você pode definir um período de retenção para uma tag usando o comando ```RETAIN```  Após esse período, a tag é considerada expirada e pode ser removida automaticamente pelo sistema.

In [None]:
## Criando Tags
spark.sql(f"""
    ALTER TABLE iceberg.bronze.vendas.branch_etl_0
      CREATE TAG `vendas-mar-2024` 
      AS OF VERSION ***
      RETAIN 180 DAYS """
)

In [None]:
## Criando Tags
# spark.sql(f"""
#     ALTER TABLE iceberg.bronze.vendas.branch_etl_1
#       CREATE TAG `vendas-abr-2024` 
#       AS OF VERSION ***
#       RETAIN 180 DAYS """
# )

In [None]:
## Consultando Tags
spark.sql("SELECT count(*) FROM iceberg.bronze.vendas FOR VERSION AS OF 'vendas-abr-2024'").show()

In [None]:
## Deletar Tags
# spark.sql(f"ALTER TABLE iceberg.bronze.vendas DROP TAG `vendas-mar-2024`")