
# Multiprocessing with Shared Memory

This notebook shows how to avoid copying a **large dataset** to every worker process by
using `multiprocessing.shared_memory`.

We load data **once**, let each process **attach**
to the shared buffer by **name**, compute on disjoint slices, and aggregate results.

##### Plan
1. Create big array and place it into shared memory
1. Build coarse **chunks**
1. Dispatch with `ProcessPoolExecutor`, collect partial results, verify, clean up

##### Single thread in every process

First, we set single-threaded environment vars (to avoid nested BLAS threads).
Numpy may run C code, that allows multiple threads to run in parallel.
Running many processes in parallel and each process (that may run numpy) running
many threads in parallel, will lead to oversubscription and saturation of the 
available computational resources.


In [1]:
import os
import time
import sys
import importlib
import multiprocessing as mp

from multiprocessing import shared_memory
from concurrent.futures import ProcessPoolExecutor, as_completed
from mp_tasks import chunk_sum

# (Optional) Reduce nested threading in BLAS/OpenMP libraries before importing NumPy
os.environ.setdefault("OMP_NUM_THREADS", "1")
os.environ.setdefault("MKL_NUM_THREADS", "1")
os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
os.environ.setdefault("NUMEXPR_NUM_THREADS", "1")

import numpy as np


## Step 1 — Create a big array and place it in shared memory

We create a large NumPy array.

We allocate a shared-memory buffer and copy the data **once**.

We retain the **name**, **shape**, and **dtype** so workers can reattach.


In [2]:
# Simulate a heavy dataset (adjust size as needed)
rng = np.random.default_rng(0)
A = rng.standard_normal(20_000_000, dtype=np.float32)  # ~76 MB

# Allocate shared memory once and copy data into it
shm = shared_memory.SharedMemory(create=True, size=A.nbytes)
shm_data = np.ndarray(A.shape, dtype=A.dtype, buffer=shm.buf)
shm_data[:] = A  # one-time copy

print("✅ Shared memory created")
print(f"  name   : {shm.name}")
print(f"  shape  : {shm_data.shape}")
print(f"  dtype  : {str(shm_data.dtype)}")
print(f"  bytes  : {A.nbytes:,}")


✅ Shared memory created
  name   : psm_cdeed9db
  shape  : (20000000,)
  dtype  : float32
  bytes  : 80,000,000



## Step 2 — Build coarse-grained chunks

We split the array into a modest number of contiguous slices (≈ 1–2 per worker).
This keeps each task **coarse**, avoiding excessive pickling/IPC overhead.


In [3]:
n_workers = os.cpu_count() or 4
n_chunks = max(n_workers * 2, 1)  # aim for 1–2 chunks per worker

n_items = shm_data.size
edges = np.linspace(0, n_items, n_chunks + 1, dtype=int)

tasks = [
    (shm.name, shm_data.shape, str(shm_data.dtype), int(edges[i]), int(edges[i+1]))
    for i in range(n_chunks)
]

print(f"✅ Built {len(tasks)} tasks")
for task in tasks:  
    print("task:", task)


✅ Built 24 tasks
task: ('psm_cdeed9db', (20000000,), 'float32', 0, 833333)
task: ('psm_cdeed9db', (20000000,), 'float32', 833333, 1666666)
task: ('psm_cdeed9db', (20000000,), 'float32', 1666666, 2500000)
task: ('psm_cdeed9db', (20000000,), 'float32', 2500000, 3333333)
task: ('psm_cdeed9db', (20000000,), 'float32', 3333333, 4166666)
task: ('psm_cdeed9db', (20000000,), 'float32', 4166666, 5000000)
task: ('psm_cdeed9db', (20000000,), 'float32', 5000000, 5833333)
task: ('psm_cdeed9db', (20000000,), 'float32', 5833333, 6666666)
task: ('psm_cdeed9db', (20000000,), 'float32', 6666666, 7500000)
task: ('psm_cdeed9db', (20000000,), 'float32', 7500000, 8333333)
task: ('psm_cdeed9db', (20000000,), 'float32', 8333333, 9166666)
task: ('psm_cdeed9db', (20000000,), 'float32', 9166666, 10000000)
task: ('psm_cdeed9db', (20000000,), 'float32', 10000000, 10833333)
task: ('psm_cdeed9db', (20000000,), 'float32', 10833333, 11666666)
task: ('psm_cdeed9db', (20000000,), 'float32', 11666666, 12500000)
task: ('p

## Step 3 — Dispatch with `ProcessPoolExecutor`, collect results, verify, clean up

We try to use a **`fork`** context on Unix (for smoother demos in notebooks).

If not available (e.g., Windows), we fall back to the default **spawn** context. 

We submit tasks, aggregate partial sums, and verify theresult against a single-process NumPy sum. 

Finally, we **close** and **unlink** the shared memory segment.

In [4]:
t0_submit = time.perf_counter()
total = 0.0

with ProcessPoolExecutor(max_workers=n_workers) as ex:
    
    futures = [ex.submit(chunk_sum, *t) for t in tasks]
    
    for k, f in enumerate(as_completed(futures), 1):
        total += f.result()  # re-raises worker exceptions here
        print(f"[progress] {k}/{len(futures)} chunks reduced")

dt_submit = time.perf_counter() - t0_submit

# Verify vs single-process sum
t0_check = time.perf_counter()
check = float(np.sum(shm_data))
dt_check = time.perf_counter() - t0_check

print("\nParallel sum :", total)
print("Single-proc  :", check)
print("Abs error    :", abs(total - check))
print(f"Time check   : {dt_check:.2f}s using 1 worker")
print(f"Time submit : {dt_submit:.2f}s using {n_workers} workers")

# Cleanup: close and unlink shared memory
try:
    shm.close()
    shm.unlink()
    print("✅ Shared memory cleaned up (close + unlink)")
except Exception as e:
    print("Cleanup note:", e)


[progress] 1/24 chunks reduced
[progress] 2/24 chunks reduced
[progress] 3/24 chunks reduced
[progress] 4/24 chunks reduced
[progress] 5/24 chunks reduced
[progress] 6/24 chunks reduced
[progress] 7/24 chunks reduced
[progress] 8/24 chunks reduced
[progress] 9/24 chunks reduced
[progress] 10/24 chunks reduced
[progress] 11/24 chunks reduced
[progress] 12/24 chunks reduced
[progress] 13/24 chunks reduced
[progress] 14/24 chunks reduced
[progress] 15/24 chunks reduced
[progress] 16/24 chunks reduced
[progress] 17/24 chunks reduced
[progress] 18/24 chunks reduced
[progress] 19/24 chunks reduced
[progress] 20/24 chunks reduced
[progress] 21/24 chunks reduced
[progress] 22/24 chunks reduced
[progress] 23/24 chunks reduced
[progress] 24/24 chunks reduced

Parallel sum : -1431.3464050292969
Single-proc  : -1431.345703125
Abs error    : 0.000701904296875
Time check   : 0.02s using 1 worker
Time submit : 0.58s using 12 workers
✅ Shared memory cleaned up (close + unlink)


# Why `np.sum` (1 worker) beats multiprocessing here

**What you measured**
- **Single-proc `np.sum` (0.01s):** one tight C loop over contiguous memory → saturates
  memory bandwidth, minimal Python overhead.
- **Multiprocessing submit (0.10s):** Python orchestrates many processes
  + sends tasks,
  + attaches to shared memory,
  + runs many small reductions,
  + collects futures → added overhead
  + all processes compete for the same memory bandwidth.

**Key reasons**
- **Memory-bandwidth bound:** Summing a 76 MiB float32 array is limited by RAM speed, not CPU.
  More processes can’t make memory faster; they only add contention.
- **Overheads dominate:** Pool startup, pickling small task tuples, `as_completed()` handling,
  multiple `np.sum` calls, shared-memory attach/teardown.
- **Chunking penalty:** One big C loop is faster than many small ones.

**Takeaways**
- For pure NumPy reductions/transforms on contiguous arrays → **stay single-process** (or
  use a small number of native threads in BLAS/OpenMP if beneficial).
- Use **multiprocessing** when the per-element work is **non-vectorizable Python** or heavy
  custom code that can’t be expressed as a single NumPy call.


# Further reading

### What is “contention”?

**Definition (in this context):** Multiple workers compete for the **same limited
resource** (memory bandwidth, last-level cache, memory controller queues). 

The total
“pipe” isn’t wider, so adding workers doesn’t increase throughput.


**What happens when many processes sum the same big array:**
- All processes stream from **the same DRAM channels** → the memory subsystem saturates.
- **LLC/cache thrash** increases: more evictions, more misses, higher latency.
- The OS spends time on **scheduling/context switches** with no net gain.

**Symptoms you’ll see:**
- More workers ⇒ **no speedup** or even **slower**.
- Per-core CPU looks busy, but **wall-time doesn’t drop**.
- Perf counters (if you check) show **high memory bandwidth** and **LLC misses**.

**Mitigations:**
- Prefer **one tight C/NumPy pass** (or a small, tuned thread count) for bandwidth-bound
  reductions.
- Use **fewer, larger chunks**; avoid many small passes over the same data.
- If you must parallelize: split data so workers hit **disjoint memory** (advanced: NUMA
  pinning), and keep **threads × processes ≤ cores**.


# LLC / Cache Thrash (what & why)

**LLC (Last-Level Cache)**: the shared L3 cache for all cores on a socket.  
**Cache thrash**: multiple workers evict each other’s cache lines so useful data
won’t stay cached → more **LLC misses** → more **DRAM traffic** → slower runs.

**Why it happens here**
- Many processes stream over large arrays (working set → cache size).  
- Shared LLC gets constantly overwritten by neighbors’ streams.  
- Net: memory bandwidth saturates; extra workers add contention, not speed.

**Symptoms**
- High per-core “CPU usage” but little/no wall-time improvement.  
- Perf counters: high LLC miss rate, near-peak memory bandwidth.

**Mitigations**
- Use **fewer workers** (or one tight NumPy/C loop).  
- **Block/tile** data so chunks fit better in cache.  
- Prefer **one process + small OpenMP thread count (e.g., 2–4)** over many processes.  
- On Linux/HPC: **pin** threads/processes and use **NUMA-aware** partitioning
  (`OMP_PROC_BIND=close`, `OMP_PLACES=cores`).
