In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DeltaLakeDemo") \
    .getOrCreate()

# Create sample DataFrame
df = spark.createDataFrame([
    (1, "Alice"),
    (2, "Bob"),
    (3, "Cathy")
], ["id", "name"])

# Write as Delta table
df.write.format("delta").mode("overwrite").save("/data/people")

# Read Delta table
df2 = spark.read.format("delta").load("/data/people")
df2.show()


+---+-----+
| id| name|
+---+-----+
|  3|Cathy|
|  1|Alice|
|  2|  Bob|
+---+-----+



# 🔁 1. ACID Transactions

✅ **What Are ACID Transactions?**  
ACID stands for:

- **Atomicity** → each write is all-or-nothing  
- **Consistency** → data moves from one valid state to another  
- **Isolation** → concurrent writes don’t interfere  
- **Durability** → once a write is committed, it's permanent  

Delta Lake brings ACID to data lakes by using a transaction log — just like a database does — even though the data is stored in flat files (Parquet) in object storage or local disk.

---

🔧 **How Delta Achieves ACID Transactions**  
Let’s break it down into mechanisms.

### 1. Transaction Log (`_delta_log/`)
Every write operation (insert, delete, update, merge, etc.) creates a new log file like:

```logs
/data/people-delta/_delta_log/00000000000000000000.json
/data/people-delta/_delta_log/00000000000000000001.json
```


Each log file contains:
- **Metadata** (`metaData`)  
- **Operation info** (`commitInfo`)  
- **A list of files added or removed** (`add`, `remove` entries)  

🧠 The data itself is immutable Parquet files. Delta just manages which files are “active” by listing them in the latest log version.

---

### 2. Atomicity
When you write data:

- Delta writes the new Parquet files first.  
- Only if *all* new files are successfully written, it appends a JSON log file with `add` entries.  
- If anything fails, nothing gets committed to the `_delta_log`, and the table state remains unchanged.  

This is atomic: **either the whole change is visible, or none of it is.**

---

### 3. Isolation via File Locking
Delta uses concurrency control:

- **Local file systems:** it can use file locks  
- **Cloud stores like S3:** it uses a protocol called *Optimistic Concurrency Control (OCC)*  

With OCC:
- A transaction reads the current state (e.g., version 12)  
- It prepares new changes based on that state  
- When it commits, it checks whether version 12 is still the latest  
- If someone else has committed version 13 in the meantime → it fails and retries  

This keeps multiple concurrent writers isolated and avoids conflicts.

---

### 4. Durability
The `_delta_log` is an append-only log, and once written:

- It is **never modified**  
- It is considered the **source of truth**  
- Even if the cluster crashes, you can rebuild the table’s current state from these logs  

That’s your **durability guarantee**.

---

⚠️ **Does It Affect Performance?**  
Yes, but in a good way (mostly)!

✅ **Benefits**
- Querying is faster: because the log tells you exactly which files to read, instead of scanning the whole folder  
- Appends and updates are efficient and safe  

⚠️ **Trade-offs**
- Small writes result in many small Parquet files → can affect read performance  
- That’s why Delta has **Optimize** and **Z-Ordering** to compact and organize files  
- For large-scale use, write performance is *slightly* slower than blind dumping to Parquet — but you gain **huge advantages** in reliability, auditability, and maintainability.


In [6]:
import shutil

# Path to your Delta table
path = "/data/test-acid"

# Delete the entire folder
shutil.rmtree(path, ignore_errors=True)

df = spark.createDataFrame([
    (1, "Alice"),
    (2, "Bob"),
    (3, "Cathy")
], ["id", "name"])

# Write some data
df.write.format("delta").mode("overwrite").save("/data/test-acid")

# Append new row
spark.createDataFrame([(4, "test")], ["id", "name"]) \
    .write.format("delta").mode("append").save("/data/test-acid")

from IPython.display import display, JSON

log_dir = "/data/test-acid/_delta_log"
log_files = sorted([f for f in os.listdir(log_dir) if f.endswith(".json")])

for file in log_files:
    print(f"\n>>> Log version: {file}")
    with open(os.path.join(log_dir, file)) as f:
        for line in f.readlines():
            parsed = json.loads(line)
            display(JSON(parsed))


>>> Log version: 00000000000000000000.json


<IPython.core.display.JSON object>

<IPython.core.display.JSON object>

<IPython.core.display.JSON object>

<IPython.core.display.JSON object>

<IPython.core.display.JSON object>

<IPython.core.display.JSON object>


>>> Log version: 00000000000000000001.json


<IPython.core.display.JSON object>

<IPython.core.display.JSON object>

## _delta_logs Breakdown

### 🧾 `commitInfo` Example Breakdown

This is the metadata for the commit.

- **`timestamp`**:  
  The time when the operation was committed (in milliseconds since Unix epoch).

- **`operation`**:  
  This log entry represents a `WRITE` operation (it could also be `INSERT`, `UPDATE`, `DELETE`, etc.).  
  Here, it’s a `WRITE` operation.

- **`operationParameters`**:
  - **`mode`**: `Overwrite` — the data is being written to the table, replacing the previous content.  
  - **`partitionBy`**: `[]` — indicates the partitioning scheme. An empty list means the table isn’t partitioned.  
  - **`isolationLevel`**: `Serializable` — the highest level of isolation (no other operations can interfere during this transaction).  
  - **`isBlindAppend`**: `False` — this is not a simple append; it’s a full overwrite.

- **`operationMetrics`**:  
  Provides stats for the operation:
  - **`numFiles`**: 4 new files were written.  
  - **`numOutputRows`**: 3 rows of data were written.  
  - **`numOutputBytes`**: 2522 bytes — the total size of the written data.

- **`txnId`**:  
  The transaction ID (unique to the transaction).  
  Used to track this specific operation in Delta’s transaction log.


### 📦 `metaData` Example Breakdown

Contains schema and metadata information for the Delta table.

- **`id`**:  
  A unique ID for the Delta table (used internally for reference).

- **`format`**:  
  The format used for storing data.  
  Here, it's `parquet`, the underlying storage format for Delta.

- **`schemaString`**:  
  The schema of the table, defined as a JSON string. This tells you the structure of the data (columns, types, etc.).  
  In this case:
  - `id`: A `long` field (nullable)  
  - `name`: A `string` field (nullable)

- **`partitionColumns`**:  
  The table isn’t partitioned (`[]`).

- **`configuration`**:  
  Additional configuration (empty here).

- **`createdTime`**:  
  The time the table was created (in milliseconds since Unix epoch).


### ➕ `add` Entry Breakdown

This entry indicates that a new Parquet file was added to the Delta table.

- **`path`**:  
  The path to the newly added Parquet file.

- **`partitionValues`**:  
  Empty here since the table isn’t partitioned.

- **`size`**:  
  The size of the Parquet file in bytes (`719` bytes).

- **`modificationTime`**:  
  The timestamp when the file was last modified.

- **`dataChange`**:  
  `True` means that this file contains **data changes** (not just metadata).

- **`stats`**:  
  The statistics for the data in this file:
  - **`numRecords`**: This file contains 1 record.  
  - **`minValues`** and **`maxValues`**:  
    The minimum and maximum values of the fields in the file.  
    Helps optimize queries (e.g., for pruning files based on min/max).  
  - **`nullCount`**:  
    The number of null values in each field.


# ⏳ Time Travel in Delta Lake

Time Travel in Delta Lake allows you to query historical versions of your data, even after updates, deletes, or appends. This feature is possible thanks to Delta Lake’s transaction log.

---

### 🔍 How Time Travel Works

Delta Lake uses the `_delta_log` to track all changes to the table. Each write operation (INSERT, UPDATE, DELETE, etc.) appends a new JSON file to the log, containing metadata about the operation.

#### 🗂 Transaction Log
Each commit is stored as a new JSON file in `_delta_log/`, including:
- The operation type (`WRITE`, `UPDATE`, `DELETE`)
- The schema at that point
- Data files written or removed
- The resulting table version

#### 🔢 Versions
Each commit increments the table version:
- Version 0 is the initial state
- Version 1 is the next, and so on
- Each commit includes a unique `txnId`

#### 🧾 Snapshotting
When querying, Delta reads the latest log version and constructs a snapshot.  
To read an older version, it uses the corresponding log version and rebuilds the table as it was.

---

### ⚙️ Mechanics Behind Time Travel

- **Immutable Logs**: Once written, logs are never changed.
- **Immutable Data**: Parquet files are also immutable.
- **Versioning**: Delta tracks which files belong to each version.
- **Garbage Collection**: Old versions are cleaned up via `VACUUM` after a retention period.

---

### 📅 How to Query Historical Versions

#### By Version Number
```python
df = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load("path_to_table")
df.show()

#### 🕰️ By Timestamp
Delta finds the most recent version that was committed **before** the specified timestamp and returns the snapshot of that version.

---

### 📊 Performance Considerations of Time Travel

#### 🔄 **Read Performance**:
Time travel operations are efficient because Delta doesn’t re-read all data. Instead, it:
- Uses the transaction log and metadata to only read necessary files.
- Might need to read multiple files, but avoids scanning the entire table.
- Leverages Delta’s query optimizer, which efficiently prunes data based on the log's statistics.

#### 🗑️ **Garbage Collection**:
- Time travel doesn't indefinitely increase storage size.
- Delta performs garbage collection to remove old versions when they are no longer needed.
- This process is controlled by the retention period and can be manually triggered using the `VACUUM` command to delete old data files no longer referenced.

#### 💾 **Storage Overhead**:
- There is some overhead due to new Parquet files and logs being created with each version.
- However, this is minimized by Delta’s efficient handling of small incremental writes.

---

### 📝 Example Use Case

Suppose you want to go back to a version of the Delta table as it was on **2025-04-20 at 15:30**. Here’s how you would proceed:

1. **Check Table History** to find the closest version:
```python
delta_table = DeltaTable.forPath(spark, "path_to_table")
history_df = delta_table.history()
history_df.show(truncate=False)
```

In [9]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/data/test-acid")
history_df = delta_table.history()
history_df.show(truncate=False)

+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|1      |2025-04-23 09:19:56.172|null  |null    |WRITE    |{mode -> Append, partitionBy -> []}   |null|null    |null     |0          |Serializable  |true         |{numFiles -> 2, nu

In [12]:
delta_table.delete("id = 1")

In [13]:
history_df = delta_table.history()
history_df.show(truncate=False)

+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                                                                                                                                           |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+

In [14]:
spark.read.format("delta").option("versionAsOf", 1).load("/data/test-acid").filter("id = 1").show()

+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  1| test|
+---+-----+

+---+----+
| id|name|
+---+----+
+---+----+



In [15]:
spark.read.format("delta").load("/data/test-acid").filter("id = 1").show()

+---+----+
| id|name|
+---+----+
+---+----+



How Deletes Are Handled in Delta Lake
Delta Lake is append-only under the hood. It never modifies existing data files directly. Instead, it follows a copy-on-write strategy:

1. Query Optimization and File Pruning
When you run a DELETE or an UPDATE, Delta first identifies which Parquet files contain the rows to be deleted using the statistics in the _delta_log (like minValues, maxValues, and nullCounts).

Only those files are read and rewritten. Others are untouched.

2. Rewrite with Changes
Delta reads the affected Parquet files, applies the delete condition (e.g., WHERE id = 42), and writes new files with the remaining (non-deleted) records.

The old Parquet files are logically removed — they are not physically deleted immediately but are marked as removed in the _delta_log.

3. Transaction Log Update
The _delta_log records:

A new version with a commitInfo block describing the DELETE operation.

A set of remove actions, each pointing to a deleted (now obsolete) data file.

A set of add actions for the newly written files.

So from a transactional point of view:

The table instantly reflects the updated state (with the records deleted).

Old files still exist and can be used to reconstruct previous versions — which is what enables time travel.

In [16]:
from IPython.display import display, JSON

log_dir = "/data/test-acid/_delta_log"
log_files = sorted([f for f in os.listdir(log_dir) if f.endswith(".json")])

for file in log_files:
    print(f"\n>>> Log version: {file}")
    with open(os.path.join(log_dir, file)) as f:
        for line in f.readlines():
            parsed = json.loads(line)
            display(JSON(parsed))


>>> Log version: 00000000000000000000.json


<IPython.core.display.JSON object>

<IPython.core.display.JSON object>

<IPython.core.display.JSON object>

<IPython.core.display.JSON object>

<IPython.core.display.JSON object>

<IPython.core.display.JSON object>


>>> Log version: 00000000000000000001.json


<IPython.core.display.JSON object>

<IPython.core.display.JSON object>


>>> Log version: 00000000000000000002.json


<IPython.core.display.JSON object>

<IPython.core.display.JSON object>

<IPython.core.display.JSON object>