# Iceberg Summit 2025 demo

Let's verify now if the Marquez is app and running 

In [1]:
import json,requests
marquez_url = "http://marquez:9000" ## this may depend on your local setup
if (requests.get("{}/api/v1/namespaces".format(marquez_url)).status_code == 200):
    print("Marquez is OK.")
else:
    print("Cannot connect to Marquez")

Marquez is OK.


You can check it in your browser as well -> http://localhost:3000

Let's now start Spark session with Openlineage and Iceberg enabled: 

In [2]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder.master('local')
     .appName('sample_spark')
     .config('spark.jars.packages', 'io.openlineage:openlineage-spark_2.12:1.30.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1')
     .config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')
     .config('spark.openlineage.transport.type', 'http')
     .config('spark.openlineage.transport.url', 'http://marquez:9000/api/v1')
     .config('spark.openlineage.job.owners.team', 'capybaras')
     .config('spark.openlineage.job.owners.email', 'capybaras@myorg.com')
     .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
     .config("spark.sql.catalog.spark_catalog.type", "hadoop")
     .config("spark.sql.catalog.spark_catalog.warehouse", "/tmp/iceberg")
     .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
     .getOrCreate())
spark.sparkContext.setLogLevel("INFO")

## Prepare data
### Downlaod tripdata

We download NYC taxi data:

In [3]:
# data from https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
import os
os.system(f"rm -rf taxi_parquet")
for i in range(1, 13):
    if (i == 11):
        print("deliberately skipping November")
        continue
    print("downloading", str(i).zfill(2))
    os.system(f"wget -c https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-{str(i).zfill(2)}.parquet -P taxi_parquet ")

downloading 01
downloading 02
downloading 03
downloading 04
downloading 05
downloading 06
downloading 07
downloading 08
downloading 09
downloading 10
deliberately skipping November
downloading 12


### Write data as iceberg table with ride_id UUID column added

In [4]:
import uuid
from pyspark.sql.functions import udf, month
from pyspark.sql.types import StringType

uuidUdf= udf(lambda : str(uuid.uuid4()),StringType())

spark \
  .read \
  .parquet("taxi_parquet") \
  .withColumn("ride_id", uuidUdf()) \
  .withColumn("month", month("tpep_dropoff_datetime")) \
  .write \
  .format("iceberg") \
  .mode("overwrite") \
  .partitionBy("month") \
  .saveAsTable("yellow_tripdata")

## Create extra table with users' comments for 1% of the rides

In [5]:
from pyspark.sql.functions import col, lit

spark \
  .read \
  .table("yellow_tripdata") \
  .sample(fraction=0.01) \
  .select(col("ride_id"), col("tpep_pickup_datetime").alias("datetime")) \
  .withColumn("comment", lit("This ride was cool")) \
  .write \
  .format("iceberg") \
  .mode("overwrite") \
  .saveAsTable("comments")

In [6]:
spark.read.table("comments").show(2)

+--------------------+-------------------+------------------+
|             ride_id|           datetime|           comment|
+--------------------+-------------------+------------------+
|29d49cdb-b7b5-4dc...|2024-10-01 00:09:29|This ride was cool|
|55dcb827-0fe9-472...|2024-10-01 00:09:16|This ride was cool|
+--------------------+-------------------+------------------+
only showing top 2 rows



## Common pitfall

Get one-month comments joined with trip_distance


In [7]:
from pyspark.sql.functions import month

comments = spark.read.table("comments")
tripdata = spark.read.table("yellow_tripdata")


tripdata \
  .join(comments, tripdata["ride_id"] == comments["ride_id"]) \
  .filter(month(comments["datetime"]) == 12) \
  .select(comments["comment"], tripdata["trip_distance"]) \
  .write \
  .format("iceberg") \
  .mode("overwrite") \
  .saveAsTable("comment_distance")

But there is something wrong with the query below as we don't filter month on tripdata datataset. 

The query should return proper results but this is a clear performance BUG. 

One can see the graph here:
http://localhost:3000/lineage/job/default/sample_spark.atomic_replace_table_as_select.default_comment_distance

## Fixed query

In [8]:
from pyspark.sql.functions import month

comments = spark.read.table("comments")
tripdata = spark.read.table("yellow_tripdata")


tripdata \
  .filter("month=12") \
  .join(comments, tripdata["ride_id"] == comments["ride_id"]) \
  .filter(month(comments["datetime"]) == 12) \
  .select(comments["comment"], tripdata["trip_distance"]) \
  .write \
  .format("iceberg") \
  .mode("overwrite") \
  .saveAsTable("comment_distance")

## Identify problematic jobs and contact teams 

List all jobs that that read more than X bytes and with skipped files in metrics

You can see the lineage graph in [Marquez](http://localhost:3000/lineage/job/default/sample_spark.atomic_replace_table_as_select.default_comment_distance_december):

In [11]:
import json
import requests

response = requests.get("{}/api/v1/events/lineage?limit=100".format(marquez_url))
events = json.loads(response.text)

List all jobs that read more than 1 gigabyte of data without skippedFiles

In [12]:
for event in events['events']:
    jobName = event['job']['name']
    owner = event['job']['facets'].get('ownership')
    for dataset in event['inputs']:
        if dataset.get('inputFacets') == None or owner == None:
            continue
        scan = dataset['inputFacets'].get('icebergScanReport')
        if scan != None:
            skippedFiles = scan['scanMetrics']['skippedDataFiles']
            bytesRead = scan['scanMetrics']['totalFileSizeInBytes']
            # more than 1Gigabyte read and no skippedFiles
            if skippedFiles == 0 and bytesRead > 1048576000:
                print(
                    f"\n\nJob :{jobName}", 
                    f"\nreading '{dataset['name']}'",
                    f"\nowner: {owner.get('owners')}", 
                    f"\nhas read: {bytesRead/(1<<30):.2f}", "gigabyte"
                )



Job :sample_spark.append_data.spark_catalog_default_comment_distance 
reading '/tmp/iceberg/default/yellow_tripdata' 
owner: [{'name': 'capybaras', 'type': 'team'}, {'name': 'capybaras@myorg.com', 'type': 'email'}] 
has read: 1.27 gigabyte


Job :sample_spark.append_data.spark_catalog_default_comments 
reading '/tmp/iceberg/default/yellow_tripdata' 
owner: [{'name': 'capybaras', 'type': 'team'}, {'name': 'capybaras@myorg.com', 'type': 'email'}] 
has read: 1.27 gigabyte


## Missing November data

Let's fill comment_distance table for November.

In [13]:
tripdata \
  .filter("month=11") \
  .join(comments, tripdata["ride_id"] == comments["ride_id"]) \
  .filter(month(comments["datetime"]) == 11) \
  .select(comments["comment"], tripdata["trip_distance"]) \
  .write \
  .format("iceberg") \
  .mode("append") \
  .saveAsTable("comment_distance")

This should not append any rows. Let's compare now the CommitReports:

In [14]:
response = requests.get("{}/api/v1/events/lineage?limit=50".format(marquez_url))
events = json.loads(response.text)

In [15]:
for event in events['events']:
    runId = event['run'].get('runId')
    jobName = event['job']['name']
    for dataset in event['outputs']:
        if not dataset.get('name').endswith('comment_distance'):
            continue
        if dataset.get('outputFacets') == None:
            continue
        commit = dataset['outputFacets'].get('icebergCommitReport')
        if commit != None:
            print(
                    f"\n\nJob :{jobName}", 
                    f"\nwriting '{dataset['name']}'",
                    f"\nrunId: {runId}", 
                    f"\nhas written: {commit['commitMetrics'].get('addedRecords')}", "records"
                )



Job :sample_spark.append_data.spark_catalog_default_comment_distance 
writing '/tmp/iceberg/default/comment_distance' 
runId: 0195a84b-0ef5-762d-9b02-3175e3a2a9c7 
has written: 1 records


Job :sample_spark.atomic_replace_table_as_select.default_comment_distance 
writing '/tmp/iceberg/default/comment_distance' 
runId: 0195a84a-4012-73cd-bab4-c54db77dbc9d 
has written: 36904 records


Job :sample_spark.append_data.spark_catalog_default_comment_distance 
writing '/tmp/iceberg/default/comment_distance' 
runId: 0195a84a-406f-7685-8ec1-d4ecd0701e4c 
has written: 36907 records


Job :sample_spark.atomic_replace_table_as_select.default_comment_distance 
writing '/tmp/iceberg/default/comment_distance' 
runId: 0195a84a-036c-704c-92e8-3ab7e67e9794 
has written: 36907 records


Job :sample_spark.append_data.spark_catalog_default_comment_distance 
writing '/tmp/iceberg/default/comment_distance' 
runId: 0195a84a-03f5-758e-a074-0130bdeb02d4 
has written: 36907 records


This is interesting. Why a certain run did not write any record? Lets verify the inputs of jobs writing to `comment_distance`. 

In [16]:
for event in events['events']:
    if len(event['outputs']) == 0:
        continue
    if not event['outputs'][0]['name'].endswith('comment_distance'):
        continue
    jobName = event['job']['name']
    for dataset in event['inputs']:
        if dataset.get('inputFacets') == None or dataset.get('inputFacets').get('icebergScanReport') == None:
            continue
        scan = dataset['inputFacets'].get('icebergScanReport')
        if scan != None:
              print(
                f"\n\nJob :{jobName} run at {event['eventTime']}", 
                f"\nIutput dataset :{dataset['name']}", 
                f"\nrows read '{dataset['inputFacets'].get('inputStatistics').get('rowCount')}'"
              )



Job :sample_spark.append_data.spark_catalog_default_comment_distance run at 2025-03-18T08:07:26.385Z 
Iutput dataset :/tmp/iceberg/default/yellow_tripdata 
rows read '1994'


Job :sample_spark.append_data.spark_catalog_default_comment_distance run at 2025-03-18T08:07:26.385Z 
Iutput dataset :/tmp/iceberg/default/comments 
rows read '376458'


Job :sample_spark.append_data.spark_catalog_default_comment_distance run at 2025-03-18T08:06:34.763Z 
Iutput dataset :/tmp/iceberg/default/yellow_tripdata 
rows read '3667755'


Job :sample_spark.append_data.spark_catalog_default_comment_distance run at 2025-03-18T08:06:34.763Z 
Iutput dataset :/tmp/iceberg/default/comments 
rows read '376458'


Job :sample_spark.append_data.spark_catalog_default_comment_distance run at 2025-03-18T08:06:32.892Z 
Iutput dataset :/tmp/iceberg/default/yellow_tripdata 
rows read '37523351'


Job :sample_spark.append_data.spark_catalog_default_comment_distance run at 2025-03-18T08:06:32.892Z 
Iutput dataset :/tmp/ice

This is something that cannot be done purely with Iceberg Metrics on its own. The value of the metric comes from connecting input metrics with output metrics, while they belong to the same job.