# Iceberg PII Data Deletion Demo


This notebook walks through the process of creating an Iceberg table, adding data, deleting PII, and then permanently removing the history containing the PII.


## 1. Setup


First, we need to import pyspark and set up our Spark session. The configuration for the S3 endpoint and Iceberg catalog is already handled by the `docker-compose.yml` file.


In [1]:
import pyspark
from pyspark.sql import SparkSession

# Configure Spark with S3A filesystem settings
spark = SparkSession.builder \
    .appName("IcebergPIIDemo") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.2,org.apache.hadoop:hadoop-common:3.3.2") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "password") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .getOrCreate()


25/09/03 11:16:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
for k, v in spark.sparkContext.getConf().getAll():
    if "catalog" in k:
        print(k, "=", v)

spark.sql.catalog.demo.s3.endpoint = http://minio:9000
spark.sql.catalogImplementation = in-memory
spark.sql.catalog.demo.warehouse = s3://warehouse/wh/
spark.sql.catalog.demo.io-impl = org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.demo.uri = http://rest:8181
spark.sql.catalog.demo.type = rest
spark.sql.catalog.demo = org.apache.iceberg.spark.SparkCatalog


In [3]:
!curl -X DELETE http://rest:8181/v1/namespaces/default/tables/pii_data

In [4]:
import re
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from datetime import datetime

def _format_exception_message(e: Exception) -> str:
    msg = str(e)
    m = re.search(r"(NotFoundException|TABLE_OR_VIEW_NOT_FOUND|AnalysisException|ServiceFailureException)[^\n]*", msg)
    return m.group(0) if m else (msg.splitlines()[0] if msg else "Unknown error")

def summarize_files(input_param: str,
                    operation_name: str,
                    save_path: str = None,
                    run_id: str = None):
    """
    Build minute-bucketed file summaries for Iceberg metadata + data.

    Returns: (meta_pd, data_pd, all_pd)  # now Pandas
      - meta_df columns: prefix,file_type,file_format,created_minute,files_created,run_id,operation
      - data_df columns: prefix,file_type,file_format,created_minute,files_created,run_id,operation
      - all_df  = union(meta_df,data_df)

    If save_path is provided, writes Parquet to:
      {save_path}/summary/  partitioned by run_id
    """
    print(f"--- File Summary ({operation_name}) ---")

    table_name = input_param
    if input_param.startswith("s3a://warehouse/default/pii_data"):
        table_name = "demo.default.pii_data"
        print("Using table name for better reliability...")

    # Give this run a stable id if none provided
    if run_id is None:
        run_id = datetime.utcnow().isoformat(timespec="seconds").replace(":", "-") + "Z"

    meta_df = None
    data_df = None

    # --- Metadata files ---
    try:
        metadata_query = f"""
        WITH manifest_lists AS (
          SELECT manifest_list AS file_path,
                 committed_at  AS created_at,
                 'avro'        AS file_format,
                 'snapshots'   AS file_type
          FROM {table_name}.snapshots
          WHERE manifest_list IS NOT NULL
        ),
        manifests AS (
          SELECT m.path        AS file_path,
                 s.committed_at AS created_at,
                 'avro'         AS file_format,
                 'manifests'    AS file_type
          FROM {table_name}.all_manifests m
          LEFT OUTER JOIN {table_name}.snapshots s
            ON m.added_snapshot_id = s.snapshot_id
          WHERE m.path IS NOT NULL
        ),
        metadata_json AS (
          SELECT file         AS file_path,
                 timestamp    AS created_at,
                 'json'       AS file_format,
                 'metadata_log_entries' AS file_type
          FROM {table_name}.metadata_log_entries
          WHERE file IS NOT NULL
        )
        SELECT
          'metadata'                                 AS prefix,
          file_type,
          file_format,
          date_trunc('minute', created_at)           AS created_minute,
          COUNT(*)                                   AS files_created
        FROM (
          SELECT * FROM manifest_lists
          UNION ALL
          SELECT * FROM manifests
          UNION ALL
          SELECT * FROM metadata_json
        )
        GROUP BY prefix, file_type, file_format, date_trunc('minute', created_at)
        """
        meta_df = spark.sql(metadata_query) \
                       .withColumn("run_id", F.lit(run_id)) \
                       .withColumn("operation", F.lit(operation_name))
        print("\nMetadata file summary:")
        meta_df.orderBy("created_minute","file_type","file_format").show(truncate=False)
    except Exception as e:
        print(f"Metadata file summary unavailable: {_format_exception_message(e)}")

    # --- Data files ---
    try:
        data_query = f"""
        WITH created AS (
          SELECT e.data_file.file_path AS file_path,
                 MIN(s.committed_at)   AS created_at,
                 MIN(e.data_file.content) AS content
          FROM {table_name}.entries e
          JOIN {table_name}.snapshots s USING (snapshot_id)
          GROUP BY e.data_file.file_path
        )
        SELECT
          'data' AS prefix,
          CASE content
            WHEN 0 THEN 'data'
            WHEN 1 THEN 'position_deletes'
            WHEN 2 THEN 'equality_deletes'
            ELSE 'unknown'
          END AS file_type,
          /* file_format for data isn't tracked here; set to parquet for uniform schema */
          'parquet' AS file_format,
          date_trunc('minute', created_at) AS created_minute,
          COUNT(*) AS files_created
        FROM created
        GROUP BY prefix, content, date_trunc('minute', created_at)
        """
        data_df = spark.sql(data_query) \
                       .withColumn("run_id", F.lit(run_id)) \
                       .withColumn("operation", F.lit(operation_name))
        print("\nData file summary:")
        data_df.orderBy("created_minute","file_type").show(truncate=False)
    except Exception as e:
        print(f"Data file summary unavailable: {_format_exception_message(e)}")

    # Union (align schemas if one side is None)
    cols = ["prefix","file_type","file_format","created_minute","files_created","run_id","operation"]
    def _empty_df():
        return spark.createDataFrame([], "prefix string, file_type string, file_format string, created_minute timestamp, files_created long, run_id string, operation string")
    if meta_df is None: meta_df = _empty_df()
    if data_df is None: data_df = _empty_df()
    all_df = meta_df.select(cols).unionByName(data_df.select(cols))

    # Optional save
    if save_path:
        (all_df
         .repartition("run_id")
         .write
         .mode("append")
         .partitionBy("run_id")
         .parquet(f"{save_path.rstrip('/')}/summary"))
        print(f"\nSaved summary to {save_path.rstrip('/')}/summary (partitioned by run_id={run_id})")

    # Return as Pandas (no extra prints)
    return meta_df.toPandas(), data_df.toPandas(), all_df.toPandas()

# Define the base path for our table for easy reuse 
table_base_path = "s3a://warehouse/default/pii_data"


In [5]:
import pandas as pd
import numpy as np

def diff_summaries(df_old: pd.DataFrame, df_new: pd.DataFrame, count_col: str = "files_created") -> pd.DataFrame:
    def norm(df):
        d = df.copy()
        d["_prefix"]      = d["prefix"].astype("string")
        d["_file_type"]   = d["file_type"].astype("string")
        d["_file_format"] = d.get("file_format").astype("string").fillna("__NULL__")
        d["_minute_str"]  = pd.to_datetime(d["created_minute"], errors="coerce").dt.floor("T").dt.strftime("%Y-%m-%d %H:%M:00")
        return d

    keys = ["_prefix","_file_type","_file_format","_minute_str"]

    oldN = norm(df_old)
    newN = norm(df_new)

    # aggregate + keep natural fields on each side
    oldA = (oldN.groupby(keys, dropna=False)
                 .agg(old_count=(count_col,"sum"),
                      prefix=("prefix","first"),
                      file_type=("file_type","first"),
                      file_format=("file_format","first"),
                      created_minute=("created_minute","first"))
                 .reset_index())

    newA = (newN.groupby(keys, dropna=False)
                 .agg(new_count=(count_col,"sum"),
                      prefix=("prefix","first"),
                      file_type=("file_type","first"),
                      file_format=("file_format","first"),
                      created_minute=("created_minute","first"))
                 .reset_index())

    # present in both
    both = newA.merge(oldA[keys + ["old_count"]], on=keys, how="inner")
    both = both.loc[:, ["prefix","file_type","file_format","created_minute","old_count","new_count"]]
    both["delta"] = both["new_count"] - both["old_count"]
    both["status"] = np.where(both["delta"].eq(0), "UNCHANGED", "CHANGED")

    # added (in new only)
    added = newA.merge(oldA[keys], on=keys, how="left", indicator=True)
    added = (added[added["_merge"] == "left_only"]
                  .drop(columns="_merge")
                  .assign(old_count=0)[["prefix","file_type","file_format","created_minute","old_count","new_count"]])
    added["delta"] = added["new_count"]
    added["status"] = "ADDED"

    # removed (in old only)
    removed = oldA.merge(newA[keys], on=keys, how="left", indicator=True)
    removed = (removed[removed["_merge"] == "left_only"]
                    .drop(columns="_merge")
                    .assign(new_count=0)[["prefix","file_type","file_format","created_minute","old_count","new_count"]])
    removed["delta"] = -removed["old_count"]
    removed["status"] = "REMOVED"

    # union + prettify
    diff = pd.concat([both, added, removed], ignore_index=True)
    diff["minute_str"] = pd.to_datetime(diff["created_minute"], errors="coerce").dt.strftime("%Y-%m-%d %H:%M:%S")

    return (diff[["prefix","file_type","file_format","created_minute","minute_str",
                  "old_count","new_count","delta","status"]]
            .sort_values(["prefix","file_type","file_format","created_minute"])
            .reset_index(drop=True))


What each of these metadata file types “tells you”

m*.avro (manifests): rows = data-file entries, with partition info + per-column stats + an entry status (ADDED/DELETED). They’re scoped to a snapshot (you can link them via added_snapshot_id).

snap-*.avro (manifest lists): the index for a snapshot; each row points to one or more m*.avro files used by that snapshot.

0000*-*.metadata.json (table metadata versions): the table’s high-level state over time and which snapshots are current/valid.

Next, we'll create an Iceberg table called `pii_data` in our `demo` catalog. The schema will include the PII columns we want to manage.


With the REST catalog, we need to create a namespace before we can create a table. We'll create a namespace called `default` inside our `demo` catalog.


In [6]:
spark.sql("DROP TABLE IF EXISTS demo.default.pii_data;")
spark.sql("CREATE NAMESPACE IF NOT EXISTS demo.default")
df = spark.sql("SHOW TABLES IN demo.default;")
df.show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [7]:
# Check files after namespace creation
_, _, all_previous = summarize_files(table_base_path, "After Table Creation")


--- File Summary (After Table Creation) ---
Using table name for better reliability...
Metadata file summary unavailable: TABLE_OR_VIEW_NOT_FOUND] The table or view `demo`.`default`.`pii_data`.`snapshots` cannot be found. Verify the spelling and correctness of the schema and catalog.
Data file summary unavailable: TABLE_OR_VIEW_NOT_FOUND] The table or view `demo`.`default`.`pii_data`.`entries` cannot be found. Verify the spelling and correctness of the schema and catalog.


In [8]:
spark.sql("""
CREATE TABLE IF NOT EXISTS demo.default.pii_data (
    case_id STRING,
    first_name STRING,
    email_address STRING,
    key_nm STRING,
    secure_txt STRING,
    secure_key STRING,
    update_date DATE
)
USING iceberg
""")


DataFrame[]

In [9]:
# Check files after table creation
_, _, all_previous = summarize_files(table_base_path, "After Table Creation")
all_previous

--- File Summary (After Table Creation) ---
Using table name for better reliability...

Metadata file summary:
+--------+--------------------+-----------+-------------------+-------------+--------------------+--------------------+
|prefix  |file_type           |file_format|created_minute     |files_created|run_id              |operation           |
+--------+--------------------+-----------+-------------------+-------------+--------------------+--------------------+
|metadata|metadata_log_entries|json       |2025-09-03 11:16:00|1            |2025-09-03T11-16-27Z|After Table Creation|
+--------+--------------------+-----------+-------------------+-------------+--------------------+--------------------+


Data file summary:
+------+---------+-----------+--------------+-------------+------+---------+
|prefix|file_type|file_format|created_minute|files_created|run_id|operation|
+------+---------+-----------+--------------+-------------+------+---------+
+------+---------+-----------+-------

Unnamed: 0,prefix,file_type,file_format,created_minute,files_created,run_id,operation
0,metadata,metadata_log_entries,json,2025-09-03 11:16:00,1,2025-09-03T11-16-27Z,After Table Creation


## 3. Seed the Table with Data


Now, let's insert some sample data into our table. We'll add two records, one of which we will target for PII deletion.


In [10]:
spark.sql("""
INSERT INTO demo.default.pii_data VALUES
('case-1', 'John', 'john.doe@example.com', 'key1', 'secret text 1', 'secret_key_1', DATE('2023-01-01')),
('case-2', 'Jane', 'jane.doe@example.com', 'key2', 'secret text 2', 'secret_key_2', DATE('2023-01-02'))
""")


DataFrame[]

In [11]:
# Check files after data insertion
_, _, all_current = summarize_files(table_base_path, "After Data Insertion")
all_current

--- File Summary (After Data Insertion) ---
Using table name for better reliability...

Metadata file summary:
+--------+--------------------+-----------+-------------------+-------------+--------------------+--------------------+
|prefix  |file_type           |file_format|created_minute     |files_created|run_id              |operation           |
+--------+--------------------+-----------+-------------------+-------------+--------------------+--------------------+
|metadata|manifests           |avro       |2025-09-03 11:16:00|1            |2025-09-03T11-16-29Z|After Data Insertion|
|metadata|metadata_log_entries|json       |2025-09-03 11:16:00|2            |2025-09-03T11-16-29Z|After Data Insertion|
|metadata|snapshots           |avro       |2025-09-03 11:16:00|1            |2025-09-03T11-16-29Z|After Data Insertion|
+--------+--------------------+-----------+-------------------+-------------+--------------------+--------------------+


Data file summary:
+------+---------+----------

Unnamed: 0,prefix,file_type,file_format,created_minute,files_created,run_id,operation
0,metadata,snapshots,avro,2025-09-03 11:16:00,1,2025-09-03T11-16-29Z,After Data Insertion
1,metadata,manifests,avro,2025-09-03 11:16:00,1,2025-09-03T11-16-29Z,After Data Insertion
2,metadata,metadata_log_entries,json,2025-09-03 11:16:00,2,2025-09-03T11-16-29Z,After Data Insertion
3,data,data,parquet,2025-09-03 11:16:00,2,2025-09-03T11-16-29Z,After Data Insertion


In [12]:
# Compare
diff = diff_summaries(all_previous, all_current)
diff

  d["_minute_str"]  = pd.to_datetime(d["created_minute"], errors="coerce").dt.floor("T").dt.strftime("%Y-%m-%d %H:%M:00")
  d["_minute_str"]  = pd.to_datetime(d["created_minute"], errors="coerce").dt.floor("T").dt.strftime("%Y-%m-%d %H:%M:00")


Unnamed: 0,prefix,file_type,file_format,created_minute,minute_str,old_count,new_count,delta,status
0,data,data,parquet,2025-09-03 11:16:00,2025-09-03 11:16:00,0,2,2,ADDED
1,metadata,manifests,avro,2025-09-03 11:16:00,2025-09-03 11:16:00,0,1,1,ADDED
2,metadata,metadata_log_entries,json,2025-09-03 11:16:00,2025-09-03 11:16:00,1,2,1,CHANGED
3,metadata,snapshots,avro,2025-09-03 11:16:00,2025-09-03 11:16:00,0,1,1,ADDED


Let's verify the data is there.


In [13]:
spark.table("demo.default.pii_data").show()


+-------+----------+--------------------+------+-------------+------------+-----------+
|case_id|first_name|       email_address|key_nm|   secure_txt|  secure_key|update_date|
+-------+----------+--------------------+------+-------------+------------+-----------+
| case-1|      John|john.doe@example.com|  key1|secret text 1|secret_key_1| 2023-01-01|
| case-2|      Jane|jane.doe@example.com|  key2|secret text 2|secret_key_2| 2023-01-02|
+-------+----------+--------------------+------+-------------+------------+-----------+



We can also inspect the table's history to see the snapshot that was created when we inserted the data.


In [14]:
initial_snapshots = spark.table("demo.default.pii_data.history")
initial_snapshots.show()


+--------------------+-------------------+---------+-------------------+
|     made_current_at|        snapshot_id|parent_id|is_current_ancestor|
+--------------------+-------------------+---------+-------------------+
|2025-09-03 11:16:...|5408540653570735206|     NULL|               true|
+--------------------+-------------------+---------+-------------------+



## 4. Delete PII


Now, we will "delete" the PII for `case-1`. In this context, "deletion" means updating the PII columns to `NULL`. This is a common strategy for retaining the record for referential integrity while removing the sensitive information.


In [15]:
def delete_pii(case_id):
    spark.sql(f"""
    UPDATE demo.default.pii_data
    SET
        first_name = NULL,
        email_address = NULL,
        secure_txt = NULL
    WHERE case_id = '{case_id}'
    """)

delete_pii('case-1')


In [16]:
# Check files after data deletion (updates)
all_previous = all_current.copy(deep=True)
_, _, all_current = summarize_files(table_base_path, "After Data Deletion")
all_current

--- File Summary (After Data Deletion) ---
Using table name for better reliability...

Metadata file summary:
+--------+--------------------+-----------+-------------------+-------------+--------------------+-------------------+
|prefix  |file_type           |file_format|created_minute     |files_created|run_id              |operation          |
+--------+--------------------+-----------+-------------------+-------------+--------------------+-------------------+
|metadata|manifests           |avro       |2025-09-03 11:16:00|1            |2025-09-03T11-17-11Z|After Data Deletion|
|metadata|metadata_log_entries|json       |2025-09-03 11:16:00|2            |2025-09-03T11-17-11Z|After Data Deletion|
|metadata|snapshots           |avro       |2025-09-03 11:16:00|1            |2025-09-03T11-17-11Z|After Data Deletion|
|metadata|manifests           |avro       |2025-09-03 11:17:00|2            |2025-09-03T11-17-11Z|After Data Deletion|
|metadata|metadata_log_entries|json       |2025-09-03 11:

Unnamed: 0,prefix,file_type,file_format,created_minute,files_created,run_id,operation
0,metadata,snapshots,avro,2025-09-03 11:16:00,1,2025-09-03T11-17-11Z,After Data Deletion
1,metadata,snapshots,avro,2025-09-03 11:17:00,1,2025-09-03T11-17-11Z,After Data Deletion
2,metadata,manifests,avro,2025-09-03 11:16:00,1,2025-09-03T11-17-11Z,After Data Deletion
3,metadata,manifests,avro,2025-09-03 11:17:00,2,2025-09-03T11-17-11Z,After Data Deletion
4,metadata,metadata_log_entries,json,2025-09-03 11:17:00,1,2025-09-03T11-17-11Z,After Data Deletion
5,metadata,metadata_log_entries,json,2025-09-03 11:16:00,2,2025-09-03T11-17-11Z,After Data Deletion
6,data,data,parquet,2025-09-03 11:17:00,2,2025-09-03T11-17-11Z,After Data Deletion
7,data,data,parquet,2025-09-03 11:16:00,1,2025-09-03T11-17-11Z,After Data Deletion


In [17]:
# Compare
diff = diff_summaries(all_previous, all_current)
diff

  d["_minute_str"]  = pd.to_datetime(d["created_minute"], errors="coerce").dt.floor("T").dt.strftime("%Y-%m-%d %H:%M:00")
  d["_minute_str"]  = pd.to_datetime(d["created_minute"], errors="coerce").dt.floor("T").dt.strftime("%Y-%m-%d %H:%M:00")


Unnamed: 0,prefix,file_type,file_format,created_minute,minute_str,old_count,new_count,delta,status
0,data,data,parquet,2025-09-03 11:16:00,2025-09-03 11:16:00,2,1,-1,CHANGED
1,data,data,parquet,2025-09-03 11:17:00,2025-09-03 11:17:00,0,2,2,ADDED
2,metadata,manifests,avro,2025-09-03 11:16:00,2025-09-03 11:16:00,1,1,0,UNCHANGED
3,metadata,manifests,avro,2025-09-03 11:17:00,2025-09-03 11:17:00,0,2,2,ADDED
4,metadata,metadata_log_entries,json,2025-09-03 11:16:00,2025-09-03 11:16:00,2,2,0,UNCHANGED
5,metadata,metadata_log_entries,json,2025-09-03 11:17:00,2025-09-03 11:17:00,0,1,1,ADDED
6,metadata,snapshots,avro,2025-09-03 11:16:00,2025-09-03 11:16:00,1,1,0,UNCHANGED
7,metadata,snapshots,avro,2025-09-03 11:17:00,2025-09-03 11:17:00,0,1,1,ADDED


Let's check the data again. We should see that the PII for `case-1` is now gone.


In [18]:
spark.table("demo.default.pii_data").show()


+-------+----------+--------------------+------+-------------+------------+-----------+
|case_id|first_name|       email_address|key_nm|   secure_txt|  secure_key|update_date|
+-------+----------+--------------------+------+-------------+------------+-----------+
| case-1|      NULL|                NULL|  key1|         NULL|secret_key_1| 2023-01-01|
| case-2|      Jane|jane.doe@example.com|  key2|secret text 2|secret_key_2| 2023-01-02|
+-------+----------+--------------------+------+-------------+------------+-----------+



If we look at the table history, we'll see a new snapshot has been added.


In [19]:
spark.table("demo.default.pii_data.history").show()


+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2025-09-03 11:16:...|5408540653570735206|               NULL|               true|
|2025-09-03 11:17:...|2222908239089185913|5408540653570735206|               true|
+--------------------+-------------------+-------------------+-------------------+



## 5. The Problem: Time Travel


Even though we've "deleted" the PII from the current view of the table, the old data still exists in the previous snapshot. Anyone with access can use time travel to see the PII.


In [20]:
first_snapshot_id = initial_snapshots.select("snapshot_id").first()[0]
spark.read.option("snapshot-id", first_snapshot_id).table("demo.default.pii_data").show()


+-------+----------+--------------------+------+-------------+------------+-----------+
|case_id|first_name|       email_address|key_nm|   secure_txt|  secure_key|update_date|
+-------+----------+--------------------+------+-------------+------------+-----------+
| case-1|      John|john.doe@example.com|  key1|secret text 1|secret_key_1| 2023-01-01|
| case-2|      Jane|jane.doe@example.com|  key2|secret text 2|secret_key_2| 2023-01-02|
+-------+----------+--------------------+------+-------------+------------+-----------+



## 6. Permanent Deletion with Maintenance


To permanently remove the PII, we need to perform two maintenance operations:
1.  **Expire Snapshots**: This removes old snapshots from the table's metadata, making time travel to those versions impossible.
2.  **Rewrite Data Files (VACUUM)**: This physically rewrites the data files to remove data that is no longer referenced by any snapshot.


### Expire Old Snapshots


We'll expire all snapshots that are older than the current one. We can get the current timestamp and use that to expire anything older.


In [21]:
all_current

Unnamed: 0,prefix,file_type,file_format,created_minute,files_created,run_id,operation
0,metadata,snapshots,avro,2025-09-03 11:16:00,1,2025-09-03T11-17-11Z,After Data Deletion
1,metadata,snapshots,avro,2025-09-03 11:17:00,1,2025-09-03T11-17-11Z,After Data Deletion
2,metadata,manifests,avro,2025-09-03 11:16:00,1,2025-09-03T11-17-11Z,After Data Deletion
3,metadata,manifests,avro,2025-09-03 11:17:00,2,2025-09-03T11-17-11Z,After Data Deletion
4,metadata,metadata_log_entries,json,2025-09-03 11:17:00,1,2025-09-03T11-17-11Z,After Data Deletion
5,metadata,metadata_log_entries,json,2025-09-03 11:16:00,2,2025-09-03T11-17-11Z,After Data Deletion
6,data,data,parquet,2025-09-03 11:17:00,2,2025-09-03T11-17-11Z,After Data Deletion
7,data,data,parquet,2025-09-03 11:16:00,1,2025-09-03T11-17-11Z,After Data Deletion


In [22]:
from pyspark.sql.functions import current_timestamp

now = spark.sql("SELECT current_timestamp()").collect()[0][0]
print(now)
spark.sql(f"CALL demo.system.expire_snapshots('default.pii_data', TIMESTAMP '{now}')")


2025-09-03 11:19:15.156012


DataFrame[deleted_data_files_count: bigint, deleted_position_delete_files_count: bigint, deleted_equality_delete_files_count: bigint, deleted_manifest_files_count: bigint, deleted_manifest_lists_count: bigint, deleted_statistics_files_count: bigint]

Now, if we look at the history, we should only see the most recent snapshot.


In [23]:
spark.table("demo.default.pii_data.history").show()


+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2025-09-03 11:17:...|2222908239089185913|5408540653570735206|               true|
+--------------------+-------------------+-------------------+-------------------+



In [24]:
all_previous = all_current.copy(deep=True)
_, _, all_current = summarize_files(table_base_path, "After Expire Snapshots")
all_current

--- File Summary (After Expire Snapshots) ---
Using table name for better reliability...

Metadata file summary:
+--------+--------------------+-----------+-------------------+-------------+--------------------+----------------------+
|prefix  |file_type           |file_format|created_minute     |files_created|run_id              |operation             |
+--------+--------------------+-----------+-------------------+-------------+--------------------+----------------------+
|metadata|metadata_log_entries|json       |2025-09-03 11:16:00|2            |2025-09-03T11-19-34Z|After Expire Snapshots|
|metadata|manifests           |avro       |2025-09-03 11:17:00|2            |2025-09-03T11-19-34Z|After Expire Snapshots|
|metadata|metadata_log_entries|json       |2025-09-03 11:17:00|1            |2025-09-03T11-19-34Z|After Expire Snapshots|
|metadata|snapshots           |avro       |2025-09-03 11:17:00|1            |2025-09-03T11-19-34Z|After Expire Snapshots|
|metadata|metadata_log_entries|js

Unnamed: 0,prefix,file_type,file_format,created_minute,files_created,run_id,operation
0,metadata,snapshots,avro,2025-09-03 11:17:00,1,2025-09-03T11-19-34Z,After Expire Snapshots
1,metadata,manifests,avro,2025-09-03 11:17:00,2,2025-09-03T11-19-34Z,After Expire Snapshots
2,metadata,metadata_log_entries,json,2025-09-03 11:17:00,1,2025-09-03T11-19-34Z,After Expire Snapshots
3,metadata,metadata_log_entries,json,2025-09-03 11:19:00,1,2025-09-03T11-19-34Z,After Expire Snapshots
4,metadata,metadata_log_entries,json,2025-09-03 11:16:00,2,2025-09-03T11-19-34Z,After Expire Snapshots
5,data,data,parquet,2025-09-03 11:17:00,2,2025-09-03T11-19-34Z,After Expire Snapshots


In [25]:
# Compare
diff = diff_summaries(all_previous, all_current)
diff

  d["_minute_str"]  = pd.to_datetime(d["created_minute"], errors="coerce").dt.floor("T").dt.strftime("%Y-%m-%d %H:%M:00")
  d["_minute_str"]  = pd.to_datetime(d["created_minute"], errors="coerce").dt.floor("T").dt.strftime("%Y-%m-%d %H:%M:00")


Unnamed: 0,prefix,file_type,file_format,created_minute,minute_str,old_count,new_count,delta,status
0,data,data,parquet,2025-09-03 11:16:00,2025-09-03 11:16:00,1,0,-1,REMOVED
1,data,data,parquet,2025-09-03 11:17:00,2025-09-03 11:17:00,2,2,0,UNCHANGED
2,metadata,manifests,avro,2025-09-03 11:16:00,2025-09-03 11:16:00,1,0,-1,REMOVED
3,metadata,manifests,avro,2025-09-03 11:17:00,2025-09-03 11:17:00,2,2,0,UNCHANGED
4,metadata,metadata_log_entries,json,2025-09-03 11:16:00,2025-09-03 11:16:00,2,2,0,UNCHANGED
5,metadata,metadata_log_entries,json,2025-09-03 11:17:00,2025-09-03 11:17:00,1,1,0,UNCHANGED
6,metadata,metadata_log_entries,json,2025-09-03 11:19:00,2025-09-03 11:19:00,0,1,1,ADDED
7,metadata,snapshots,avro,2025-09-03 11:16:00,2025-09-03 11:16:00,1,0,-1,REMOVED
8,metadata,snapshots,avro,2025-09-03 11:17:00,2025-09-03 11:17:00,1,1,0,UNCHANGED


### Rewrite Data Files (VACUUM)


Even though the snapshots are gone, the underlying Parquet files containing the PII may still exist in S3. The `rewrite_data_files` procedure (similar to VACUUM in other systems) will consolidate data into new files and remove the old, unreferenced ones.


In [26]:
all_current

Unnamed: 0,prefix,file_type,file_format,created_minute,files_created,run_id,operation
0,metadata,snapshots,avro,2025-09-03 11:17:00,1,2025-09-03T11-19-34Z,After Expire Snapshots
1,metadata,manifests,avro,2025-09-03 11:17:00,2,2025-09-03T11-19-34Z,After Expire Snapshots
2,metadata,metadata_log_entries,json,2025-09-03 11:17:00,1,2025-09-03T11-19-34Z,After Expire Snapshots
3,metadata,metadata_log_entries,json,2025-09-03 11:19:00,1,2025-09-03T11-19-34Z,After Expire Snapshots
4,metadata,metadata_log_entries,json,2025-09-03 11:16:00,2,2025-09-03T11-19-34Z,After Expire Snapshots
5,data,data,parquet,2025-09-03 11:17:00,2,2025-09-03T11-19-34Z,After Expire Snapshots


In [27]:
spark.sql("CALL demo.system.rewrite_data_files('default.pii_data')")


DataFrame[rewritten_data_files_count: int, added_data_files_count: int, rewritten_bytes_count: bigint, failed_data_files_count: int]

In [28]:
all_previous = all_current.copy(deep=True)
_, _, all_current = summarize_files(table_base_path, "After Rewrite Data Files")
all_current


--- File Summary (After Rewrite Data Files) ---
Using table name for better reliability...

Metadata file summary:
+--------+--------------------+-----------+-------------------+-------------+--------------------+------------------------+
|prefix  |file_type           |file_format|created_minute     |files_created|run_id              |operation               |
+--------+--------------------+-----------+-------------------+-------------+--------------------+------------------------+
|metadata|metadata_log_entries|json       |2025-09-03 11:16:00|2            |2025-09-03T11-20-00Z|After Rewrite Data Files|
|metadata|manifests           |avro       |2025-09-03 11:17:00|2            |2025-09-03T11-20-00Z|After Rewrite Data Files|
|metadata|metadata_log_entries|json       |2025-09-03 11:17:00|1            |2025-09-03T11-20-00Z|After Rewrite Data Files|
|metadata|snapshots           |avro       |2025-09-03 11:17:00|1            |2025-09-03T11-20-00Z|After Rewrite Data Files|
|metadata|metadat

Unnamed: 0,prefix,file_type,file_format,created_minute,files_created,run_id,operation
0,metadata,snapshots,avro,2025-09-03 11:17:00,1,2025-09-03T11-20-00Z,After Rewrite Data Files
1,metadata,manifests,avro,2025-09-03 11:17:00,2,2025-09-03T11-20-00Z,After Rewrite Data Files
2,metadata,metadata_log_entries,json,2025-09-03 11:17:00,1,2025-09-03T11-20-00Z,After Rewrite Data Files
3,metadata,metadata_log_entries,json,2025-09-03 11:19:00,1,2025-09-03T11-20-00Z,After Rewrite Data Files
4,metadata,metadata_log_entries,json,2025-09-03 11:16:00,2,2025-09-03T11-20-00Z,After Rewrite Data Files
5,data,data,parquet,2025-09-03 11:17:00,2,2025-09-03T11-20-00Z,After Rewrite Data Files


In [29]:
# Compare
diff = diff_summaries(all_previous, all_current)
diff

  d["_minute_str"]  = pd.to_datetime(d["created_minute"], errors="coerce").dt.floor("T").dt.strftime("%Y-%m-%d %H:%M:00")
  d["_minute_str"]  = pd.to_datetime(d["created_minute"], errors="coerce").dt.floor("T").dt.strftime("%Y-%m-%d %H:%M:00")


Unnamed: 0,prefix,file_type,file_format,created_minute,minute_str,old_count,new_count,delta,status
0,data,data,parquet,2025-09-03 11:17:00,2025-09-03 11:17:00,2,2,0,UNCHANGED
1,metadata,manifests,avro,2025-09-03 11:17:00,2025-09-03 11:17:00,2,2,0,UNCHANGED
2,metadata,metadata_log_entries,json,2025-09-03 11:16:00,2025-09-03 11:16:00,2,2,0,UNCHANGED
3,metadata,metadata_log_entries,json,2025-09-03 11:17:00,2025-09-03 11:17:00,1,1,0,UNCHANGED
4,metadata,metadata_log_entries,json,2025-09-03 11:19:00,2025-09-03 11:19:00,1,1,0,UNCHANGED
5,metadata,snapshots,avro,2025-09-03 11:17:00,2025-09-03 11:17:00,1,1,0,UNCHANGED


## 7. Validation


Now, let's try to time travel back to the first snapshot. This should fail because the snapshot no longer exists.


In [30]:
try:
    spark.read.option("snapshot-id", first_snapshot_id).table("demo.default.pii_data").show()
except Exception as e:
    print("Successfully prevented time travel!")
    print(e)


Successfully prevented time travel!
Cannot find snapshot with ID 5408540653570735206


## 8. Manual Parquet File Reader


This section adds a utility to upload a Parquet file from your local computer and display its contents. This is useful for inspecting individual data files, for example, if you download a file from the MinIO bucket to your machine and want to see what's inside to confirm that PII has been physically removed from the file.


In [31]:
# Install necessary libraries for the uploader widget and Parquet reader
%pip install ipywidgets pandas pyarrow


[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [32]:
import ipywidgets as widgets
import pandas as pd
import io
from IPython.display import display, clear_output

# Create a file upload widget
uploader = widgets.FileUpload(
    accept='.parquet',
    description='Upload Parquet File',
    multiple=False
)

# Create an output widget to display the DataFrame
output = widgets.Output()

def on_upload_change(change):
    """
    This function is triggered when a file is uploaded.
    It reads the Parquet file and displays it as a Pandas DataFrame.
    """
    if not uploader.value:
        return
        
    # Get the uploaded file info
    uploaded_file = uploader.value[0]
    file_content = uploaded_file['content']
    
    # Read the Parquet file content into a Pandas DataFrame
    df = pd.read_parquet(io.BytesIO(file_content))
    
    # Display the DataFrame in the output widget
    with output:
        clear_output(wait=True)
        print(f"Contents of {uploaded_file['name']}:")
        display(df)
        
    # Reset the uploader so the same file can be uploaded again if needed
    uploader.value.clear()
    uploader._counter = 0


# Observe changes in the uploader's value
uploader.observe(on_upload_change, names='value')

# Display the uploader and the output area
display(uploader, output)


FileUpload(value=(), accept='.parquet', description='Upload Parquet File')

Output()

This confirms that we have successfully and permanently deleted the PII from our Iceberg table.


In [33]:
result = spark.sql(f"""SELECT * FROM demo.default.pii_data TIMESTAMP AS OF '2026-09-02 09:40:00'""");
result.show()

+-------+----------+--------------------+------+-------------+------------+-----------+
|case_id|first_name|       email_address|key_nm|   secure_txt|  secure_key|update_date|
+-------+----------+--------------------+------+-------------+------------+-----------+
| case-2|      Jane|jane.doe@example.com|  key2|secret text 2|secret_key_2| 2023-01-02|
| case-1|      NULL|                NULL|  key1|         NULL|secret_key_1| 2023-01-01|
+-------+----------+--------------------+------+-------------+------------+-----------+

