<!-- Teste de Time Travel em Delta Lake -->
# <font color='Red'>Delta Lake</font>
## <font color='blue'>Manipulação de dados em Delta Lake</font>
## <font color='blue'>Time Travel</font>
### <font color='Green'>Data Lakehouse Time Travel com Apache Spark e Delta Lake</font>

## Criando a Sessão Spark e a Tabela Delta

In [1]:
# Imports
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from delta.tables import DeltaTable
from pyspark.sql.functions import col, to_json, lit, collect_list, size, avg

In [2]:
if SparkSession._instantiatedSession is not None:
    SparkSession._instantiatedSession.stop()

In [3]:
# Configuração do Spark com Delta Lake
builder = SparkSession.builder \
    .appName("deltalake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.debug.maxToStringFields", "5000")  

In [4]:
    spark = configure_spark_with_delta_pip(builder).getOrCreate()

Picked up JAVA_TOOL_OPTIONS: -Djdk.internal.platform.cgroupv2.enable=false
Picked up JAVA_TOOL_OPTIONS: -Djdk.internal.platform.cgroupv2.enable=false


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9f659548-9e4e-4f8e-b4fe-78315a63222b;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.1 in central
	found io.delta#delta-storage;3.2.1 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 74ms :: artifacts dl 3ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.1 from central in [default]
	io.delta#delta-storage;3.2.1 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0

In [5]:
# Define o nível de log
spark.sparkContext.setLogLevel("ERROR")

A tabela Delta, usada no contexto do Delta Lake, é um tipo de tabela baseado no formato de armazenamento Delta, desenvolvido pela Databricks. Esse formato combina as vantagens dos Data Lakes (armazenamento escalável e econômico) com as funcionalidades tradicionais de um Data Warehouse (transações ACID, versionamento e consultas rápidas).

In [6]:
# Caminho da tabela Delta
delta_table_path = "/repositorio/delta-table"

## Armazenamento Inicial no Data Lakehouse

In [7]:
# Nome do arquivo
csv_file_path = "dados_iniciais.csv"

In [8]:
# Ler os dados do arquivo CSV para um DataFrame
print("Carregando dados iniciais do arquivo CSV...")
sil_dados = spark.read.csv(csv_file_path, header = True, inferSchema = True)

Carregando dados iniciais do arquivo CSV...


In [9]:
# Gravar dados na tabela Delta
sil_dados.write.format("delta").mode("overwrite").save(delta_table_path)

## Manipulando os Dados no Data Lakehouse

In [10]:
# Leitura dos dados
print("Dados iniciais:")
spark.read.format("delta").load(delta_table_path).show()

Dados iniciais:
+---+--------+-----+-------------------+
| id|    nome|idade|             funcao|
+---+--------+-----+-------------------+
|  1|   Lucas|   30| Cientista de Dados|
|  2|   Bruno|   18|  Analista de Dados|
|  3| Mariana|   35| Arquiteto de Dados|
|  4|Fernando|   40|Engenheiro de Dados|
|  5| Gabriel|   28|   Engenheiro de IA|
|  6|  Camila|   50|   Engenheiro de ML|
|  7|  Amanda|   29| Engenheiro DataOps|
|  8| Juliano|   43| Arquiteto de Dados|
|  9| Gustavo|   56| Arquiteto de Dados|
| 10| Vanessa|   31|  Analista de Dados|
+---+--------+-----+-------------------+



In [11]:
# Atualizando idade e função de um funcionário
delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.update(
    condition = "nome = 'Lucas'",
    set = {"idade": "32", "funcao": "'Gerente de Data Science'"}
)
print("Após atualização de Lucas:")
delta_table.toDF().show(truncate=False)

Após atualização de Lucas:
+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|2  |Bruno   |18   |Analista de Dados      |
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|5  |Gabriel |28   |Engenheiro de IA       |
|6  |Camila  |50   |Engenheiro de ML       |
|7  |Amanda  |29   |Engenheiro DataOps     |
|8  |Juliano |43   |Arquiteto de Dados     |
|9  |Gustavo |56   |Arquiteto de Dados     |
|10 |Vanessa |31   |Analista de Dados      |
+---+--------+-----+-----------------------+



In [12]:
# Removendo registros com idade menor ou igual a 30
delta_table.delete(condition = "idade <= 30")
print("Após remoção de funcionários com idade <= 30:")
delta_table.toDF().show(truncate=False)

Após remoção de funcionários com idade <= 30:
+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|6  |Camila  |50   |Engenheiro de ML       |
|8  |Juliano |43   |Arquiteto de Dados     |
|9  |Gustavo |56   |Arquiteto de Dados     |
|10 |Vanessa |31   |Analista de Dados      |
+---+--------+-----+-----------------------+



In [13]:
# Inserindo múltiplos novos registros
new_employees = spark.createDataFrame([
    (11, "Leonardo", 27, "Analytics Engineer"),
    (12, "Felipe", 31, "Analytics Engineer"),
    (13, "Paula", 26, "Engenheiro de Dados"),
    (14, "Melissa", 26, "Cientista de Dados")
], ["id", "nome", "idade", "funcao"])

delta_table.alias("existingData").merge(
    new_employees.alias("newData"),
    "existingData.id = newData.id"
).whenNotMatchedInsertAll().execute()

print("Após inserção de novos registros:")
delta_table.toDF().show(truncate=False)

Após inserção de novos registros:
+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|6  |Camila  |50   |Engenheiro de ML       |
|8  |Juliano |43   |Arquiteto de Dados     |
|9  |Gustavo |56   |Arquiteto de Dados     |
|10 |Vanessa |31   |Analista de Dados      |
|11 |Leonardo|27   |Analytics Engineer     |
|14 |Melissa |26   |Cientista de Dados     |
|12 |Felipe  |31   |Analytics Engineer     |
|13 |Paula   |26   |Engenheiro de Dados    |
+---+--------+-----+-----------------------+



## UPSERT - Inserir ou Atualizar (Merge) Dados Existentes e Novos

In [14]:
# UPSERT - Inserir ou Atualizar (Merge) dados existentes e novos
upsert_data = spark.createDataFrame([
    (3, "Mariana", 36, "Arquiteto de Dados"),  # Atualizar idade da Mariana
    (15, "Tales", 24, "Arquiteto RPA")         # Novo registro
], ["id", "nome", "idade", "funcao"])

delta_table.alias("oldData").merge(
    upsert_data.alias("upsertData"),
    "oldData.id = upsertData.id"
).whenMatchedUpdate(set={
    "idade": "upsertData.idade",
    "funcao": "upsertData.funcao"
}).whenNotMatchedInsertAll().execute()

print("Após upsert (atualização/inserção):")
delta_table.toDF().show(truncate=False)

Após upsert (atualização/inserção):
+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|3  |Mariana |36   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|6  |Camila  |50   |Engenheiro de ML       |
|8  |Juliano |43   |Arquiteto de Dados     |
|9  |Gustavo |56   |Arquiteto de Dados     |
|10 |Vanessa |31   |Analista de Dados      |
|15 |Tales   |24   |Arquiteto RPA          |
|11 |Leonardo|27   |Analytics Engineer     |
|14 |Melissa |26   |Cientista de Dados     |
|12 |Felipe  |31   |Analytics Engineer     |
|13 |Paula   |26   |Engenheiro de Dados    |
+---+--------+-----+-----------------------+



## Histórico de Alterações (Time Travel)

In [15]:
# Caminho para a tabela Delta
delta_table = DeltaTable.forPath(spark, delta_table_path)

# Obter o histórico completo
history_df = delta_table.history()

# Contar o número de versões
num_versions = history_df.count()
print(f"A tabela tem {num_versions} versões.")

A tabela tem 5 versões.


In [16]:
# Acessar a versão mais antiga da tabela (versão 0)
print("Versão inicial da tabela:")
old_version = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
old_version.show(truncate=False)

Versão inicial da tabela:
+---+--------+-----+-------------------+
|id |nome    |idade|funcao             |
+---+--------+-----+-------------------+
|1  |Lucas   |30   |Cientista de Dados |
|2  |Bruno   |18   |Analista de Dados  |
|3  |Mariana |35   |Arquiteto de Dados |
|4  |Fernando|40   |Engenheiro de Dados|
|5  |Gabriel |28   |Engenheiro de IA   |
|6  |Camila  |50   |Engenheiro de ML   |
|7  |Amanda  |29   |Engenheiro DataOps |
|8  |Juliano |43   |Arquiteto de Dados |
|9  |Gustavo |56   |Arquiteto de Dados |
|10 |Vanessa |31   |Analista de Dados  |
+---+--------+-----+-------------------+



In [17]:
print("Versão 0 da tabela:")
version_0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
version_0.show(truncate=False)

print("Versão 1 da tabela:")
version_1 = spark.read.format("delta").option("versionAsOf", 1).load(delta_table_path)
version_1.show(truncate=False)

print("Versão 4 da tabela:")
version_4 = spark.read.format("delta").option("versionAsOf", 4).load(delta_table_path)
version_4.show(truncate=False)


Versão 0 da tabela:
+---+--------+-----+-------------------+
|id |nome    |idade|funcao             |
+---+--------+-----+-------------------+
|1  |Lucas   |30   |Cientista de Dados |
|2  |Bruno   |18   |Analista de Dados  |
|3  |Mariana |35   |Arquiteto de Dados |
|4  |Fernando|40   |Engenheiro de Dados|
|5  |Gabriel |28   |Engenheiro de IA   |
|6  |Camila  |50   |Engenheiro de ML   |
|7  |Amanda  |29   |Engenheiro DataOps |
|8  |Juliano |43   |Arquiteto de Dados |
|9  |Gustavo |56   |Arquiteto de Dados |
|10 |Vanessa |31   |Analista de Dados  |
+---+--------+-----+-------------------+

Versão 1 da tabela:
+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|2  |Bruno   |18   |Analista de Dados      |
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|5  |Gabriel |28   |Engenheiro de IA       |
|6  |Camila  |50   |Engen

In [18]:
# Adicionar uma coluna que identifica a versão
version_0 = version_0.withColumn("versao", lit(0))
version_4 = version_4.withColumn("versao", lit(4))

# Unir as duas versões
changes = version_0.union(version_4)

In [19]:
# Mostrar as diferenças em relação ao nome
print("Alterações entre versões:")
changes.groupBy("id", "nome") \
       .agg(collect_list("versao").alias("versoes")) \
       .filter(size("versoes") == 1) \
       .show(truncate=False)

Alterações entre versões:
+---+--------+-------+
|id |nome    |versoes|
+---+--------+-------+
|7  |Amanda  |[0]    |
|5  |Gabriel |[0]    |
|2  |Bruno   |[0]    |
|15 |Tales   |[4]    |
|11 |Leonardo|[4]    |
|14 |Melissa |[4]    |
|12 |Felipe  |[4]    |
|13 |Paula   |[4]    |
+---+--------+-------+



In [20]:
# Mostrar as diferenças em relação a idade
print("Alterações entre versões:")
changes.groupBy("id", "idade") \
       .agg(collect_list("versao").alias("versoes")) \
       .filter(size("versoes") == 1) \
       .show(truncate=False)

Alterações entre versões:
+---+-----+-------+
|id |idade|versoes|
+---+-----+-------+
|5  |28   |[0]    |
|3  |35   |[0]    |
|2  |18   |[0]    |
|1  |30   |[0]    |
|7  |29   |[0]    |
|15 |24   |[4]    |
|1  |32   |[4]    |
|3  |36   |[4]    |
|11 |27   |[4]    |
|14 |26   |[4]    |
|12 |31   |[4]    |
|13 |26   |[4]    |
+---+-----+-------+



In [21]:
# Calcular a média de idade por versão
avg_ages = version_0.union(version_4) \
    .groupBy("versao") \
    .agg(avg("idade").alias("media_idade"))

# Mostrar a diferença de média de idade entre as versões
print("Diferença de média de idade entre versões:")
avg_ages.show()

Diferença de média de idade entre versões:
+------+------------------+
|versao|       media_idade|
+------+------------------+
|     0|              36.0|
|     4|35.166666666666664|
+------+------------------+



In [22]:
# Carrega a tabela delta
delta_table = DeltaTable.forPath(spark, delta_table_path)

# Carregar o histórico de alterações da tabela Delta
history = delta_table.history()

# Selecionar apenas as colunas relevantes
formatted_history = history.select(
    col("version").alias("Versão"),
    col("operation").alias("Operação"),
    col("operationMetrics").alias("Métricas"),
    col("userMetadata").alias("Metadados do Usuário")
)

# Mostrar as alterações 
print("Histórico de alterações da tabela Delta (formatado):")
formatted_history.show(truncate=False)

Histórico de alterações da tabela Delta (formatado):
+------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+
|Versão|Operação|Métricas                                                                                                                                                                                                                                                             

In [23]:
# Se você quiser exibir apenas operações específicas (por exemplo, UPDATE), pode usar .filter():
filtered_history = formatted_history.filter(col("Operação") == "UPDATE")
filtered_history.show(truncate=False)

+------+--------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+
|Versão|Operação|Métricas                                                                                                                                                                                                                                                                                                                    |Metadados do Usuário|
+------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [24]:
# Carregar o histórico de alterações da tabela Delta
history = delta_table.history()

# Selecionar e formatar as colunas
formatted_history = history.select(
    col("version").alias("Versão"),
    col("operation").alias("Operação"),
    to_json(col("operationMetrics")).alias("Métricas"),  # Converter MAP para JSON, para conseguir salvar em CSV
    col("userMetadata").alias("Metadados do Usuário")
)

# Salvar o histórico formatado em CSV
output_path = "output"
formatted_history.write.format("csv").option("header", "true").save(output_path)

print(f"Histórico salvo em: {output_path}")

Histórico salvo em: output


Embora não haja um comando direto de rollback no Delta Lake, você pode sobrescrever uma nova versão com os dados de uma versão anterior sem perder todo o histórico:

In [25]:
# Consultar uma versão antiga (versão 2)
spark.read.format("delta").option("versionAsOf", 2).load(delta_table_path).show(truncate=False)

+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|6  |Camila  |50   |Engenheiro de ML       |
|8  |Juliano |43   |Arquiteto de Dados     |
|9  |Gustavo |56   |Arquiteto de Dados     |
|10 |Vanessa |31   |Analista de Dados      |
+---+--------+-----+-----------------------+



In [26]:
# Carregar a versão 2
old_version = spark.read.format("delta").option("versionAsOf", 2).load(delta_table_path)

In [27]:
# Sobrescrever a tabela principal com a versão 2
# Isso vai gerar uma cópia da versão 2 que será agora a versão principal. 
old_version.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(delta_table_path)

In [28]:
# Verificar os dados sobrescritos
spark.read.format("delta").load(delta_table_path).show(truncate=False)

+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|6  |Camila  |50   |Engenheiro de ML       |
|8  |Juliano |43   |Arquiteto de Dados     |
|9  |Gustavo |56   |Arquiteto de Dados     |
|10 |Vanessa |31   |Analista de Dados      |
+---+--------+-----+-----------------------+



In [29]:
# Caminho para a tabela Delta
delta_table = DeltaTable.forPath(spark, delta_table_path)

# Obter o histórico completo
history_df = delta_table.history()

# Contar o número de versões
num_versions = history_df.count()
print(f"A tabela tem {num_versions} versões.")

A tabela tem 6 versões.


In [33]:
print("Versão 0 da tabela:")
version_0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
version_0.show(truncate=False)

print("Versão 1 da tabela:")
version_1 = spark.read.format("delta").option("versionAsOf", 1).load(delta_table_path)
version_1.show(truncate=False)

print("Versão 2 da tabela:")
version_2 = spark.read.format("delta").option("versionAsOf", 2).load(delta_table_path)
version_2.show(truncate=False)

print("Versão 3 da tabela:")
version_3 = spark.read.format("delta").option("versionAsOf", 3).load(delta_table_path)
version_3.show(truncate=False)

print("Versão 4 da tabela:")
version_4 = spark.read.format("delta").option("versionAsOf", 4).load(delta_table_path)
version_4.show(truncate=False)

print("Versão 5 da tabela:")
version_5 = spark.read.format("delta").option("versionAsOf", 5).load(delta_table_path)
version_5.show(truncate=False)


Versão 0 da tabela:
+---+--------+-----+-------------------+
|id |nome    |idade|funcao             |
+---+--------+-----+-------------------+
|1  |Lucas   |30   |Cientista de Dados |
|2  |Bruno   |18   |Analista de Dados  |
|3  |Mariana |35   |Arquiteto de Dados |
|4  |Fernando|40   |Engenheiro de Dados|
|5  |Gabriel |28   |Engenheiro de IA   |
|6  |Camila  |50   |Engenheiro de ML   |
|7  |Amanda  |29   |Engenheiro DataOps |
|8  |Juliano |43   |Arquiteto de Dados |
|9  |Gustavo |56   |Arquiteto de Dados |
|10 |Vanessa |31   |Analista de Dados  |
+---+--------+-----+-------------------+

Versão 1 da tabela:
+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|2  |Bruno   |18   |Analista de Dados      |
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|5  |Gabriel |28   |Engenheiro de IA       |
|6  |Camila  |50   |Engen

Considerações:

- A operação de sobrescrita cria uma nova versão na tabela Delta. Dados atuais ainda estarão no histórico, mas os dados sobrescritos substituem a visão principal da tabela.

- Certifique-se de que o esquema da versão antiga é compatível com o esquema atual. Caso contrário, pode ser necessário habilitar a opção overwriteSchema.

- Em ambientes críticos, prefira corrigir os dados com operações como MERGE ou UPDATE em vez de sobrescrever diretamente.

## Aplicando o VACUUM

Por padrão, o Delta Lake impõe uma retenção mínima de 7 dias para garantir que operações como time travel ainda sejam possíveis e para evitar exclusão acidental de dados necessários para transações. Se você quiser reduzir esse período, será necessário modificar a configuração de retenção mínima.

Você não poderá acessar versões anteriores além do período de retenção configurado.

In [34]:
# Desativar temporariamente a proteção para retenção mínima
spark.sql("SET spark.databricks.delta.retentionDurationCheck.enabled = false")

DataFrame[key: string, value: string]

In [36]:
# Executar VACUUM com retenção de 1 dia
print("Executando vacuum com retenção de 1 dia...")
delta_table.vacuum(retentionHours=1)

Executando vacuum com retenção de 1 dia...


                                                                                

Deleted 4 files and directories in a total of 1 directories.


DataFrame[]

In [38]:
# Caminho para a tabela Delta
delta_table = DeltaTable.forPath(spark, delta_table_path)

# Obter o histórico completo
history_df = delta_table.history()

# Contar o número de versões
num_versions = history_df.count()
print(f"A tabela tem {num_versions} versões.")

A tabela tem 10 versões.


In [48]:
# Carrega a tabela delta
delta_table = DeltaTable.forPath(spark, delta_table_path)

# Carregar o histórico de alterações da tabela Delta
history = delta_table.history()

# Selecionar apenas as colunas relevantes
formatted_history = history.select(
    col("version").alias("Versão"),
    col("operation").alias("Operação"),
    col("operationMetrics").alias("Métricas"),
    col("userMetadata").alias("Metadados do Usuário")
)

# Mostrar as alterações 
print("Histórico de alterações da tabela Delta (formatado):")
formatted_history.show(truncate=False)

Histórico de alterações da tabela Delta (formatado):
+------+------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+
|Versão|Operação    |Métricas                                                                                                                                                                                                                                                     

In [47]:
'''print("Versão 0 da tabela:")
version_0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
version_0.show(truncate=False)

print("Versão 1 da tabela:")
version_1 = spark.read.format("delta").option("versionAsOf", 1).load(delta_table_path)
version_1.show(truncate=False)

print("Versão 2 da tabela:")
version_2 = spark.read.format("delta").option("versionAsOf", 2).load(delta_table_path)
version_2.show(truncate=False)

print("Versão 3 da tabela:")
version_3 = spark.read.format("delta").option("versionAsOf", 3).load(delta_table_path)
version_3.show(truncate=False)'''

print("Versão 4 da tabela:")
version_4 = spark.read.format("delta").option("versionAsOf", 4).load(delta_table_path)
version_4.show(truncate=False)

print("Versão 5 da tabela:")
version_5 = spark.read.format("delta").option("versionAsOf", 5).load(delta_table_path)
version_5.show(truncate=False)

print("Versão 6 da tabela:")
version_6 = spark.read.format("delta").option("versionAsOf", 6).load(delta_table_path)
version_6.show(truncate=False)

print("Versão 7 da tabela:")
version_7 = spark.read.format("delta").option("versionAsOf", 7).load(delta_table_path)
version_7.show(truncate=False)

print("Versão 9 da tabela:")
version_9 = spark.read.format("delta").option("versionAsOf", 9).load(delta_table_path)
version_9.show(truncate=False)

Versão 4 da tabela:
+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|3  |Mariana |36   |Arquiteto de Dados     |
|4  |Fernando|40   |Engenheiro de Dados    |
|6  |Camila  |50   |Engenheiro de ML       |
|8  |Juliano |43   |Arquiteto de Dados     |
|9  |Gustavo |56   |Arquiteto de Dados     |
|10 |Vanessa |31   |Analista de Dados      |
|15 |Tales   |24   |Arquiteto RPA          |
|11 |Leonardo|27   |Analytics Engineer     |
|14 |Melissa |26   |Cientista de Dados     |
|12 |Felipe  |31   |Analytics Engineer     |
|13 |Paula   |26   |Engenheiro de Dados    |
+---+--------+-----+-----------------------+

Versão 5 da tabela:
+---+--------+-----+-----------------------+
|id |nome    |idade|funcao                 |
+---+--------+-----+-----------------------+
|1  |Lucas   |32   |Gerente de Data Science|
|3  |Mariana |35   |Arquiteto de Dados     |
|4  |Fernando|

In [39]:
# Reativar a proteção para retenção mínima
spark.sql("SET spark.databricks.delta.retentionDurationCheck.enabled = true")

DataFrame[key: string, value: string]

Se o VACUUM foi executado com um período curto de retenção, versões mais antigas podem ter sido excluídas e não estarão disponíveis no histórico.


In [49]:
# Finaliza a sessão Spark
spark.stop()

# Fim