# POC: Incremental reads on Iceberg without hive metastore

Using local metastore with incremental feactures

# Use cases
Change Data Feed is not enabled by default. The following use cases should drive when you enable the change data feed.

1. Silver and Gold tables: Improve Delta performance by processing only row-level changes following initial MERGE, UPDATE, or DELETE operations to accelerate and simplify ETL and ELT operations.

2. Transmit changes: Send a change data feed to downstream systems such as Kafka or RDBMS that can use it to incrementally process in later stages of data pipelines.

3. Audit trail table: Capture the change data feed as a Delta table provides perpetual storage and efficient query capability to see all changes over time, including when deletes occur and what updates were made.



# Known constrains:

- The snapshots also has constrains in rentation policys. Therefore, if you run the Optimizations command, change data feed data is also deleted. Default is 5 days according to some documentations. [Reference](https://www.tabular.io/apache-iceberg-cookbook/data-operations-snapshot-expiration/)

## Before run

Checks if spark 3.5.3 and Hadoop are install
also the pyiceberg>=0.8.1 and pyspark>=3.5.3 libraries.

Remove if existis the /warehouse/ folder in this direcory

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth

### Spark Setup

In [2]:
spark_jar_packages = ",".join([
    "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1",
])

In [3]:
LOCAL_WAREHOUSE_CATALOG = "file:///home/baptvit/Documents/github/lakehouse-labs/notebooks/warehouse/iceberg/"

In [4]:
spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("iceberg-hive-playground")
    .config("spark.jars.packages", spark_jar_packages)

    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
    
    .config("spark.sql.catalog.local.type", "hadoop")   # Use Hadoop catalog
    .config("spark.sql.catalog.local.warehouse", LOCAL_WAREHOUSE_CATALOG)   # Path to store metadata
    .config("spark.sql.warehouse.dir", LOCAL_WAREHOUSE_CATALOG)   # Path to store metadata
    .config("spark.sql.defaultCatalog", "local")
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .getOrCreate()
)

25/01/13 19:43:06 WARN Utils: Your hostname, baptvit resolves to a loopback address: 127.0.1.1; using 192.168.2.129 instead (on interface wlp4s0)
25/01/13 19:43:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/baptvit/.ivy2/cache
The jars for the packages stored in: /home/baptvit/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-894c97ed-c25d-4ad4-84dc-1deb9924a45d;1.0
	confs: [default]


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.7.1 in central
:: resolution report :: resolve 72ms :: artifacts dl 3ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.7.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-894c97ed-c25d-4ad4-84dc-1deb9924a45d
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/3ms)
25/01/13 19:43:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".


## Creating a fake database

In [5]:
import random
from faker import Faker

In [6]:
def generate_entry(faker: Faker, country_codes: list):
    return {
        "id": faker.unique.uuid4(),
        "name":  faker.name(),
        "email": faker.email(),
        "passport": faker.passport_number(),
        "country_code": random.choice(country_codes),
        "iban": faker.iban(),
        "swift": faker.swift11(),
        "created_at": faker.past_date(start_date='-90d').strftime('%Y-%m-%d')
    }

In [7]:
def generate_dataset(num: int, seed: int):
    country_codes = ['US', 'CA', 'JP', 'KR', 'FR', 'GE', 'UK', 'BR', 'AR']
    Faker.seed(seed)
    faker = Faker()
    return [generate_entry(faker, country_codes) for _ in range(num)]

In [8]:
dataset = generate_dataset(num=100, seed=739)

In [9]:
df = spark.createDataFrame(dataset)\
        .withColumn("year", year(col("created_at")))\
        .withColumn("month", month(col("created_at")))\
        .withColumn("day", dayofmonth(col("created_at")))

In [10]:
df.show(1)

+------------+----------+--------------------+--------------------+--------------------+-----------+---------+-----------+----+-----+---+
|country_code|created_at|               email|                iban|                  id|       name| passport|      swift|year|month|day|
+------------+----------+--------------------+--------------------+--------------------+-----------+---------+-----------+----+-----+---+
|          US|2024-10-20|powelljason@examp...|GB77AKMZ560580635...|5a424412-b127-4f8...|Cody Taylor|895549199|INSEGB5PR6S|2024|   10| 20|
+------------+----------+--------------------+--------------------+--------------------+-----------+---------+-----------+----+-----+---+
only showing top 1 row



In [11]:
df.count()

100

In [12]:
df.createOrReplaceTempView("accounts")

## Save using the default database

In [17]:
spark.sql("""
    CREATE DATABASE IF NOT EXISTS iceberg;
""")

DataFrame[]

Ideially we could specify the table schema to about the **overwriteSchem=true**

````
from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
schema = StructType([
  StructField("vendor_id", LongType(), True),
  StructField("trip_id", LongType(), True),
  StructField("trip_distance", FloatType(), True),
  StructField("fare_amount", DoubleType(), True),
  StructField("store_and_fwd_flag", StringType(), True)
])

df = spark.createDataFrame([], schema)
df.writeTo("demo.nyc.taxis").create()
````

In [18]:
spark.sql(
    """
    CREATE TABLE IF NOT EXISTS iceberg.accounts
        USING iceberg
    """
)

DataFrame[]

In [19]:
df.writeTo("iceberg.accounts")\
    .tableProperty("changelog.enabled", "true")\
    .tableProperty("overwriteSchema", "true")\
    .partitionedBy("year", "month")\
    .createOrReplace() ## FIrst time create or replace

## Reading from local direct from file

In [20]:
LOCAL_ACCOUNT_TABLE = LOCAL_WAREHOUSE_CATALOG + "iceberg/accounts"

In [21]:
# providing a starting version
df_read = spark.read.format("iceberg") \
  .load(LOCAL_ACCOUNT_TABLE)

In [22]:
df_read.count()

100

## Reading the table history from local folder

Local folder and spark SQL

In [23]:
spark.sql("""
    CREATE DATABASE IF NOT EXISTS iceberg;
""")

DataFrame[]

In [24]:
spark.sql(
    f"""
    CREATE TABLE IF NOT EXISTS iceberg.accounts
        USING iceberg
        OPTIONS (
          path = '{LOCAL_ACCOUNT_TABLE}'
        );
    """
)

DataFrame[]

In [25]:
spark.sql("""
    SELECT *
    FROM iceberg.accounts.history
""").show(truncate=False)


+-----------------------+-------------------+---------+-------------------+
|made_current_at        |snapshot_id        |parent_id|is_current_ancestor|
+-----------------------+-------------------+---------+-------------------+
|2025-01-13 19:44:00.205|8107394393973872139|NULL     |true               |
+-----------------------+-------------------+---------+-------------------+



## Using CDC and table history to identify the increments

In [22]:
### Local folder with describe history

In [26]:
latest_snapshot_id = spark.sql("""
    SELECT snapshot_id
    FROM iceberg.accounts.snapshots
    ORDER BY committed_at DESC
    LIMIT 1;
""").collect()[0][0]

latest_snapshot_id

8107394393973872139

### Reading just the last table version using local catalog

In [28]:
latest_changes_df = spark.read.format("iceberg") \
    .option("snapshot-id", latest_snapshot_id) \
    .load("iceberg.accounts")

latest_changes_df.count()

100

In [29]:
latest_changes_df.show(5)

+------------+----------+--------------------+--------------------+--------------------+----------------+---------+-----------+----+-----+---+
|country_code|created_at|               email|                iban|                  id|            name| passport|      swift|year|month|day|
+------------+----------+--------------------+--------------------+--------------------+----------------+---------+-----------+----+-----+---+
|          GE|2024-12-08|brightthomas@exam...|GB56NVYS608859440...|189b84f0-9527-45e...|  Brittany Heath|H58091059|BEBQGBVOSLL|2024|   12|  8|
|          KR|2024-12-15|donaldpierce@exam...|GB55LFTZ500270831...|b7e33adb-9bfe-465...|        Ann Cruz|T22953641|HMAQGBCSXE8|2024|   12| 15|
|          GE|2024-12-08|harrisondeanna@ex...|GB96EZYO163067760...|bef2df38-4a7a-4b7...| Destiny Jimenez|F75210547|WSHOGBQ55I9|2024|   12|  8|
|          GE|2024-12-09|derrick15@example...|GB14AYNQ551881503...|0daad7bc-25b6-446...|Cassidy Jones MD|595954695|VTHYGBZMNOI|2024|   12|  9|

## Creating the upsert

### Upsert Dataset

Editing 4 records and adding new 4 records

In [30]:
entries = [
    # Existing entries
    dataset[2], 
    dataset[4], 
    dataset[7],
    dataset[11],
    # New entries
    *generate_dataset(4, seed=1037)
]

In [110]:
entries = [
    # Existing entries
    dataset[3], 
    dataset[5], 
    dataset[8],
    dataset[12],
    # New entries
    *generate_dataset(4, seed=11037)
]

In [111]:
for entry in entries:
    username = entry['name'].lower().replace(" ", ".")
    entry['email'] = f"{username}@domain.com"

In [112]:
upsert_df = spark.createDataFrame(entries)\
        .withColumn("year", year(col("created_at")))\
        .withColumn("month", month(col("created_at")))\
        .withColumn("day", dayofmonth(col("created_at")))

In [113]:
upsert_df.show(8, truncate=False)

+------------+----------+-----------------------------+----------------------+------------------------------------+------------------+---------+-----------+----+-----+---+
|country_code|created_at|email                        |iban                  |id                                  |name              |passport |swift      |year|month|day|
+------------+----------+-----------------------------+----------------------+------------------------------------+------------------+---------+-----------+----+-----+---+
|GE          |2024-12-08|destiny.jimenez@domain.com   |GB96EZYO16306776005871|bef2df38-4a7a-4b73-904d-c949dc8140c6|Destiny Jimenez   |F75210547|WSHOGBQ55I9|2024|12   |8  |
|BR          |2024-10-30|evan.jordan@domain.com       |GB17EPUZ66764518981680|4709d52f-517d-4907-b01b-2620c70146e1|Evan Jordan       |D83053040|HASPGB7OGUX|2024|10   |30 |
|JP          |2024-11-09|christine.crawford@domain.com|GB72XSCN85943044334322|019aea9a-35bf-4a7c-be65-a783924affd0|Christine Crawford|G48667

In [114]:
upsert_df.createOrReplaceTempView("upsert_data")

# Upsert Strategy

## Slowly Changing Dimension (SCD) Type 1

In SCD Type 1, the existing records are overwritten with new data when there is a match, and new records are inserted when there is no match. This approach does not preserve historical changes; it simply updates the records with the latest data.


### Using upsert in SQL like syntax

In [115]:
spark.sql("""
    MERGE INTO iceberg.accounts AS target
    USING upsert_data AS source ON 
        target.id = source.id
    WHEN MATCHED THEN 
        UPDATE SET
            target.country_code = source.country_code,
            target.email = source.email,
            target.name = source.name,
            target.iban = source.iban,
            target.swift = source.swift,
            target.passport = source.passport
    WHEN NOT MATCHED THEN 
        INSERT *
""")

DataFrame[]

**but this trigges a snapshot creation with SCD 1, cant not isolate nativally the increments**

## Slowly Changing Dimension (SCD) Type 2

Slowly Changing Dimension Type 2 (SCD Type 2) is a method used in data warehousing to track historical changes in dimension data over time. Unlike SCD Type 1, which overwrites old data with new data, SCD Type 2 preserves the full history of changes by creating new records for each change. This allows you to analyze how data has evolved over time.



### Using append in python library

Not available

In [None]:
upsert_df.writeTo("iceberg_db.accounts")\
    .tableProperty("changelog.enabled", "true")\
    .tableProperty("overwriteSchema", "true")\
    .partitionedBy("year", "month")\
    .append() ## 

In [None]:
spark.sql("""
    SELECT *
    FROM iceberg_db.accounts.history
""").show(vertical=True, truncate=False)

In [None]:
spark.sql("""
    SELECT *
    FROM iceberg_db.accounts.snapshots
""").show(vertical=True, truncate=False)

## Reading incrementals

Iceberg doesnt keep the incremental isolated. snapshot is then a consolidade of the most recent version of the table.

In [116]:
spark.sql("""
    SELECT snapshot_id, parent_id
    FROM iceberg.accounts.snapshots
    ORDER BY committed_at DESC;
""").show()

+-------------------+-------------------+
|        snapshot_id|          parent_id|
+-------------------+-------------------+
|1292336618355848037|5319772249300913325|
|5319772249300913325|1766843850123313138|
|1766843850123313138|8107394393973872139|
|8107394393973872139|               NULL|
+-------------------+-------------------+



### Reading the last changes

In [117]:
latest_snapshot_id, parent_snapshot_id = spark.sql("""
    SELECT snapshot_id, parent_id
    FROM iceberg.accounts.snapshots
    ORDER BY committed_at DESC
    LIMIT 1;
""").collect()[0]

latest_snapshot_id

1292336618355848037

In [118]:
parent_snapshot_id

5319772249300913325

In [119]:
last_snapshot = spark.read.format("iceberg") \
    .option("snapshot-id", latest_snapshot_id) \
    .load("iceberg.accounts")

last_snapshot.count()

108

In [122]:
increment = spark.read.format("iceberg") \
    .option("start-snapshot-id", parent_snapshot_id) \
    .option("end-snapshot-id", latest_snapshot_id) \
    .load("iceberg.accounts")

increment.count()

0

### Create a changlog_view for the iceberg table SCD 2 

In order to create identify the incrementals beetween diferent snapshot_ids we need to create a changelog_view  

In [130]:
spark.sql(f"""
CALL system.create_changelog_view(
  table => 'iceberg.accounts',
  options => map('start-snapshot-id', '{"8107394393973872139"}', 'end-snapshot-id', '{latest_snapshot_id}'),
  changelog_view => 'accounts_cdc'
)""")

DataFrame[changelog_view: string]

In [131]:
spark.sql(f"""
    SELECT * FROM accounts_cdc
    WHERE _commit_snapshot_id == {latest_snapshot_id} 
""").show()

+------------+----------+--------------------+--------------------+--------------------+------------------+---------+-----------+----+-----+---+------------+---------------+-------------------+
|country_code|created_at|               email|                iban|                  id|              name| passport|      swift|year|month|day|_change_type|_change_ordinal|_commit_snapshot_id|
+------------+----------+--------------------+--------------------+--------------------+------------------+---------+-----------+----+-----+---+------------+---------------+-------------------+
|          BR|2024-10-30|dalebrown@example...|GB17EPUZ667645189...|4709d52f-517d-490...|       Evan Jordan|D83053040|HASPGB7OGUX|2024|   10| 30|      DELETE|              2|1292336618355848037|
|          BR|2024-10-30|evan.jordan@domai...|GB17EPUZ667645189...|4709d52f-517d-490...|       Evan Jordan|D83053040|HASPGB7OGUX|2024|   10| 30|      INSERT|              2|1292336618355848037|
|          BR|2024-11-01|cheye

In [132]:
spark.sql(f"""
    SELECT * FROM accounts_cdc
    WHERE _change_type = 'INSERT'
""").count()

16

### Isolate the increments 

In [129]:
spark.sql(f"""
    SELECT * FROM accounts_cdc
    WHERE _commit_snapshot_id == {latest_snapshot_id} AND _change_type = 'INSERT'
""").count()

8

### Create a changlog_view for the iceberg table SCD 2 - with UPDATE_BEFORE and UPDATE_AFTER

This feature can help achive SCD 3 in iceberg tables

In [108]:
spark.sql(f"""
CALL system.create_changelog_view(
  table => 'iceberg.accounts',
  options => map('start-snapshot-id', '{parent_snapshot_id}', 'end-snapshot-id', '{latest_snapshot_id}'),
  changelog_view => 'accounts_cdc_sdc3',
  compute_updates => true,
  identifier_columns => array('id')
)""")

DataFrame[changelog_view: string]

In [109]:
spark.sql(f"""
    SELECT * FROM accounts_cdc_sdc3
    WHERE _commit_snapshot_id == {latest_snapshot_id}
""").show()

+------------+----------+-----+----+---+----+--------+-----+----+-----+---+------------+---------------+-------------------+
|country_code|created_at|email|iban| id|name|passport|swift|year|month|day|_change_type|_change_ordinal|_commit_snapshot_id|
+------------+----------+-----+----+---+----+--------+-----+----+-----+---+------------+---------------+-------------------+
+------------+----------+-----+----+---+----+--------+-----+----+-----+---+------------+---------------+-------------------+



In [80]:
from pyspark.sql.functions import lit, when

cdc_data = spark.sql(f"""
    SELECT * FROM accounts_cdc_sdc3
    WHERE _commit_snapshot_id == {latest_snapshot_id}
""")

# Add previous value columns for SCD Type 3
scd3_data = cdc_data.withColumn("previous_email", lit(None)) \
                    .withColumn("previous_country_code", lit(None))

# Process updates to populate previous values
scd3_data = scd3_data.withColumn(
    "previous_email",
    when(col("_change_type") == "UPDATE_AFTER", col("email")).otherwise(col("previous_email"))
).withColumn(
    "previous_country_code",
    when(col("_change_type") == "UPDATE_AFTER", col("country_code")).otherwise(col("previous_country_code"))
)

# Filter out UPDATE_BEFORE rows (since we only need the latest state)
scd3_data = scd3_data.filter(col("_change_type") != "UPDATE_BEFORE")

# Show the SCD Type 3 result
scd3_data.select(
    "id", "name", "email", "previous_email", "country_code", "previous_country_code",
    "created_at", "iban", "passport", "swift", "year", "month", "day"
).show()

+---+----+-----+--------------+------------+---------------------+----------+----+--------+-----+----+-----+---+
| id|name|email|previous_email|country_code|previous_country_code|created_at|iban|passport|swift|year|month|day|
+---+----+-----+--------------+------------+---------------------+----------+----+--------+-----+----+-----+---+
+---+----+-----+--------------+------------+---------------------+----------+----+--------+-----+----+-----+---+



## TODO: Optimaze commands

## TODO: Miscellaneous on Iceberg