<a href="https://colab.research.google.com/github/CynicDog/data-engineering-lab/blob/main/Data_Engineering_Lab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction to Delta Lake

Delta Lake is an open-source storage layer that brings **ACID transactions, scalable metadata handling, and unifies streaming and batch data processing** on top of your existing data lake (like Parquet on S3 or local storage).  

This notebook is designed for **hands-on experimentation** with Delta Lake using **PySpark**. You will be able to:

- Create, read, and write Delta tables  
- Explore Delta table features such as **updates, deletes, and merges**  
- Examine Delta table history and the `_delta_log`  
- Experiment with **time travel** to query older versions of data  

Additionally, this is a space to explore **Databricks features and solutions** in a Colab environment, understand how they work, and implement them yourself.

In [None]:
# Uninstall any existing conflicting packages to avoid version conflicts.
# - pyspark: removes any pre-installed PySpark version
# - delta-spark: removes any previous Delta Lake Python package
# - dataproc-spark-connect: removes Google Colab‚Äôs built-in Spark connect package
!pip uninstall -y pyspark delta-spark dataproc-spark-connect

# Install compatible versions:
# - PySpark 3.5.1: works with Delta Lake 3.2.0
# - delta-spark 3.2.0: Delta Lake Python library
!pip install -q pyspark==3.5.1 delta-spark==3.2.0

def get_spark():
    """Creates and returns a SparkSession configured for Delta Lake.

    This function sets up a SparkSession with the necessary Delta Lake
    extensions and catalog, ensuring that Delta features such as
    time travel, updates, and deletes are available.

    Returns:
        pyspark.sql.SparkSession: Configured SparkSession for Delta Lake.
    """
    from pyspark.sql import SparkSession
    from delta import configure_spark_with_delta_pip

    # Build the SparkSession with Delta Lake configurations
    builder = (
        SparkSession.builder.appName("DeltaLakeApp")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    )

    # Apply Delta Lake pip configuration and return the SparkSession
    return configure_spark_with_delta_pip(builder).getOrCreate()

spark = get_spark()
spark

Found existing installation: pyspark 4.0.1
Uninstalling pyspark-4.0.1:
  Successfully uninstalled pyspark-4.0.1
[0mFound existing installation: dataproc-spark-connect 1.0.1
Uninstalling dataproc-spark-connect-1.0.1:
  Successfully uninstalled dataproc-spark-connect-1.0.1
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m317.0/317.0 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m200.5/200.5 kB[0m [31m14.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


## Introduction

### Common Use Cases

Delta Lake is widely used across organizations of all sizes because it provides a reliable and scalable foundation for managing large volumes of data in analytics and AI workflows. One of its most common applications is modernizing existing data lakes: by adding ACID transactions, schema enforcement, and scalable metadata handling on top of open storage formats, Delta Lake helps teams resolve the long-standing issues of unreliable or inconsistent data that often hinder traditional lakes. Many organizations also use Delta Lake as part of a lakehouse architecture to support <u>data warehousing techniques, enabling fast and dependable SQL analytics</u> while still maintaining the <u>flexibility and cost-efficiency of a data lake environment</u>. Because Delta Lake unifies batch and streaming data, it plays a central role in real-time data processing workloads, allowing developers to ingest continuous streams while applying the same transformation logic used for historical batch data.

Beyond analytics, Delta Lake is a key component in machine learning and data science pipelines. It provides teams with a consistent and high-quality source of truth, ensuring that training datasets remain accurate, reproducible, and easy to version. Data engineering teams rely on Delta Lake to build robust pipelines that maintain data quality across ingestion, transformation, and operationalization stages. At the same time, business intelligence users benefit from its SQL accessibility, which makes it simple to query data directly from dashboards and reporting tools. Overall, Delta Lake‚Äôs emphasis on reliability, performance, and openness makes it an essential platform for data engineers, data scientists, and analysts working across modern big data ecosystems.

### Key Features

Delta Lake introduces a number of core capabilities that form the backbone of the lakehouse paradigm. At its foundation, Delta Lake provides ACID transactions, ensuring that every data modification is executed safely and consistently, even under concurrent workloads or unexpected failures. These transactional guarantees are made possible by Delta Lake‚Äôs scalable metadata handling, which uses an append-only transaction log to record every change to a table. This design allows Delta Lake to manage very large datasets without suffering from the metadata bottlenecks common in raw data lakes.

One of the most powerful capabilities enabled by the transaction log is time travel, which allows users to query earlier versions of a table by version number or timestamp. This feature is particularly valuable for debugging, validating model inputs, recovering from accidental deletions, or meeting audit and regulatory requirements. Delta Lake also unifies batch and streaming processing, allowing Spark Structured Streaming jobs to operate with the same APIs and logic used for batch workloads, while the underlying storage guarantees preserve correctness and consistency in both modes.

To maintain data quality, Delta Lake enforces schemas on write and supports controlled schema evolution, preventing corrupted or malformed data from entering pipelines while still allowing tables to adapt as requirements change. Delta Lake also tracks a complete audit history of all operations, enabling transparency into who made changes and when. Modern workloads rely heavily on DML operations‚Äîsuch as updates, deletes, and merges‚Äîand Delta Lake provides efficient support for these across multiple execution engines and languages. Its open-source nature encourages broad adoption and collaboration, while its performance optimizations ensure that most workloads run efficiently without extensive tuning. Taken together, these features create a storage layer that is both powerful and approachable for a wide range of data professionals.

### Anatomy of a Delta Lake Table

A Delta Lake table is composed of several tightly integrated components that together provide reliable storage, strong transactional guarantees, and efficient performance on large datasets. Each part of the table contributes to how Delta Lake manages data, tracks changes, and scales across distributed environments. Understanding these components makes it easier to reason about Delta Lake‚Äôs behavior, optimize pipelines, and work more effectively with the lakehouse format.

#### Data Files

At the base of every Delta Lake table are the data files themselves, stored in the Parquet format. These files hold the raw records and are distributed across object stores or file systems such as HDFS, Amazon S3, Azure Data Lake Storage (ADLS), Google Cloud Storage, or MinIO. Parquet is chosen because its columnar layout, compression, and encoding techniques make it highly efficient for analytical queries and large-scale processing. Delta Lake does not alter the Parquet format but enhances its reliability by layering transactional control and metadata management on top of it.

#### Transaction Log

Above the data files sits the transaction log‚Äîoften referred to as the **_delta_log**‚Äîwhich is the heart of Delta Lake‚Äôs architecture. This log is a chronological sequence of JSON entries, each representing a single transaction against the table. Every change, whether inserting new data files, removing outdated ones, or modifying table metadata, is written as a new log entry. By recording operations rather than mutating files directly, the transaction log guarantees ACID semantics: all changes are atomic, consistent, isolated, and durable. This log is the mechanism that makes time travel, concurrent writes, schema enforcement, and recoverability possible.

#### Metadata

The metadata tracked in the _delta_log describes the structure, layout, and configuration of the table. It includes information such as the table‚Äôs schema, partition columns, data skipping statistics, and protocol versions supported by the client. Metadata can be accessed programmatically through Spark, SQL, Python, or Rust APIs, giving users full insight into how the table is organized and how it has evolved. This metadata layer enables Delta Lake to optimize queries, enforce structural constraints, and adapt as data grows or workloads change.

#### Schema

A Delta Lake table‚Äôs schema defines the structure of its data, including column names, data types, and nested fields. The schema is enforced whenever data is written, preventing corrupted or mismatched records from entering the table. Delta Lake also supports schema evolution, allowing new columns to be added or existing structures to change without breaking downstream processes. Because the schema and its modifications are captured in the transaction log, every version of the table retains a complete understanding of how the data was structured at that point in time.

#### Checkpoints

To improve performance when reading table history, Delta Lake periodically writes **checkpoints**, which are compact Parquet summaries of the current state of the table. Instead of replaying every JSON log entry from the beginning, readers can load the most recent checkpoint and then apply only the newer transactions that follow it. By default, a checkpoint is generated every ten commits. This optimization significantly speeds up table initialization, reduces metadata overhead, and allows large tables to remain responsive even at massive scale.


## Getting Started

### 1. Basic CRUD Operations with Delta Tables

The table contains sample data with `id`, `name`, and `amount` columns and is saved to `/tmp/delta-crud-table`.  
We use `overwrite` mode to replace any existing data. Delta Lake automatically maintains a transaction log (`_delta_log`) for ACID compliance and time travel.

In [None]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

# Create: Create a Delta table from a PySpark DataFrame
data = [(1, "Alice", 100), (2, "Bob", 200), (3, "Charlie", 300)]
columns = ["id", "name", "amount"]

df = spark.createDataFrame(data, columns)
delta_path = "/tmp/delta-crud-table"

# Save the DataFrame as a Delta table (overwrite mode)
df.write.format("delta").mode("overwrite").save(delta_path)

This cell loads the Delta table from the specified path into a PySpark DataFrame. We then display its contents using `show()`.  
Delta Lake ensures we always read a consistent snapshot of the table, even after updates or deletes.

In [None]:
# Read: Load the Delta table into a DataFrame
delta_df = spark.read.format("delta").load(delta_path)
delta_df.show()

+---+-------+------+
| id|   name|amount|
+---+-------+------+
|  2|    Bob|   200|
|  3|Charlie|   300|
|  1|  Alice|   100|
+---+-------+------+



Update Bob's `amount` to 250 using a `DeltaTable` object. The `condition` selects the rows, and `set` specifies the column to update. Delta Lake ensures the update is transactional and consistent.

In [None]:
# Update: Update records in Delta table
delta_table = DeltaTable.forPath(spark, delta_path)

# Update Bob's amount to 250
delta_table.update(
    condition="name = 'Bob'",  # Rows matching this condition will be updated
    set={"amount": "250"}      # Columns to update
)
delta_table.toDF().show()

+---+-------+------+
| id|   name|amount|
+---+-------+------+
|  2|    Bob|   250|
|  3|Charlie|   300|
|  1|  Alice|   100|
+---+-------+------+



Delete Alice's row using the `condition` parameter. Delta Lake ensures the deletion is transactional and preserves table consistency.

In [None]:
# Delete: Delete records from Delta table
# Delete Alice's row
delta_table.delete(condition="name = 'Alice'")
delta_table.toDF().show()

+---+-------+------+
| id|   name|amount|
+---+-------+------+
|  2|    Bob|   250|
|  3|Charlie|   300|
+---+-------+------+



### 2. Merging / Upserting Data

Delta Lake provides a powerful mechanism for upserting data into existing tables through the DeltaTable API and the DeltaMergeBuilder. A merge operation lets you define how incoming records should interact with existing ones by specifying a matching condition and separate actions for matched and unmatched rows. When the condition identifies a matching record, Delta Lake can update the existing row; when no match is found, a new row is inserted. Both operations are chained into a single merge() call, and because the entire merge is treated as one ACID transaction, the table always remains in a consistent state even under concurrent workloads or failures.

With the DeltaTable API, you use a class called the `DeltaMergeBuilder` to define how new data should be merged into an existing table. Each combination of matching condition and action has its own method‚Äî`whenMatchedUpdate()` for updates and `whenNotMatchedInsert()` for inserts.  

In this example, we merge a DataFrame of new records into the Delta table: rows with matching `id`s are updated, while new rows are inserted. Chaining these actions together in a single `merge()` ensures that each operation is atomic and that the table remains consistent.

In [None]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

# New data to merge (upsert) into the Delta table
data = [
    (3, "Charlie", 350),  # Existing ID, will update
    (4, "David", 400)     # New ID, will insert
]
new_df = spark.createDataFrame(data, ["id", "name", "amount"])

# Perform merge (upsert) operation
delta_table.alias("target").merge(
    source=new_df.alias("source"),           # New data
    condition="target.id = source.id"        # Matching condition
).whenMatchedUpdate(
    set={
        "name": "source.name",               # Update existing rows
        "amount": "source.amount"
    }
).whenNotMatchedInsert(
    values={
        "id": "source.id",                   # Insert new rows
        "name": "source.name",
        "amount": "source.amount"
    }
).execute()

# Show table after merge/upsert
delta_table.toDF().show()

+---+-------+------+
| id|   name|amount|
+---+-------+------+
|  2|    Bob|   250|
|  3|Charlie|   350|
|  4|  David|   400|
+---+-------+------+



## ACID in Depth

Delta Lake provides **ACID (Atomicity, Consistency, Isolation, Durability) guarantees** on top of your data lake.  
This ensures that every operation‚Äîwhether a simple write, update, delete, or merge‚Äîis transactional and consistent, even in the presence of concurrent operations or failures.

Delta Lake achieves this using the **_delta_log** folder, which tracks all changes made to the table as a sequence of JSON and checkpoint files.

### The `_delta_log` Folder

The `_delta_log` folder contains:

- **JSON commit files** (`00000000000000000000.json`, etc.): Each file represents a single transaction and records all actions (add, remove, update) in that commit.  
- **Checkpoint Parquet files** (`*.checkpoint.parquet`): Periodically created for faster table recovery and reducing the need to read all JSON files.  

These files together allow Delta Lake to:

- Track table history for **time travel**
- Ensure **atomicity**: either a transaction fully completes or has no effect
- Maintain **consistency**: table state always conforms to schema and constraints
- Provide **isolation**: concurrent operations see consistent snapshots
- Guarantee **durability**: committed changes survive crashes or failures

In [None]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from delta.tables import DeltaTable
import os
spark = get_spark()

# Create initial DataFrame
data = [(1, "Alice", 100), (2, "Bob", 200)]
columns = ["id", "name", "amount"]
df = spark.createDataFrame(data, columns)

# Define Delta table path
delta_path = "/tmp/acid_demo_table"

# Write DataFrame as Delta table (initial commit)
df.write.format("delta").mode("overwrite").save(delta_path)

# Perform additional operations to generate multiple commits
delta_table = DeltaTable.forPath(spark, delta_path)

Running this command lists all the files in the `_delta_log` folder of the Delta table. Each `.json` file corresponds to a single transaction, recording all the actions that were performed, such as adding, updating, or deleting data. The `_commits` file contains metadata about the committed transactions. The numbering of the JSON files reflects the sequence of operations, which Delta Lake uses to enforce ACID guarantees and enable time travel.

In [None]:
# List all files in the Delta table log folder
!ls /tmp/acid_demo_table/_delta_log

00000000000000000000.json  00000000000000000003.json  00000000000000000006.json
00000000000000000001.json  00000000000000000004.json  00000000000000000007.json
00000000000000000002.json  00000000000000000005.json  _commits


<h2> Understanding a Delta Lake Commit JSON File </h2>

When you write data to a Delta table, Delta Lake records the operation as a **commit** in the `_delta_log` folder. Each commit is stored as a **JSON file**, such as `00000000000000000000.json`. This file is crucial because it is **the atomic record of the transaction** and contains all the metadata, schema information, and file operations associated with that write.

Let‚Äôs dissect the structure and fields of the JSON you posted:

<h3> 1. <code>commitInfo</code> Object </h3>

```json
{"commitInfo":{
    "timestamp":1763730403573,
    "operation":"WRITE",
    "operationParameters":{"mode":"Overwrite","partitionBy":"[]"},
    "isolationLevel":"Serializable",
    "isBlindAppend":false,
    "operationMetrics":{"numFiles":"2","numOutputRows":"2","numOutputBytes":"1934"},
    "engineInfo":"Apache-Spark/3.5.1 Delta-Lake/3.2.0",
    "txnId":"1e8029ec-2cf7-4712-8c72-2b18bf4a35e0"
}}
```

**Explanation:**

* **`timestamp`** ‚Äì The exact time the commit occurred (in milliseconds since epoch). Useful for time travel queries.
* **`operation`** ‚Äì The type of operation performed (`WRITE`, `UPDATE`, `DELETE`, `MERGE`, etc.). In this case, it‚Äôs a `WRITE`.
* **`operationParameters`** ‚Äì Additional info about the operation:

  * `mode`: Write mode (`Overwrite` here).
  * `partitionBy`: Any partition columns used (empty array here).
* **`isolationLevel`** ‚Äì The transactional isolation level used (`Serializable` ensures full ACID isolation).
* **`isBlindAppend`** ‚Äì Indicates if the operation is a blind append (true if data is appended without checking existing files). Here it‚Äôs false.
* **`operationMetrics`** ‚Äì Metrics about the commit:

  * `numFiles`: Number of files written (2).
  * `numOutputRows`: Number of rows written (2).
  * `numOutputBytes`: Approximate size in bytes (1934).
* **`engineInfo`** ‚Äì Spark and Delta versions used for the operation.
* **`txnId`** ‚Äì Unique identifier for this transaction. Every commit has a unique ID to track operations.

**Key insight:** This object gives **a full audit trail of what the transaction did**, including metrics and configuration.

<h3> 2. <code>metaData</code> Object </h3>

```json
{"metaData":{
    "id":"f4d6ad9d-c095-4c34-bb48-64b4011a43c3",
    "format":{"provider":"parquet","options":{}},
    "schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true},{\"name\":\"amount\",\"type\":\"long\",\"nullable\":true}]}",
    "partitionColumns":[],
    "configuration":{},
    "createdTime":1763730402440
}}
```

**Explanation:**

* **`id`** ‚Äì A unique identifier for the table metadata.
* **`format`** ‚Äì The storage format (`parquet`) and any specific options.
* **`schemaString`** ‚Äì The table schema serialized as a JSON string. Here we have three columns:

  * `id` (long, nullable)
  * `name` (string, nullable)
  * `amount` (long, nullable)
* **`partitionColumns`** ‚Äì Lists any columns used for partitioning (empty here).
* **`configuration`** ‚Äì Any table-level configuration properties. Empty in this simple example.
* **`createdTime`** ‚Äì Timestamp when the table metadata was created.

**Key insight:** This object records **the schema of the table** at this commit. Delta uses this to validate writes and enable schema evolution.

<h3> 3. <code>protocol</code> Object </h3>

```json
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
```

**Explanation:**

* **`minReaderVersion`** ‚Äì Minimum Delta protocol version required to read this table.
* **`minWriterVersion`** ‚Äì Minimum Delta protocol version required to write to this table.

**Key insight:** The protocol object ensures **compatibility across different Delta Lake versions**, so older readers/writers know if they can interact with this table safely.

<h3> 4. <code>add</code> Objects </h3>

```json
{"add":{
    "path":"part-00000-d40ffd41-6e37-447c-9ec7-e1ec58f31ef1-c000.snappy.parquet",
    "partitionValues":{},
    "size":974,
    "modificationTime":1763730402871,
    "dataChange":true,
    "stats":"{\"numRecords\":1,\"minValues\":{\"id\":1,\"name\":\"Alice\",\"amount\":100},\"maxValues\":{\"id\":1,\"name\":\"Alice\",\"amount\":100},\"nullCount\":{\"id\":0,\"name\":0,\"amount\":0}}"
}}
```

* **`path`** ‚Äì The relative file path of the Parquet file that was added.
* **`partitionValues`** ‚Äì Partition values for the file (empty here since the table isn‚Äôt partitioned).
* **`size`** ‚Äì File size in bytes.
* **`modificationTime`** ‚Äì File timestamp.
* **`dataChange`** ‚Äì `true` indicates this commit modifies data. `false` would indicate metadata-only changes.
* **`stats`** ‚Äì Statistics about this file:

  * `numRecords`: Number of rows in the file.
  * `minValues` / `maxValues`: Min/max per column.
  * `nullCount`: Count of nulls per column.

**Key insight:** Each `add` entry describes **exactly what Parquet files were added** in this transaction. Delta uses this to maintain ACID guarantees and efficiently plan queries.

<h3> Putting it all together </h3>

1. **`commitInfo`** ‚Äì Audit trail of the transaction.
2. **`metaData`** ‚Äì Table schema and configuration at this commit.
3. **`protocol`** ‚Äì Ensures version compatibility.
4. **`add` / `remove` / other actions** ‚Äì Physical file changes for this commit.

Delta Lake builds the table state **by replaying all commits** in `_delta_log`, combining all `add` and `remove` actions, while ensuring **atomicity, consistency, isolation, and durability**.

This single JSON file is therefore **both a record of the operation and the foundation of Delta Lake‚Äôs ACID guarantees**.

In [None]:
!cat /tmp/acid_demo_table/_delta_log/00000000000000000000.json | jq

[1;39m{
  [0m[34;1m"commitInfo"[0m[1;39m: [0m[1;39m{
    [0m[34;1m"timestamp"[0m[1;39m: [0m[0;39m1763730403573[0m[1;39m,
    [0m[34;1m"operation"[0m[1;39m: [0m[0;32m"WRITE"[0m[1;39m,
    [0m[34;1m"operationParameters"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"mode"[0m[1;39m: [0m[0;32m"Overwrite"[0m[1;39m,
      [0m[34;1m"partitionBy"[0m[1;39m: [0m[0;32m"[]"[0m[1;39m
    [1;39m}[0m[1;39m,
    [0m[34;1m"isolationLevel"[0m[1;39m: [0m[0;32m"Serializable"[0m[1;39m,
    [0m[34;1m"isBlindAppend"[0m[1;39m: [0m[0;39mfalse[0m[1;39m,
    [0m[34;1m"operationMetrics"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"numFiles"[0m[1;39m: [0m[0;32m"2"[0m[1;39m,
      [0m[34;1m"numOutputRows"[0m[1;39m: [0m[0;32m"2"[0m[1;39m,
      [0m[34;1m"numOutputBytes"[0m[1;39m: [0m[0;32m"1934"[0m[1;39m
    [1;39m}[0m[1;39m,
    [0m[34;1m"engineInfo"[0m[1;39m: [0m[0;32m"Apache-Spark/3.5.1 Delta-Lake/3.2.0"[0m[1;39m,
    [0m[34;

Below cell updates Bob's `amount` to 250, deletes Alice's row, and appends new rows for Charlie and David.  
Each operation creates a new commit in `_delta_log`, ensuring the table remains consistent and transactional.

In [None]:
# Update Bob's amount
delta_table.update(
    condition="name = 'Bob'",
    set={"amount": "250"}
)

# Delete Alice's row
delta_table.delete(condition="name = 'Alice'")

# Insert new rows (append)
new_data = [(3, "Charlie", 300), (4, "David", 400)]
new_df = spark.createDataFrame(new_data, ["id", "name", "amount"])
new_df.write.format("delta").mode("append").save(delta_path)

When we update a row in a Delta table, Delta creates a commit JSON in `_delta_log` to record the transaction. The `commitInfo` section logs the operation type, timestamp, read version, isolation level, and detailed metrics like the number of rows updated and files added or removed.  

The `remove` entry marks the old Parquet file that contained the outdated data as deleted, while the `add` entry points to a new Parquet file with the updated row. Delta never modifies files in place; instead, it replaces them atomically. This design preserves ACID guarantees, allows time travel queries, and keeps a complete, auditable history of all changes to the table.  

By inspecting these commit files, you can see exactly what changed in each operation and how Delta manages consistent snapshots of the table over time.

In [None]:
!cat /tmp/acid_demo_table/_delta_log/00000000000000000001.json | jq

[1;39m{
  [0m[34;1m"commitInfo"[0m[1;39m: [0m[1;39m{
    [0m[34;1m"timestamp"[0m[1;39m: [0m[0;39m1763730410838[0m[1;39m,
    [0m[34;1m"operation"[0m[1;39m: [0m[0;32m"UPDATE"[0m[1;39m,
    [0m[34;1m"operationParameters"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"predicate"[0m[1;39m: [0m[0;32m"[\"(name#7709 = Bob)\"]"[0m[1;39m
    [1;39m}[0m[1;39m,
    [0m[34;1m"readVersion"[0m[1;39m: [0m[0;39m0[0m[1;39m,
    [0m[34;1m"isolationLevel"[0m[1;39m: [0m[0;32m"Serializable"[0m[1;39m,
    [0m[34;1m"isBlindAppend"[0m[1;39m: [0m[0;39mfalse[0m[1;39m,
    [0m[34;1m"operationMetrics"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"numRemovedFiles"[0m[1;39m: [0m[0;32m"1"[0m[1;39m,
      [0m[34;1m"numRemovedBytes"[0m[1;39m: [0m[0;32m"960"[0m[1;39m,
      [0m[34;1m"numCopiedRows"[0m[1;39m: [0m[0;32m"0"[0m[1;39m,
      [0m[34;1m"numDeletionVectorsAdded"[0m[1;39m: [0m[0;32m"0"[0m[1;39m,
      [0m[34;1m"numDeletionVecto

### Time Travel

Time Travel in Delta Lake allows you to query a table **as it existed at a specific point in time or version**.  
By using `.option("versionAsOf", n)` when reading a Delta table, you can access historical data safely and consistently.  
For example, version 0 shows the original write, while later versions reflect updates, deletes, or merges.  
This feature is useful for **auditing changes, recovering previous data, or comparing versions** over time.  
All previous states are reconstructed from the `_delta_log` transaction logs, ensuring **ACID compliance** even in large-scale analytics.

In [None]:
from pyspark.sql import SparkSession
from delta.tables import *
import os

# Create SparkSession with Delta Lake support
spark = get_spark()

# Create initial Delta table
data = spark.createDataFrame([
    (1, "Alice", 25),
    (2, "Bob", 30),
    (3, "Charlie", 28)
], ["id", "name", "age"])

delta_path = "/tmp/delta-table"
data.write.format("delta").mode("overwrite").save(delta_path)  # Write initial data

# Load Delta table for further operations
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update operation: change name where id=1
deltaTable.update(
    condition="id == 1",
    set={"name": "'Alicia'"}
)

# Delete operation: remove rows where age > 28
deltaTable.delete(condition="age > 28")

# Merge operation: upsert new and updated data
new_data = spark.createDataFrame([
    (2, "Bobby", 31),  # Update existing record
    (4, "David", 22)   # Insert new record
], ["id", "name", "age"])

deltaTable.alias("t").merge(
    new_data.alias("s"),
    "t.id = s.id"
).whenMatchedUpdate(set={
    "name": "s.name",
    "age": "s.age"
}).whenNotMatchedInsert(values={
    "id": "s.id",
    "name": "s.name",
    "age": "s.age"
}).execute()

# Time Travel: read previous versions of the table
# Version 0: original write
print("=== Version 0 (original write) ===")
spark.read.format("delta").option("versionAsOf", 0).load(delta_path).show()

# Version 1: after update
print("=== Version 1 (after update) ===")
spark.read.format("delta").option("versionAsOf", 1).load(delta_path).show()

# Version 2: after delete
print("=== Version 2 (after delete) ===")
spark.read.format("delta").option("versionAsOf", 2).load(delta_path).show()

# Version 3: after merge
print("=== Version 3 (after merge) ===")
spark.read.format("delta").option("versionAsOf", 3).load(delta_path).show()

# Inspect Delta table history with metrics
deltaTable.history().select(
    "version",
    "timestamp",
    "operation",
    "operationMetrics"
).show(truncate=False)

# inspect physical files on disk
print("\n--- Delta Table Files ---")
for root, dirs, files in os.walk(delta_path):
    for f in files:
        print(os.path.join(root, f))


=== Version 0 (original write) ===
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  2|    Bob| 30|
|  3|Charlie| 28|
|  1|  Alice| 25|
+---+-------+---+

=== Version 1 (after update) ===
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  2|    Bob| 30|
|  3|Charlie| 28|
|  1| Alicia| 25|
+---+-------+---+

=== Version 2 (after delete) ===
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 28|
|  1| Alicia| 25|
+---+-------+---+

=== Version 3 (after merge) ===
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 28|
|  1| Alicia| 25|
|  2|  Bobby| 31|
|  4|  David| 22|
+---+-------+---+

+-------+-----------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Hands-on with MTA Realtime Data

In [None]:
# Uninstall any existing conflicting packages to avoid version conflicts.
# - pyspark: removes any pre-installed PySpark version
# - delta-spark: removes any previous Delta Lake Python package
# - dataproc-spark-connect: removes Google Colab‚Äôs built-in Spark connect package
!pip uninstall -y pyspark delta-spark dataproc-spark-connect

# Install compatible versions:
# - PySpark 3.5.1: works with Delta Lake 3.2.0
# - delta-spark 3.2.0: Delta Lake Python library
!pip install -q pyspark==3.5.1 delta-spark==3.2.0

# Force-install a compatible version of the protobuf library
!pip install protobuf==3.20.3 --quiet
!pip install requests --quiet
print("Successfully downgraded protobuf to 3.20.3 for compatibility.")

# Create the required directory structure for the imports
!mkdir -p com/google/transit/realtime
!mkdir -p google/protobuf

def get_spark():
    """Creates and returns a SparkSession configured for Delta Lake.

    This function sets up a SparkSession with the necessary Delta Lake
    extensions and catalog, ensuring that Delta features such as
    time travel, updates, and deletes are available.

    Returns:
        pyspark.sql.SparkSession: Configured SparkSession for Delta Lake.
    """
    from pyspark.sql import SparkSession
    from delta import configure_spark_with_delta_pip

    # Build the SparkSession with Delta Lake configurations
    builder = (
        SparkSession.builder.appName("DeltaLakeApp")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    )

    # Apply Delta Lake pip configuration and return the SparkSession
    return configure_spark_with_delta_pip(builder).getOrCreate()

spark = get_spark()
spark

Found existing installation: pyspark 3.5.1
Uninstalling pyspark-3.5.1:
  Successfully uninstalled pyspark-3.5.1
Found existing installation: delta-spark 3.2.0
Uninstalling delta-spark-3.2.0:
  Successfully uninstalled delta-spark-3.2.0
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m162.1/162.1 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
ydf 0.13.0 requires protobuf<7.0.0,>=5.29.1, but you have protobuf 3.20.3 which is incompatible.
grpcio-status 1.71.2 requires protobuf<6.0dev,>=5.26.1, but you have protobuf 3.20.3 which is incompatible.
tensorflow-metadata 1.17.2 requires protobuf>=4.25.2; python_version >= "3.11", but you have protobuf 3.20.3 which is incompatible.
opentelemetry-proto 1.37.0 requires proto

In [None]:
# Fetch the core Protobuf descriptor (defines MessageOptions)
!wget -O google/protobuf/descriptor.proto https://raw.githubusercontent.com/protocolbuffers/protobuf/main/src/google/protobuf/descriptor.proto

# Fetch the official base GTFS-Realtime proto
# Note: The import path in your MTA file suggests an older structure,
# but this is the official file content needed.
!wget -O com/google/transit/realtime/gtfs-realtime.proto https://raw.githubusercontent.com/google/transit/master/gtfs-realtime/proto/gtfs-realtime.proto

--2025-12-13 07:30:15--  https://raw.githubusercontent.com/protocolbuffers/protobuf/main/src/google/protobuf/descriptor.proto
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 58435 (57K) [text/plain]
Saving to: ‚Äògoogle/protobuf/descriptor.proto‚Äô


2025-12-13 07:30:15 (9.65 MB/s) - ‚Äògoogle/protobuf/descriptor.proto‚Äô saved [58435/58435]

--2025-12-13 07:30:15--  https://raw.githubusercontent.com/google/transit/master/gtfs-realtime/proto/gtfs-realtime.proto
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64770 (63K) [text/plai

In [None]:
%%writefile gtfs-realtime-NYCT.proto
// Copyright 2012 Metropolitan Transportation Authority
//
// Protocol definition NYCT-specific extensions to GTFS-realtime.
//
// As originally posted at:
//
//   https://groups.google.com/d/msg/gtfs-realtime/KWJhkXH0kBg/8rlRMxiL-ysJ√ü

syntax = "proto2";

import "com/google/transit/realtime/gtfs-realtime.proto";

option java_package = "com.google.transit.realtime";
package transit_realtime;

message TripReplacementPeriod {
  // The replacement period is for this route
  optional string route_id = 1;
  // The start time is omitted, the end time is currently now + 30 minutes for
  // all routes of the A division
  optional transit_realtime.TimeRange replacement_period = 2;
}

// NYCT Subway extensions for the feed header
message NyctFeedHeader {
  // Version of the NYCT Subway extensions
  // The current version is 1.0
  required string nyct_subway_version = 1;
  // For the NYCT Subway, the GTFS-realtime feed replaces any scheduled
  // trip within the trip_replacement_period.
  // This feed is a full dataset, it contains all trips starting
  // in the trip_replacement_period. If a trip from the static GTFS is not
  // found in the GTFS-realtime feed, it should be considered as cancelled.
  // The replacement period can be different for each route, so here is
  // a list of the routes where the trips in the feed replace all
  // scheduled trips within the replacement period.
  repeated TripReplacementPeriod trip_replacement_period = 2;
}

extend FeedHeader {
 optional NyctFeedHeader nyct_feed_header = 1001;
}

// NYCT Subway extensions for the trip descriptor
message NyctTripDescriptor {
  // The nyct_train_id is meant for internal use only. It provides an
  // easy way to associated GTFS-realtime trip identifiers with NYCT rail
  // operations identifier
  //
  // The ATS office system assigns unique train identification (Train ID) to
  // each train operating within or ready to enter the mainline of the
  // monitored territory. An example of this is 06 0123+ PEL/BBR and is decoded
  // as follows:
  //
  // The first character represents the trip type designator. 0 identifies a
  // scheduled revenue trip. Other revenue trip values that are a result of a
  // change to the base schedule include; [= reroute], [/ skip stop], [$ turn
  // train] also known as shortly lined service.
  //
  // The second character 6 represents the trip line i.e. number 6 train The
  // third set of characters identify the decoded origin time. The last
  // character may be blank "on the whole minute" or + "30 seconds"
  //
  // Note: Origin times will not change when there is a trip type change.  This
  // is followed by a three character "Origin Location" / "Destination
  // Location"
  optional string train_id = 1;

  // This trip has been assigned to a physical train. If true, this trip is
  // already underway or most likely will depart shortly.
  //
  // Train Assignment is a function of the Automatic Train Supervision (ATS)
  // office system used by NYCT Rail Operations to monitor and track train
  // movements. ATS provides the ability to "assign" the nyct_train_id
  // attribute when a physical train is at its origin terminal. These assigned
  // trips have the is_assigned field set in the TripDescriptor.
  //
  // When a train is at a terminal but has not been given a work program it is
  // declared unassigned and is tagged as such. Unassigned trains can be moved
  // to a storage location or assigned a nyct_train_id when a determination for
  // service is made.
  optional bool is_assigned = 2;

  // The direction the train is moving.
  enum Direction {
    NORTH = 1;
    EAST = 2;
    SOUTH = 3;
    WEST = 4;
  }
  // Uptown and Bronx-bound trains are moving NORTH.
  // Times Square Shuttle to Grand Central is also northbound.
  //
  // Downtown and Brooklyn-bound trains are moving SOUTH.
  // Times Square Shuttle to Times Square is also southbound.
  //
  // EAST and WEST are not used currently.
  optional Direction direction = 3;
}

extend transit_realtime.TripDescriptor {
  optional NyctTripDescriptor nyct_trip_descriptor = 1001;
}

// NYCT Subway extensions for the stop time update
message NyctStopTimeUpdate {
  // Provides the planned station arrival track. The following is the Manhattan
  // track configurations:
  // 1: southbound local
  // 2: southbound express
  // 3: northbound express
  // 4: northbound local
  //
  // In the Bronx (except Dyre Ave line)
  // M: bi-directional express (in the AM express to Manhattan, in the PM
  // express away).
  //
  // The Dyre Ave line is configured:
  // 1: southbound
  // 2: northbound
  // 3: bi-directional
  optional string scheduled_track = 1;

  // This is the actual track that the train is operating on and can be used to
  // determine if a train is operating according to its current schedule
  // (plan).
  //
  // The actual track is known only shortly before the train reaches a station,
  // typically not before it leaves the previous station. Therefore, the NYCT
  // feed sets this field only for the first station of the remaining trip.
  //
  // Different actual and scheduled track is the result of manually rerouting a
  // train off it scheduled path.  When this occurs, prediction data may become
  // unreliable since the train is no longer operating in accordance to its
  // schedule.  The rules engine for the 'countdown' clocks will remove this
  // train from all schedule stations.
  optional string actual_track = 2;
}

extend transit_realtime.TripUpdate.StopTimeUpdate {
  optional NyctStopTimeUpdate nyct_stop_time_update = 1001;
}

Writing gtfs-realtime-NYCT.proto


In [None]:
# The key is to compile the *base* file and tell the compiler to use the current
# directory as the root (via --proto_path=.), so the output directory structure is correct.

!protoc --python_out=. --proto_path=. com/google/transit/realtime/gtfs-realtime.proto
!protoc --python_out=. --proto_path=. gtfs-realtime-NYCT.proto

In [None]:
# Create the necessary __init__.py files to make the directories Python packages.
!touch com/__init__.py
!touch com/google/__init__.py
!touch com/google/transit/__init__.py
!touch com/google/transit/realtime/__init__.py

print("Created __init__.py files to enable package imports.")

Created __init__.py files to enable package imports.


In [None]:
import requests
import datetime
import sys
import os
from google.protobuf.message import DecodeError

# --- FIX: Ensure current directory is in sys.path ---
if os.getcwd() not in sys.path:
    sys.path.append(os.getcwd())
    print(f"Added current directory to sys.path: {os.getcwd()}")
# ---------------------------------------------------

# Import the generated protobuf classes.
try:
    # 1. CORRECTED IMPORT (Case-sensitive)
    import gtfs_realtime_NYCT_pb2 as nyct_pb2

    # 2. Import the base GTFS-realtime classes (Now works due to __init__.py files)
    from com.google.transit.realtime import gtfs_realtime_pb2
    print("Successfully imported Protobuf modules.")

except ImportError as e:
    print(f"FATAL ERROR: Could not import generated Protobuf files: {e}")
    # Raise the error again if it still fails
    raise

# Define the FeedMessage object
FeedMessage = gtfs_realtime_pb2.FeedMessage

# --- Configuration ---
MTA_FEED_URL = "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-nqrw"
HEADERS = {'Accept': 'application/x-protobuf'}
# --- End Configuration ---


def fetch_and_parse_mta_feed(url, headers=None):
    """Fetches the MTA GTFS-realtime feed and parses the Protocol Buffer data."""
    print(f"\nFetching data from: {url}")
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()

        feed = FeedMessage()
        feed.ParseFromString(response.content)

        print("Successfully parsed FeedMessage.")
        return feed

    except requests.exceptions.RequestException as e:
        print(f"Error fetching data: {e}")
        return None
    except DecodeError as e:
        print(f"Error decoding Protocol Buffer data: {e}. Check compilation.")
        return None

def display_feed_info(feed):
    """Iterates through the parsed feed and prints key information about trips."""

    timestamp = feed.header.timestamp
    feed_time = datetime.datetime.fromtimestamp(timestamp)

    print(f"MTA N/Q/R/W Realtime Feed Analysis - Generated At: {feed_time.strftime('%H:%M:%S')}")

    trip_updates_count = 0

    for entity in feed.entity:
        if entity.HasField('trip_update'):
            trip_updates_count += 1
            trip_update = entity.trip_update

            route_id = trip_update.trip.route_id

            # Access the extensions using the imported module (nyct_pb2)
            nyct_trip = trip_update.trip.Extensions[nyct_pb2.nyct_trip_descriptor]
            direction_name = nyct_pb2.NyctTripDescriptor.Direction.Name(nyct_trip.direction)

            print("-" * 70)
            print(f"üöÇ Route: {route_id} | Direction: {direction_name} | Assigned: {nyct_trip.is_assigned}")
            print("-" * 70)
            print(entity)


    print(f"\nFinished parsing {trip_updates_count} trip updates.")


# --- Main Execution ---
parsed_feed = fetch_and_parse_mta_feed(MTA_FEED_URL, HEADERS)

if parsed_feed:
    display_feed_info(parsed_feed)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
    departure {
      time: 1765612265
    }
    stop_id: "D29S"
    departure_occupancy_status: NO_DATA_AVAILABLE
    [transit_realtime.nyct_stop_time_update] {
      scheduled_track: "A1"
      actual_track: "A1"
    }
  }
  stop_time_update {
    arrival {
      time: 1765612295
    }
    departure {
      time: 1765612295
    }
    stop_id: "D30S"
    departure_occupancy_status: NO_DATA_AVAILABLE
    [transit_realtime.nyct_stop_time_update] {
      scheduled_track: "A1"
      actual_track: "A1"
    }
  }
  stop_time_update {
    arrival {
      time: 1765612385
    }
    departure {
      time: 1765612385
    }
    stop_id: "D31S"
    departure_occupancy_status: NO_DATA_AVAILABLE
    [transit_realtime.nyct_stop_time_update] {
      scheduled_track: "A1"
      actual_track: "A1"
    }
  }
  stop_time_update {
    arrival {
      time: 1765612475
    }
    departure {
      time: 1765612475
    }
    stop_id: "D32S"
   

# [ Udemy ] - Databricks Certified Data Engineer Professional Certification Preparation

## Modeling Data Management Solutions

### Official Databricks Documentation References
* [Medallion Architecture (Bronze, Silver, Gold)](https://docs.databricks.com/aws/en/lakehouse/medallion)
* [What is Auto Loader (cloudFiles)](https://docs.databricks.com/aws/en/ingestion/cloud-object-storage/auto-loader/)
* [Use foreachBatch with Structured Streaming](https://docs.databricks.com/aws/en/structured-streaming/foreach)
* [Watermarking in Structured Streaming](https://docs.databricks.com/gcp/en/structured-streaming/watermarks)

## Conceptual Foundation: The Role of the Bronze Layer

In the Databricks Medallion Architecture, the **Bronze layer** exists to capture data **exactly as it arrives from source systems**. It is not designed to enforce business rules, remove duplicates, or optimize for analytics. Instead, it preserves **<u>raw, replayable data</u>** with minimal transformation so that downstream layers can evolve independently as requirements change.

Because the Bronze layer functions as the **<u>system of record</u>**, it must support both batch and streaming ingestion, maintain **full historical context**, tolerate schema evolution, and allow reprocessing without loss of fidelity. Any design decision at this layer should favor **durability, traceability, and scalability** over convenience.

## Ingestion Modeling: How Sources Map to Bronze Tables

One of the earliest architectural choices in Bronze ingestion is how source datasets are mapped to tables. In Databricks, this typically resolves into two patterns: **singleplex ingestion** and **multiplex ingestion**.

This decision is not cosmetic. It directly influences **<u>scalability limits, operational overhead, schema management</u>**, and the behavior of streaming workloads over time.

## Singleplex Ingestion: Simplicity First

Singleplex ingestion follows a **one-to-one mapping** between a source and a Bronze table. Each dataset, topic, or file feed has its own ingestion job and its own table. In a bookstore domain, this might result in tables such as **`bronze_customers`**, **`bronze_orders`**, and **`bronze_books`**, each populated by a dedicated pipeline.

This pattern is intuitive and easy to reason about. Schemas are **tightly coupled to their sources**, lineage is immediately clear, and querying raw data is straightforward. These qualities make singleplex ingestion well suited for **batch workloads** or environments with a **small, stable number of sources**.

However, the simplicity comes at a cost. In streaming systems, each source requires its own streaming job, and Databricks enforces limits on **<u>concurrent streaming queries per workspace</u>**. As the number of sources grows, operational overhead increases non-linearly. For this reason, **singleplex ingestion does not scale well** for high-volume, event-driven architectures.

## Multiplex Ingestion: Scaling for Streaming Systems

Multiplex ingestion takes the opposite approach. Instead of creating one table per source, **many datasets or topics are ingested into a single Bronze table**. This pattern is common in **Kafka-like systems**, pub/sub architectures, and large-scale file ingestion pipelines.

In a multiplexed Bronze table, records from different sources are distinguished by **metadata rather than table boundaries**. A typical schema includes a **`topic` or source identifier**, a raw **`value` payload** (often JSON), and ingestion or event timestamps. Partitioning columns such as year or month are usually derived from these timestamps.

This design dramatically reduces the number of required streaming jobs, centralizes ingestion logic, and aligns naturally with **<u>event-driven architectures</u>**. For this reason, **multiplex ingestion is the recommended pattern** for large-scale streaming pipelines in Databricks.

The trade-off is intentional complexity. Bronze tables become less human-readable, schemas are no longer isolated per dataset, and downstream logic must explicitly filter and parse records by topic. As a result, the **Silver layer becomes the point where structure and meaning are restored**.

## Promoting Data from Bronze to Silver in a Multiplex Architecture

In a multiplex design, promotion to the **Silver layer** is where raw events are transformed into domain-specific datasets. Silver pipelines typically read from the shared Bronze table, **filter by topic**, parse and validate payloads, enforce schemas, and apply data quality rules.

Each Silver table evolves independently, even though all data originates from the same Bronze source. This separation allows ingestion logic to remain stable while business semantics change, which is a **<u>core principle of the Medallion Architecture</u>**.

## Auto Loader as a Streaming Ingestion Engine

Databricks **Auto Loader (`cloudFiles`)** is frequently used as a file-based alternative to Kafka for Bronze ingestion. Rather than scanning directories repeatedly, Auto Loader **incrementally discovers new files**, tracks progress, and guarantees **exactly-once processing semantics**.

Auto Loader supports **schema inference and schema evolution**, making it well suited for semi-structured formats such as JSON. When schema evolution is enabled, new columns are automatically added to the schema as they appear in incoming data. The schema itself is stored in a dedicated location so that streams can restart safely after evolution events.

A critical behavioral detail is that **automatic schema evolution is not allowed when a schema is explicitly provided**. This distinction is subtle, frequently tested in certification exams, and a common source of production failures.

## Managing Time and Partitioning in the Bronze Layer

Streaming timestamps often arrive as Unix epoch values or source-specific formats. In the Bronze layer, these are typically normalized into **human-readable timestamps**, from which partitioning columns such as year and month are derived.

Partitioning by time, and often by topic in multiplex tables, improves **query performance**, enables **efficient incremental processing**, and reduces data scanning. A common strategy is partitioning by **`(topic, year_month)`**, which balances flexibility and performance.

## Operational Stability in Auto Loader Pipelines

To keep streaming pipelines reliable, ingestion rate controls are often applied. Limiting the amount of data processed per micro-batch helps prevent **memory pressure** and keeps batch durations predictable.

Fault tolerance is equally important. Malformed records, invalid JSON, and schema mismatches are expected in real-world pipelines. By configuring a **bad records path**, these problematic records can be quarantined without failing the stream, preserving both **availability and auditability**.

Selective ingestion is also common when directories contain multiple file types. Path-based filtering ensures that only relevant files are processed, reducing noise and operational risk.

## Streaming Execution Modes and Backfills

Structured Streaming supports multiple trigger modes. The **`availableNow` trigger** processes all currently available data and then stops automatically. It behaves like a batch job while still using streaming semantics, making it ideal for **backfills, historical loads, and controlled ingestion**.

## Duplicate Data: An Expected Reality in Streaming

In at-least-once delivery systems such as Kafka, **duplicate records are normal and expected**. Consumer restarts, retries, and offset reprocessing can all lead to the same message being delivered more than once.

For this reason, the Bronze layer **must not attempt deduplication**. Removing duplicates at ingestion time risks **data loss** and breaks replayability. Deduplication is intentionally deferred to the Silver layer, where business keys and semantics are known. This design choice is **<u>frequently tested in Databricks certification exams</u>**.

## Deduplication in Structured Streaming

Structured Streaming provides **`dropDuplicates()`** to remove duplicate records based on key columns. In streaming mode, Spark tracks **state information** so duplicates can be detected within and across micro-batches.

Without constraints, this state would grow indefinitely. **Watermarking** solves this by defining how late data is allowed to arrive and how long Spark should retain state. Events older than the watermark are treated as new, trading absolute correctness for **bounded state and operational stability**.

## Deduplicating Against Existing Target Tables

Streaming deduplication must also account for data already written to the target table. Because each micro-batch is processed independently, Spark does not automatically know what already exists.

Delta Lake‚Äôs **`MERGE INTO`** addresses this gap. By using **insert-only merge semantics**, incoming records are matched against existing ones, and only new records are inserted. This ensures **<u>idempotent writes</u>**, which is a critical property of reliable streaming pipelines.

## Using `foreachBatch` for Advanced Streaming Logic

Default streaming sinks do not support complex transactional logic. The **`foreachBatch`** interface allows each micro-batch to be treated as a static DataFrame, enabling custom write behavior such as Delta Lake merges.

Each micro-batch can be written atomically, preserving consistency and allowing sophisticated logic that would otherwise be impossible in streaming mode. This pattern is considered **standard practice** in Databricks streaming design.

## Slowly Changing Dimensions in the Lakehouse

Slowly Changing Dimensions describe how dimensional data evolves over time. Some dimensions never change, some overwrite previous values, and others require **full historical tracking**.

**SCD Type 2** is the most commonly implemented pattern. It preserves history by inserting new records for changes while marking old records as inactive using effective dates and current-state flags. This approach supports accurate historical analysis and aligns well with **Delta Lake‚Äôs transactional model**.

Delta Time Travel, while useful for short-term recovery, is **not a replacement** for SCD Type 2. It is affected by retention policies, increases storage costs over time, and lacks business semantics. SCD tables are designed explicitly for **long-term historical correctness**.

## Closing Perspective

A well-designed Bronze ingestion layer prioritizes **raw data fidelity, replayability, and scalability**. Singleplex ingestion favors simplicity but does not scale for streaming systems, while multiplex ingestion introduces complexity that is intentionally pushed downstream into the Silver layer.

Auto Loader, Structured Streaming, watermarking, `foreachBatch`, and Delta Lake merges together form the **foundation of modern Databricks ingestion architectures**. Understanding not just *how* these components work, but *why they are used this way*, is essential for building production-grade pipelines and succeeding in Databricks certification exams.


## Data Processing

### Official Databricks Documentation References

* [Medallion Architecture (Bronze, Silver, Gold)](https://docs.databricks.com/aws/en/lakehouse/medallion)
* [Delta Lake MERGE INTO](https://docs.databricks.com/aws/en/delta/merge)
* [Use foreachBatch with Structured Streaming](https://docs.databricks.com/aws/en/structured-streaming/foreach)
* [Delta Lake Change Data Feed (CDF)](https://docs.databricks.com/aws/en/delta/delta-change-data-feed)


## Conceptual Foundation: Change Data Capture in the Lakehouse

**Change Data Capture (CDC)** refers to the process of identifying and propagating **row-level changes**‚Äîinserts, updates, and deletes‚Äîfrom a source system to a downstream target. Instead of repeatedly moving full datasets, CDC focuses on **what changed**, significantly reducing data movement and enabling near real-time synchronization.

In a Databricks Lakehouse, CDC most commonly appears at the **Silver layer boundary**, where raw events from the Bronze layer are transformed into **current, queryable domain tables**. At this stage, data engineers must reconcile multiple updates, preserve ordering, and apply changes in a way that guarantees **correctness, idempotency, and replayability**.

CDC can originate from **external systems** (such as databases emitting CDC events via Kafka or files) or be **generated internally** by Delta Lake itself using Change Data Feed (CDF). While both approaches deal with changes, they solve different architectural problems and must be used deliberately.

## External CDC Feeds: Applying Source-System Changes

External CDC feeds represent changes captured directly from source systems. Each event typically contains the **full row payload** along with metadata indicating whether the row was **inserted, updated, or deleted**, and a **timestamp or version** describing when the change occurred.

A critical characteristic of CDC feeds is that **multiple changes for the same primary key can arrive** within a short time window. For example, a customer record may be updated several times before downstream processing occurs. Applying these changes na√Øvely would lead to ambiguity or failure when writing to a target table.

Because of this, CDC ingestion is **not append-only**. It requires reconciliation logic that understands ordering, resolves conflicts, and applies only the **most recent state per key**.

## Delta Lake MERGE as the CDC Application Mechanism

Delta Lake provides the **`MERGE INTO`** command as the primary mechanism for applying CDC changes. MERGE allows insert, update, and delete logic to be expressed declaratively, matching source records to target rows using a business key.

However, MERGE enforces a strict correctness rule: **a single target row cannot be modified by multiple source rows in the same operation**. If a CDC feed contains more than one change for the same key, the merge will fail with an exception.

This constraint is intentional. It prevents nondeterministic updates and forces engineers to make ordering explicit before applying changes. As a result, CDC feeds **must be pre-processed** so that only the **latest change per key** is presented to the MERGE operation.

## Resolving Multiple Updates with Window Functions

To ensure that only the most recent change per key is applied, CDC pipelines commonly use **window functions with ranking**. Records are partitioned by the business key and ordered by the change timestamp in descending order. The newest record within each partition receives rank 1.

By filtering on this rank, the dataset is reduced to **exactly one row per key**, satisfying the MERGE constraint and ensuring deterministic behavior. This approach differs fundamentally from simple deduplication. While `dropDuplicates()` removes identical rows, CDC updates are **not identical**‚Äîthey represent different states of the same entity.

This ranking logic is therefore essential whenever CDC feeds contain multiple updates for the same key, which is the normal case in real-world systems.

## Streaming CDC and the Role of `foreachBatch`

A key operational limitation emerges when CDC feeds are processed using **Structured Streaming**. Window functions that require global ordering, such as rank over a partition, are **not supported directly on streaming DataFrames** due to unbounded state requirements.

Databricks addresses this limitation through **`foreachBatch`**. This interface allows each micro-batch of a streaming query to be treated as a **static DataFrame**, enabling full batch semantics inside a streaming pipeline.

Within `foreachBatch`, CDC records can be ranked, filtered to the latest change per key, and merged atomically into a Delta table. This pattern is considered **standard practice** for applying CDC in streaming architectures, as it combines the scalability of streaming ingestion with the expressiveness and safety of batch operations.

## Enrichment and Idempotency in CDC Pipelines

CDC application often includes **data enrichment**, such as joining reference or lookup tables. These lookup tables are typically small and static, making them ideal candidates for **broadcast joins**, which reduce shuffle costs and improve performance.

Equally important is **idempotency**. CDC pipelines must tolerate retries, restarts, and partial failures without introducing duplicate or inconsistent data. Delta Lake‚Äôs transactional guarantees, combined with deterministic MERGE logic inside `foreachBatch`, ensure that each micro-batch is applied **exactly once** from a logical perspective.

Trigger modes such as **`availableNow`** are commonly used for CDC backfills and controlled loads. This mode processes all available data and then stops automatically, behaving like a batch job while preserving streaming semantics.

## Delta Lake Change Data Feed: CDC Generated by Delta

While external CDC captures changes **before data enters the Lakehouse**, **Delta Lake Change Data Feed (CDF)** captures changes **after data is written** to a Delta table. CDF automatically records **row-level inserts, updates, and deletes**, along with metadata such as change type, commit version, and commit timestamp.

When CDF is enabled, Delta Lake exposes these changes through the `table_changes` function. Consumers can query changes starting from a specific table version or timestamp, making CDF ideal for **incremental propagation in multi-hop architectures**.

A defining characteristic of CDF is how it represents updates. Each update produces **two records**: a *pre-image* showing the row state before the change, and a *post-image* showing the state after the change. This provides full visibility into how data evolved over time.

## Enabling and Retaining Change Data Feed

CDF is **not enabled by default**. It must be explicitly activated by setting the table property `delta.enableChangeDataFeed = true`, either at table creation time or via an ALTER TABLE command. It can also be enabled globally for newly created tables using Spark configuration.

CDF data follows the **same retention policy as the underlying Delta table**. When a VACUUM operation removes old files, corresponding CDF data is also deleted. For this reason, CDF should not be treated as a long-term audit log unless retention policies are configured accordingly.


## When to Use CDC Feeds vs. Delta CDF

External CDC feeds and Delta CDF serve different purposes and should not be conflated. External CDC is appropriate when changes originate **outside the Lakehouse**, such as operational databases. Delta CDF is designed to propagate changes **between Delta tables** within the Lakehouse.

CDF is most effective when **only a small fraction of rows change per batch**, which is typical for CDC-style updates. It should not be used when tables are **frequently overwritten** or when most rows change in every batch, as this negates its efficiency advantages.

For append-only workloads, CDF is unnecessary. Standard streaming reads are simpler and more efficient in such cases.

## Stream-Static Joins: Enriching Streaming Data with Reference Tables

A **stream-static join** is a Structured Streaming pattern used to enrich **append-only streaming data** with attributes from a **static table**. Streaming tables are incremental by definition: data can only be appended, and previously processed records are never revisited. Static tables, by contrast, may be updated, deleted, or fully overwritten, which makes them unsuitable as streaming sources.

In a stream-static join, the **streaming side drives the computation**. Each new record arriving in the stream is joined against the **latest snapshot** of the static table at query time. The result of this join is itself an **incremental table**, emitting rows only when new streaming data appears.

This pattern relies on Delta Lake‚Äôs guarantee that every query against a Delta table returns the **most recent committed version**. As a result, changes made to the static table are visible to future streaming records, but they do **not retroactively update** previously produced results.

## Limitations of Stream-Static Joins

Stream-static joins are **not stateful**. If a streaming record does not find a match in the static table at the time it is processed, that record is **permanently missed** in the join output. Later inserts or updates to the static table will not trigger reprocessing or backfilling of unmatched streaming records.

Because streaming systems cannot buffer unmatched records indefinitely, there is no mechanism to ‚Äúwait‚Äù for future static data to arrive. This limitation is fundamental to the streaming execution model.

To address this gap, architectures often include a **separate batch reconciliation job** that periodically scans for unmatched records and inserts the missing results. This hybrid approach balances streaming latency with eventual completeness.

## Closing Perspective

Change Data Capture is a **core capability** of modern data platforms, but it introduces complexity that must be handled explicitly. In Databricks, this complexity is addressed through a combination of **Delta Lake MERGE**, **window-based reconciliation**, and **`foreachBatch` execution patterns**.

Delta Lake Change Data Feed complements this model by providing **native CDC generation** for downstream consumers, enabling clean and efficient multi-hop architectures.

A correct CDC design prioritizes **determinism, idempotency, and explicit ordering**. Understanding not just how these mechanisms work, but **why they are structured this way**, is essential for building reliable production pipelines and for succeeding in Databricks certification exams.


## Improving Performance

### Official Databricks Documentation References

* [Optimize Data File Layout (OPTIMIZE, Z-ORDER, Clustering)](https://docs.databricks.com/delta/optimize.html)
* [Predictive Optimization for Unity Catalog Managed Tables](https://docs.databricks.com/optimizations/predictive-optimization.html)
* [Deletion Vectors](https://docs.databricks.com/delta/deletion-vectors)
* [Python UDFs in Databricks](https://docs.databricks.com/udf/python)
* [VACUUM and Transaction Log Retention](https://docs.databricks.com/delta/vacuum)

## Partitioning Delta Lake Tables in the Lakehouse

**Partitioning** is a physical data layout strategy used to optimize query performance on large Delta Lake tables. A partition represents a subset of rows that share the same value for one or more **partitioning columns**, and each partition is stored in a separate directory on disk.

When a table is partitioned, Databricks automatically routes incoming rows to the correct partition directory during writes. During query execution, Spark can skip entire partitions that do not satisfy filter predicates on partition columns, significantly reducing the amount of data scanned. This mechanism improves both **query latency and resource efficiency**, especially for very large tables.

Partitioning is defined at table creation time using the **`PARTITION BY`** clause and can include multiple columns, such as year and month. Delta Lake operations like **OPTIMIZE** can also be applied selectively at the partition level, enabling targeted maintenance.

## Choosing Effective Partitioning Columns

Partitioning decisions must be made carefully, as they have long-term consequences. The most effective partitioning columns are those with **low cardinality** and that frequently appear in query filters. Columns representing measures of time, such as dates or months, are especially well suited, as they naturally bound data growth and support archiving and retention strategies.

Partition size is equally important. As a general guideline, each partition should contain **at least 1 GB of data**, though optimal sizes may vary depending on total table scale. If most partitions are significantly smaller, the table is likely **over-partitioned**, leading to excessive metadata, too many files, and degraded query performance.

Partitioning is not always beneficial. Small or medium-sized tables often perform better without partitions, as Delta Lake‚Äôs built-in data skipping and file statistics are usually sufficient. Once a table is incorrectly partitioned, correcting it requires a **full rewrite of all data**, making partitioning a decision that should err on the side of restraint.

## Partitioning, Retention, and Streaming Constraints

Partitioning is particularly valuable for **data lifecycle management**. Deleting or archiving old data can be done cleanly along partition boundaries, such as removing an entire year of data, which reduces operational complexity and storage costs.

However, when a partitioned table is used as a **streaming source**, deleting partitions violates the append-only requirement of Structured Streaming. To maintain streamability in such cases, the **`ignoreDeletes`** option must be enabled so that streaming queries can continue despite partition-level deletions. Physical file removal only occurs after running **VACUUM**, preserving Delta Lake‚Äôs transactional guarantees.

## Data File Layout and Data Skipping

The **data file layout** of a Delta table describes how records are distributed across underlying Parquet files. Optimizing this layout is essential for enabling **data skipping**, a core performance feature where Spark avoids reading files that cannot contain relevant data for a query.

Delta Lake leverages multiple layout optimization techniques to reduce file scans and improve performance. These techniques aim to minimize unnecessary I/O while preserving flexibility and transactional correctness.

## Z-Order Indexing: Optimizing Without Partitions

**Z-order indexing** reorganizes data within files to colocate related column values, improving file-level pruning without introducing partition directories. It is applied by running **OPTIMIZE ZORDER BY** on one or more columns.

By clustering values together, Z-ordering allows Delta Lake to skip large portions of data when filtering on those columns. This approach is especially effective for columns with moderate cardinality that frequently appear in query predicates.

The key limitation of Z-ordering is that it is **not incremental**. Whenever new data is ingested, Z-ordering must be rerun, potentially rewriting large portions of the table. For frequently updated tables, this can become computationally expensive.

## Liquid Clustering: Incremental and Flexible Optimization

**Liquid clustering** is an evolution of Z-order indexing that provides incremental optimization and greater flexibility. Clustering keys are defined at the table level using **`CLUSTER BY`**, eliminating the need to specify them on every OPTIMIZE run.

When OPTIMIZE is executed on a clustered table, only **new or unoptimized files** are reorganized. Previously clustered files are left untouched, making the operation significantly faster and more efficient. Unlike partitioning, clustering keys can be **redefined over time** without rewriting existing data, allowing the physical layout to evolve alongside changing query patterns.

Liquid clustering is mutually exclusive with partitioning and Z-ordering. Migrating to clustering typically involves creating a new table that projects existing partition and Z-order columns as clustering keys.

## Automatic Liquid Clustering and Predictive Optimization

Choosing optimal clustering keys requires insight into query workloads, which is not always available upfront. **Automatic Liquid Clustering** addresses this challenge by allowing Databricks to select clustering keys automatically based on historical query patterns. This capability is enabled using **`CLUSTER BY AUTO`** and requires **Predictive Optimization** on Unity Catalog managed tables.

Predictive Optimization is an AI-driven service that automatically performs **OPTIMIZE**, **VACUUM**, and **ANALYZE** operations. It incrementally clusters data, updates table statistics for the query optimizer, and removes obsolete files to reduce storage costs. When enabled, OPTIMIZE operations do not apply Z-ordering, as clustering supersedes it.

Databricks recommends enabling Predictive Optimization broadly to reduce manual maintenance and improve long-term performance stability.

## Delta Lake Transaction Log and Checkpoints

Every commit to a Delta table is recorded as a JSON file in the **transaction log**, capturing actions such as adding or removing data files. Over time, processing many small log files would become inefficient.

To address this, Databricks periodically writes **checkpoint files** in Parquet format that capture the entire table state at a specific version. Spark can skip directly to the most recent checkpoint and process only newer log entries, dramatically accelerating table state resolution.

Checkpointing also enables efficient **time travel**, allowing queries against historical table versions within the configured log retention window.

## File Statistics and Data Skipping

The Delta transaction log stores **file-level statistics** for each data file, including row counts and minimum, maximum, and null counts for the first 32 columns in the schema. These statistics are always leveraged by Delta Lake to enable file skipping.

Because nested fields count toward the first 32 columns, deeply nested schemas can quickly exhaust this limit. High-cardinality string fields, such as free-text columns, often provide little value for statistics and should be placed later in the schema to avoid unnecessary computation.

Log files themselves are retained independently of data files. While VACUUM removes data files, transaction log cleanup is governed by **`delta.logRetentionDuration`**, which defaults to 30 days and determines how far back time travel is possible.

## Auto Optimize and File Size Tuning

**Auto Optimize** reduces the small-file problem by automatically compacting data during and after writes. It consists of two features: **optimized writes**, which aim to write files around 128 MB, and **auto compaction**, which further merges small files opportunistically.

These features can be enabled at the table level or controlled at the session level, allowing fine-grained operational control. Auto compaction does not support Z-ordering, as compaction focuses on file size rather than data locality.

Importantly, Databricks can dynamically adjust file sizes based on workload patterns. For example, tables with frequent MERGE operations may benefit from smaller files to minimize rewrite costs, and Auto Optimize adapts accordingly.

## Deletion Vectors: Efficient Row-Level Modifications

**Deletion vectors** optimize UPDATE, DELETE, and MERGE operations by avoiding full Parquet file rewrites when only a few rows change. Instead, Delta Lake writes small auxiliary files that logically mark rows as deleted or modified.

These changes are applied lazily and become physical only when files are rewritten during OPTIMIZE or auto compaction. Deletion vectors are enabled by default on modern runtimes and can be controlled using the **`delta.enableDeletionVectors`** table property.

## Python UDFs and Their Performance Trade-offs

**Python user-defined functions (UDFs)** allow custom transformations that are not expressible using built-in Spark functions. While flexible, Python UDFs cannot be optimized by Spark‚Äôs query engine and incur overhead due to data serialization between the JVM and Python runtime.

To mitigate this cost, **Pandas UDFs** use Apache Arrow to transfer data in columnar batches, enabling vectorized execution and significantly better performance. Pandas UDFs can be registered for use in both Python and SQL, combining expressiveness with efficiency.

## Grouped Pandas UDFs with `applyInPandas`

The **`applyInPandas`** API enables group-level operations using Pandas DataFrames while retaining Spark‚Äôs distributed execution model. Each group is processed independently as a Pandas DataFrame, allowing stateful and complex logic such as rolling computations or custom aggregations.

This pattern is widely used in advanced analytics and data science workflows, where Pandas‚Äô ecosystem complements Spark‚Äôs scalability. The output schema must be explicitly defined, ensuring predictable integration with downstream pipelines.

## Closing Perspective

Partitioning, clustering, file layout optimization, and transaction log mechanics define the **physical efficiency layer** of the Databricks Lakehouse. Features such as Auto Optimize, Predictive Optimization, and deletion vectors automate many of these concerns, allowing engineers to focus on data modeling rather than constant tuning.

At the same time, UDFs and Pandas integrations provide controlled escape hatches for custom logic when native Spark functions are insufficient. Understanding how these mechanisms interact‚Äîand where their limits lie‚Äîis essential for building scalable, maintainable, and cost-efficient Delta Lake architectures.


## ETL Pipelines

### Official Databricks Documentation References

* [Lakeflow Spark Declarative Pipelines (overview & concepts)](https://docs.databricks.com/aws/en/ldp/)
* [AUTO CDC INTO (pipelines) SQL reference](https://docs.databricks.com/aws/en/ldp/developer/ldp-sql-ref-apply-changes-into)
* [The AUTO CDC APIs: Simplify change data capture with pipelines](https://docs.databricks.com/aws/en/ldp/cdc)
* [Tutorial: Build an ETL pipeline using change data capture](https://docs.databricks.com/aws/en/dlt/tutorial-pipelines)
* [Lakeflow Spark Declarative Pipelines Python API reference](https://docs.databricks.com/aws/en/ldp/developer/python-ref)

## LakeFlow Declarative Pipelines (LDP) Overview

**LakeFlow Declarative Pipelines (LDP)** is a declarative ETL framework for building **reliable, maintainable, and scalable** data pipelines on Databricks. It is built on **Apache Spark**, but differs fundamentally by allowing users to define **what the pipeline should produce**, not *how* it should be executed.

By abstracting execution details such as orchestration, checkpointing, retries, and optimization, LDP removes much of the operational complexity traditionally associated with large-scale Spark pipelines.

LDP was previously known as **Delta Live Tables (DLT)**. It has since been **open sourced** and integrated into the Apache Spark ecosystem under the name **Spark Declarative Pipelines**, while remaining deeply integrated with the Databricks platform.

## Declarative Model vs Apache Spark

In traditional Apache Spark pipelines, engineers explicitly control execution flow using APIs such as **`readStream`** and **`writeStream`**, manage checkpoints manually, and define job orchestration externally.

In contrast, **LDP uses a declarative model**, where pipeline logic is defined as a graph of tables and views. From this definition, Databricks automatically derives:

* The **execution order**
* **Stable dependencies** between objects
* **Fault tolerance and retries**
* **Performance optimizations**

This execution graph is automatically visualized in the pipeline UI, providing a clear and always up-to-date view of data lineage.

## LDP Object Types

LDP supports three object types, all defined declaratively in **SQL or Python**:

### Streaming Tables

**Streaming tables** are **materialized** tables designed for **incremental data processing**. Each pipeline run processes only **new data** added since the previous run.

Streaming tables support append-only sources such as **Kafka**, **Auto Loader**, or append-only Delta tables read via **Spark Structured Streaming**. They enable **near real-time ingestion**, with latencies typically in the **millisecond range**.

### Materialized Views

**Materialized views** are **persisted tables** that are fully recomputed on each pipeline run using a **full refresh** model. They are commonly used for:

* Precomputing **complex joins and aggregations**
* Accelerating **BI dashboards**
* Ingesting data from **non-streamable sources** that include updates, deletes, or overwrites

On **serverless compute**, some materialized views can be **incrementally refreshed**, but their update latency is typically **seconds to minutes**, not milliseconds.

### Temporary Views

**Temporary views** are **non-persisted**, pipeline-scoped objects used for **intermediate transformations** and **data quality checks**. They are not written to the data catalog and do not expose data previews.

Temporary views are ideal for shaping data between pipeline stages without creating permanent datasets.

## SQL and Python Syntax in LDP

In **SQL**, objects are created using:

*Create or refresh `<object_type>` `<object_name>`*

In **Python**, objects are defined using function decorators that declare the object type. The returned DataFrame determines whether the object is streaming or static.

Unlike Spark Structured Streaming:

* **Streaming tables can be created natively in SQL** using `CREATE STREAMING TABLE`
* No explicit `writeStream` call is required
* **Checkpointing is automatically managed** by LDP

## Execution Model and Pipeline Validation

LDP pipelines are defined as **script files** (`.py` or `.sql`), not notebooks. This enables:

* **Dry runs** to validate pipeline logic without modifying data
* Clear separation of pipeline definition and execution

This replaces the notebook-based validation model used in the legacy DLT framework.

## Data Quality Expectations in LDP

**Expectations** are **data quality constraints** that validate records as they flow through LDP pipelines. They are defined using declarative functions in SQL or Python and are evaluated automatically during pipeline execution.

By default, expectation violations are **tracked and reported in metrics**, but violating records are still written to the target table.

## Expectation Actions and Semantics

LDP supports three expectation behaviors:

* **Expect (warning)**
  Violating records are written to the table, but violation counts are logged in metrics.

* **Expect or Drop**
  Violating records are **removed** before being written, and their count is tracked.

* **Expect or Fail**
  Any violation causes the pipeline flow to **fail and roll back**, requiring manual intervention.

These behaviors mirror Delta Lake check constraints, but are applied **during data processing**, not post-write.

## Defining Expectations in Python and SQL

Expectations can be defined individually or grouped:

* In **SQL**, expectations are defined inline with table or view definitions.
* In **Python**, expectations can be grouped into a dictionary and applied collectively using:

  * **`expect_all`**
  * **`expect_all_or_drop`**
  * **`expect_all_or_fail`**

This enables consistent enforcement of multiple validation rules with a single action policy.

## Quarantine Patterns for Invalid Data

LDP supports multiple patterns for handling invalid records:

* **Separate quarantine table**
  Apply an expectation on the **negation** of the validation rule to isolate invalid records.

* **Flag-based quarantine**
  Add a Boolean column (for example, **`is_quarantined`**) that identifies invalid rows.
  This column can also be used as a **partitioning key** to physically separate invalid data.

Both approaches allow invalid data to be retained for auditing or remediation without disrupting downstream consumers.

## Expectation Metrics and Event Logs

All expectation results are captured in the **pipeline event log table**, which stores execution metadata as structured events.

Expectation metrics are available under:

* Event type: **flow_progress**
* Path: **details ‚Üí flow_progress ‚Üí data_quality ‚Üí expectations**

This allows **programmatic access** to:

* Expectation names
* Affected datasets
* Counts of passed and failed records

These metrics enable automated monitoring, alerting, and governance workflows.

## Auto CDC APIs in LDP

**Auto CDC APIs** simplify **Change Data Capture (CDC)** processing in LDP by eliminating the need for complex `MERGE INTO` logic.

Unlike manual merges, Auto CDC:

* **Automatically handles out-of-sequence records**
* Guarantees correct results without custom ordering logic
* Natively supports **SCD Type 1 and Type 2**

## Auto CDC SQL Semantics

Auto CDC is declared using a **flow definition** that applies changes from a CDC source into a **pre-existing streaming table**.

Key components include:

* **Keys**: Define primary key fields
* **Apply as delete**: Deletes records based on a condition
* **Sequence by**: Defines logical ordering for CDC events
* **Columns / Accept**: Controls included or ignored fields
* **Stored as SCD Type 1 or 2** (Type 1 is default)

The **sequence by** clause is critical for resolving late-arriving or out-of-order events.

## Internal CDC Architecture

When Auto CDC is declared, LDP automatically creates:

* An **internal target table** that tracks change events
* A **view** that exposes the latest clean state

The internal table stores metadata such as **tombstone markers** for deleted rows, allowing late-arriving updates to be correctly applied before final state resolution.

For multi-column sequencing, a **struct expression** can be used to enforce deterministic ordering.

## Python Auto CDC API and Legacy Compatibility

Auto CDC is also supported in **Python** using the corresponding API for creating CDC flows.

The legacy **`APPLY CHANGES`** syntax from the DLT framework remains supported for **backward compatibility** and may still appear in certification exams. However, Databricks **recommends Auto CDC APIs** for all new development.

## Closing Perspective

LakeFlow Declarative Pipelines provide a **higher-level abstraction** for ETL development, combining declarative definitions, automated orchestration, built-in data quality, and native CDC handling.

By shifting focus from execution mechanics to **data intent**, LDP enables teams to build pipelines that are **more resilient, observable, and easier to evolve** at scale.


## Deployment and Testing

### Official Databricks Documentation References

* [Databricks Asset Bundles overview](https://docs.databricks.com/dev-tools/bundles/index.html)
* [CI/CD with Databricks Asset Bundles](https://docs.databricks.com/dev-tools/bundles/ci-cd.html)
* [Databricks Jobs REST API reference](https://docs.databricks.com/api/workspace/jobs)
* [Databricks authentication and personal access tokens](https://docs.databricks.com/dev-tools/auth/pat.html)

## Databricks Asset Bundles (DAB) Overview

**Databricks Asset Bundles (DAB)** is a deployment and packaging mechanism for **Databricks jobs, pipelines, and related assets** that enables consistent promotion across **multiple environments** such as development, staging, and production.

DABs are designed to support **software engineering best practices** for data projects, including version control, testing, repeatable deployments, and **CI/CD automation**.

## Multi-Environment Deployment Model

In real-world data platforms, workloads are typically deployed across multiple isolated environments. Asset Bundles allow the same project to be **parameterized and deployed** to different Databricks workspaces using environment-specific configurations.

Each environment is defined as a **target**, with its own workspace URL, mode (development or production), and optional storage locations.

## Databricks Asset Bundles CLI

Asset Bundles are managed using the **Databricks CLI**, which provides a small set of core commands:

* **`bundle init`**
  Initializes a new bundle project from a default or custom template.

* **`bundle validate`**
  Validates bundle configuration files for correctness and consistency.

* **`bundle deploy -t <target>`**
  Deploys bundle resources to the specified target environment.

* **`bundle run -t <target> <job_name>`**
  Executes a deployed job in the selected environment.

These commands are designed to be **idempotent**, making them well suited for automation.

## Bundle Configuration Structure

The central configuration file of a bundle project is **`databricks.yml`**. This YAML file defines:

* The **bundle name**
* One or more **targets** (environments)
* **Variables** for dynamic configuration (for example, catalog or schema names)
* References to **resource definitions** such as jobs and pipelines

YAML relies on indentation to express hierarchy, making structure and formatting critical.

For maintainability, Databricks recommends defining jobs and pipelines in **separate YAML files** and importing them into the main configuration.

## Resource Definitions: Jobs and Pipelines

Bundle resources typically include:

* **Pipelines** (for example, LakeFlow / DLT pipelines)
* **Jobs** (single-task or multi-task workflows)

Jobs can reference pipelines, notebooks, or scripts located in the bundle‚Äôs **source directory**. Paths must be updated to reflect bundle-relative locations, and spaces in paths must be quoted.

Job permissions are **not automatically copied** and must be explicitly defined using the `permissions` mapping.

## Variables and Environment-Specific Configuration

Asset Bundles support **variable substitution**, allowing the same codebase to adapt across environments.

Common use cases include:

* Catalog and schema names
* Environment-specific parameters
* Resource naming conventions

Variables are resolved at deployment time based on the selected target.

## CI/CD Integration with Asset Bundles

Asset Bundles integrate naturally with **Git-based CI/CD workflows**:

1. Code changes are committed and pushed to a feature or development branch.
2. A CI pipeline runs:

   * `bundle validate`
   * `bundle deploy` to the development environment
   * `bundle run` for validation
3. A pull request is merged into the main or production branch.
4. A production CI pipeline deploys and runs the bundle in the production workspace.

This approach ensures **consistent, repeatable, and automated deployments**.

## Development Tooling and IDE Support

Databricks provides IDE extensions (for example, for **Visual Studio Code**) to simplify working with Asset Bundles. These extensions support:

* Workspace authentication
* Bundle initialization
* Deployment and execution
* Resource inspection

Bundles can be developed entirely locally and deployed without manual UI interaction.

## Binding to Existing Resources

By default, deploying a bundle creates **new resources** in the target workspace. Existing jobs can be linked to bundle-managed resources using the **deployment bind** mechanism, allowing deployments to **override** existing jobs instead of duplicating them.

## Relative Imports vs `%run` in Notebooks

In bundle-based and production-grade projects, **Python modules (`.py` files)** should be preferred over notebook-based reuse.

Key differences:

* `%run` works only with notebooks and injects all symbols into the caller‚Äôs scope
* Python `import` statements require `.py` files and follow standard Python semantics
* The notebook directory is included in `sys.path` by default
* Additional directories must be manually appended to `sys.path` if needed

Using Python modules enables **cleaner dependency management** and better compatibility with CI/CD.

## Python Wheels and Dependency Installation

Python dependencies can be packaged as **wheels** and installed using the `%pip install` magic command.

Important behaviors:

* `%pip install` installs packages on **all cluster nodes**
* The Python interpreter is restarted after installation
* All `%pip` commands should appear at the **top of the notebook**

This makes dependency installation reproducible and cluster-safe.

## Databricks REST API Basics

Databricks exposes a comprehensive **REST API** for programmatic workspace interaction. Common use cases include automating job creation, execution, and monitoring.

Authentication is performed using a **Bearer token**, typically a **personal access token (PAT)** generated from user settings.

## Jobs API Core Operations

The **Jobs API** supports:

* **Create job** (`POST /api/2.1/jobs/create`)
* **Run job** (`POST /api/2.1/jobs/run-now`)
* **Get run status** (`GET /api/2.1/jobs/runs/get`)

Creating a job via API always produces a **new job ID**, even if the configuration is identical. Job runs produce **run IDs**, which are globally unique and can be queried or canceled independently.

## Operational Perspective

Databricks Asset Bundles provide a **foundational CI/CD abstraction** for Databricks workloads, aligning data engineering workflows with modern software delivery practices.

When combined with Python modules, structured configuration, and REST APIs, Asset Bundles enable **fully automated, environment-aware, and production-grade deployments** of Databricks jobs and pipelines.


## Data Governance and Sharing

### Official Databricks Documentation References

* [Unity Catalog overview and concepts](https://docs.databricks.com/aws/en/data-governance/unity-catalog/)
* [Row filters and column masks in Unity Catalog](https://docs.databricks.com/aws/en/data-governance/unity-catalog/filters-and-masks/)
* [Delta Sharing overview](https://docs.databricks.com/aws/en/delta-sharing/)
* [Lakehouse Federation (query federation)](https://docs.databricks.com/aws/en/query-federation/)

## Advanced Unity Catalog Capabilities

Unity Catalog is the governance layer of the Databricks Lakehouse, responsible for managing **access control, metadata, data quality, and secure sharing** across workspaces and compute. This section explores advanced Unity Catalog features that go beyond basic catalog and schema management.

## Workspace Catalogs and Default Schemas

When a new Databricks workspace is created with Unity Catalog enabled, Databricks automatically provisions a **workspace catalog** along with a **default schema**. The workspace catalog is typically named after the workspace itself and is scoped to that workspace by default.

Workspace users are granted the **USE CATALOG** privilege on this catalog, which means only users of the associated workspace can access it. This isolation ensures that workspace-level data remains protected, although the catalog can later be assigned to additional workspaces if cross-workspace sharing is required.

Within the default schema, users are intentionally granted a limited set of privileges. These privileges allow controlled object creation without granting full administrative power, and commonly include the ability to create tables, views, functions, materialized views, models, and volumes, along with the **USE SCHEMA** privilege.

## Ownership, Privileges, and Delegated Administration

Workspace admins are the default owners of the workspace catalog. When a user creates an object‚Äîsuch as a schema or table‚Äîthey become the **initial owner** of that object. Ownership implies full control, including the ability to modify, drop, and grant privileges.

Unity Catalog allows ownership to be transferred directly from the UI or via SQL. However, in many cases, full ownership transfer is unnecessary. Instead, Databricks provides the **MANAGE** privilege to support delegated administration.

The MANAGE privilege allows a user to:

* View and modify object privileges
* Transfer ownership
* Rename or drop the object

Unlike object ownership, MANAGE does **not automatically grant all privileges** on the object. Users must still have **USE CATALOG** and **USE SCHEMA** privileges to interact with the object. Additionally, MANAGE is intentionally excluded from **ALL PRIVILEGES** to prevent accidental privilege escalation.

## Enriching Metadata with Comments and Tags

Unity Catalog supports rich metadata to improve **data discoverability and governance**.

Tables and columns can include descriptive comments, and Databricks now supports **AI-generated comments**. These comments are produced by a large language model that analyzes column names, data types, sample values, and observed data patterns. Generated comments are editable, allowing teams to refine documentation before publishing.

In addition to comments, Unity Catalog supports **tags**, which are key‚Äìvalue attributes used to classify and organize data assets. Tags can be applied to catalogs, schemas, tables, views, volumes, and even individual columns.

Common tagging use cases include:

* Identifying data layers such as bronze, silver, and gold
* Flagging sensitive or regulated data
* Improving search and discovery

Tags can be defined through the UI or using SQL. Values are optional, and multiple tags can be applied at once. Column-level tags are particularly powerful, as they enable **tag-based policies**, such as masking all columns labeled as PII.

## Data Quality Monitoring with Lakehouse Monitoring

Unity Catalog integrates with **Lakehouse Monitoring** to provide automatic data quality tracking over time. When monitoring is enabled on a table, Databricks computes profile statistics and drift metrics that reveal changes in data behavior across refreshes.

Users can choose between snapshot-based profiling, which evaluates the entire table on each refresh, or time-series profiling, which tracks quality metrics based on a timestamp column. Monitor refreshes can be manual or scheduled, and the resulting metrics are stored in a user-selected schema.

Once created, Lakehouse Monitoring automatically generates:

* A profile metrics table
* A drift metrics table
* A built-in dashboard for visualizing quality trends

These metrics enable teams to detect issues such as increasing null rates or unexpected value distributions.

## Alerts Based on Data Quality Metrics

The metrics produced by Lakehouse Monitoring can be used to define **alerts**. For example, a team might configure an alert that triggers when the percentage of null values in a column exceeds a defined threshold.

Alerts support flexible notification behavior. Notifications can be sent once when an issue is detected, repeatedly while the issue persists, or at a defined interval. A schedule controls how often the alert condition is evaluated, enabling proactive data quality monitoring without manual inspection.

## System Tables and Billing Visibility

Unity Catalog exposes **system tables** that provide visibility into platform usage and billing. The billing schema records detailed information about billable workloads, including compute type, SKU, usage time, consumption quantity (such as DBUs), and the identity of the user or service principal that ran the workload.

This data can be aggregated to analyze cost trends, track usage per user or team, and build dashboards for FinOps and cost governance. Dashboards can be scheduled to refresh automatically and shared with relevant stakeholders.

## Fine-Grained Access Control with Dynamic Views

Dynamic views allow data access policies to be enforced at **query time**, supporting both column-level and row-level security.

For column-level security, dynamic views can redact sensitive fields using conditional logic. Typically, this logic relies on group membership checks, allowing authorized users to see plain-text values while masking data for others.

Row-level security is implemented by adding conditional filters to the view definition. Depending on the user‚Äôs group membership, different subsets of rows may be visible. Dynamic views can be layered, enabling increasingly restrictive access rules to be applied incrementally.

## Row Filters and Column Masks on Tables

While dynamic views are effective, Unity Catalog now supports **row filters and column masks directly on tables**, simplifying governance by removing the need for additional view objects.

Column masks are implemented using user-defined functions that define how values should be masked based on user identity or group membership. Once applied, the masking logic is enforced automatically whenever the table is queried.

Row filters work similarly, using user-defined functions to determine which rows a user is allowed to see. These functions are evaluated dynamically at query time.

For group membership checks, `is_member()` evaluates workspace-level groups, while `is_account_group_member()` evaluates account-level groups.

## Delta Sharing

Delta Sharing is an **open protocol** for securely sharing live data with external consumers, regardless of their compute platform. It enables organizations to share large datasets without copying or replicating data.

Delta Sharing involves a data provider, a data recipient, and a Delta Sharing server. When a recipient queries shared data, the server validates access and generates temporary URLs to the underlying cloud storage files. The recipient then reads the data directly from object storage, enabling efficient, large-scale data transfer.

Databricks provides an integrated Delta Sharing server that is governed through Unity Catalog.

### Sharing Models in Databricks

Databricks supports two Delta Sharing modes. **Databricks-to-Databricks sharing** allows data to be shared between Unity Catalog‚Äìenabled workspaces using built-in authentication. This mode supports tables, views, volumes, and notebooks, and can optionally include table history for time travel, streaming, and change data feed access.

The **open Delta Sharing protocol** allows sharing with non-Databricks consumers but requires external authentication mechanisms such as bearer tokens or OpenID Connect.

### Costs and Limitations of Delta Sharing

Delta Sharing does not require data replication, so there is no additional storage cost. However, cloud providers may charge **egress fees** when data is transferred across regions or cloud platforms.

Other important limitations include:

* Shared data is read-only for recipients
* Only Delta Lake tables are supported

## Lakehouse Federation

Lakehouse Federation addresses scenarios where data ingestion is undesirable or impractical. Instead of copying data into Databricks, Federation enables **direct querying of external systems** such as relational databases.

After creating a connection to an external source, a **foreign catalog** is registered in Unity Catalog. Queries against foreign tables are pushed down to the external system and executed on its compute resources, with results returned in real time.

This approach is ideal for live operational access, ad hoc analysis, and proofs of concept. However, because queries run on external systems, they do not benefit from Databricks‚Äô distributed compute engine. For complex analytics, materialized views or ingestion into the lakehouse are often preferred.

## Cluster Permissions and Performance Monitoring

Databricks supports both workspace-level and cluster-level permissions. Workspace-level permissions control whether users can create clusters, while cluster-level permissions define what users can do with a specific cluster.

At the cluster level, permissions are hierarchical:

* **Can Attach To** allows attaching notebooks and viewing logs and metrics
* **Can Restart** adds the ability to start, stop, and restart the cluster
* **Can Manage** provides full control, including editing configuration and permissions

## Cluster Logs and Metrics

Databricks provides multiple tools for monitoring cluster behavior. The **event log** captures lifecycle events such as creation, termination, resizing, and configuration changes, with detailed metadata available in JSON format.

**Driver logs** collect standard output, standard error, and log4j messages from notebooks, jobs, and libraries. These logs can be viewed or downloaded for troubleshooting.

For performance monitoring, the **Metrics** tab exposes Ganglia metrics while the cluster is running. These metrics provide visibility into workload, memory usage, CPU utilization, and network activity at both the cluster and node levels. This helps identify bottlenecks, data skew, and resource underutilization.

Live metrics are only available while the cluster is running. After termination, Databricks retains snapshot images of key metrics, providing historical visibility into overall cluster health.

## Closing Perspective

Advanced Unity Catalog features extend governance beyond access control into **metadata management, data quality, secure sharing, federation, and operational visibility**. Together, these capabilities enable organizations to operate a lakehouse that is **secure, observable, and scalable**, while supporting both internal analytics and external data collaboration.
