# Parallel Processing with ProcessPoolExecutor

This notebook demonstrates how to use `concurrent.futures.ProcessPoolExecutor` (stdlib) to
compute Minkowski functionals on many meshes in parallel using pykarambola.

In [None]:
import time
import os
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path

import numpy as np
import pykarambola

## Helper functions

Functions passed to `ProcessPoolExecutor` must be **top-level** (module-scope) functions,
because Python pickles them to send across processes. Lambdas and closures won't work.

In [None]:
def arrays_from_triangulation(tri):
    """Extract vertex/face numpy arrays from a Triangulation object."""
    nv, nt = tri.n_vertices(), tri.n_triangles()
    verts = np.array([tri.get_pos_of_vertex(i) for i in range(nv)], dtype=np.float64)
    faces = np.array(
        [[tri.ith_vertex_of_triangle(j, i) for i in range(3)] for j in range(nt)],
        dtype=np.int64,
    )
    return verts, faces


def compute_from_arrays(args):
    """Compute Minkowski functionals from a (verts, faces) tuple.

    This is a top-level function so it can be pickled by multiprocessing.
    """
    verts, faces = args
    return pykarambola.minkowski_functionals(verts, faces)


def compute_from_arrays_with_options(args):
    """Compute specific functionals with options.

    args: (verts, faces, compute, center)
    """
    verts, faces, compute, center = args
    return pykarambola.minkowski_functionals(verts, faces, compute=compute, center=center)

## 1. Generate synthetic meshes

We create a batch of randomly-sized boxes to simulate a realistic workload.

In [None]:
def make_box(a, b, c):
    """Return (verts, faces) for an axis-aligned box of size a x b x c centred at origin."""
    ha, hb, hc = a / 2, b / 2, c / 2
    verts = np.array([
        [-ha, -hb, -hc], [ ha, -hb, -hc], [ ha,  hb, -hc], [-ha,  hb, -hc],
        [-ha, -hb,  hc], [ ha, -hb,  hc], [ ha,  hb,  hc], [-ha,  hb,  hc],
    ], dtype=np.float64)
    faces = np.array([
        [0, 3, 2], [0, 2, 1],
        [4, 5, 6], [4, 6, 7],
        [0, 1, 5], [0, 5, 4],
        [2, 3, 7], [2, 7, 6],
        [0, 4, 7], [0, 7, 3],
        [1, 2, 6], [1, 6, 5],
    ], dtype=np.int64)
    return verts, faces


rng = np.random.default_rng(42)
N_MESHES = 200
meshes = [make_box(*rng.uniform(1, 10, size=3)) for _ in range(N_MESHES)]

print(f"Generated {N_MESHES} random boxes")

## 2. Sequential baseline

In [None]:
t0 = time.perf_counter()
results_seq = [compute_from_arrays(m) for m in meshes]
dt_seq = time.perf_counter() - t0

print(f"Sequential: {dt_seq:.3f}s for {N_MESHES} meshes ({dt_seq/N_MESHES*1000:.1f} ms/mesh)")

## 3. Parallel with ProcessPoolExecutor

By default, `ProcessPoolExecutor()` uses all available CPU cores.

In [None]:
n_cores = os.cpu_count()
print(f"Available CPU cores: {n_cores}")

t0 = time.perf_counter()
with ProcessPoolExecutor() as pool:
    results_par = list(pool.map(compute_from_arrays, meshes))
dt_par = time.perf_counter() - t0

print(f"Parallel:   {dt_par:.3f}s for {N_MESHES} meshes ({dt_par/N_MESHES*1000:.1f} ms/mesh)")
print(f"Speedup:    {dt_seq/dt_par:.2f}x")

## 4. Verify correctness

Results should be identical regardless of sequential vs parallel execution.

In [None]:
for i, (r_seq, r_par) in enumerate(zip(results_seq, results_par)):
    for key in r_seq:
        assert np.allclose(r_seq[key], r_par[key]), f"Mismatch at mesh {i}, key {key}"

print("All results match between sequential and parallel execution.")

## 5. Controlling the number of workers

You can limit worker count to leave cores free for other tasks.

In [None]:
for n_workers in [1, 2, 4, os.cpu_count()]:
    t0 = time.perf_counter()
    with ProcessPoolExecutor(max_workers=n_workers) as pool:
        results = list(pool.map(compute_from_arrays, meshes))
    dt = time.perf_counter() - t0
    print(f"  {n_workers} workers: {dt:.3f}s  (speedup {dt_seq/dt:.2f}x)")

## 6. Passing options (compute subset, center)

To pass extra arguments, pack them into tuples and use a wrapper function.

In [None]:
# Only compute volume and surface area, with centroid centering
tasks = [(v, f, ['w000', 'w100'], 'centroid') for v, f in meshes]

with ProcessPoolExecutor() as pool:
    results_subset = list(pool.map(compute_from_arrays_with_options, tasks))

print(f"First 5 volumes:  {[r['w000'] for r in results_subset[:5]]}")
print(f"First 5 areas:    {[round(r['w100'], 2) for r in results_subset[:5]]}")
print(f"Keys per result:  {sorted(results_subset[0].keys())}")

## 7. Processing mesh files in parallel

In [None]:
def compute_from_poly_file(filepath):
    """Load a .poly file and compute Minkowski functionals."""
    tri = pykarambola.parse_poly_file(str(filepath))
    nv, nt = tri.n_vertices(), tri.n_triangles()
    verts = np.array([tri.get_pos_of_vertex(i) for i in range(nv)], dtype=np.float64)
    faces = np.array(
        [[tri.ith_vertex_of_triangle(j, i) for i in range(3)] for j in range(nt)],
        dtype=np.int64,
    )
    return {
        'file': filepath.name,
        'n_verts': nv,
        'n_faces': nt,
        'functionals': pykarambola.minkowski_functionals(verts, faces),
    }


poly_files = sorted(Path('../test_suite/inputs').glob('*.poly'))
print(f"Found {len(poly_files)} .poly files")

with ProcessPoolExecutor() as pool:
    file_results = list(pool.map(compute_from_poly_file, poly_files))

for r in file_results:
    vol = r['functionals'].get('w000', float('nan'))
    print(f"  {r['file']:<50s}  V={r['n_verts']:>4d}  F={r['n_faces']:>4d}  vol={vol:.4f}")

## 8. Writing results to CSV

All results are collected in the main process before writing, so a plain pandas `DataFrame` is safe â€” no locking needed.

In [None]:
import pandas as pd

rows = [
    {"file": r["file"], "n_verts": r["n_verts"], "n_faces": r["n_faces"], **r["functionals"]}
    for r in file_results
]
df = pd.DataFrame(rows)
df.to_csv("results.csv", index=False)
print(df.to_string())

## 9. Processing label images in parallel

In [None]:
def compute_from_label_image(args):
    """Compute Minkowski functionals from a 3D label image."""
    label_image, spacing = args
    return pykarambola.minkowski_functionals_from_label_image(
        label_image, spacing=spacing, center='centroid',
    )


# Generate synthetic label images: spheres of varying radii
shape = (64, 64, 64)
z, y, x = np.mgrid[:shape[0], :shape[1], :shape[2]]
center = np.array(shape) / 2

images = []
radii = [10, 15, 20, 25]
for r in radii:
    dist = np.sqrt((x - center[2])**2 + (y - center[1])**2 + (z - center[0])**2)
    img = np.zeros(shape, dtype=int)
    img[dist <= r] = 1
    images.append((img, (1.0, 1.0, 1.0)))

print(f"Processing {len(images)} label images...")

with ProcessPoolExecutor() as pool:
    img_results = list(pool.map(compute_from_label_image, images))

for r, radius in zip(img_results, radii):
    vol = r[1]['w000']
    expected = 4/3 * np.pi * radius**3
    print(f"  radius={radius:>2d}  vol={vol:>10.1f}  expected={expected:>10.1f}  err={abs(vol-expected)/expected:.2%}")

## Tips

- **Process, not thread**: `ProcessPoolExecutor` spawns separate processes, bypassing the GIL.
  `ThreadPoolExecutor` would give no speedup for CPU-bound work like this.
- **Picklability**: All functions passed to the pool must be top-level (not lambdas or closures).
  Inputs/outputs must be picklable (numpy arrays, dicts, floats are all fine).
- **Chunk size**: For many small tasks, pass `chunksize=` to `pool.map()` to reduce IPC overhead.
- **Memory**: Each worker gets a copy of the data. For very large meshes, monitor memory usage.
- **Error handling**: Exceptions in workers are re-raised in the main process when you iterate results.