This notebook is a demonstration of how to handle GDPR export and delete requests in an Iceberg Lakehouse to make it GDPR-compliant.

In [1]:
from datetime import datetime, timedelta
import os
from pathlib import Path
import random
import sys
import time
import uuid

from pyspark import SparkConf
from pyspark.sql import functions, Row, SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BooleanType

In [2]:
sys.path.append(str(Path(sys.path[0]).resolve().parent.joinpath("modules")))

from data_faker import generate_users_dataframe, generate_events_dataframe, generate_gdpr_request_dataframe
from stats import get_users_export_stats

### Create Spark Session

In [3]:
conf = (
    SparkConf()
    .setMaster("local[*]")
    .set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0,org.apache.iceberg:iceberg-aws-bundle:1.6.0")
    .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
    .set("spark.sql.catalog.spark_catalog.type", "hive")
    .set("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.lakehouse.type", "rest")
    .set("spark.sql.catalog.lakehouse.uri", "http://catalog:8181")
    .set("spark.sql.catalog.lakehouse.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .set("spark.sql.catalog.lakehouse.s3.endpoint", "http://minio:9000")
    .set("spark.sql.catalog.lakehouse.s3.path-style-access", "true")
    .set("spark.sql.catalog.lakehouse.s3.access-key-id", os.environ["AWS_ACCESS_KEY_ID"])
    .set("spark.sql.catalog.lakehouse.s3.secret-access-key", os.environ["AWS_SECRET_ACCESS_KEY"])
    .set("spark.sql.catalog.lakehouse.client.region", os.environ["AWS_REGION"])
)
spark = SparkSession.builder.config(conf=conf).getOrCreate()

### Clean and create `demo` namespace in lakehouse catalog

In [4]:
demo_db = "lakehouse.demo"
if spark.catalog.databaseExists(demo_db):
    for table in spark.catalog.listTables(dbName=demo_db):
        spark.sql(f"DROP TABLE {demo_db}.{table.name}")
    spark.sql(f"DROP NAMESPACE {demo_db}")

In [5]:
spark.sql(f"CREATE NAMESPACE {demo_db}")

DataFrame[]

In [6]:
spark.sql("SHOW NAMESPACES IN lakehouse").show(10, truncate=False)

+---------+
|namespace|
+---------+
|demo     |
+---------+



### Generate Users, Events and GDPR requests random data and create tables

In [7]:
users_table_name = f"{demo_db}.users"
events_table_name = f"{demo_db}.events"
gdpr_requests_table_name = f"{demo_db}.gdpr_requests"

In [8]:
# Generate 100k users and add them to the users table
users_df = generate_users_dataframe(spark, 100000)
(
    users_df
    .writeTo(users_table_name)
    .createOrReplace()
)
del users_df

In [9]:
spark.table(users_table_name).count()

100000

In [10]:
spark.table(users_table_name).show(2, truncate=False)

+------------------------------------+--------------------------+------------+------------------------+
|user_id                             |created_at                |name        |email                   |
+------------------------------------+--------------------------+------------+------------------------+
|ff0f450c-dc9c-49bc-9174-8db700cb7eb5|2024-08-26 18:48:44.607596|User38456842|user56165866@example.com|
|bb89b19c-347b-4a73-a76b-2427478edf3e|2024-10-21 17:22:38.607856|User91917059|user51690866@example.com|
+------------------------------------+--------------------------+------------+------------------------+
only showing top 2 rows



In [11]:
# Generate 10M events and add them to the events table on 10 batchs of 1M to avoid memory issues
events_df = generate_events_dataframe(spark, spark.table(users_table_name), 1000000)
(
    events_df
    .writeTo(events_table_name)
    # .partitionedBy(functions.days(functions.col("event_timestamp")))
    .createOrReplace()
)
for i in range(9):
    print(f"events_table size: {spark.table(events_table_name).count()}")
    events_df = generate_events_dataframe(spark, spark.table(users_table_name), 1000000)
    events_df.writeTo(events_table_name).append()
del events_df

events_table size: 1000000
events_table size: 2000000
events_table size: 3000000
events_table size: 4000000
events_table size: 5000000
events_table size: 6000000
events_table size: 7000000
events_table size: 8000000
events_table size: 9000000


In [12]:
spark.table(events_table_name).count()

10000000

In [13]:
spark.table(events_table_name).show(2, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------
 event_id        | 27115cac-e447-4cde-b235-48531f1e2a7b 
 user_id         | 2a636d96-9c67-414c-b1f3-6f9198152771 
 event_type      | purchase                             
 event_timestamp | 2024-10-26 00:53:55.038357           
 page_url        | NULL                                 
 product_id      | product_61                           
 event_details   | Purchased product_61                 
-RECORD 1-----------------------------------------------
 event_id        | a24e96ce-b296-4497-a815-73bb010df0f0 
 user_id         | 8374fb66-6a6b-4acd-81c1-a2e1ff5f26d3 
 event_type      | purchase                             
 event_timestamp | 2024-11-10 14:14:03.038363           
 page_url        | NULL                                 
 product_id      | product_92                           
 event_details   | Purchased product_92                 
only showing top 2 rows



In [14]:
# Generate 100 delete requests and 1000 export requests and add them to gdpr_requests table
gdpr_requests = generate_gdpr_request_dataframe(spark, spark.table(users_table_name), 100, 1000)

gdpr_requests.writeTo(gdpr_requests_table_name).createOrReplace()

In [15]:
spark.table(gdpr_requests_table_name).show(10, False)

+------------------------------------+------------+
|user_id                             |request_type|
+------------------------------------+------------+
|cffc431f-9ff2-424f-b420-70e3113fea4e|delete      |
|ea3d97b5-4567-406a-89de-3df72eb11411|delete      |
|4f495cf7-e15e-4683-a322-b0c5b7d63181|delete      |
|41a9f8bf-e782-4a6f-8805-921033a46836|delete      |
|9bd5b479-a026-48f2-8a36-d7ddb6850d8e|delete      |
|4e7d01f9-ae1e-4194-961d-3cdd64f71396|delete      |
|606da03b-b96e-4746-a493-6942f7ae0232|delete      |
|7869b5dc-88b0-4744-a1e0-2ac268ae47b5|delete      |
|153bbdfb-d1f8-47ba-a938-633fa2805ae2|delete      |
|7121fee3-bcb7-44d1-bbdd-169a0ea57f56|delete      |
+------------------------------------+------------+
only showing top 10 rows



### Join events table with gdpr_requests table to get the expected rows to delete

In [16]:
spark.sql(f"""
SELECT *
FROM {events_table_name} events
INNER JOIN {gdpr_requests_table_name} gdpr_requests
ON gdpr_requests.request_type = 'delete'
    AND events.user_id = gdpr_requests.user_id
""").count()

9992

### Delete events generated by deleted users

In [17]:
spark.sql(f"""
DELETE FROM {events_table_name} AS t
WHERE EXISTS (
    SELECT user_id
    FROM {gdpr_requests_table_name}
    WHERE request_type = 'delete'
        AND t.user_id = user_id
)
""")

DataFrame[]

In [18]:
before_last_snapshot_id = spark.sql(f"""
WITH events_table_history AS (
    SELECT
        snapshot_id,
        ROW_NUMBER() OVER (ORDER BY made_current_at DESC) AS row_number
    FROM {events_table_name}.history
)
SELECT snapshot_id
FROM events_table_history
WHERE row_number = 2;
""").collect()[0]["snapshot_id"]

In [19]:
spark.sql(f"""
   CALL lakehouse.system.rollback_to_snapshot('{events_table_name}', {before_last_snapshot_id})
""").show()

+--------------------+-------------------+
|previous_snapshot_id|current_snapshot_id|
+--------------------+-------------------+
| 2106750013844698407|1882728075265419825|
+--------------------+-------------------+



In [20]:
spark.sql(f"""
MERGE INTO {events_table_name} t
USING (
    SELECT *
    FROM {gdpr_requests_table_name}
    WHERE request_type = 'delete'
) s
ON t.user_id = s.user_id
WHEN MATCHED THEN DELETE
""")

DataFrame[]

In [21]:
spark.table(events_table_name).count()

9990008

### Re-join events table with gdpr_requests table to check if there is any match

In [22]:
spark.sql(f"""
SELECT *
FROM {events_table_name} events
INNER JOIN {gdpr_requests_table_name} gdpr_requests
ON gdpr_requests.request_type = 'delete'
    AND events.user_id = gdpr_requests.user_id
""").count()

0

### Re-join the old snapshot of events table with gdpr_requests table

In [23]:
before_last_snapshot_id = spark.sql(f"""
WITH events_table_history AS (
    SELECT
        snapshot_id,
        ROW_NUMBER() OVER (ORDER BY made_current_at DESC) AS row_number
    FROM {events_table_name}.history
)
SELECT snapshot_id
FROM events_table_history
WHERE row_number = 2;
""").collect()[0]["snapshot_id"]

In [24]:
spark.sql(f"""
SELECT *
FROM (
    SELECT *
    FROM {events_table_name}
    VERSION AS OF {before_last_snapshot_id}
) events
INNER JOIN {gdpr_requests_table_name} gdpr_requests
ON gdpr_requests.request_type = 'delete'
    AND events.user_id = gdpr_requests.user_id
""").count()

9992

### Clean old snapshots

In [25]:
spark.sql(f"""
CALL lakehouse.system.expire_snapshots('{events_table_name}', TIMESTAMP '{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}', 1)
""").show(truncate=False, vertical=True)

-RECORD 0----------------------------------
 deleted_data_files_count            | 89  
 deleted_position_delete_files_count | 0   
 deleted_equality_delete_files_count | 0   
 deleted_manifest_files_count        | 21  
 deleted_manifest_lists_count        | 11  
 deleted_statistics_files_count      | 0   



In [26]:
spark.sql(f"""
SELECT *
FROM {events_table_name}.history
""").count()

1

### Process export requests

When processing export requests, we typically want to group each user's data to write it to a single file or join it to other tables to retrieve the original values ​​when pseudonymization techniques are used on columns containing sensitive information.

To simulate these scenarios, we write the result to a Parquet table partitioned by `user_id` to force Spark to shuffle the data using this column.

In [27]:
start_time = time.monotonic()

spark.sql(f"""
SELECT events.*
FROM {events_table_name} events
INNER JOIN {gdpr_requests_table_name} gdpr_requests
ON gdpr_requests.request_type = 'export'
    AND events.user_id = gdpr_requests.user_id
""").write.mode("overwrite").partitionBy("user_id").parquet("users_export_data1")

elapsed_time_without_optimization = time.monotonic() - start_time
print(f"finished in {elapsed_time_without_optimization}s") 

finished in 132.024236561032s


We now optimize the table using the `rewrite_data_files` stored procedure, sorting the data files by `user_id` in ascending order, and then rerun the export query processing.

In [28]:
# Optimize the data files - sorting by user_id

start_time = time.monotonic()

spark.sql(f"""
CALL lakehouse.system.rewrite_data_files(
    table => '{events_table_name}',
    strategy => 'sort',
    sort_order => 'user_id ASC',
    options => map('rewrite-all', 'true')
)
""").show(truncate=False, vertical=True)

elapsed_time_for_optimization = time.monotonic() - start_time
print(f"Optimized the table in {elapsed_time_for_optimization}s")

-RECORD 0-------------------------------
 rewritten_data_files_count | 40        
 added_data_files_count     | 1         
 rewritten_bytes_count      | 479267521 
 failed_data_files_count    | 0         

Optimized the table in 22.89085567597067s


In [29]:
# rerun the export query on the optimized files

start_time = time.monotonic()

spark.sql(f"""
SELECT events.*
FROM {events_table_name} events
INNER JOIN {gdpr_requests_table_name} gdpr_requests
ON gdpr_requests.request_type = 'export'
    AND events.user_id = gdpr_requests.user_id
""").write.mode("overwrite").partitionBy("user_id").parquet("users_export_data2")

elapsed_time_with_optimization = time.monotonic() - start_time
print(f"finished in {elapsed_time_with_optimization}s") 

finished in 4.196367917989846s


In [30]:
print(f"It's {(elapsed_time_without_optimization / elapsed_time_with_optimization):.2f}X faster")

It's 31.46X faster


In [31]:
# Compare the two dataframes

spark.read.parquet("users_export_data1").createOrReplaceTempView("users_export_data1")
spark.read.parquet("users_export_data2").createOrReplaceTempView("users_export_data2")

assert spark.table("users_export_data1").count() == spark.table("users_export_data2").count()

assert spark.sql("""
SELECT *
FROM users_export_data1 v1 ANTI JOIN users_export_data2 v2
    ON v1.user_id = v2.user_id AND v1.event_id = v2.event_id
""").count() == 0

In [32]:
get_users_export_stats("users_export_data1")

{'max_files_by_user': 40,
 'min_files_by_user': 29,
 'avg_files_by_user': 36.753753753753756}

In [33]:
get_users_export_stats("users_export_data2")

{'max_files_by_user': 1, 'min_files_by_user': 1, 'avg_files_by_user': 1.0}

As you can see, querying the optimized table is not only faster, but also produces significantly fewer files, which will help improve all of its downstream queries.