# Benchmarking ETL & Accelerators

This notebook benchmarks:
1. **Pre-aggregate once, cache forever** (parquet vs recompute)
2. **Parallel summarization** using ProcessPoolExecutor
3. **Dask** array summarization
4. **DuckDB** querying Parquet vs pandas filtering
5. **Packbits** compression of masks to bytes

In [1]:
import contextlib
import os
import pathlib
import time
from concurrent.futures import ThreadPoolExecutor

import h5py
import numpy as np
import pandas as pd
from tqdm.auto import tqdm

try:
    import dask.array as da
    DASK_AVAILABLE = True
except ImportError:
    DASK_AVAILABLE = False
try:
    import duckdb
    DUCKDB_AVAILABLE = True
except ImportError:
    DUCKDB_AVAILABLE = False

# Paths
H5 = pathlib.Path("../data/2021-07-13 TCR Phase 1 Build 1.hdf5")
OUT_DIR = pathlib.Path("../data/precomputed")
OUT_DIR.mkdir(exist_ok=True)
PARQUET = OUT_DIR / "layer_summary.parquet"

CLS_STREAK, CLS_SPATTER = 3, 8
EDGE_FRAC = 0.10

In [2]:

@contextlib.contextmanager
def timed(label):
    start = time.perf_counter()
    yield
    end = time.perf_counter()
    print(f"{label}: {end-start:.2f} s")


## 1. Pre-aggregate once, cache forever

Define and time:
- `summarise_layers_vec()` to build and write Parquet
- reading from Parquet with `pd.read_parquet()`


In [3]:
def summarise_layers_vec(batch=128):
    with h5py.File(H5, "r") as h5:
        seg       = h5["slices/segmentation_results"]
        ds        = seg[str(CLS_STREAK)]
        nL, ny, nx = ds.shape
        edge      = int(nx * EDGE_FRAC)
        out       = np.zeros((nL, 4), np.int64)

        for i in tqdm(range(0, nL, batch), desc="Vectorised batches"):
            sl   = slice(i, min(i+batch, nL))
            st   = ds[sl]
            out[sl,0] = st.sum(axis=(1,2))
            out[sl,1] = st[:,:, :edge].sum(axis=(1,2))
            out[sl,2] = st[:,:, -edge:].sum(axis=(1,2))
            sp   = seg[str(CLS_SPATTER)][sl]
            out[sl,3] = sp.sum(axis=(1,2))

    df = pd.DataFrame(out,
                      columns=["streak_px","streak_left","streak_right","spatter_px"])
    df.to_parquet(PARQUET)
    return df


In [4]:
# Benchmark
if PARQUET.exists():
    PARQUET.unlink()

with timed("Vectorised build parquet"):
    df_build = summarise_layers_vec()

with timed("Read Parquet"):
    df_read = pd.read_parquet(PARQUET)

Vectorised batches:   0%|          | 0/28 [00:00<?, ?it/s]

Vectorised build parquet: 100.63 s
Read Parquet: 0.02 s


Notes: 
- Have to seperate ETL from analysis: Run vectorised summary once.
- Might have to guard analysis notebooks, e.g.:

		```python
		P = Path("../data/precomputed/layer_summary.parquet")
		if P.exists():
			layer_df = pd.read_parquet(P)
		else:
			layer_df = summarise_layers_vec()
		```
- Version Parquet with DVC, so we only recompute when needed.

## 2. Parallel summarization with small-batch ThreadPoolExecutor

Split into small batches to cap per-thread memory

In [5]:
batch = 32
with h5py.File(H5, "r") as h5:
    total_layers = h5["slices/segmentation_results"][str(CLS_STREAK)].shape[0]

ranges = [
    slice(i, min(i + batch, total_layers))
    for i in range(0, total_layers, batch)
]

Chunk‐processing function (reopens file per chunk)

In [6]:
def summarise_chunk(slc):
    with h5py.File(H5, "r") as h5:
        seg = h5["slices/segmentation_results"]
        st  = seg[str(CLS_STREAK)][slc]
        sp  = seg[str(CLS_SPATTER)][slc]
        edge = int(st.shape[2] * EDGE_FRAC)
        out = np.zeros((slc.stop - slc.start, 4), dtype=np.int64)
        out[:,0] = st.sum(axis=(1,2))
        out[:,1] = st[:,:, :edge].sum(axis=(1,2))
        out[:,2] = st[:,:, -edge:].sum(axis=(1,2))
        out[:,3] = sp.sum(axis=(1,2))
    return slc.start, out

Run in parallel with threads + progress bar

In [7]:
n_workers = os.cpu_count()  # Set n_workers to the number of CPU cores

results = []
with timed("Parallel summarise"), ThreadPoolExecutor(max_workers=n_workers) as executor:
    for start, out in tqdm(
        executor.map(summarise_chunk, ranges),
        total=len(ranges),
        desc="Parallel chunks"
    ):
        results.append((start, out))

Parallel chunks:   0%|          | 0/112 [00:00<?, ?it/s]

Parallel summarise: 169.90 s


In [8]:
results.sort(key=lambda x: x[0])
df_list = [
    pd.DataFrame(
        chunk,
        columns=["streak_px", "streak_left", "streak_right", "spatter_px"],
        index=range(start, start + chunk.shape[0])
    )
    for start, chunk in results
]
df_par = pd.concat(df_list)


## 3. Dask array summarization

If Dask is available, benchmark array computation using dask.


In [None]:

if DASK_AVAILABLE:
    with h5py.File(H5, "r") as h5:
        ds = h5["slices/segmentation_results"][str(CLS_STREAK)]
        darr = da.from_array(ds, chunks=(128, ds.shape[1], ds.shape[2]))
        with timed("Dask sum streak_px"):
            streak_sum = darr.sum(axis=(1, 2)).compute()
        print("First 5 sums:", streak_sum[:5])
else:
    print("Dask not installed")

Dask sum streak_px: 41.84 s
First 5 sums: [0 0 0 0 0]



## 4. DuckDB querying Parquet vs pandas filtering

Benchmark filter of layers where streak_px > median using DuckDB SQL vs pandas.


In [11]:

median_val = df_read['streak_px'].median()

if DUCKDB_AVAILABLE:
    with timed("DuckDB query"):
        con = duckdb.connect()
        res = con.execute(f"SELECT * FROM '{PARQUET}' WHERE streak_px > {median_val}").fetchall()
    with timed("Pandas filter"):
        df_filt = df_read[df_read['streak_px'] > median_val]
else:
    print("DuckDB not installed")


DuckDB query: 0.09 s
Pandas filter: 0.00 s



## 5. Packbits compression of a mask slice

Demonstrate memory and time savings using `np.packbits` on a boolean mask.


In [12]:

with h5py.File(H5,"r") as h5:
    mask = h5["slices/segmentation_results"][str(CLS_STREAK)][0] > 0

arr = mask.astype(bool)

with timed("np.packbits"):
    packed = np.packbits(arr, axis=None)

with timed("np.unpackbits"):
    unpacked = np.unpackbits(packed, count=arr.size).reshape(arr.shape)

print("Original size:", arr.nbytes, "Packed size:", packed.nbytes)


np.packbits: 0.00 s
np.unpackbits: 0.00 s
Original size: 3392964 Packed size: 424121


## DVC Stage Integration
To automate ETL with DVC:
```bash
dvc stage add -n build_layer_summary \
  -d data/2021-07-13\ TCR\ Phase\ 1\ Build\ 1.hdf5 \
  -o data/precomputed/layer_summary.parquet \
  "python notebooks/benchmark_etl_refined.ipynb"
```