In [1]:
# global imports
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
from delta import *

## Funções para facilitar os exemplos

In [2]:
def get_spark_session():
    """
    Create and configure a SparkSession with Delta Lake support.
    
    This function initializes a local Spark session with Delta Lake extensions enabled,
    which allows working with Delta tables. The session is configured to use all
    available cores on the local machine.

    The configuration includes:
    - Delta Lake SQL extensions
    - Delta Lake catalog implementation
    - Automatic handling of Delta Lake dependencies via Maven

    Returns:
    --------
    pyspark.sql.SparkSession
        A configured SparkSession object with Delta Lake support enabled.

    Examples:
    ---------
    >>> spark = get_spark_session()
    >>> df = spark.range(10)  # Create a test DataFrame
    >>> df.show()
    
    Notes:
    ------
    - Requires PySpark and delta-spark packages to be installed
    - Uses local mode with all available cores ('local[*]')
    - The session should be stopped using spark.stop() when no longer needed
    """
    builder = (
        pyspark.sql.SparkSession.builder.master("local[*]")
            .config(
                "spark.sql.extensions", 
                "io.delta.sql.DeltaSparkSessionExtension"
            )
            .config(
                "spark.sql.catalog.spark_catalog", 
                "org.apache.spark.sql.delta.catalog.DeltaCatalog"
            )
    )

    spark = configure_spark_with_delta_pip(builder).getOrCreate()

    return spark

In [3]:
def get_products(spark, data=None, total_rows=2000, total_customers=20, total_years=5, start_year=2020):
    """
    Generate a PySpark DataFrame with synthetic sales data.
    
    This function creates a realistic sales dataset with products, customers, and transaction details,
    which can be used for testing, demonstrations, or data analysis exercises.

    Parameters:
    -----------
    spark : pyspark.sql.SparkSession
        The active SparkSession instance required to create DataFrames.
        
    data : dict, optional
        Dictionary containing custom data definitions with the following keys:
        - 'products': Nested dictionary of product categories and items
        - 'cities': List of cities
        - 'payment_types': List of payment methods
        If None (default), uses built-in sample data.
        
    total_rows : int, optional
        Total number of sales records to generate (default: 2000).
        
    total_customers : int, optional
        Number of distinct customers to generate (default: 20).
        
    total_years : int, optional
        Number of years to spread the sales dates across (default: 5).
        
    start_year : int, optional
        Base year for sales date generation (default: 2020).

    Returns:
    --------
    pyspark.sql.DataFrame
        A DataFrame with the following schema:
        - id_sale: Integer (unique sale ID)
        - customer: String (customer name)
        - product: String (product name)
        - category: String (product category)
        - quantity: Integer (quantity sold)
        - price: Double (unit price)
        - date: Date (sale date)
        - city: String (sale city)
        - payment: String (payment method)

    Examples:
    ---------
    >>> from pyspark.sql import SparkSession
    >>> spark = SparkSession.builder.getOrCreate()
    >>> sales_df = get_products(spark, total_rows=1000)
    >>> sales_df.show(5)
    
    >>> custom_data = {
    ...     "products": {"Food": ["Apple", "Bread"], "Drinks": ["Water"]},
    ...     "cities": ["New York", "Chicago"],
    ...     "payment_types": ["Cash", "Credit"]
    ... }
    >>> custom_sales = get_products(spark, data=custom_data)
    """
    import random
    from datetime import datetime, timedelta
    import pyspark.sql.types as T
    
    # Initialize default data if none provided
    if data is None:
        products = {
            "Eletrônicos": ["Notebook", "Smartphone", "TV", "Fone de Ouvido", "Monitor"],
            "Móveis": ["Mesa", "Cadeira"],
            "Informática": ["Teclado", "Mouse"]
        }
        cities = ["São Paulo", "Rio de Janeiro", "Belo Horizonte", "Porto Alegre", 
                 "Curitiba", "Recife", "Salvador", "Fortaleza"]
        payment_types = ["Cartão de Crédito", "Boleto", "PIX", "Cartão de Débito"]
    else:
        products = data["products"]
        cities = data["cities"]
        payment_types = data["payment_types"]

    # Flatten product categories into item:category dictionary
    products = {item: category for category, items in products.items() for item in items}

    # Define DataFrame schema
    schema = T.StructType([
        T.StructField("id_sale", T.IntegerType(), nullable=False),
        T.StructField("customer", T.StringType(), nullable=True),
        T.StructField("product", T.StringType(), nullable=True),
        T.StructField("category", T.StringType(), nullable=True),
        T.StructField("qty", T.IntegerType(), nullable=True),
        T.StructField("price", T.DoubleType(), nullable=True),
        T.StructField("date", T.DateType(), nullable=True),
        T.StructField("city", T.StringType(), nullable=True),
        T.StructField("payment", T.StringType(), nullable=True)
    ])
    
    # Generate customer IDs
    customers = [f"customer_{i}" for i in range(1, total_customers + 1)]  
    
    # Generate synthetic sales data
    data = []
    for i in range(1, total_rows + 1):
        product = random.choice(list(products.keys()))
        product_category = products[product]
        price = round(random.uniform(100.0, 5000.0), 2)
        qty = random.randint(1, 5)
        days = total_years * 365
        sale_date = datetime(start_year, 1, 1) + timedelta(days=random.randint(0, days))
        customer = random.choice(customers)
        city = random.choice(cities)
        payment = random.choice(payment_types)
        
        data.append((
            i, customer, product, product_category, qty, price, 
            sale_date.date(), city, payment
        ))
    
    # Create and return Spark DataFrame
    df_sales = spark.createDataFrame(data, schema=schema)
    return df_sales

## ***Definições***

O **Delta Lake** é uma camada de armazenamento que traz ACID, versionamento e otimizações para data lakes (ex: em cima do S3, HDFS ou Azure Data Lake). 

[Documentação oficial](https://docs.delta.io/latest/index.html)


---

### Principais características
1 - ***ACID Transactions***

- Garantem que operações de leitura e escrita sejam atomicamente consistentes, mesmo em pipelines concorrentes.

2 - ***Time Travel (Viagem no Tempo)***

- Permite acessar versões anteriores dos dados usando timestamps ou version numbers.

3 - ***Schema Enforcement & Evolution***

- Valida o schema dos dados durante a escrita (evitando corrupção).

- Permite evoluir o schema sem quebrar pipelines.

4 - ***Upsert, Delete e Merge (Operações DML)***

- Suporte a operações como MERGE INTO, DELETE e UPDATE, que não são nativas em Parquet/ORC.

5 - ***Armazenamento Eficiente***

- Usa Parquet como formato físico, com otimizações como:

- Compactação (para reduzir custo de armazenamento).

- Particionamento automático (para consultas mais rápidas).

6 - ***Open Format***

 - Compatível com Spark, Pandas, Trino, Presto, Flink e outras ferramentas.

In [4]:
# Create a spark session with delta table
spark = get_spark_session()

df = get_products(spark, total_rows=5)


df.show(truncate=False)

:: loading settings :: url = jar:file:/home/alberto/.cache/pypoetry/virtualenvs/pyspark-delta-Km3JpzXx-py3.13/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/alberto/.ivy2.5.2/cache
The jars for the packages stored in: /home/alberto/.ivy2.5.2/jars
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b18f890f-4074-45ce-83e0-a1c083770289;1.0
	confs: [default]
	found io.delta#delta-spark_2.13;4.0.0 in central
	found io.delta#delta-storage;4.0.0 in central
	found org.antlr#antlr4-runtime;4.13.1 in central
:: resolution report :: resolve 171ms :: artifacts dl 7ms
	:: modules in use:
	io.delta#delta-spark_2.13;4.0.0 from central in [default]
	io.delta#delta-storage;4.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.13.1 from central in [default]
	---------------------------------------------------------------------
	|                  |  

+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+
|id_sale|customer   |product       |category   |qty|price  |date      |city        |payment          |
+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+
|1      |customer_6 |Fone de Ouvido|Eletrônicos|2  |3347.86|2021-12-26|Curitiba    |Cartão de Débito |
|2      |customer_20|Mesa          |Móveis     |3  |4463.29|2020-08-10|Porto Alegre|PIX              |
|3      |customer_18|TV            |Eletrônicos|3  |4403.15|2021-11-03|Recife      |Boleto           |
|4      |customer_9 |Mouse         |Informática|4  |2040.97|2022-12-10|São Paulo   |Cartão de Débito |
|5      |customer_2 |Monitor       |Eletrônicos|5  |4213.25|2020-12-27|Porto Alegre|Cartão de Crédito|
+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+



## Leitura e escrita de uma delta table

- A escrita podemos usar diretamente os métodos **write** passando **delta** como formato de arquivo
- A leitura é feita através do método load, passando o **delta** como formato

***Observação importante***

Os dados são salvos com o formato **parquet** e isso pode induzir ao erro de fazer a sua leitura apenas com esse formato, o que pode **acarretar erros** na integridade dos dados, como as delta tables possuim a funcionalidade de time travel, os dados podem não ser deletados, mesmo utilizando a instrução delete o dado ainda permanece localmente (ainda é possível realmente apagar dados não utilizados com a funcão **vacuum** do delta table), suas alterações são registradas em transações dentro da pasta delta_log, e por isso a **leitura apenas do parquet sem os metadados** pode acarretar em valores incorretos.

---

### Estrutura de um diretório escrito com delta table

![](./images/escrita_delta.png)

- Gera uma tabela com metadados avançados, suporte a ACID e versionamento (armazenada como arquivos Parquet + log de transações).

### Descrições dos arquivos
| **Termo**         | **Descrição**                                                                  |
| ----------------- | ------------------------------------------------------------------------------ |
| **Parquet Files** | Arquivos de dados (armazenam os dados efetivos, otimizados para consultas).    |
| **Delta Log**     | Pasta `_delta_log/` com registros de transações (arquivos JSON/Parquet).       |
| **Checkpoint**    | Snapshot periódico do transaction log (em Parquet) para acelerar recuperações. |

In [5]:
# create a delta table
# save on BRONZE/sales folder
df.write.format("delta").mode("overwrite").save("./BRONZE/sales")

25/07/06 04:30:13 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [6]:
# read a delta table from path
spark.read.format("delta").load("./BRONZE/sales").show()

+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+
|id_sale|   customer|       product|   category|qty|  price|      date|        city|          payment|
+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+
|      1| customer_6|Fone de Ouvido|Eletrônicos|  2|3347.86|2021-12-26|    Curitiba| Cartão de Débito|
|      5| customer_2|       Monitor|Eletrônicos|  5|4213.25|2020-12-27|Porto Alegre|Cartão de Crédito|
|      4| customer_9|         Mouse|Informática|  4|2040.97|2022-12-10|   São Paulo| Cartão de Débito|
|      2|customer_20|          Mesa|     Móveis|  3|4463.29|2020-08-10|Porto Alegre|              PIX|
|      3|customer_18|            TV|Eletrônicos|  3|4403.15|2021-11-03|      Recife|           Boleto|
+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+



## `Classe DeltaTable`

Além da leitura com o formato *delta*, podemos usar a classe `DeltaTable` para representar uma tabela Delta em memória.  
Com ela podemos ter acessos a algumas funcionalidades que apenas a leitura como Dataframe não permite.

---
**Principais Funcionalidades**
1. ***Operações de Modificação***
- **MERGE**: Upsert (atualiza ou insere dados).
- **UPDATE**: Modifica registros existentes.
- **DELETE**: Remove registros.

2. ***Controle de Versões***
- **history()**: Mostra o histórico de alterações.
- **timeTravel**: Acessa versões antigas.

3. ***Otimizações***
- **vacuum()**: Limpa versões não utilizadas.
- **optimize()**: Compacta arquivos pequenos.


---

### **Principais Diferenças entre DeltaTable e DataFrame**

| **Recurso**               | **`DeltaTable`**                          | **`DataFrame` (leitura delta)**          |
|---------------------------|------------------------------------------|------------------------------------------|
| **Operações DML**         | ✅ Suporta `MERGE`, `UPDATE`, `DELETE`   | ❌ Apenas leitura                        |
| **Time Travel**           | ✅ Via `timeTravel` ou `history()`       | ✅ Via `.option("versionAsOf", N)`       |
| **Histórico de Versões**  | ✅ `delta_table.history()`               | ❌ Não mostra histórico diretamente       |
| **Otimizações**           | ✅ `vacuum()`, `optimize()`              | ❌ Não aplicável                         |
| **Esquema**              | ✅ Pode forçar evolução de esquema       | ❌ Só lê o esquema existente             |
| **Uso em Queries SQL**    | ❌ Não pode ser usado em `spark.sql()`   | ✅ Pode ser registrado como tabela SQL    |

---

- Ambos usam o **mesmo formato físico** (arquivos Parquet + log de transações Delta).
- `DeltaTable` é uma **camada superior** que expõe operações de modificação.
- `DataFrame` é uma **visão imutável** dos dados, mas é mais flexível para consultas.

In [7]:
# create from path
delta_table = DeltaTable.forPath(spark, "./BRONZE/sales")

print(type(delta_table))

<class 'delta.tables.DeltaTable'>


In [8]:
# Convert to DataFrame
delta_table.toDF().show()

+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+
|id_sale|   customer|       product|   category|qty|  price|      date|        city|          payment|
+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+
|      1| customer_6|Fone de Ouvido|Eletrônicos|  2|3347.86|2021-12-26|    Curitiba| Cartão de Débito|
|      5| customer_2|       Monitor|Eletrônicos|  5|4213.25|2020-12-27|Porto Alegre|Cartão de Crédito|
|      4| customer_9|         Mouse|Informática|  4|2040.97|2022-12-10|   São Paulo| Cartão de Débito|
|      2|customer_20|          Mesa|     Móveis|  3|4463.29|2020-08-10|Porto Alegre|              PIX|
|      3|customer_18|            TV|Eletrônicos|  3|4403.15|2021-11-03|      Recife|           Boleto|
+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+



## Upset(update + insert)/Merge

Podemos atualizar os dados com a delta table usando uma chave de refência para os registros e os métodos:
- **whenMatchedUpdadeAll()** - Atualiza a linha onde acontecer o match das chaves
- **whenNotMatchedInsertAll()** - Insere os registros que não deram match entre as chaves.

In [9]:
# current version
delta_table = DeltaTable.forPath(spark, "./BRONZE/sales")
delta_table.toDF().show()

+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+
|id_sale|   customer|       product|   category|qty|  price|      date|        city|          payment|
+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+
|      1| customer_6|Fone de Ouvido|Eletrônicos|  2|3347.86|2021-12-26|    Curitiba| Cartão de Débito|
|      5| customer_2|       Monitor|Eletrônicos|  5|4213.25|2020-12-27|Porto Alegre|Cartão de Crédito|
|      4| customer_9|         Mouse|Informática|  4|2040.97|2022-12-10|   São Paulo| Cartão de Débito|
|      2|customer_20|          Mesa|     Móveis|  3|4463.29|2020-08-10|Porto Alegre|              PIX|
|      3|customer_18|            TV|Eletrônicos|  3|4403.15|2021-11-03|      Recife|           Boleto|
+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+



In [10]:
new_df = get_products(spark, total_rows=10)
new_df.show()

+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|id_sale|   customer|       product|   category|qty|  price|      date|          city|         payment|
+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|      1| customer_1|      Notebook|Eletrônicos|  1|4381.02|2021-08-20|     São Paulo|             PIX|
|      2| customer_9|    Smartphone|Eletrônicos|  4|2311.84|2021-02-03|Belo Horizonte|Cartão de Débito|
|      3| customer_9|       Cadeira|     Móveis|  3|3774.18|2023-04-12|        Recife|Cartão de Débito|
|      4|customer_16|Fone de Ouvido|Eletrônicos|  3|1448.87|2023-09-03|Belo Horizonte|          Boleto|
|      5| customer_6|          Mesa|     Móveis|  1| 753.71|2020-05-03|        Recife|Cartão de Débito|
|      6|customer_13|       Teclado|Informática|  4|3564.91|2021-06-09|     São Paulo|Cartão de Débito|
|      7|customer_18|      Notebook|Eletrônicos|  5|3524.76|2021

Para o exemplo a seguir será usando apenas as vendas com ID`s maiores que 4 para que ocorra os seguintes processos com o Upsert:
- A linha com ID 5 será atualizada, pois ele já existe na tabela
- As linhas com 6 a 10, como não existem na ultima versão da tabela, serão inseridas

In [11]:
new_df = new_df.filter(F.col("id_sale") > 4)
new_df.show()

+-------+-----------+--------+-----------+---+-------+----------+--------------+----------------+
|id_sale|   customer| product|   category|qty|  price|      date|          city|         payment|
+-------+-----------+--------+-----------+---+-------+----------+--------------+----------------+
|      5| customer_6|    Mesa|     Móveis|  1| 753.71|2020-05-03|        Recife|Cartão de Débito|
|      6|customer_13| Teclado|Informática|  4|3564.91|2021-06-09|     São Paulo|Cartão de Débito|
|      7|customer_18|Notebook|Eletrônicos|  5|3524.76|2021-04-26|      Salvador|             PIX|
|      8|customer_14| Cadeira|     Móveis|  4|4391.35|2021-07-28|     Fortaleza|Cartão de Débito|
|      9| customer_7| Teclado|Informática|  5|4882.49|2020-09-05|        Recife|             PIX|
|     10| customer_7|   Mouse|Informática|  3| 3880.5|2023-10-08|Rio de Janeiro|Cartão de Débito|
+-------+-----------+--------+-----------+---+-------+----------+--------------+----------------+



In [12]:
(
    delta_table.alias("old_data")
    .merge(
        new_df.alias("new_data"),
        "old_data.id_sale = new_data.id_sale"
    )
    .whenMatchedUpdateAll() # update rows
    .whenNotMatchedInsertAll() # insert new rows
    .execute()
)

25/07/06 04:30:52 WARN MapPartitionsRDD: RDD 67 was locally checkpointed, its lineage has been truncated and cannot be recomputed after unpersisting


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [13]:
# new version 
spark.read.format("delta").load("./BRONZE/sales").show()

+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|id_sale|   customer|       product|   category|qty|  price|      date|          city|         payment|
+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|      5| customer_6|          Mesa|     Móveis|  1| 753.71|2020-05-03|        Recife|Cartão de Débito|
|      6|customer_13|       Teclado|Informática|  4|3564.91|2021-06-09|     São Paulo|Cartão de Débito|
|      7|customer_18|      Notebook|Eletrônicos|  5|3524.76|2021-04-26|      Salvador|             PIX|
|      8|customer_14|       Cadeira|     Móveis|  4|4391.35|2021-07-28|     Fortaleza|Cartão de Débito|
|      9| customer_7|       Teclado|Informática|  5|4882.49|2020-09-05|        Recife|             PIX|
|     10| customer_7|         Mouse|Informática|  3| 3880.5|2023-10-08|Rio de Janeiro|Cartão de Débito|
|      1| customer_6|Fone de Ouvido|Eletrônicos|  2|3347.86|2021

## Delete

In [14]:
(
    delta_table
    .delete("payment =  'Boleto'")
)

25/07/06 04:31:31 WARN DeleteCommand: Could not validate number of records due to missing statistics.


In [15]:
# df from path
spark.read.format("delta").load("./BRONZE/sales").show()

+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|id_sale|   customer|       product|   category|qty|  price|      date|          city|         payment|
+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|      5| customer_6|          Mesa|     Móveis|  1| 753.71|2020-05-03|        Recife|Cartão de Débito|
|      6|customer_13|       Teclado|Informática|  4|3564.91|2021-06-09|     São Paulo|Cartão de Débito|
|      7|customer_18|      Notebook|Eletrônicos|  5|3524.76|2021-04-26|      Salvador|             PIX|
|      8|customer_14|       Cadeira|     Móveis|  4|4391.35|2021-07-28|     Fortaleza|Cartão de Débito|
|      9| customer_7|       Teclado|Informática|  5|4882.49|2020-09-05|        Recife|             PIX|
|     10| customer_7|         Mouse|Informática|  3| 3880.5|2023-10-08|Rio de Janeiro|Cartão de Débito|
|      1| customer_6|Fone de Ouvido|Eletrônicos|  2|3347.86|2021

## History e Time Travel

Com o history podemos ver todas as alterações feitas na tabela e registradas nos metadados no delta_log

In [16]:
(
    delta_table
    .history()
    .show()
)

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      2|2025-07-06 04:31:...|  NULL|    NULL|   DELETE|{predicate -> ["(...|NULL|    NULL|     NULL|          1|  Serializable|        false|{numRemovedFiles ...|        NULL|Apache-Spark/4.0....|
|      1|2025-07-06 04:30:...|  NULL|    NULL|    MERGE|{predicate -> ["(...|NULL|    NULL|     NULL|          0|  Serializable|        false|{numTargetRowsCop...|        NULL|Apache-Spark/4.0....|
|      0|2

## Time Travel

- É possível voltar versões das tabelas usando o número da sua versão ou o seu timestamp
- A leitura da tabela vai apontar para a ultima versão registrada

In [17]:
(
    delta_table
        .history()
        .select("version", "timestamp")
        .show(truncate=False)
)

+-------+-----------------------+
|version|timestamp              |
+-------+-----------------------+
|2      |2025-07-06 04:31:31.792|
|1      |2025-07-06 04:30:51.647|
|0      |2025-07-06 04:30:12.745|
+-------+-----------------------+



In [18]:
# default - Current version
spark.read.format("delta").load("./BRONZE/sales").show()

+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|id_sale|   customer|       product|   category|qty|  price|      date|          city|         payment|
+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|      5| customer_6|          Mesa|     Móveis|  1| 753.71|2020-05-03|        Recife|Cartão de Débito|
|      6|customer_13|       Teclado|Informática|  4|3564.91|2021-06-09|     São Paulo|Cartão de Débito|
|      7|customer_18|      Notebook|Eletrônicos|  5|3524.76|2021-04-26|      Salvador|             PIX|
|      8|customer_14|       Cadeira|     Móveis|  4|4391.35|2021-07-28|     Fortaleza|Cartão de Débito|
|      9| customer_7|       Teclado|Informática|  5|4882.49|2020-09-05|        Recife|             PIX|
|     10| customer_7|         Mouse|Informática|  3| 3880.5|2023-10-08|Rio de Janeiro|Cartão de Débito|
|      1| customer_6|Fone de Ouvido|Eletrônicos|  2|3347.86|2021

In [19]:
# using date
(
    spark
        .read.format("delta")
        .option("timestampAsOf","2025-07-06 04:30:12.745")
        .load("./BRONZE/sales")
        .show()
)

+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+
|id_sale|   customer|       product|   category|qty|  price|      date|        city|          payment|
+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+
|      1| customer_6|Fone de Ouvido|Eletrônicos|  2|3347.86|2021-12-26|    Curitiba| Cartão de Débito|
|      5| customer_2|       Monitor|Eletrônicos|  5|4213.25|2020-12-27|Porto Alegre|Cartão de Crédito|
|      4| customer_9|         Mouse|Informática|  4|2040.97|2022-12-10|   São Paulo| Cartão de Débito|
|      2|customer_20|          Mesa|     Móveis|  3|4463.29|2020-08-10|Porto Alegre|              PIX|
|      3|customer_18|            TV|Eletrônicos|  3|4403.15|2021-11-03|      Recife|           Boleto|
+-------+-----------+--------------+-----------+---+-------+----------+------------+-----------------+



In [20]:
# using version
(
    spark
        .read.format("delta")
        .option("versionAsOf",1)
        .load("./BRONZE/sales")
        .show()
)

+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|id_sale|   customer|       product|   category|qty|  price|      date|          city|         payment|
+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|      5| customer_6|          Mesa|     Móveis|  1| 753.71|2020-05-03|        Recife|Cartão de Débito|
|      6|customer_13|       Teclado|Informática|  4|3564.91|2021-06-09|     São Paulo|Cartão de Débito|
|      7|customer_18|      Notebook|Eletrônicos|  5|3524.76|2021-04-26|      Salvador|             PIX|
|      8|customer_14|       Cadeira|     Móveis|  4|4391.35|2021-07-28|     Fortaleza|Cartão de Débito|
|      9| customer_7|       Teclado|Informática|  5|4882.49|2020-09-05|        Recife|             PIX|
|     10| customer_7|         Mouse|Informática|  3| 3880.5|2023-10-08|Rio de Janeiro|Cartão de Débito|
|      1| customer_6|Fone de Ouvido|Eletrônicos|  2|3347.86|2021

## Restore

As visões anteriores apenas exibiram os dados da tabela ao longo do tempo, mas caso seja necessário retornar a tebela para uma versão anterior o delta table também tem esse tipo de funcionalidades
- restoreToVersion - Restaura a tabela a partir de uma versão
- restoreToTimestamp - Restaura a tabela a partir de uma data

In [21]:
# Current version
spark.read.format("delta").load("./BRONZE/sales").show()

+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|id_sale|   customer|       product|   category|qty|  price|      date|          city|         payment|
+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|      5| customer_6|          Mesa|     Móveis|  1| 753.71|2020-05-03|        Recife|Cartão de Débito|
|      6|customer_13|       Teclado|Informática|  4|3564.91|2021-06-09|     São Paulo|Cartão de Débito|
|      7|customer_18|      Notebook|Eletrônicos|  5|3524.76|2021-04-26|      Salvador|             PIX|
|      8|customer_14|       Cadeira|     Móveis|  4|4391.35|2021-07-28|     Fortaleza|Cartão de Débito|
|      9| customer_7|       Teclado|Informática|  5|4882.49|2020-09-05|        Recife|             PIX|
|     10| customer_7|         Mouse|Informática|  3| 3880.5|2023-10-08|Rio de Janeiro|Cartão de Débito|
|      1| customer_6|Fone de Ouvido|Eletrônicos|  2|3347.86|2021

In [22]:
delta_table.restoreToVersion(1)

25/07/06 04:32:47 WARN DAGScheduler: Broadcasting large task binary with size 1087.4 KiB


DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]

In [23]:
# current dataframe
spark.read.format("delta").load("./BRONZE/sales").show()

+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|id_sale|   customer|       product|   category|qty|  price|      date|          city|         payment|
+-------+-----------+--------------+-----------+---+-------+----------+--------------+----------------+
|      5| customer_6|          Mesa|     Móveis|  1| 753.71|2020-05-03|        Recife|Cartão de Débito|
|      6|customer_13|       Teclado|Informática|  4|3564.91|2021-06-09|     São Paulo|Cartão de Débito|
|      7|customer_18|      Notebook|Eletrônicos|  5|3524.76|2021-04-26|      Salvador|             PIX|
|      8|customer_14|       Cadeira|     Móveis|  4|4391.35|2021-07-28|     Fortaleza|Cartão de Débito|
|      9| customer_7|       Teclado|Informática|  5|4882.49|2020-09-05|        Recife|             PIX|
|     10| customer_7|         Mouse|Informática|  3| 3880.5|2023-10-08|Rio de Janeiro|Cartão de Débito|
|      1| customer_6|Fone de Ouvido|Eletrônicos|  2|3347.86|2021

In [24]:
# restore row added 
delta_table.history().show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      3|2025-07-06 04:32:...|  NULL|    NULL|  RESTORE|{version -> 1, ti...|NULL|    NULL|     NULL|          2|  Serializable|        false|{numRestoredFiles...|        NULL|Apache-Spark/4.0....|
|      2|2025-07-06 04:31:...|  NULL|    NULL|   DELETE|{predicate -> ["(...|NULL|    NULL|     NULL|          1|  Serializable|        false|{numRemovedFiles ...|        NULL|Apache-Spark/4.0....|
|      1|2

## Vacuum

- Remove arquivos de versões antigas que não são mais utilizadas
- Limpa arquivos não referenciados (útil após operações de DELETE ou OVERWRITE)
- Reduz o armazenamento utilizado
- Default de 7 dias de retenção
- Pode afetar o time travel

```python
# modo de uso
delta_table.vacuum(24) # recebe horas como parâmetro

```

In [25]:
delta_table.vacuum()

                                                                                

Deleted 0 files and directories in a total of 1 directories.


DataFrame[]