In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName('dataincode') \
    .config("spark.jars", "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.6.0.jar") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.local.warehouse", "s3a://datalake/iceberg") \
    .getOrCreate()

#Ajuste de log WARN log para ERROR
spark.sparkContext.setLogLevel("ERROR")

### Atualizações em Nível de Linha no Apache Iceberg

O Apache Iceberg gerencia atualizações em nível de linha por meio de dois modos principais: **copy-on-write (COW)** e **merge-on-read (MOR)**.

**Copy-on-Write (COW):** COW é o modo padrão no Iceberg. Quando registros na tabela são alterados (atualizados ou excluídos), os arquivos de dados associados a esses registros são reescritos com as alterações aplicadas. Este modo oferece as leituras mais rápidas, mas tem as atualizações e exclusões mais lentas.

**Merge-on-Read (MOR):** No MOR, as atualizações e exclusões não resultam na reescrita imediata de arquivos de dados. Em vez disso, "arquivos deletados" são criados para registrar as alterações. Esses arquivos deletados são então usados durante as operações de leitura para filtrar ou atualizar os dados afetados. O MOR geralmente oferece leituras e gravações rápidas. No entanto, o uso regular de compactação é recomendado para minimizar os custos de leitura, pois vários arquivos de exclusão podem se acumular ao longo do tempo, levando a operações de leitura mais complexas.

**Tipos de arquivo de exclusão:** Existem dois tipos de arquivos de exclusão: arquivos de exclusão posicional e arquivos de exclusão de igualdade. A escolha do tipo de arquivo de exclusão afeta o desempenho de leitura e gravação.

**Caso de Uso :** Se as leituras forem mais frequentes que as gravações, o COW pode ser mais adequado. Se as gravações forem frequentes, o MOR pode ser preferível.

**Fontes:**

https://iceberg.apache.org/docs/1.5.2/configuration/#write-properties<br>
Apache Iceberg: The Definitive Guide<br>
https://aws.amazon.com/pt/blogs/big-data/use-apache-iceberg-in-a-data-lake-to-support-incremental-data-processing/?ref=guptaakashdeep.com


####  <span style="color:blue">COW:</span>
Sempre que uma linha em um arquivo de dados é atualizada ou excluída, todo o arquivo de dados é reescrito com as alterações aplicadas. Isso significa que mesmo se apenas uma única linha for alterada, um novo arquivo de dados será criado, e o antigo será substituído

**Vantagens:** Leitura rápida, pois o leitor só precisa ler os dados sem mesclá-los com arquivos excluídos ou atualizados.

**Desvantagens:** Gravação lenta, pois a reescrita de arquivos de dados inteiros pode ser um processo demorado, especialmente se houver atualizações frequentes.

#### Cenários de uso ideais para COW:
**Priorizando leituras rápidas:** Se o desempenho de leitura for crítico e as gravações puderem ser um pouco mais lentas, COW é uma boa opção. Em resumo mais ```INSERT``` menos ```UPDATE, DELETE```


#### <span style="color:blue">MOR:</span>
As atualizações não resultam na reescrita imediata de arquivos de dados inteiros. Em vez disso, as alterações são rastreadas em arquivos de exclusão separados.

**Exclusão de um registro:** O registro é listado em um arquivo de exclusão. Quando o leitor lê a tabela, ele mescla os dados com o arquivo de exclusão para decidir qual registro ignorar.

**Atualização de um registro:** O registro modificado também é rastreado em um arquivo de exclusão, e o mecanismo cria um novo arquivo de dados contendo o registro com o valor atualizado. Ao ler a tabela, o mecanismo ignora a versão antiga do registro por causa do arquivo de exclusão e usa a nova versão no novo arquivo de dados.

**Vantagens:** Gravação mais rápida em comparação com COW, pois não há necessidade de reescrever arquivos de dados inteiros.

**Desvantagens:** Leitura mais lenta em comparação com COW, pois os arquivos de exclusão precisam ser reconciliados durante a leitura.

#### Cenários de uso ideais:

**Tabelas com muitas atualizações:** Se a tabela for atualizada com frequência, MOR pode ser uma boa escolha, pois pode acelerar as gravações.

**Priorizando gravações rápidas:** Se o desempenho de gravação for crítico e as leituras puderem ser um pouco mais lentas, MOR pode ser uma boa opção.

**Compactação:** A compactação regular é essencial ao usar MOR para minimizar o custo de leitura de dados


#### <span style="color:blue">Como habilitar o MOR:</span>

**Propriedades:**
- ```write.delete.mode```: Abordagem usada para transações de exclusão.
- ```write.update.mode```: Abordagem usada para transações de atualização.
- ```write.merge.mode```:  Abordagem usada para transações de mesclagem.
---
> ⚠️ O correto funcionamento vai depender da compatibilidade do mecanismo de computação, neste caso estamos usando o SPARK.
---

**Tabela já existente:**

```sql
ALTER TABLE catalog.db.nome_da_tabela SET TBLPROPERTIES (
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
```

**Nova tabela:**

```sql
CREATE TABLE catalog.db.nome_da_tabela (
    id int,
    first_name string,
    last_name string
) TBLPROPERTIES (
    'write.delete.mode'='copy-on-write',
    'write.update.mode'='merge-on-read',
    'write.merge.mode'='merge-on-read'
) USING iceberg
```

### CRIAR A TABELA VENDAS

In [3]:
# Importar funções 
from IPython.display import display, HTML

In [4]:
%run ./Includes/Utils.ipynb

In [5]:
%run ./Includes/Datasets.ipynb

##### CRIAR TABELA NO MODO PADRÃO - COW

In [6]:
# Criar DF
init_data_df = create_dataframe(columns_schema, init_data)

In [7]:
## Escrever tabela no storage
(
    init_data_df
    .writeTo("iceberg.bronze.vendas")    
    .createOrReplace()
)

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".    (0 + 8) / 8]
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
                                                                                

In [None]:
## Inserir novos dados

# feb_data_df = create_dataframe(columns_schema, feb_data)

# feb_data_df.writeTo("iceberg.bronze.vendas_cow").append()

In [8]:
## Listar configurações do modo padrão de
extended_info = spark.sql("DESCRIBE EXTENDED iceberg.bronze.vendas")

table_properties = extended_info.filter(extended_info["col_name"] == "Table Properties").take(1)[0][1]

properties = table_properties.strip('[]').split(',')

for item in properties:
    print(f"{item}")

current-snapshot-id=6641450198374604857
format=iceberg/parquet
format-version=2
write.parquet.compression-codec=zstd


In [15]:
## Listar arquivos de dados
spark.sql("SELECT * FROM iceberg.bronze.vendas.files;").toPandas()

Unnamed: 0,content,file_path,file_format,spec_id,record_count,file_size_in_bytes,column_sizes,value_counts,null_value_counts,nan_value_counts,lower_bounds,upper_bounds,key_metadata,split_offsets,equality_ids,sort_order_id,readable_metrics
0,0,s3a://datalake/iceberg/bronze/vendas/data/0000...,PARQUET,0,1,2343,"{1: 47, 2: 52, 3: 43, 4: 48, 5: 52, 6: 50, 7: ...","{1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1}","{1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0}",{},"{1: [49, 48, 49, 54, 56], 2: [50, 48, 50, 52, ...","{1: [49, 48, 49, 54, 56], 2: [50, 48, 50, 52, ...",,[4],,0,"((48, 1, 0, None, France, France), (52, 1, 0, ..."
1,0,s3a://datalake/iceberg/bronze/vendas/data/0000...,PARQUET,0,1,2343,"{1: 47, 2: 52, 3: 43, 4: 48, 5: 52, 6: 50, 7: ...","{1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1}","{1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0}",{},"{1: [49, 48, 49, 56, 48], 2: [50, 48, 50, 52, ...","{1: [49, 48, 49, 56, 48], 2: [50, 48, 50, 52, ...",,[4],,0,"((48, 1, 0, None, Norway, Norway), (52, 1, 0, ..."
2,0,s3a://datalake/iceberg/bronze/vendas/data/0000...,PARQUET,0,1,2364,"{1: 47, 2: 52, 3: 44, 4: 48, 5: 51, 6: 50, 7: ...","{1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1}","{1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0}",{},"{1: [49, 48, 49, 56, 56], 2: [50, 48, 50, 52, ...","{1: [49, 48, 49, 56, 56], 2: [50, 48, 50, 52, ...",,[4],,0,"((51, 1, 0, None, Australia, Australia), (52, ..."
3,0,s3a://datalake/iceberg/bronze/vendas/data/0000...,PARQUET,0,1,2327,"{1: 47, 2: 52, 3: 42, 4: 47, 5: 49, 6: 50, 7: ...","{1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1}","{1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0}",{},"{1: [49, 48, 50, 48, 49], 2: [50, 48, 50, 52, ...","{1: [49, 48, 50, 48, 49], 2: [50, 48, 50, 52, ...",,[4],,0,"((49, 1, 0, None, Finland, Finland), (52, 1, 0..."


In [16]:
## Listar arquivos de metadados
spark.sql("SELECT * FROM iceberg.bronze.vendas.manifests;").toPandas()

Unnamed: 0,content,path,length,partition_spec_id,added_snapshot_id,added_data_files_count,existing_data_files_count,deleted_data_files_count,added_delete_files_count,existing_delete_files_count,deleted_delete_files_count,partition_summaries
0,0,s3a://datalake/iceberg/bronze/vendas/metadata/...,7142,0,9139426830403085932,1,0,0,0,0,0,[]
1,0,s3a://datalake/iceberg/bronze/vendas/metadata/...,7333,0,9139426830403085932,0,3,1,0,0,0,[]


In [14]:
## Exibir Registro
spark.sql("select * from iceberg.bronze.vendas").show()

+------------+----------+-----------+----------+----------+----------+---------------+---------+
|order_number|order_date|qty_ordered|unit_price|    status|product_id|product_line_id|  country|
+------------+----------+-----------+----------+----------+----------+---------------+---------+
|       10180|2024-01-22|          1|    951.87|In Process|  S10_2016|           1002|   Norway|
|       10168|2024-01-23|          5|    98.115|In Process|  S10_1949|           1002|   France|
|       10188|2024-01-21|         65|    95.202| Cancelled|  S10_4698|           1002|Australia|
|       10201|2024-01-26|          8|    951.17|   On Hold|  S10_4757|           1221|  Finland|
+------------+----------+-----------+----------+----------+----------+---------------+---------+



In [13]:
## Atualizar registros
spark.sql("""
UPDATE iceberg.bronze.vendas
SET status = 'In Process'
WHERE order_number = 10168
"""
)

                                                                                

DataFrame[]

In [23]:
### Para deletar por completo do catalog e storage
# spark.sql("DROP TABLE iceberg.bronze.vendas PURGE")

                                                                                

DataFrame[]

##### CRIAR TABELA NO MODO MOR

In [24]:
(
    init_data_df
    .writeTo("iceberg.bronze.vendas")    
    .createOrReplace()
)

In [25]:
## spark.sql("SHOW CATALOG").show()
## Alterar tabela no formato caw
spark.sql("""
ALTER TABLE iceberg.bronze.vendas SET TBLPROPERTIES (
'write.delete.mode'='copy-on-write',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)""")

DataFrame[]

In [31]:
## Exibir Registro
spark.sql("select * from iceberg.bronze.vendas").show()

+------------+----------+-----------+----------+----------+----------+---------------+---------+
|order_number|order_date|qty_ordered|unit_price|    status|product_id|product_line_id|  country|
+------------+----------+-----------+----------+----------+----------+---------------+---------+
|       10168|2024-01-23|          5|    98.115|In Process|  S10_1949|           1002|   France|
|       10180|2024-01-22|          1|    951.87|In Process|  S10_2016|           1002|   Norway|
|       10188|2024-01-21|         65|    95.202| Cancelled|  S10_4698|           1002|Australia|
|       10201|2024-01-26|          8|    951.17|   On Hold|  S10_4757|           1221|  Finland|
+------------+----------+-----------+----------+----------+----------+---------------+---------+



In [27]:
extended_info = spark.sql("DESCRIBE EXTENDED iceberg.bronze.vendas")

table_properties = extended_info.filter(extended_info["col_name"] == "Table Properties").take(1)[0][1]

properties = table_properties.strip('[]').split(',')

for item in properties:
    print(f"{item}")

current-snapshot-id=780635310986335313
format=iceberg/parquet
format-version=2
write.delete.mode=copy-on-write
write.merge.mode=merge-on-read
write.parquet.compression-codec=zstd
write.update.mode=merge-on-read


In [30]:
spark.sql("""
UPDATE iceberg.bronze.vendas
SET status = 'In Process'
WHERE order_number = 10168
"""
)
# Disputed

DataFrame[]

In [32]:
## Listar arquivos de dados
spark.sql("SELECT * FROM iceberg.bronze.vendas.files;").toPandas()

Unnamed: 0,content,file_path,file_format,spec_id,record_count,file_size_in_bytes,column_sizes,value_counts,null_value_counts,nan_value_counts,lower_bounds,upper_bounds,key_metadata,split_offsets,equality_ids,sort_order_id,readable_metrics
0,0,s3a://datalake/iceberg/bronze/vendas/data/0000...,PARQUET,0,1,2343,"{1: 47, 2: 52, 3: 43, 4: 48, 5: 52, 6: 50, 7: ...","{1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1}","{1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0}",{},"{1: [49, 48, 49, 54, 56], 2: [50, 48, 50, 52, ...","{1: [49, 48, 49, 54, 56], 2: [50, 48, 50, 52, ...",,[4],,0.0,"((48, 1, 0, None, France, France), (52, 1, 0, ..."
1,0,s3a://datalake/iceberg/bronze/vendas/data/0000...,PARQUET,0,1,2329,"{1: 47, 2: 52, 3: 43, 4: 48, 5: 50, 6: 50, 7: ...","{1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1}","{1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0}",{},"{1: [49, 48, 49, 54, 56], 2: [50, 48, 50, 52, ...","{1: [49, 48, 49, 54, 56], 2: [50, 48, 50, 52, ...",,[4],,0.0,"((48, 1, 0, None, France, France), (52, 1, 0, ..."
2,0,s3a://datalake/iceberg/bronze/vendas/data/0000...,PARQUET,0,1,2343,"{1: 47, 2: 52, 3: 43, 4: 48, 5: 52, 6: 50, 7: ...","{1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1}","{1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0}",{},"{1: [49, 48, 49, 56, 48], 2: [50, 48, 50, 52, ...","{1: [49, 48, 49, 56, 48], 2: [50, 48, 50, 52, ...",,[4],,0.0,"((48, 1, 0, None, Norway, Norway), (52, 1, 0, ..."
3,0,s3a://datalake/iceberg/bronze/vendas/data/0000...,PARQUET,0,1,2364,"{1: 47, 2: 52, 3: 44, 4: 48, 5: 51, 6: 50, 7: ...","{1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1}","{1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0}",{},"{1: [49, 48, 49, 56, 56], 2: [50, 48, 50, 52, ...","{1: [49, 48, 49, 56, 56], 2: [50, 48, 50, 52, ...",,[4],,0.0,"((51, 1, 0, None, Australia, Australia), (52, ..."
4,0,s3a://datalake/iceberg/bronze/vendas/data/0000...,PARQUET,0,1,2327,"{1: 47, 2: 52, 3: 42, 4: 47, 5: 49, 6: 50, 7: ...","{1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1}","{1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0}",{},"{1: [49, 48, 50, 48, 49], 2: [50, 48, 50, 52, ...","{1: [49, 48, 50, 48, 49], 2: [50, 48, 50, 52, ...",,[4],,0.0,"((49, 1, 0, None, Finland, Finland), (52, 1, 0..."
5,1,s3a://datalake/iceberg/bronze/vendas/data/0000...,PARQUET,0,1,1451,"{2147483546: 134, 2147483545: 39}",,,,"{2147483546: [115, 51, 97, 58, 47, 47, 100, 97...","{2147483546: [115, 51, 97, 58, 47, 47, 100, 97...",,[4],,,"((None, None, None, None, None, None), (None, ..."


In [33]:
# Listar arquivos de metadados
spark.sql("SELECT * FROM iceberg.bronze.vendas.manifests;").toPandas()

Unnamed: 0,content,path,length,partition_spec_id,added_snapshot_id,added_data_files_count,existing_data_files_count,deleted_data_files_count,added_delete_files_count,existing_delete_files_count,deleted_delete_files_count,partition_summaries
0,0,s3a://datalake/iceberg/bronze/vendas/metadata/...,7142,0,1548163295742755684,1,0,0,0,0,0,[]
1,0,s3a://datalake/iceberg/bronze/vendas/metadata/...,7317,0,780635310986335313,4,0,0,0,0,0,[]
2,1,s3a://datalake/iceberg/bronze/vendas/metadata/...,7116,0,1548163295742755684,0,0,0,1,0,0,[]


In [None]:
# # ## Para deletar por completo do catalog e storage
# spark.sql("DROP TABLE iceberg.bronze.vendas PURGE")

In [None]:
# spark.stop()