In [1]:
import os
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
current_dir = os.getcwd()
print(current_dir)

/home/apolo/Dropbox/programacao/Udemy/2024/engenharia_de_dados_com_apache_iceberg_e_spark/00-scripts_apolo


In [3]:
dir_warehouse = f"{current_dir}/warehouse"
dir_jars = f"{current_dir}/spark-3.3.0-bin-hadoop3/jars"

In [4]:
spark = SparkSession.builder \
    .appName("IcebergWithSpark") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.hadoop_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.hadoop_catalog.type", "hadoop") \
    .config("spark.sql.catalog.hadoop_catalog.warehouse", dir_warehouse) \
    .config("spark.sql.default.catalog", "hadoop_catalog") \
    .getOrCreate()

25/01/06 21:21:19 WARN Utils: Your hostname, dell resolves to a loopback address: 127.0.1.1; using 192.168.15.6 instead (on interface wlp0s20f3)
25/01/06 21:21:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/01/06 21:21:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Criamos a tabela vendas usando Iceberg
spark.sql("""
CREATE TABLE IF NOT EXISTS hadoop_catalog.default.vendas (
    id INT,
    produto STRING,
    quantidade INT,
    preco DOUBLE,
    data_venda DATE
)
USING iceberg
""")

DataFrame[]

In [6]:
# Dados Iniciais
data_initial = [
    (1, "Produto A", 10, 15.5, "2023-11-01"),
    (2, "Produto B", 5, 22.0, "2023-11-02"),
    (3, "Produto C", 8, 30.0, "2023-11-03")
]
columns = ["id", "produto", "quantidade", "preco", "data_venda"]

df_initial = spark.createDataFrame(data_initial, columns)
df_initial = df_initial.withColumn("data_venda", F.to_date(F.col("data_venda"), "yyyy-MM-dd"))

In [7]:
# Gravamos os dados
df_initial.writeTo("hadoop_catalog.default.vendas").append()

                                                                                

In [8]:
# Visualização dos dados
spark.sql("SELECT * FROM hadoop_catalog.default.vendas").show()

+---+---------+----------+-----+----------+
| id|  produto|quantidade|preco|data_venda|
+---+---------+----------+-----+----------+
|  1|Produto A|        10| 15.5|2023-11-01|
|  2|Produto B|         5| 22.0|2023-11-02|
|  3|Produto C|         8| 30.0|2023-11-03|
+---+---------+----------+-----+----------+



In [9]:
# Lista de snapshots atuais da tabela vendas
snapshots_df = spark.sql("SELECT * FROM hadoop_catalog.default.vendas.snapshots")

snapshots_df.select("snapshot_id", "committed_at", "operation").show(truncate=False)

+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|5502038290185584988|2025-01-06 21:37:09.735|append   |
+-------------------+-----------------------+---------+



In [10]:
# Incluimos mais dados
data_additional = [
    (4, "Produto D", 12, 25.0, "2023-11-04"),
    (5, "Produto E", 7, 18.5, "2023-11-05")
]
df_additional = spark.createDataFrame(data_additional, columns)
df_additional = df_additional.withColumn("data_venda", F.to_date(F.col("data_venda"), "yyyy-MM-dd"))
df_additional.writeTo("hadoop_catalog.default.vendas").append()

In [11]:
# Visualização dos dados
spark.sql("SELECT * FROM hadoop_catalog.default.vendas order by id").show()

+---+---------+----------+-----+----------+
| id|  produto|quantidade|preco|data_venda|
+---+---------+----------+-----+----------+
|  1|Produto A|        10| 15.5|2023-11-01|
|  2|Produto B|         5| 22.0|2023-11-02|
|  3|Produto C|         8| 30.0|2023-11-03|
|  4|Produto D|        12| 25.0|2023-11-04|
|  5|Produto E|         7| 18.5|2023-11-05|
+---+---------+----------+-----+----------+



In [12]:
# List os snapshots novamente
snapshots_df = spark.sql("SELECT * FROM hadoop_catalog.default.vendas.snapshots")
snapshots_df.select("snapshot_id", "committed_at", "operation").show(truncate=False)

+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|5502038290185584988|2025-01-06 21:37:09.735|append   |
|3087817526478000336|2025-01-06 21:40:23.168|append   |
+-------------------+-----------------------+---------+



In [13]:
# Atualização de dados
spark.sql("""
UPDATE hadoop_catalog.default.vendas
SET preco = 16.0
WHERE id = 1
""")

DataFrame[]

In [14]:
# Visualização dos dados
spark.sql("SELECT * FROM hadoop_catalog.default.vendas  order by id").show()

+---+---------+----------+-----+----------+
| id|  produto|quantidade|preco|data_venda|
+---+---------+----------+-----+----------+
|  1|Produto A|        10| 16.0|2023-11-01|
|  2|Produto B|         5| 22.0|2023-11-02|
|  3|Produto C|         8| 30.0|2023-11-03|
|  4|Produto D|        12| 25.0|2023-11-04|
|  5|Produto E|         7| 18.5|2023-11-05|
+---+---------+----------+-----+----------+



In [33]:
# List os snapshots novamente
snapshots_df = spark.sql("SELECT * FROM hadoop_catalog.default.vendas.snapshots")
snapshots_df.select("snapshot_id", "committed_at", "operation").orderBy("committed_at").show(truncate=False)

+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|5502038290185584988|2025-01-06 21:37:09.735|append   |
|3087817526478000336|2025-01-06 21:40:23.168|append   |
|686919947402139456 |2025-01-06 21:42:25.739|overwrite|
|2392243818373974793|2025-01-06 21:43:03.196|delete   |
+-------------------+-----------------------+---------+



In [16]:
# Excluímos dados
spark.sql("""
DELETE FROM hadoop_catalog.default.vendas
WHERE id = 2
""")

DataFrame[]

In [17]:
# Visualização dos dados
spark.sql("SELECT * FROM hadoop_catalog.default.vendas order by id").show()

+---+---------+----------+-----+----------+
| id|  produto|quantidade|preco|data_venda|
+---+---------+----------+-----+----------+
|  1|Produto A|        10| 16.0|2023-11-01|
|  3|Produto C|         8| 30.0|2023-11-03|
|  4|Produto D|        12| 25.0|2023-11-04|
|  5|Produto E|         7| 18.5|2023-11-05|
+---+---------+----------+-----+----------+



In [34]:
# List os snapshots novamente
snapshots_df = spark.sql("SELECT * FROM hadoop_catalog.default.vendas.snapshots")
snapshots_df.select("snapshot_id", "committed_at", "operation").orderBy(F.col("committed_at").asc()).show(truncate=False)

+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|5502038290185584988|2025-01-06 21:37:09.735|append   |
|3087817526478000336|2025-01-06 21:40:23.168|append   |
|686919947402139456 |2025-01-06 21:42:25.739|overwrite|
|2392243818373974793|2025-01-06 21:43:03.196|delete   |
+-------------------+-----------------------+---------+



### Time Travel por ID do snapshot

In [35]:
# lista de snapshost ordenados pela data de commit
snapshot_ids = spark.sql("""
SELECT snapshot_id
FROM hadoop_catalog.default.vendas.snapshots
ORDER BY committed_at ASC
""").collect()

print(snapshot_ids)

[Row(snapshot_id=5502038290185584988), Row(snapshot_id=3087817526478000336), Row(snapshot_id=686919947402139456), Row(snapshot_id=2392243818373974793)]


In [36]:
# recuperamos o estado no primeiro snapshot
first_snapshot_id = snapshot_ids[0].snapshot_id
print(f"Data at Snapshot ID {first_snapshot_id}:")

spark.read \
    .option("snapshot-id", first_snapshot_id) \
    .table("hadoop_catalog.default.vendas") \
    .show()

Data at Snapshot ID 5502038290185584988:
+---+---------+----------+-----+----------+
| id|  produto|quantidade|preco|data_venda|
+---+---------+----------+-----+----------+
|  1|Produto A|        10| 15.5|2023-11-01|
|  2|Produto B|         5| 22.0|2023-11-02|
|  3|Produto C|         8| 30.0|2023-11-03|
+---+---------+----------+-----+----------+



In [37]:
# recuperamos o estado no segundo snapshot
first_snapshot_id = snapshot_ids[1].snapshot_id
print(f"Data at Snapshot ID {first_snapshot_id}:")

spark.read \
    .option("snapshot-id", first_snapshot_id) \
    .table("hadoop_catalog.default.vendas") \
    .show()

Data at Snapshot ID 3087817526478000336:
+---+---------+----------+-----+----------+
| id|  produto|quantidade|preco|data_venda|
+---+---------+----------+-----+----------+
|  1|Produto A|        10| 15.5|2023-11-01|
|  4|Produto D|        12| 25.0|2023-11-04|
|  2|Produto B|         5| 22.0|2023-11-02|
|  5|Produto E|         7| 18.5|2023-11-05|
|  3|Produto C|         8| 30.0|2023-11-03|
+---+---------+----------+-----+----------+



### Time Travel por timestamp

In [38]:
# Recupera os timestamps de committed
snapshot_timestamps = spark.sql("""
SELECT committed_at
FROM hadoop_catalog.default.vendas.snapshots
ORDER BY committed_at ASC
""").collect()

[print(item) for item in snapshot_timestamps]

Row(committed_at=datetime.datetime(2025, 1, 6, 21, 37, 9, 735000))
Row(committed_at=datetime.datetime(2025, 1, 6, 21, 40, 23, 168000))
Row(committed_at=datetime.datetime(2025, 1, 6, 21, 42, 25, 739000))
Row(committed_at=datetime.datetime(2025, 1, 6, 21, 43, 3, 196000))


[None, None, None, None]

In [39]:
# Verificamos o timestamp do segundo snapshot
second_snapshot_timestamp = snapshot_timestamps[1].committed_at
print(second_snapshot_timestamp)

2025-01-06 21:40:23.168000


In [40]:
# Convertemos o timestamp em milesegundos para recuperar o segundo snapshot
timestamp_ms = int(second_snapshot_timestamp.timestamp() * 1000)
print(f"Data as of {second_snapshot_timestamp}:")

spark.read \
    .option("as-of-timestamp", timestamp_ms) \
    .table("hadoop_catalog.default.vendas") \
    .show()

Data as of 2025-01-06 21:40:23.168000:
+---+---------+----------+-----+----------+
| id|  produto|quantidade|preco|data_venda|
+---+---------+----------+-----+----------+
|  4|Produto D|        12| 25.0|2023-11-04|
|  1|Produto A|        10| 15.5|2023-11-01|
|  2|Produto B|         5| 22.0|2023-11-02|
|  5|Produto E|         7| 18.5|2023-11-05|
|  3|Produto C|         8| 30.0|2023-11-03|
+---+---------+----------+-----+----------+



### Time Travel usando SQL

In [30]:
# Consulta usando ID do snapshot
print(f"Data at Snapshot ID {first_snapshot_id} using SQL:")

spark.sql(f"""
SELECT * FROM hadoop_catalog.default.vendas
VERSION AS OF {first_snapshot_id}
""").show()

Data at Snapshot ID 3087817526478000336 using SQL:
+---+---------+----------+-----+----------+
| id|  produto|quantidade|preco|data_venda|
+---+---------+----------+-----+----------+
|  1|Produto A|        10| 15.5|2023-11-01|
|  4|Produto D|        12| 25.0|2023-11-04|
|  5|Produto E|         7| 18.5|2023-11-05|
|  2|Produto B|         5| 22.0|2023-11-02|
|  3|Produto C|         8| 30.0|2023-11-03|
+---+---------+----------+-----+----------+



In [31]:
# Usando sql com timestamp
timestamp_str = second_snapshot_timestamp.strftime("%Y-%m-%d %H:%M:%S")
print(timestamp_str)
print(f"Data as of '{timestamp_str}' using SQL:")

spark.sql(f"""
SELECT * FROM hadoop_catalog.default.vendas
TIMESTAMP AS OF '{timestamp_str}'
""").show()

2025-01-06 21:40:23
Data as of '2025-01-06 21:40:23' using SQL:
+---+---------+----------+-----+----------+
| id|  produto|quantidade|preco|data_venda|
+---+---------+----------+-----+----------+
|  1|Produto A|        10| 15.5|2023-11-01|
|  2|Produto B|         5| 22.0|2023-11-02|
|  3|Produto C|         8| 30.0|2023-11-03|
+---+---------+----------+-----+----------+



### Definindo Expiração de Snapshots

In [41]:
# Definimos expiração de snapshots para mais de 1 minuto
expire_timestamp = datetime.now() - timedelta(minutes=1)
expire_timestamp_str = expire_timestamp.strftime("%Y-%m-%d %H:%M:%S")

print(expire_timestamp_str)

2025-01-06 22:04:24


In [42]:
# executa expireSnapshots
expiration_result = spark.sql(f"""
CALL hadoop_catalog.system.expire_snapshots(
    table => 'default.vendas',
    older_than => TIMESTAMP '{expire_timestamp_str}',
    retain_last => 1
)
""")

# Exibe os resultaods
expiration_result.show(truncate=False)

ANTLR Tool version 4.9.3 used for code generation does not match the current runtime version 4.8ANTLR Runtime version 4.9.3 used for parser compilation does not match the current runtime version 4.8ANTLR Tool version 4.9.3 used for code generation does not match the current runtime version 4.8ANTLR Runtime version 4.9.3 used for parser compilation does not match the current runtime version 4.8

+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+
|deleted_data_files_count|deleted_position_delete_files_count|deleted_equality_delete_files_count|deleted_manifest_files_count|deleted_manifest_lists_count|deleted_statistics_files_count|
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+
|2                       |0                                  |0                                  |2                           |3                           |0                             |
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+



In [43]:
# mostra snapshots ativos
snapshots_df = spark.sql("SELECT * FROM hadoop_catalog.default.vendas.snapshots")
snapshots_df.select("snapshot_id", "committed_at", "operation").show(truncate=False)

+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|2392243818373974793|2025-01-06 21:43:03.196|delete   |
+-------------------+-----------------------+---------+



In [44]:
# Exclui tabela
spark.sql("DROP TABLE hadoop_catalog.default.vendas")

DataFrame[]