In [1]:
from pyspark.sql import SparkSession
import os
import shutil

# Limpa warehouse Iceberg (evita warnings de metadata antigos)
shutil.rmtree("/tmp/iceberg", ignore_errors=True)
os.makedirs("/tmp/iceberg", exist_ok=True)

# SparkSession OTIMIZADA para ICEBERG LOCAL (sem lock-impl para evitar erro de construtor)
spark = SparkSession.builder \
    .appName("Spark-IPS-Iceberg-Local-Fixed-NoLock") \
    .config("spark.jars.packages", 
            "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2") \
    .config("spark.sql.extensions", 
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg_catalog.type", "hadoop") \
    .config("spark.sql.catalog.iceberg_catalog.warehouse", "file:///tmp/iceberg") \
    .config("spark.sql.catalog.iceberg_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") \
    .config("spark.sql.catalog.iceberg_catalog.default-spec", "v2") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")  # WARN para ver downloads (mude para "ERROR" ap√≥s sucesso)

print("=== Spark Iniciado com ICEBERG LOCAL (Sem Lock/AWS)! ===")
print(f"Vers√£o do Spark: {spark.version}")

# Verifica√ß√£o CR√çTICA: Confirma io-impl (deve ser HadoopFileIO)
try:
    io_impl = spark.conf.get("spark.sql.catalog.iceberg_catalog.io-impl")
    print(f"Config io-impl ATUAL: {io_impl}")
    if "HadoopFileIO" in io_impl:
        print("‚úÖ IO local confirmado (HadoopFileIO - sem S3/AWS)!")
    else:
        print("‚ö†Ô∏è IO-impl suspeito - verifique se √© HadoopFileIO.")
except Exception as e:
    print(f"Erro ao ler io-impl: {e}")

# Verifica√ß√£o de Cat√°logos (deve incluir iceberg_catalog ap√≥s init)
try:
    catalogs = [c.name for c in spark.catalog.listCatalogs()]
    print(f"Cat√°logos dispon√≠veis: {catalogs} (deve incluir 'iceberg_catalog')")
    if "iceberg_catalog" in catalogs:
        print("‚úÖ Cat√°logo Iceberg registrado com sucesso!")
    else:
        print("‚ö†Ô∏è 'iceberg_catalog' n√£o listado ainda - teste CREATE para ativar.")
except Exception as e:
    print(f"Erro ao listar cat√°logos: {e}")

# Testa o cat√°logo Iceberg (DROP, CREATE simples - sem lock-impl)
try:
    # Teste 1: DROP TABLE teste
    spark.sql("DROP TABLE IF EXISTS iceberg_catalog.default.iceberg_ips")
    print("Teste 1: DROP TABLE OK! (Cat√°logo acess√≠vel).")

    # Teste 2: CREATE simples (Iceberg table)
    spark.sql("CREATE TABLE IF NOT EXISTS iceberg_catalog.default.test_table (id INT) USING iceberg")
    print("Teste 2: CREATE TABLE OK! (Tabela Iceberg criada).")

    # Teste 3: DROP teste
    spark.sql("DROP TABLE IF EXISTS iceberg_catalog.default.test_table")
    print("Teste 3: DROP TABLE teste OK! (Iceberg funcional - pronto para ETL).")

    # Teste 4: Confirma listagem de tabelas vazia
    tables_df = spark.sql("SHOW TABLES IN iceberg_catalog.default")
    print(f"Teste 4: Tabelas em iceberg_catalog.default: {tables_df.count()} (deve ser 0 ap√≥s drop).")
    
except Exception as e:
    print(f"‚ùå Erro em um dos testes Iceberg: {e}")
    print("DETALHES: Se mencionar 'LockManager', confirme remo√ß√£o da config. Reinicie kernel se persistir.")
    print("Dica: Rode manualmente: spark.sql('SHOW CATALOGS').show() para debug.")
else:
    print("\nüéâ TODOS OS TESTES ICEBERG PASSARAM! Cat√°logo local pronto para DDL/INSERT/UPDATE.")

25/10/02 19:22:05 WARN Utils: Your hostname, DESKTOP-1P6TETU resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/10/02 19:22:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/nice_correia/.cache/pypoetry/virtualenvs/trabalho-spark-RjY8yXlH-py3.12/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/nice_correia/.ivy2/cache
The jars for the packages stored in: /home/nice_correia/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2d514e93-c184-40f6-b973-1964669db6f0;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 in central
:: resolution report :: resolve 226ms :: artifacts dl 7ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: o

=== Spark Iniciado com ICEBERG LOCAL (Sem Lock/AWS)! ===
Vers√£o do Spark: 3.5.7
Config io-impl ATUAL: org.apache.iceberg.hadoop.HadoopFileIO
‚úÖ IO local confirmado (HadoopFileIO - sem S3/AWS)!


                                                                                

Cat√°logos dispon√≠veis: ['spark_catalog'] (deve incluir 'iceberg_catalog')
‚ö†Ô∏è 'iceberg_catalog' n√£o listado ainda - teste CREATE para ativar.
Teste 1: DROP TABLE OK! (Cat√°logo acess√≠vel).
Teste 2: CREATE TABLE OK! (Tabela Iceberg criada).
Teste 3: DROP TABLE teste OK! (Iceberg funcional - pronto para ETL).
Teste 4: Tabelas em iceberg_catalog.default: 0 (deve ser 0 ap√≥s drop).

üéâ TODOS OS TESTES ICEBERG PASSARAM! Cat√°logo local pronto para DDL/INSERT/UPDATE.


In [2]:
import os

# Detecta o caminho do CSV dinamicamente para notebook em /notebooks/
# (Sobe 1 n√≠vel de notebooks/ para root do projeto: trabalho_spark/)
notebook_dir = os.getcwd()  # Diret√≥rio atual (provavelmente /home/nice_correia/trabalho_spark/notebooks)
project_root = os.path.dirname(notebook_dir)  # Sobe 1 n√≠vel: notebooks/ -> trabalho_spark/
raw_path = os.path.join(project_root, "data", "ips_brasil.csv")

# Verifica√ß√£o: Mostra caminhos calculados e confirma exist√™ncia
print(f"Diret√≥rio do notebook: {notebook_dir}")
print(f"Root do projeto detectado: {project_root}")
print(f"Caminho absoluto do CSV: {raw_path}")
print(f"Arquivo existe? {os.path.exists(raw_path)}")

# Se n√£o existir (fallback), tenta caminho relativo direto ou absoluto manual
if not os.path.exists(raw_path):
    # Fallback 1: Se notebook foi aberto de outro lugar, tenta relativo ao cwd
    alt_path1 = os.path.join(os.getcwd(), "data", "raw", "ips_brasil.csv")
    print(f"Fallback 1: {alt_path1} (existe? {os.path.exists(alt_path1)})")
    if os.path.exists(alt_path1):
        raw_path = alt_path1
    else:
        # Fallback 2: Caminho absoluto manual (ajuste se o seu home for diferente)
        raw_path = "/home/nice_correia/trabalho_spark/data/raw/ips_brasil.csv"
        print(f"Fallback 2 (manual): {raw_path} (existe? {os.path.exists(raw_path)})")
        if not os.path.exists(raw_path):
            raise FileNotFoundError(f"CSV n√£o encontrado! Verifique se est√° em {project_root}/data/raw/ips_brasil.csv")

# Carrega o DF com o caminho correto
df = spark.read.option("header", "true").option("inferSchema", "true").csv(raw_path)
print(f"\nDados IPS full carregados de '{raw_path}': {df.count()} linhas, {len(df.columns)} colunas.")

# Verifica√ß√£o r√°pida: Schema e top 5 linhas chave (com nomes especiais escapados)
print("\nSchema das colunas chave:")
df.select("`C√≥digo IBGE`", "`Munic√≠pio`", "`UF`", "`√çndice de Progresso Social`").printSchema()
df.select("`C√≥digo IBGE`", "`Munic√≠pio`", "`UF`", "`√çndice de Progresso Social`").show(5, truncate=False)

# Opcional: Lista todas as colunas para confirmar (79 no total)
print(f"\nTotal de colunas: {len(df.columns)}")
print("Primeiras 10 colunas:", df.columns[:10])

Diret√≥rio do notebook: /home/nice_correia/trabalho_spark/notebooks
Root do projeto detectado: /home/nice_correia/trabalho_spark
Caminho absoluto do CSV: /home/nice_correia/trabalho_spark/data/ips_brasil.csv
Arquivo existe? True


                                                                                


Dados IPS full carregados de '/home/nice_correia/trabalho_spark/data/ips_brasil.csv': 5570 linhas, 79 colunas.

Schema das colunas chave:
root
 |-- C√≥digo IBGE: integer (nullable = true)
 |-- Munic√≠pio: string (nullable = true)
 |-- UF: string (nullable = true)
 |-- √çndice de Progresso Social: double (nullable = true)

+-----------+--------------------------+---+--------------------------+
|C√≥digo IBGE|Munic√≠pio                 |UF |√çndice de Progresso Social|
+-----------+--------------------------+---+--------------------------+
|1100015    |Alta Floresta D'Oeste (RO)|RO |50.94710852687823         |
|1100023    |Ariquemes (RO)            |RO |55.97475391330499         |
|1100031    |Cabixi (RO)               |RO |51.36453973053614         |
|1100049    |Cacoal (RO)               |RO |61.84526595721548         |
|1100056    |Cerejeiras (RO)           |RO |58.70878800673873         |
+-----------+--------------------------+---+--------------------------+
only showing top 5 rows


In [3]:
# === Iceberg: DDL (Cria√ß√£o da Tabela) ===
print("\n=== Iceberg: DDL ===")
table_name = "iceberg_catalog.default.iceberg_ips"

# Limpa se existir
spark.sql(f"DROP TABLE IF EXISTS {table_name}")

# Cria tabela Iceberg a partir do DF
df.write.format("iceberg").mode("overwrite").saveAsTable(table_name)

print("Tabela Iceberg criada com sucesso! (79 colunas com nomes especiais suportados).")
print(f"Total de linhas na tabela: {spark.sql(f'SELECT COUNT(*) FROM {table_name}').collect()[0][0]}")

# Verifica√ß√£o: Top 5 por IPS
spark.sql(f"SELECT `C√≥digo IBGE`, `Munic√≠pio`, `UF`, `√çndice de Progresso Social` FROM {table_name} ORDER BY `√çndice de Progresso Social` DESC LIMIT 5").show(truncate=False)


=== Iceberg: DDL ===


25/10/02 20:00:15 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/10/02 20:00:18 WARN HadoopTableOperations: Error reading version hint file file:/tmp/iceberg/default/iceberg_ips/metadata/version-hint.text
java.io.FileNotFoundException: File file:/tmp/iceberg/default/iceberg_ips/metadata/version-hint.text does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
	at 

Tabela Iceberg criada com sucesso! (79 colunas com nomes especiais suportados).
Total de linhas na tabela: 5570


[Stage 13:>                                                         (0 + 1) / 1]

+-----------+-------------------+---+--------------------------+
|C√≥digo IBGE|Munic√≠pio          |UF |√çndice de Progresso Social|
+-----------+-------------------+---+--------------------------+
|3516853    |Gavi√£o Peixoto (SP)|SP |74.49282395600983         |
|5300108    |Bras√≠lia (DF)      |DF |71.25189747327438         |
|3548906    |S√£o Carlos (SP)    |SP |70.96059545595044         |
|5208707    |Goi√¢nia (GO)       |GO |70.49282747376537         |
|3533601    |Nuporanga (SP)     |SP |70.4719550872065          |
+-----------+-------------------+---+--------------------------+



                                                                                

In [4]:
# === Iceberg: INSERT (Adi√ß√£o de Dados) - VERS√ÉO FINAL CORRIGIDA ===
print("\n=== Iceberg: INSERT ===")
table_name = "iceberg_catalog.default.iceberg_ips"

# Verifica√ß√£o inicial: Confirma colunas chave (baseado no seu df.columns)
print(f"Total de colunas no DF original: {len(df.columns)}")
key_columns = ["C√≥digo IBGE", "Munic√≠pio", "UF", "√çndice de Progresso Social", "Popula√ß√£o 2022"]
for col in key_columns:
    exists = col in df.columns
    print(f"Coluna '{col}' existe? {exists}")
    if not exists:
        print(f"AVISO: Coluna '{col}' n√£o encontrada! Colunas similares: {[c for c in df.columns if col.lower() in c.lower()]}")

# Cria DataFrame fict√≠cio com schema EXATO do original
from pyspark.sql import Row
data_dict = {col: None for col in df.columns}  # 79 chaves, todas NULL

# Define s√≥ colunas que existem (evita erros)
if "C√≥digo IBGE" in df.columns:
    data_dict["C√≥digo IBGE"] = 9999999  # Fict√≠cio (int)
if "Munic√≠pio" in df.columns:
    data_dict["Munic√≠pio"] = "Exemplo Fict√≠cio"  # String
if "UF" in df.columns:
    data_dict["UF"] = "XX"  # String
if "√çndice de Progresso Social" in df.columns:
    data_dict["√çndice de Progresso Social"] = 99.9  # Double alto para topo
if "Popula√ß√£o 2022" in df.columns:
    data_dict["Popula√ß√£o 2022"] = 100000  # Int/long fict√≠cio (usa coluna real do seu dataset)

# Confirma: Deve ter exatamente 79 chaves
print(f"Chaves no data_dict: {len(data_dict)} (deve ser {len(df.columns)})")

# Cria Row e DataFrame com SCHEMA EXPL√çCITO (resolve infer√™ncia com NULLs)
fictitious_row = Row(**data_dict)
fictitious_df = spark.createDataFrame([fictitious_row], df.schema)  # <- CHAVE: Usa df.schema para tipos exatos
print(f"DataFrame fict√≠cio criado: {fictitious_df.count()} linha(s), {len(fictitious_df.columns)} colunas.")
print("Schema fict√≠cio (primeiras 5 colunas):")
fictitious_df.printSchema()  # Mostra tipos inferidos/for√ßados (ex: double para IPS)

# INSERT via append (adiciona ao cat√°logo Iceberg - snapshot novo criado)
fictitious_df.write.format("iceberg").mode("append").saveAsTable(table_name)  # Use saveAsTable para compatibilidade, ou insertInto

print("INSERT: Linha fict√≠cia adicionada com sucesso! (Nova snapshot criada no Iceberg).")

# Verifica√ß√£o: Top 5 linhas ordenadas por IPS (deve incluir a nova no topo)
spark.sql(f"SELECT `C√≥digo IBGE`, `Munic√≠pio`, `UF`, `√çndice de Progresso Social` FROM {table_name} ORDER BY `√çndice de Progresso Social` DESC LIMIT 5").show(truncate=False)

# Conta total (deve ser original +1 = ~5566)
total_rows = spark.sql(f'SELECT COUNT(*) FROM {table_name}').collect()[0][0]
print(f"Total de linhas ap√≥s INSERT: {total_rows}")

# Verifica√ß√£o extra: Mostra a linha fict√≠cia inserida (com Popula√ß√£o 2022)
spark.sql(f"SELECT `C√≥digo IBGE`, `Munic√≠pio`, `UF`, `√çndice de Progresso Social`, `Popula√ß√£o 2022` FROM {table_name} WHERE `C√≥digo IBGE` = 9999999").show(truncate=False)


=== Iceberg: INSERT ===
Total de colunas no DF original: 79
Coluna 'C√≥digo IBGE' existe? True
Coluna 'Munic√≠pio' existe? True
Coluna 'UF' existe? True
Coluna '√çndice de Progresso Social' existe? True
Coluna 'Popula√ß√£o 2022' existe? True
Chaves no data_dict: 79 (deve ser 79)


                                                                                

DataFrame fict√≠cio criado: 1 linha(s), 79 colunas.
Schema fict√≠cio (primeiras 5 colunas):
root
 |-- C√≥digo IBGE: integer (nullable = true)
 |-- Munic√≠pio: string (nullable = true)
 |-- UF: string (nullable = true)
 |-- √Årea (km¬≤): double (nullable = true)
 |-- Popula√ß√£o 2022: integer (nullable = true)
 |-- PIB per capita 2021: double (nullable = true)
 |-- √çndice de Progresso Social: double (nullable = true)
 |-- Necessidades Humanas B√°sicas: double (nullable = true)
 |-- Fundamentos do Bem-estar: double (nullable = true)
 |-- Oportunidades: double (nullable = true)
 |-- Nutri√ß√£o e Cuidados M√©dicos B√°sicos: double (nullable = true)
 |-- √Ågua e Saneamento: double (nullable = true)
 |-- Moradia: double (nullable = true)
 |-- Seguran√ßa Pessoal: double (nullable = true)
 |-- Acesso ao Conhecimento B√°sico: double (nullable = true)
 |-- Acesso √† Informa√ß√£o e Comunica√ß√£o: double (nullable = true)
 |-- Sa√∫de e Bem-estar: double (nullable = true)
 |-- Qualidade do Meio Am

                                                                                

INSERT: Linha fict√≠cia adicionada com sucesso! (Nova snapshot criada no Iceberg).
+-----------+-------------------+---+--------------------------+
|C√≥digo IBGE|Munic√≠pio          |UF |√çndice de Progresso Social|
+-----------+-------------------+---+--------------------------+
|9999999    |Exemplo Fict√≠cio   |XX |99.9                      |
|3516853    |Gavi√£o Peixoto (SP)|SP |74.49282395600983         |
|5300108    |Bras√≠lia (DF)      |DF |71.25189747327438         |
|3548906    |S√£o Carlos (SP)    |SP |70.96059545595044         |
|5208707    |Goi√¢nia (GO)       |GO |70.49282747376537         |
+-----------+-------------------+---+--------------------------+

Total de linhas ap√≥s INSERT: 5571
+-----------+----------------+---+--------------------------+--------------+
|C√≥digo IBGE|Munic√≠pio       |UF |√çndice de Progresso Social|Popula√ß√£o 2022|
+-----------+----------------+---+--------------------------+--------------+
|9999999    |Exemplo Fict√≠cio|XX |99.9             

In [5]:
# Verifica√ß√£o de snapshot ap√≥s INSERT
snapshots_df = spark.sql(f"SELECT snapshot_id, operation, committed_at FROM {table_name}.snapshots ORDER BY committed_at DESC LIMIT 2")
snapshots_df.show(truncate=False)

+-------------------+---------+-----------------------+
|snapshot_id        |operation|committed_at           |
+-------------------+---------+-----------------------+
|1260496121539592792|append   |2025-10-02 20:00:39.085|
|3870061069435861093|overwrite|2025-10-02 20:00:18.681|
+-------------------+---------+-----------------------+



In [6]:
# === Iceberg: UPDATE (Atualiza√ß√£o de Dados) - VERS√ÉO CORRIGIDA ===
print("\n=== Iceberg: UPDATE ===")
table_name = "iceberg_catalog.default.iceberg_ips"

# Verifica√ß√£o: Confirma se C√≥digo IBGE 3550308 (S√£o Paulo) existe no dataset
print("Verificando C√≥digo IBGE de S√£o Paulo...")
saopaulo_check = spark.sql(f"SELECT `C√≥digo IBGE`, `Munic√≠pio`, `UF`, `√çndice de Progresso Social` FROM {table_name} WHERE `Munic√≠pio` LIKE '%S√£o Paulo%' LIMIT 1")
saopaulo_check.show(truncate=False)
if saopaulo_check.count() == 0:
    print("AVISO: S√£o Paulo n√£o encontrado! Use outro munic√≠pio. Exemplo: Pegue o primeiro da tabela.")
    # Fallback: Use o primeiro munic√≠pio da tabela como exemplo
    first_city = spark.sql(f"SELECT `C√≥digo IBGE`, `Munic√≠pio`, `UF` FROM {table_name} LIMIT 1").collect()[0]
    codigo_ibge = first_city[0]
    municipio = first_city[1]
    uf = first_city[2]
    print(f"Fallback: Usando {municipio} (C√≥digo {codigo_ibge}) para UPDATE.")
else:
    row = saopaulo_check.collect()[0]
    codigo_ibge = row[0]
    municipio = row[1]
    uf = row[2]
    print(f"S√£o Paulo encontrado: C√≥digo {codigo_ibge}, {municipio}, {uf}")

# DataFrame para UPDATE: Usa o c√≥digo real encontrado e atualiza IPS para 70.5 (fict√≠cio)
update_data = [(codigo_ibge, municipio, uf, 70.5)]  # Novo IPS mais alto

# Schema program√°tico com StructType (resolve acentos - sem string DDL)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
update_schema = StructType([
    StructField("C√≥digo IBGE", IntegerType(), True),  # int, nullable
    StructField("Munic√≠pio", StringType(), True),     # string
    StructField("UF", StringType(), True),            # string
    StructField("√çndice de Progresso Social", DoubleType(), True)  # double (com acento OK!)
])
update_df = spark.createDataFrame(update_data, update_schema)
print("DataFrame de UPDATE criado com sucesso (schema program√°tico):")
update_df.show(truncate=False)  # Mostra a linha de update
print(f"Schema: {len(update_df.columns)} colunas")

# Cria temp view para o MERGE
update_df.createOrReplaceTempView("update_temp")

# UPDATE via MERGE INTO (ACID do Iceberg - atualiza baseado em C√≥digo IBGE)
spark.sql(f"""
    MERGE INTO {table_name} AS target
    USING update_temp AS source
    ON target.`C√≥digo IBGE` = source.`C√≥digo IBGE`
    WHEN MATCHED THEN 
        UPDATE SET 
            target.`√çndice de Progresso Social` = source.`√çndice de Progresso Social`,
            target.`Munic√≠pio` = source.`Munic√≠pio`,
            target.`UF` = source.`UF`
""")

print(f"UPDATE: IPS de {municipio} atualizado para 70.5! (MERGE ACID executado - nova snapshot criada).")

# Verifica√ß√£o: Mostra a linha atualizada (deve ter IPS = 70.5)
spark.sql(f"SELECT `C√≥digo IBGE`, `Munic√≠pio`, `UF`, `√çndice de Progresso Social` FROM {table_name} WHERE `C√≥digo IBGE` = {codigo_ibge}").show(truncate=False)

# Top 5 atualizado (o munic√≠pio deve subir no ranking)
spark.sql(f"SELECT `C√≥digo IBGE`, `Munic√≠pio`, `UF`, `√çndice de Progresso Social` FROM {table_name} ORDER BY `√çndice de Progresso Social` DESC LIMIT 5").show(truncate=False)

# Conta total (deve ser o mesmo, s√≥ atualizado)
total_rows = spark.sql(f'SELECT COUNT(*) FROM {table_name}').collect()[0][0]
print(f"Total de linhas ap√≥s UPDATE: {total_rows} (mesmo total, s√≥ atualizado)")


=== Iceberg: UPDATE ===
Verificando C√≥digo IBGE de S√£o Paulo...
+-----------+--------------------------+---+--------------------------+
|C√≥digo IBGE|Munic√≠pio                 |UF |√çndice de Progresso Social|
+-----------+--------------------------+---+--------------------------+
|1303908    |S√£o Paulo de Oliven√ßa (AM)|AM |48.6949936570276          |
+-----------+--------------------------+---+--------------------------+

S√£o Paulo encontrado: C√≥digo 1303908, S√£o Paulo de Oliven√ßa (AM), AM
DataFrame de UPDATE criado com sucesso (schema program√°tico):
+-----------+--------------------------+---+--------------------------+
|C√≥digo IBGE|Munic√≠pio                 |UF |√çndice de Progresso Social|
+-----------+--------------------------+---+--------------------------+
|1303908    |S√£o Paulo de Oliven√ßa (AM)|AM |70.5                      |
+-----------+--------------------------+---+--------------------------+

Schema: 4 colunas


                                                                                

UPDATE: IPS de S√£o Paulo de Oliven√ßa (AM) atualizado para 70.5! (MERGE ACID executado - nova snapshot criada).
+-----------+--------------------------+---+--------------------------+
|C√≥digo IBGE|Munic√≠pio                 |UF |√çndice de Progresso Social|
+-----------+--------------------------+---+--------------------------+
|1303908    |S√£o Paulo de Oliven√ßa (AM)|AM |70.5                      |
+-----------+--------------------------+---+--------------------------+

+-----------+--------------------------+---+--------------------------+
|C√≥digo IBGE|Munic√≠pio                 |UF |√çndice de Progresso Social|
+-----------+--------------------------+---+--------------------------+
|9999999    |Exemplo Fict√≠cio          |XX |99.9                      |
|3516853    |Gavi√£o Peixoto (SP)       |SP |74.49282395600983         |
|5300108    |Bras√≠lia (DF)             |DF |71.25189747327438         |
|3548906    |S√£o Carlos (SP)           |SP |70.96059545595044         |
|1303908  

In [7]:
# === Iceberg: Time Travel (Snapshots e Hist√≥rico) - VERS√ÉO CORRIGIDA ===
print("\n=== Iceberg: Time Travel ===")
table_name = "iceberg_catalog.default.iceberg_ips"

# Lista TODOS os snapshots (hist√≥rico de mudan√ßas - deve ter 3+: DDL, INSERT, UPDATE)
snapshots_df = spark.sql(f"SELECT * FROM {table_name}.snapshots ORDER BY committed_at DESC")
#print(f"Total de snapshots encontrados: {snapshots_df.count()} (esperado: 3+ ap√≥s DDL/INSERT/UPDATE)")
#print("Snapshots recentes (hist√≥rico completo):")
snapshots_df.show(truncate=False, n=10)  # Mostra todas as colunas e linhas (√∫til para debug)

# Select customizado com NOMES CORRETOS de colunas (parent_id, n√£o parent_snapshot_id)
#print("\nSnapshots selecionados (colunas chave):")
snapshots_df.select("snapshot_id", "parent_id", "operation", "manifest_list", "summary").show(truncate=False, n=5)

# Time Travel: Consulta vers√£o ANTES do INSERT/UPDATE (snapshot mais antigo, se existir m√∫ltiplos)
if snapshots_df.count() > 1:
    # Pega o snapshot mais antigo (√∫ltima linha ap√≥s ORDER BY DESC)
    snapshots_list = snapshots_df.collect()  # Coleta todos (poucos snapshots, OK)
    old_snapshot_id = snapshots_list[-1][1]  # snapshot_id √© a 2¬™ coluna (√≠ndice 1: committed_at=0, snapshot_id=1)
    old_operation = snapshots_list[-1][3] if len(snapshots_list[-1]) > 3 else "unknown"  # operation
    print(f"\nVers√£o ANTES das mudan√ßas (snapshot mais antigo ID {old_snapshot_id}, opera√ß√£o: {old_operation}):")
    spark.sql(f"SELECT `C√≥digo IBGE`, `Munic√≠pio`, `UF`, `√çndice de Progresso Social` FROM {table_name} VERSION AS OF {old_snapshot_id} ORDER BY `√çndice de Progresso Social` DESC LIMIT 3").show(truncate=False)
    old_count = spark.sql(f"SELECT COUNT(*) FROM {table_name} VERSION AS OF {old_snapshot_id}").collect()[0][0]
    print(f"Linhas na vers√£o antiga: {old_count} (sem INSERT/UPDATE - s√≥ dados originais do DDL)")
else:
    print("\nApenas 1 snapshot (s√≥ DDL) - pulando Time Travel antigo. Rode mais DMLs para hist√≥rico.")

# Time Travel: Vers√£o ATUAL (ap√≥s todos os DML - deve incluir INSERT fict√≠cio e UPDATE em S√£o Paulo)
print(f"\nVers√£o ATUAL (snapshot mais recente, ap√≥s DDL/INSERT/UPDATE):")
current_snapshot_id = snapshots_df.collect()[0][1]  # Primeiro da lista (mais recente)
spark.sql(f"SELECT `C√≥digo IBGE`, `Munic√≠pio`, `UF`, `√çndice de Progresso Social` FROM {table_name} VERSION AS OF {current_snapshot_id} ORDER BY `√çndice de Progresso Social` DESC LIMIT 3").show(truncate=False)
current_count = spark.sql(f'SELECT COUNT(*) FROM {table_name}').collect()[0][0]
print(f"Linhas na vers√£o atual: {current_count} (com INSERT + UPDATE aplicado)")

# Diferen√ßa: Compara contagens (deve mostrar impacto do INSERT)
if snapshots_df.count() > 1:
    diff = current_count - old_count
    print(f"Diferen√ßa de linhas (atual vs. antigo): +{diff} (devido ao INSERT; UPDATE n√£o altera contagem)")

# Opcional: Rollback para snapshot anterior (ex: desfaz UPDATE/INSERT - COMENTADO por seguran√ßa)
# if old_snapshot_id:
#     spark.sql(f"CALL iceberg_system.rollback_to_snapshot('{table_name}', {old_snapshot_id})")
#     print(f"Rollback executado para snapshot {old_snapshot_id}! (Desfaz mudan√ßas - verifique com SELECT COUNT(*))")

print("\nTime Travel completo! Iceberg mant√©m hist√≥rico imut√°vel para auditoria e rollback.")


=== Iceberg: Time Travel ===
+-----------------------+-------------------+-------------------+---------+-------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                      |summary                                                                                                                                                                                                      

In [8]:
# === Iceberg: DELETE (Remo√ß√£o de Dados) ===
print("\n=== Iceberg: DELETE ===")
table_name = "iceberg_catalog.default.iceberg_ips"

# DELETE via SQL (remove a linha fict√≠cia INSERTed e, opcionalmente, outra condi√ß√£o)
spark.sql(f"DELETE FROM {table_name} WHERE `C√≥digo IBGE` = 9999999")  # Remove a fict√≠cia
# Opcional: spark.sql(f"DELETE FROM {table_name} WHERE `UF` = 'XX'")  # Remove todas de UF fict√≠cia

print("DELETE: Linha fict√≠cia removida! (DELETE ACID executado - nova snapshot).")

# Verifica√ß√£o: Busca a linha deletada (deve retornar vazio)
spark.sql(f"SELECT `C√≥digo IBGE`, `Munic√≠pio`, `UF`, `√çndice de Progresso Social` FROM {table_name} WHERE `C√≥digo IBGE` = 9999999").show(truncate=False)

# Top 5 atualizado (fict√≠cia sumiu, S√£o Paulo ainda no topo)
spark.sql(f"SELECT `C√≥digo IBGE`, `Munic√≠pio`, `UF`, `√çndice de Progresso Social` FROM {table_name} ORDER BY `√çndice de Progresso Social` DESC LIMIT 5").show(truncate=False)

total_rows = spark.sql(f'SELECT COUNT(*) FROM {table_name}').collect()[0][0]
print(f"Total de linhas ap√≥s DELETE: {total_rows} (original -1 = ~5565)")


=== Iceberg: DELETE ===
DELETE: Linha fict√≠cia removida! (DELETE ACID executado - nova snapshot).
+-----------+---------+---+--------------------------+
|C√≥digo IBGE|Munic√≠pio|UF |√çndice de Progresso Social|
+-----------+---------+---+--------------------------+
+-----------+---------+---+--------------------------+

+-----------+--------------------------+---+--------------------------+
|C√≥digo IBGE|Munic√≠pio                 |UF |√çndice de Progresso Social|
+-----------+--------------------------+---+--------------------------+
|3516853    |Gavi√£o Peixoto (SP)       |SP |74.49282395600983         |
|5300108    |Bras√≠lia (DF)             |DF |71.25189747327438         |
|3548906    |S√£o Carlos (SP)           |SP |70.96059545595044         |
|1303908    |S√£o Paulo de Oliven√ßa (AM)|AM |70.5                      |
|5208707    |Goi√¢nia (GO)              |GO |70.49282747376537         |
+-----------+--------------------------+---+--------------------------+

Total de linhas ap

In [12]:
# === Iceberg: Time Travel (Completo: Antes/Depois de INSERT, UPDATE e DELETE) ===
print("\n=== Iceberg: Time Travel ===")
table_name = "iceberg_catalog.default.iceberg_ips"

# Lista TODOS os snapshots (hist√≥rico de mudan√ßas - ORDER BY DESC para mais recente no topo)
snapshots_df = spark.sql(f"SELECT snapshot_id, parent_id, committed_at, operation, summary, manifest_list FROM {table_name}.snapshots ORDER BY committed_at DESC")
print(f"Total de snapshots encontrados: {snapshots_df.count()} (esperado: 4+ ap√≥s DDL/INSERT/UPDATE/DELETE)")
print("Snapshots recentes (hist√≥rico completo - operation e summary chave):")
snapshots_df.show(truncate=False, n=10)  # Mostra colunas chave (√∫til para debug)

if snapshots_df.count() < 2:
    print("‚ö†Ô∏è Poucos snapshots - rode mais DMLs (INSERT/UPDATE/DELETE) para demo completa.")
else:
    # Coleta snapshots para indexa√ß√£o (poucos, OK para .collect())
    snapshots_list = snapshots_df.collect()
    
    # IDs chave (baseado em ordem DESC: [0]=mais recente (ap√≥s DELETE), [-1]=mais antigo (inicial))
    current_snapshot_id = snapshots_list[0][0]  # snapshot_id da posi√ß√£o 0 (coluna 0)
    v1_insert_id = snapshots_list[1][0] if len(snapshots_list) > 1 else None  # Ap√≥s INSERT
    v2_update_id = snapshots_list[2][0] if len(snapshots_list) > 2 else None  # Ap√≥s UPDATE (antes DELETE)
    old_snapshot_id = snapshots_list[-1][0]  # Mais antigo (inicial, DDL)
    
    # Fun√ß√£o auxiliar para query em snapshot espec√≠fico (foco em fict√≠cia + S√£o Paulo)
    def query_snapshot(snapshot_id, limit=2):
        return spark.sql(f"""
            SELECT `C√≥digo IBGE`, `Munic√≠pio`, `UF`, `√çndice de Progresso Social` 
            FROM {table_name} VERSION AS OF {snapshot_id} 
            WHERE `C√≥digo IBGE` IN (9999999, 3550308)  -- Fict√≠cia + S√£o Paulo (ajuste 3550308 se outro)
            ORDER BY `√çndice de Progresso Social` DESC
        """)

    # 1. Vers√£o Inicial (mais antigo: Ap√≥s DDL, antes INSERT/UPDATE/DELETE)
    print(f"\n1. Vers√£o INICIAL (snapshot {old_snapshot_id} - ap√≥s DDL, dados originais):")
    try:
        initial_df = query_snapshot(old_snapshot_id)
        initial_df.show(truncate=False)
        initial_count = spark.sql(f"SELECT COUNT(*) FROM {table_name} VERSION AS OF {old_snapshot_id}").collect()[0][0]
        print(f"   Linhas totais: {initial_count} (CSV original)")
        print("   Observa√ß√£o: Sem fict√≠cia (9999999); IPS original de S√£o Paulo (~66)")
    except Exception as e:
        print(f"   Erro na query inicial: {e} (snapshot pode n√£o existir)")

    # 2. Ap√≥s INSERT (com fict√≠cia adicionada, antes UPDATE/DELETE)
    if v1_insert_id:
        print(f"\n2. Ap√≥s INSERT (snapshot {v1_insert_id} - fict√≠cia adicionada):")
        try:
            insert_df = query_snapshot(v1_insert_id)
            insert_df.show(truncate=False)
            insert_count = spark.sql(f"SELECT COUNT(*) FROM {table_name} VERSION AS OF {v1_insert_id}").collect()[0][0]
            print(f"   Linhas totais: {insert_count} (+1 da fict√≠cia 9999999 com IPS 99.9 no topo)")
        except Exception as e:
            print(f"   Erro na query INSERT: {e}")
    else:
        print("\n2. Pulando ap√≥s INSERT (snapshot n√£o encontrado)")

    # 3. Ap√≥s UPDATE, Antes DELETE (fict√≠cia ainda presente, S√£o Paulo atualizado)
    if v2_update_id:
        print(f"\n3. Ap√≥s UPDATE, ANTES DELETE (snapshot {v2_update_id} - UPDATE aplicado):")
        try:
            update_df = query_snapshot(v2_update_id)
            update_df.show(truncate=False)
            update_count = spark.sql(f"SELECT COUNT(*) FROM {table_name} VERSION AS OF {v2_update_id}").collect()[0][0]
            print(f"   Linhas totais: {update_count} (mesmo que ap√≥s INSERT; IPS de S√£o Paulo = 70.5)")
            print("   Observa√ß√£o: Fict√≠cia (9999999) ainda existe; S√£o Paulo atualizado")
        except Exception as e:
            print(f"   Erro na query UPDATE: {e}")
    else:
        print("\n3. Pulando ap√≥s UPDATE (snapshot n√£o encontrado)")

    # 4. Ap√≥s DELETE (atual: fict√≠cia removida, UPDATE preservado)
    print(f"\n4. Ap√≥s DELETE (snapshot {current_snapshot_id} - atual, fict√≠cia removida):")
    try:
        delete_df = query_snapshot(current_snapshot_id)
        delete_df.show(truncate=False)
        current_count = spark.sql(f'SELECT COUNT(*) FROM {table_name}').collect()[0][0]
        print(f"   Linhas totais: {current_count} (-1 da fict√≠cia; volta ao inicial)")
        print("   Observa√ß√£o: Fict√≠cia (9999999) sumiu; S√£o Paulo mant√©m IPS 70.5 (UPDATE intacto)")
    except Exception as e:
        print(f"   Erro na query DELETE: {e}")

    # Resumo de Mudan√ßas (se snapshots suficientes)
    if len(snapshots_list) >= 4:
        print(f"\nResumo de Mudan√ßas:")
        print(f"   Inicial (snapshot {old_snapshot_id}): {initial_count} linhas")
        print(f"   Ap√≥s INSERT (snapshot {v1_insert_id}): {insert_count} linhas (+1)")
        print(f"   Ap√≥s UPDATE (snapshot {v2_update_id}): {update_count} linhas (sem mudan√ßa)")
        print(f"   Ap√≥s DELETE (snapshot {current_snapshot_id}): {current_count} linhas (-1)")

# Opcional: Rollback para snapshot antes do DELETE (ex: v2 - desfaz DELETE, fict√≠cia volta - CUIDADO!)
# if v2_update_id:
#     spark.sql(f"CALL iceberg_system.rollback_to_snapshot('{table_name}', {v2_update_id})")
#     print(f"Rollback executado para snapshot {v2_update_id}! Fict√≠cia voltou - COUNT(*): {spark.sql(f'SELECT COUNT(*) FROM {table_name}').collect()[0][0]}")
#     # Re-execute DELETE para fixar se quiser

print("\nTime Travel Iceberg completo! Hist√≥rico imut√°vel - INSERT/UPDATE/DELETE audit√°veis em snapshots espec√≠ficos.")


=== Iceberg: Time Travel ===
Total de snapshots encontrados: 4 (esperado: 4+ ap√≥s DDL/INSERT/UPDATE/DELETE)
Snapshots recentes (hist√≥rico completo - operation e summary chave):
+-------------------+-------------------+-----------------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+
|snapshot_id        |parent_id          |committed_at           |operation|summary                                                                                                                                                                    