# Expected failure (wrapper-local defs): asyncio + ProcessPoolExecutor (fork) with notebook-defined worker


In [None]:
# pytest-nb-as-test: must-raise-exception=True
import asyncio
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor

expected_error_seen = False


def cpu_fn(x):
    return x + 1


async def main() -> None:
    if "fork" not in mp.get_all_start_methods():
        raise RuntimeError("fork start method not available")
    ctx = mp.get_context("fork")
    loop = asyncio.get_running_loop()
    xs = [1, 2, 3]
    with ProcessPoolExecutor(max_workers=2, mp_context=ctx) as ex:
        tasks = [loop.run_in_executor(ex, cpu_fn, x) for x in xs]
        ys = await asyncio.gather(*tasks)
    assert ys == [2, 3, 4]


try:
    await main()
    raise RuntimeError("unexpected success")
except Exception as e:
    msg = repr(e) + "\n" + str(e)
    expected_error_seen = any(
        s in msg
        for s in ["Can't pickle", "<locals>", "fork start method not available"]
    )
    raise

In [None]:
assert expected_error_seen is True