# Lakehousing of HEP data using Apache Iceberg

Jayjeet Chakraborty, University of California, Santa Cruz


## Features of Iceberg:

* Supports transactions
* Hidden Partitioning
* Schema Evolution
* Time Travel and Rollbacks
* Expressive SQL

## Adding Iceberg to Spark

The Iceberg JAR file has to be copied to the Spark installation's JAR directory.

In [144]:
!ls /opt/spark/jars | grep "iceberg"

iceberg-spark-runtime-3.3_2.12-1.1.0.jar


We need to add some configurations options for Spark to pickup Iceberg.

In [145]:
!cat /opt/spark/conf/spark-defaults.conf

spark.sql.extensions                   org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.demo                 org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.demo.warehouse       warehouse
spark.sql.catalog.demo.type            hadoop
spark.sql.defaultCatalog               demo
spark.eventLog.enabled                 true
spark.eventLog.dir                     /home/iceberg/spark-events
spark.history.fs.logDirectory          /home/iceberg/spark-events
spark.sql.catalogImplementation        in-memory



In [146]:
spark

## Creating an Iceberg table from a Parquet file

We read NanoEvents data out of our Parquet file into a Spark Dataframe

In [234]:
df = spark.read.parquet("Run2012B_SingleMu-1000.parquet")
df

DataFrame[run: int, luminosityBlock: bigint, event: bigint, MET: struct<pt:float,phi:float,sumet:float,significance:float,CovXX:float,CovXY:float,CovYY:float>, HLT: struct<IsoMu24_eta2p1:boolean,IsoMu24:boolean,IsoMu17_eta2p1_LooseIsoPFTau20:boolean>, PV: struct<npvs:int,x:float,y:float,z:float>, Muon: array<struct<pt:float,eta:float,phi:float,mass:float,charge:int,pfRelIso03_all:float,pfRelIso04_all:float,tightId:boolean,softId:boolean,dxy:float,dxyErr:float,dz:float,dzErr:float,jetIdx:int,genPartIdx:int>>, Electron: array<struct<pt:float,eta:float,phi:float,mass:float,charge:int,pfRelIso03_all:float,dxy:float,dxyErr:float,dz:float,dzErr:float,cutBasedId:boolean,pfId:boolean,jetIdx:int,genPartIdx:int>>, Photon: array<struct<pt:float,eta:float,phi:float,mass:float,charge:int,pfRelIso03_all:float,jetIdx:int,genPartIdx:int>>, Jet: array<struct<pt:float,eta:float,phi:float,mass:float,puId:boolean,btag:float>>, Tau: array<struct<pt:float,eta:float,phi:float,mass:float,charge:int,decayMode:

We now create an Iceberg table out of this dataframe partitioned by the `event` field.

In [235]:
%%sql

DROP TABLE IF EXISTS iris_hep;

In [237]:
df = df[df['event']<62362200]
df.writeTo("iris_hep").partitionedBy('event').using("iceberg").create()

## Data, Metadata, WAL, and Snapshots

Data files

In [238]:
%%sql

SELECT file_path, file_size_in_bytes FROM demo.iris_hep.all_data_files;

file_path,file_size_in_bytes
warehouse/iris_hep/data/event=62359790/00000-1499-63eace2d-a3ad-417f-b695-f31c483d4c12-00001.parquet,21439
warehouse/iris_hep/data/event=62361579/00000-1499-63eace2d-a3ad-417f-b695-f31c483d4c12-00002.parquet,21467
warehouse/iris_hep/data/event=62362173/00000-1499-63eace2d-a3ad-417f-b695-f31c483d4c12-00003.parquet,23876


Metadata files

In [165]:
%%sql

SELECT * FROM demo.iris_hep.manifests;

content,path,length,partition_spec_id,added_snapshot_id,added_data_files_count,existing_data_files_count,deleted_data_files_count,added_delete_files_count,existing_delete_files_count,deleted_delete_files_count,partition_summaries
0,warehouse/iris_hep/metadata/dc806fc5-1836-43cb-abc3-42ff202e79a9-m0.avro,12631,0,4553086720717858461,1,0,0,0,0,0,[]


Write-Ahead Log files that enables transactions

In [164]:
%%sql

SELECT * from demo.iris_hep.metadata_log_entries;

timestamp,file,latest_snapshot_id,latest_schema_id,latest_sequence_number
2022-12-16 04:33:53.542000,warehouse/iris_hep/metadata/v1.metadata.json,4553086720717858461,0,0


Snapshot files

In [163]:
%%sql

SELECT snapshot_id, manifest_list FROM demo.iris_hep.snapshots

snapshot_id,manifest_list
4553086720717858461,warehouse/iris_hep/metadata/snap-4553086720717858461-1-dc806fc5-1836-43cb-abc3-42ff202e79a9.avro


## Schema Evolution

Iceberg allows adding, dropping, renaming, updating, and reordering columns with just metadata operations which are fast and do not need rewriting any data files. Altering the schema does not create a new snapshot. We beging by checking the schema of our table.

In [166]:
%%sql

DESCRIBE TABLE demo.iris_hep;

col_name,data_type,comment
run,int,
luminosityBlock,bigint,
event,bigint,
MET,"struct<pt:float,phi:float,sumet:float,significance:float,CovXX:float,CovXY:float,CovYY:float>",
HLT,"struct<IsoMu24_eta2p1:boolean,IsoMu24:boolean,IsoMu17_eta2p1_LooseIsoPFTau20:boolean>",
PV,"struct<npvs:int,x:float,y:float,z:float>",
Muon,"array<struct<pt:float,eta:float,phi:float,mass:float,charge:int,pfRelIso03_all:float,pfRelIso04_all:float,tightId:boolean,softId:boolean,dxy:float,dxyErr:float,dz:float,dzErr:float,jetIdx:int,genPartIdx:int>>",
Electron,"array<struct<pt:float,eta:float,phi:float,mass:float,charge:int,pfRelIso03_all:float,dxy:float,dxyErr:float,dz:float,dzErr:float,cutBasedId:boolean,pfId:boolean,jetIdx:int,genPartIdx:int>>",
Photon,"array<struct<pt:float,eta:float,phi:float,mass:float,charge:int,pfRelIso03_all:float,jetIdx:int,genPartIdx:int>>",
Jet,"array<struct<pt:float,eta:float,phi:float,mass:float,puId:boolean,btag:float>>",


Now we add a new column to the table `mycol` of type `double`.

In [167]:
%%sql

ALTER TABLE demo.iris_hep ADD COLUMN mycol double;

We check the schema again.

In [168]:
%%sql

DESCRIBE TABLE demo.iris_hep;

col_name,data_type,comment
run,int,
luminosityBlock,bigint,
event,bigint,
MET,"struct<pt:float,phi:float,sumet:float,significance:float,CovXX:float,CovXY:float,CovYY:float>",
HLT,"struct<IsoMu24_eta2p1:boolean,IsoMu24:boolean,IsoMu17_eta2p1_LooseIsoPFTau20:boolean>",
PV,"struct<npvs:int,x:float,y:float,z:float>",
Muon,"array<struct<pt:float,eta:float,phi:float,mass:float,charge:int,pfRelIso03_all:float,pfRelIso04_all:float,tightId:boolean,softId:boolean,dxy:float,dxyErr:float,dz:float,dzErr:float,jetIdx:int,genPartIdx:int>>",
Electron,"array<struct<pt:float,eta:float,phi:float,mass:float,charge:int,pfRelIso03_all:float,dxy:float,dxyErr:float,dz:float,dzErr:float,cutBasedId:boolean,pfId:boolean,jetIdx:int,genPartIdx:int>>",
Photon,"array<struct<pt:float,eta:float,phi:float,mass:float,charge:int,pfRelIso03_all:float,jetIdx:int,genPartIdx:int>>",
Jet,"array<struct<pt:float,eta:float,phi:float,mass:float,puId:boolean,btag:float>>",


Since, evolving schema is just a metadata operation, no snapshot is created. We still have the only parent snapshot that we had from the start.

In [169]:
%%sql

SELECT snapshot_id, manifest_list FROM demo.iris_hep.snapshots;

snapshot_id,manifest_list
4553086720717858461,warehouse/iris_hep/metadata/snap-4553086720717858461-1-dc806fc5-1836-43cb-abc3-42ff202e79a9.avro


We have the new column `mycol` in place. No data files have been modified.

In [170]:
%%sql 

SELECT run, event, mycol from demo.iris_hep LIMIT 5;

run,event,mycol
194711,263142897,
194711,263176076,
194711,263180946,
194711,263197228,
194711,263232293,


## Snapshots

Snapshots are created only when the data of the table changes.

In [171]:
%%sql

UPDATE demo.iris_hep SET run = run * 2;

Since, we updated the data, a new snapshot is created.

In [172]:
%%sql

SELECT snapshot_id, manifest_list FROM demo.iris_hep.snapshots

snapshot_id,manifest_list
4553086720717858461,warehouse/iris_hep/metadata/snap-4553086720717858461-1-dc806fc5-1836-43cb-abc3-42ff202e79a9.avro
8370068912865699136,warehouse/iris_hep/metadata/snap-8370068912865699136-1-ef44eccb-9614-438d-87eb-1943393b64fc.avro


Let's do a couple of more updates to the table

In [173]:
%%sql

UPDATE demo.iris_hep SET event = event + 1;

Here is a snippet of the updated version of the table.

In [174]:
%%sql

SELECT run, event FROM demo.iris_hep LIMIT 5;

run,event
389422,263142898
389422,263176077
389422,263180947
389422,263197229
389422,263232294


## Time Travel and Rollback

Iceberg allows switching between different versions as it stores snapshots of data and metadata of every new version of the table. Here, we first look at the version history of the table.

In [175]:
df = spark.sql('SELECT * FROM demo.iris_hep.history')
df.toPandas()

Unnamed: 0,made_current_at,snapshot_id,parent_id,is_current_ancestor
0,2022-12-16 04:33:53.542,4553086720717858461,,True
1,2022-12-16 04:35:05.836,8370068912865699136,4.553087e+18,True
2,2022-12-16 04:35:16.053,2508631590115946818,8.370069e+18,True


Now, we roll back the table to the first version.

In [176]:
spark.sql(f"CALL demo.system.rollback_to_snapshot('iris_hep', {df.head().snapshot_id})")

DataFrame[previous_snapshot_id: bigint, current_snapshot_id: bigint]

We have the initial version of the table.

In [239]:
%%sql 

SELECT run, event FROM demo.iris_hep WHERE LIMIT 5;

run,event
195916,62359790
195916,62361579
195916,62362173


Although, we have rolled back to the initial version, we can still travel in time to use any version of the table we want.

In [179]:
%%sql 

SELECT run, event FROM demo.iris_hep VERSION AS OF 8370068912865699136 WHERE LIMIT 5;

run,event
389422,263142897
389422,263176076
389422,263180946
389422,263197228
389422,263232293


## Running ADL benchmark queries on Iceberg

### Query 1

In [180]:
%%sql

SELECT
  FLOOR((
    CASE
      WHEN MET.pt < 0 THEN -1
      WHEN MET.pt > 2000 THEN 2001
      ELSE MET.pt
    END) / 20) * 20 + 10 AS x,
  COUNT(*) AS y
FROM demo.iris_hep
GROUP BY FLOOR((
    CASE
      WHEN MET.pt < 0 THEN -1
      WHEN MET.pt > 2000 THEN 2001
      ELSE MET.pt
    END) / 20) * 20 + 10
ORDER BY x;

x,y
10,346
30,427
50,196
70,25
90,1
110,4
130,1


### Query 8

In [181]:
%%sql

-- Make the structure of Electrons and Muons uniform, and then union their arrays
WITH uniform_structure_leptons AS (
  SELECT
    event,
    MET,
    array_union(
      transform(
        COALESCE(Muon, ARRAY()),
        x -> CAST( STRUCT(x.pt, x.eta, x.phi, x.mass, x.charge, 'm') AS STRUCT<pt: FLOAT, eta: FLOAT, phi: FLOAT, mass: FLOAT, charge: INT, type: CHAR(256)> )
      ),
      transform(
        COALESCE(Electron, ARRAY()),
        x -> CAST( STRUCT(x.pt, x.eta, x.phi, x.mass, x.charge, 'e') AS STRUCT<pt: FLOAT, eta: FLOAT, phi: FLOAT, mass: FLOAT, charge: INT, type: CHAR(256)> )
      )
    ) AS Leptons
  FROM demo.iris_hep
  WHERE cardinality(Muon) + cardinality(Electron) > 2
),


-- Create the Lepton pairs, transform the leptons using PtEtaPhiM2PxPyPzE and then sum the transformed leptons
lepton_pairs AS (
  SELECT
    *,
    CAST(
      STRUCT(
        l1.pt * cos(l1.phi) + l2.pt * cos(l2.phi),
        l1.pt * sin(l1.phi) + l2.pt * sin(l2.phi),
        l1.pt * ( ( exp(l1.eta) - exp(-l1.eta) ) / 2.0 ) + l2.pt * ( ( exp(l2.eta) - exp(-l2.eta) ) / 2.0 ),
        sqrt(l1.pt * cosh(l1.eta) * l1.pt * cosh(l1.eta) + l1.mass * l1.mass) + sqrt(l2.pt * cosh(l2.eta) * l2.pt * cosh(l2.eta) + l2.mass * l2.mass)
      ) AS
      STRUCT <x: REAL, y: REAL, z: REAL, e: REAL>
    ) AS l,
    idx1 AS l1_idx,
    idx2 AS l2_idx
  FROM uniform_structure_leptons
  LATERAL VIEW POSEXPLODE(Leptons) AS idx1,l1
  LATERAL VIEW POSEXPLODE(Leptons) AS idx2,l2
  WHERE idx1 < idx2 AND l1.type = l2.type AND l1.charge != l2.charge
),


-- Apply the PtEtaPhiM2PxPyPzE transformation on the particle pairs, then retrieve the one with the mass closest to 91.2 for each event
processed_pairs AS (
  SELECT
    event,
    min_by(
      STRUCT(
        l1_idx,
        l2_idx,
        Leptons,
        MET.pt,
        MET.phi
      ),
      abs(91.2 - sqrt(l.e * l.e - l.x * l.x - l.y * l.y - l.z * l.z))
    ) AS system
  FROM lepton_pairs
  GROUP BY event
),


-- For each event get the max pt of the other leptons
other_max_pt AS (
  SELECT event, CAST(max_by(sqrt(2 * system.pt * l.pt * (1.0 - cos((system.phi- l.phi + pi()) % (2 * pi()) - pi()))), l.pt) AS REAL) AS pt
  FROM processed_pairs
  LATERAL VIEW POSEXPLODE(system.Leptons) AS idx,l
  WHERE idx != system.l1_idx AND idx != system.l2_idx
  GROUP BY event
)


-- Compute the histogram
SELECT
  FLOOR((
    CASE
      WHEN pt < 15 THEN 14.99
      WHEN pt > 250 THEN 250.1
      ELSE pt
    END - 0.9) / 2.35) * 2.35 + 2.075 AS x,
  COUNT(*) AS y
FROM other_max_pt
GROUP BY FLOOR((
    CASE
      WHEN pt < 15 THEN 14.99
      WHEN pt > 250 THEN 250.1
      ELSE pt
    END - 0.9) / 2.35) * 2.35 + 2.075
ORDER BY x NULLS LAST;

22/12/16 04:35:58 WARN CharVarcharUtils: The Spark cast operator does not support char/varchar type and simply treats them as string type. Please use string type directly to avoid confusion. Otherwise, you can set spark.sql.legacy.charVarcharAsString to true, so that Spark treat them as string type as same as Spark 3.0 and earlier
22/12/16 04:35:58 WARN CharVarcharUtils: The Spark cast operator does not support char/varchar type and simply treats them as string type. Please use string type directly to avoid confusion. Otherwise, you can set spark.sql.legacy.charVarcharAsString to true, so that Spark treat them as string type as same as Spark 3.0 and earlier


x,y
13.825,30
16.175,2
18.525,2
20.875,3
23.225,4
25.575,2
27.925,2
30.275,2
32.625,1
34.975,5


## Partition Pruning

Since, Iceberg can maintain hive-style partitions transparently, it helps speed up queries by enabing paritition pruning. Here, we just show how it's done.

In [215]:
%%sql

SELECT sum(event) FROM demo.iris_hep WHERE event=62362173;

sum(event)
62362173


In [216]:
%%sql

SELECT event FROM demo.iris_hep WHERE event=62362174;

event


Spark and Iceberg setup is enough to perform data management on HEP data. No special systems such as Presto, BigQuery, or Athena is required.