![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)

### [Integrated Audits: Streamlined Data Observability with Apache Iceberg](https://tabular.io/blog/integrated-audits/)

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

spark

25/06/16 16:15:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


To be able to rerun the notebook several times, let's drop the `permits` table if it exists to start fresh.

In [2]:
%%sql

CREATE DATABASE IF NOT EXISTS nyc

In [3]:
%%sql

DROP TABLE IF EXISTS nyc.permits

# Load NYC Film Permits Data

For this demo, we will use the [New York City Film Permits dataset](https://data.cityofnewyork.us/City-Government/Film-Permits/tg4x-b46p) available as part of the NYC Open Data initiative. We're using a locally saved copy of a 1000 record sample, but feel free to download the entire dataset to use in this notebook!

We'll save the sample dataset into an iceberg table called `permits`.

In [4]:
df = spark.read.option("inferSchema","true").option("multiline","true").json("/home/iceberg/data/nyc_film_permits.json")
df.write.saveAsTable("nyc.permits")

                                                                                

In [37]:

df1 = spark.read.option("inferSchema","true").option("multiline","false").json("/home/iceberg/warehouse/esm.jsonl")

from pyspark.sql import functions as F


In [73]:
df1.printSchema()


df1.select(df1._source.customer, 
           df1._source.source.id.alias('source_id'),
           df1._source.metric.value.alias('metric_value'),
           df1._source.metric['name'].alias('metric_name'),
          ).printSchema()


root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: long (nullable = true)
 |-- _source: struct (nullable = true)
 |    |-- @timestamp: string (nullable = true)
 |    |-- @version: string (nullable = true)
 |    |-- collectorId: string (nullable = true)
 |    |-- customer: string (nullable = true)
 |    |-- domain: string (nullable = true)
 |    |-- environment: string (nullable = true)
 |    |-- eventId: string (nullable = true)
 |    |-- eventTime: string (nullable = true)
 |    |-- eventType: string (nullable = true)
 |    |-- metric: struct (nullable = true)
 |    |    |-- labels: struct (nullable = true)
 |    |    |    |-- __name__: string (nullable = true)
 |    |    |    |-- job: string (nullable = true)
 |    |    |    |-- kpi_uid: string (nullable = true)
 |    |    |    |-- metricPeriod: string (nullable = true)
 |    |    |    |-- modelUid: string (nullable = true)
 |    |    |    |-- storage_type: string (nullable = true)
 |    |    |

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, TimestampNTZType, MapType

schema = StructType([
    StructField("_source",StructType([
        StructField("customer",StringType()),
        StructField("eventId",StringType()),
        StructField("eventTime",TimestampType()),
        StructField("timeReceived",TimestampType()),        
        StructField("domain",StringType()),
        StructField("source",StructType([
            StructField("id",StringType()),
            StructField("labels",MapType(StringType(),StringType()))
            ])),
        StructField("metric",StructType([
            StructField("name",StringType()),
            StructField("value",DoubleType()),
            StructField("labels",MapType(StringType(),StringType()))
            ]))
        ]))
    ])

#df2 = spark.read.schema(schema).json("/home/iceberg/warehouse/esm-mpn-logstash-oss-performance_data.jsonl.gz")
df2 = spark.read.schema(schema).json("/home/iceberg/datahouse/vf/cisco-vmanage-pm-metrics_20250601a.jsonl.gz")
#df2 = spark.read.schema(schema).json("/home/iceberg/datahouse/vf/cisco-vmanage-pm-metrics_20250601b.jsonl.gz")
#df2.printSchema()
df3 = df2.select(df2._source.customer.alias("customer"), 
           df2._source.eventId.alias("eventId"),
           df2._source.eventTime.alias("eventTime"),           
           df2._source.timeReceived.alias("timeReceived"),
           df2._source.source.id.alias("source_id"),
           df2._source.source.labels.alias("source_labels"),
           df2._source.metric["name"].alias("metric_name"),
           df2._source.metric.value.alias("metric_value"),
           df2._source.metric.labels.alias("metric_labels")
          )
df3.count()
df3.head()
df3.write.saveAsTable("nyc.esm_metric_06")
#df3.write.insertInto("nyc.esm_metric")

#df3.coalesce(1).write.save("/tmp/nyc.esm_metric_01")



[Stage 138:>                                                        (0 + 1) / 1]

In [100]:
%%sql

DROP TABLE IF EXISTS nyc.esm_metric_01;
DROP TABLE IF EXISTS nyc.esm_metric_02;


In [None]:
df = spark.read.parquet("s3a://ingress/yellow_tripdata_2022-02.parquet");
df.count()
df.show()


Taking a quick peek at the data, you can see that there are a number of permits for different boroughs in New York.

In [None]:
spark.read \
    .format("iceberg") \
    .load("nyc.permits") \
    .groupBy("borough") \
    .count() \
    .show()

# Generate an ID for an Integrated Audit Session

An integrated audit session is a single cadence of:
1. Staging changes to a table
2. Auditing the staged changes
3. Committing the changes (optional)

Each of these sessions must be represented with an ID. You can use any convention that makes sense in your environment but in this demo we'll simply use a UUID.

In [None]:
import uuid
ia_session_id = uuid.uuid4().hex
ia_session_id

# The Setup

Tables by default are not configured to allow integrated audits, therefore the first step is enabling this by setting the `write.wap.enabled` table metadata property to `true`

In [None]:
%%sql

ALTER TABLE nyc.permits
SET TBLPROPERTIES (
    'write.wap.enabled'='true'
)

Next, the `spark.wap.id` property of your Spark session configuration must be set to the integrated audit session ID.

In [None]:
spark.conf.set('spark.wap.id', ia_session_id)

With a `spark.wap.id` value set, you can now safely write **directly to the permits table**--don't worry, these changes will only be staged, not committed!

# Staging The Changes

To stage the changes, you simply write directly to the `permits` table. This is awesome in situations where you're working with a large and complex data ingestion pipeline.
Instead of including hard-coded logic in your pipeline to switch between a sort of "audit-mode" as opposed to "production-mode", with integrated audits you simple run your
production code!

For this demo, let's use a simple query that deletes all records for film permits in the manhattan borough.

In [None]:
%%sql

DELETE FROM nyc.permits
WHERE borough='Manhattan'

As described, even though the query was executed against the production table, these changes are only staged and not committed since we are within an integrated audit session. Let's confirm this by verifying that a count by borough still includes the Manhattan records.

In [None]:
%%sql

SELECT borough, count(*) permit_cnt
FROM nyc.permits
GROUP BY borough

# The Audit

Once the changes for this session are staged, you can perform all of your audits to validate the data. The first step is to retrieve the snapshot ID generated by the changes and tagged with this integrated audit session ID.

In [None]:
query = f"""
SELECT snapshot_id
FROM nyc.permits.snapshots
WHERE summary['wap.id'] = '{ia_session_id}'
"""

ia_session_snapshot = spark.sql(query).head().snapshot_id

In [None]:
ia_session_snapshot

This snapshot includes the staged (but not commited) changes to your production table. Once you have this snapshot ID, you can use Iceberg's Time Travel feature to query it!

In [None]:
spark.read \
    .option("snapshot-id", ia_session_snapshot) \
    .format("iceberg") \
    .load("nyc.permits") \
    .groupBy("borough") \
    .count() \
    .show()

At this point, you can use any auditing tool or technique to validate your changes. For this demo, we'll do a simple audit that confirms that the only remaining boroughs are Queens, Brooklyn, Bronx, and Staten Island. If either borough is missing or any additional boroughs are found, we'll raise an exception.

In [None]:
expected_boroughs = {"Queens", "Brooklyn", "Bronx", "Staten Island"}
distinct_boroughs = spark.read \
    .option("snapshot-id", ia_session_snapshot) \
    .format("iceberg") \
    .load("nyc.permits") \
    .select("borough") \
    .distinct() \
    .toLocalIterator()
boroughs = {row[0] for row in distinct_boroughs}

In [None]:
# Since `boroughs` and `required_boroughs` are both sets (array of distinct items),
# we can confirm that they match by checking that the lengths of the sets are equal
# to eachother as well as to the union of both sets.
if len(boroughs) != len(expected_boroughs) != len(set.union(boroughs, expected_boroughs)):
    raise ValueError(f"Audit failed, borough set does not match expected boroughs: {boroughs} != {expected_boroughs}")

If the above check does not fail, we can go ahead and commit our staged data to publish our changes!

# The Publish

After the audits are completed, publishing the data is as simple as running a `cherrypick_snapshot` stored procedure.

In [None]:
publish_query = f"CALL system.cherrypick_snapshot('nyc.permits', {ia_session_snapshot})"
%sql $publish_query

That's it! Publishing the changes from this integrated audit session is a simple metadata-only operation that instantly makes the changes live for all downstream consumers querying the `permits` table! Query results will now include the commit that removed all Manhattan records.

In [None]:
spark.read \
    .format("iceberg") \
    .load("nyc.permits") \
    .groupBy("borough") \
    .count() \
    .show()

# What Happens When The Audits Fail?

What about when your audits fail? What happens to the snapshots generated? How about the data and metadata files?

One of the best parts of Iceberg's integrated audits is that the cleanup of "*staged-yet-not-committed-data*" is part of the normal snapshot cleanup process of a typical Iceberg warehouse. To be more specific, let's say a daily snapshot expiration is performed on the data warehouse (using the [expire_snapshots](https://iceberg.apache.org/docs/latest/spark-procedures/#expire_snapshots) procedure) and all snapshots older than 7 days are expired. That means once your staged snapshot reaches 7 days in age, it will be expired.

Additionally, since the changes were never committed, the underlying data files for the snapshot will be removed since they're not referenced by any other snapshots in the linear history of the table.

Let's see this in action. First, start a new integrated audit session and stage a commit by inserting a single record.

In [None]:
ia_session_id = uuid.uuid4().hex
ia_session_id

In [None]:
spark.conf.set('spark.wap.id', ia_session_id)

In [None]:
%%sql

INSERT INTO nyc.permits
VALUES (
    'Hoboken',
    'Television',
    '1',
    'United States of America',
    '2021-11-24T23:00:00.000',
    '2021-11-23T09:38:17.000',
    'Mayor\'s Office of Film, Theatre & Broadcasting',
    '613322',
    'Shooting Permit',
    'WASHINGTON STREET',
    '100',
    '2021-11-24T07:00:00.000',
    'Episodic series',
    '07030'
)

Next, let's identify the snapshot that was tagged with the integrated audit session ID.

In [None]:
%%sql

SELECT snapshot_id
FROM nyc.permits.snapshots

In [None]:
query = f"""
SELECT snapshot_id
FROM nyc.permits.snapshots
WHERE summary['wap.id'] = '{ia_session_id}'
"""

ia_session_snapshot = spark.sql(query).head().snapshot_id

In [None]:
ia_session_snapshot

A quick check of the history table shows that this snapshot is not included as part of the current history of the table since it has not been published yet.

In [None]:
%%sql

SELECT *
FROM nyc.permits.history

In a scenario where the audits fail and this change is not published, the `expire_snapshots` procedure will clean up the snapshot **and** the data files. Let's demonstrate this by calling the `expire_snapshots` procedure for all snapshots older than the current timestamp.

In [None]:
import time
%sql CALL system.expire_snapshots('nyc.permits', {round(time.time() * 1000)}, 100)

The output from the `expire_snapshots` procedure shows that a data file, a manifest file, and a manifest list file were deleted. Furthermore, the snapshot no longer appears in the permit table's snapshots table.

In [None]:
%%sql

SELECT *
FROM nyc.permits.snapshots