From 9fd6b19282e8a1e04a47dbad962a753c647211fa Mon Sep 17 00:00:00 2001 From: Andy Jost Date: Wed, 3 Dec 2025 13:27:38 -0800 Subject: [PATCH 1/3] Warn when multiprocessing start method is 'fork' CUDA does not support the fork() system call. Forked subprocesses exhibit undefined behavior, including failure to initialize CUDA contexts and devices. Add warning checks in multiprocessing reduction functions for IPC objects (DeviceMemoryResource, IPCAllocationHandle, Event) that warn when the start method is 'fork'. The warning is emitted once per process when IPC objects are serialized. Fixes #1136 --- cuda_core/cuda/core/experimental/_event.pyx | 2 + .../cuda/core/experimental/_memory/_ipc.pyx | 3 + .../core/experimental/_utils/cuda_utils.pyx | 39 +++++ .../tests/test_multiprocessing_warning.py | 158 ++++++++++++++++++ 4 files changed, 202 insertions(+) create mode 100644 cuda_core/tests/test_multiprocessing_warning.py diff --git a/cuda_core/cuda/core/experimental/_event.pyx b/cuda_core/cuda/core/experimental/_event.pyx index 98a45d0043..f580e32425 100644 --- a/cuda_core/cuda/core/experimental/_event.pyx +++ b/cuda_core/cuda/core/experimental/_event.pyx @@ -21,6 +21,7 @@ from typing import TYPE_CHECKING, Optional from cuda.core.experimental._context import Context from cuda.core.experimental._utils.cuda_utils import ( CUDAError, + _check_multiprocessing_start_method, driver, ) if TYPE_CHECKING: @@ -300,6 +301,7 @@ cdef class IPCEventDescriptor: def _reduce_event(event): + _check_multiprocessing_start_method() return event.from_ipc_descriptor, (event.get_ipc_descriptor(),) multiprocessing.reduction.register(Event, _reduce_event) diff --git a/cuda_core/cuda/core/experimental/_memory/_ipc.pyx b/cuda_core/cuda/core/experimental/_memory/_ipc.pyx index 22be23d9ea..efa87b77d9 100644 --- a/cuda_core/cuda/core/experimental/_memory/_ipc.pyx +++ b/cuda_core/cuda/core/experimental/_memory/_ipc.pyx @@ -10,6 +10,7 @@ from cuda.bindings cimport cydriver from cuda.core.experimental._memory._buffer cimport Buffer from cuda.core.experimental._stream cimport default_stream from cuda.core.experimental._utils.cuda_utils cimport HANDLE_RETURN +from cuda.core.experimental._utils.cuda_utils import _check_multiprocessing_start_method import multiprocessing import os @@ -129,6 +130,7 @@ cdef class IPCAllocationHandle: def _reduce_allocation_handle(alloc_handle): + _check_multiprocessing_start_method() df = multiprocessing.reduction.DupFd(alloc_handle.handle) return _reconstruct_allocation_handle, (type(alloc_handle), df, alloc_handle.uuid) @@ -141,6 +143,7 @@ multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handl def _deep_reduce_device_memory_resource(mr): + _check_multiprocessing_start_method() from .._device import Device device = Device(mr.device_id) alloc_handle = mr.get_allocation_handle() diff --git a/cuda_core/cuda/core/experimental/_utils/cuda_utils.pyx b/cuda_core/cuda/core/experimental/_utils/cuda_utils.pyx index d57a777537..2dee276667 100644 --- a/cuda_core/cuda/core/experimental/_utils/cuda_utils.pyx +++ b/cuda_core/cuda/core/experimental/_utils/cuda_utils.pyx @@ -5,6 +5,9 @@ import functools from functools import partial import importlib.metadata +import multiprocessing +import platform +import warnings from collections import namedtuple from collections.abc import Sequence from contextlib import ExitStack @@ -283,3 +286,39 @@ class Transaction: """ # pop_all() empties this stack so no callbacks are triggered on exit. self._stack.pop_all() + + +# Track whether we've already warned about fork method +_fork_warning_emitted = False + + +def _check_multiprocessing_start_method(): + """Check if multiprocessing start method is 'fork' and warn if so.""" + global _fork_warning_emitted + if _fork_warning_emitted: + return + + # Common warning message parts + common_message = ( + "CUDA does not support. Forked subprocesses exhibit undefined behavior, " + "including failure to initialize CUDA contexts and devices. Set the start method " + "to 'spawn' before creating processes that use CUDA. " + "Use: multiprocessing.set_start_method('spawn')" + ) + + try: + start_method = multiprocessing.get_start_method() + if start_method == "fork": + message = f"multiprocessing start method is 'fork', which {common_message}" + warnings.warn(message, UserWarning, stacklevel=3) + _fork_warning_emitted = True + except RuntimeError: + # get_start_method() can raise RuntimeError if start method hasn't been set + # In this case, default is 'fork' on Linux, so we should warn + if platform.system() == "Linux": + message = ( + f"multiprocessing start method is not set and defaults to 'fork' on Linux, " + f"which {common_message}" + ) + warnings.warn(message, UserWarning, stacklevel=3) + _fork_warning_emitted = True diff --git a/cuda_core/tests/test_multiprocessing_warning.py b/cuda_core/tests/test_multiprocessing_warning.py new file mode 100644 index 0000000000..852a92886d --- /dev/null +++ b/cuda_core/tests/test_multiprocessing_warning.py @@ -0,0 +1,158 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Test that warnings are emitted when multiprocessing start method is 'fork' +and IPC objects are serialized. + +These tests use mocking to simulate the 'fork' start method without actually +using fork, avoiding the need for subprocess isolation. +""" + +import warnings +from unittest.mock import patch + +from cuda.core.experimental import DeviceMemoryResource, DeviceMemoryResourceOptions, EventOptions +from cuda.core.experimental._event import _reduce_event +from cuda.core.experimental._memory._ipc import ( + _deep_reduce_device_memory_resource, + _reduce_allocation_handle, +) + + +def test_warn_on_fork_method_device_memory_resource(mempool_device): + """Test that warning is emitted when DeviceMemoryResource is pickled with fork method.""" + device = mempool_device + device.set_current() + options = DeviceMemoryResourceOptions(max_size=2097152, ipc_enabled=True) + mr = DeviceMemoryResource(device, options=options) + + with patch("multiprocessing.get_start_method", return_value="fork"), warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + # Reset the warning flag to allow testing + from cuda.core.experimental._utils import cuda_utils + + cuda_utils._fork_warning_emitted = False + + # Trigger the reduction function directly + _deep_reduce_device_memory_resource(mr) + + # Check that warning was emitted + assert len(w) == 1, f"Expected 1 warning, got {len(w)}: {[str(warning.message) for warning in w]}" + warning = w[0] + assert warning.category is UserWarning + assert "fork" in str(warning.message).lower() + assert "spawn" in str(warning.message).lower() + assert "undefined behavior" in str(warning.message).lower() + + mr.close() + + +def test_warn_on_fork_method_allocation_handle(mempool_device): + """Test that warning is emitted when IPCAllocationHandle is pickled with fork method.""" + device = mempool_device + device.set_current() + options = DeviceMemoryResourceOptions(max_size=2097152, ipc_enabled=True) + mr = DeviceMemoryResource(device, options=options) + alloc_handle = mr.get_allocation_handle() + + with patch("multiprocessing.get_start_method", return_value="fork"), warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + # Reset the warning flag to allow testing + from cuda.core.experimental._utils import cuda_utils + + cuda_utils._fork_warning_emitted = False + + # Trigger the reduction function directly + _reduce_allocation_handle(alloc_handle) + + # Check that warning was emitted + assert len(w) == 1 + warning = w[0] + assert warning.category is UserWarning + assert "fork" in str(warning.message).lower() + + mr.close() + + +def test_warn_on_fork_method_event(mempool_device): + """Test that warning is emitted when Event is pickled with fork method.""" + device = mempool_device + device.set_current() + stream = device.create_stream() + ipc_event_options = EventOptions(ipc_enabled=True) + event = stream.record(options=ipc_event_options) + + with patch("multiprocessing.get_start_method", return_value="fork"), warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + # Reset the warning flag to allow testing + from cuda.core.experimental._utils import cuda_utils + + cuda_utils._fork_warning_emitted = False + + # Trigger the reduction function directly + _reduce_event(event) + + # Check that warning was emitted + assert len(w) == 1 + warning = w[0] + assert warning.category is UserWarning + assert "fork" in str(warning.message).lower() + + event.close() + + +def test_no_warning_with_spawn_method(mempool_device): + """Test that no warning is emitted when start method is 'spawn'.""" + device = mempool_device + device.set_current() + options = DeviceMemoryResourceOptions(max_size=2097152, ipc_enabled=True) + mr = DeviceMemoryResource(device, options=options) + + with patch("multiprocessing.get_start_method", return_value="spawn"), warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + # Reset the warning flag to allow testing + from cuda.core.experimental._utils import cuda_utils + + cuda_utils._fork_warning_emitted = False + + # Trigger the reduction function directly + _deep_reduce_device_memory_resource(mr) + + # Check that no fork-related warning was emitted + fork_warnings = [warning for warning in w if "fork" in str(warning.message).lower()] + assert len(fork_warnings) == 0, f"Unexpected warning: {fork_warnings[0].message if fork_warnings else None}" + + mr.close() + + +def test_warning_emitted_only_once(mempool_device): + """Test that warning is only emitted once even when multiple objects are pickled.""" + device = mempool_device + device.set_current() + options = DeviceMemoryResourceOptions(max_size=2097152, ipc_enabled=True) + mr1 = DeviceMemoryResource(device, options=options) + mr2 = DeviceMemoryResource(device, options=options) + + with patch("multiprocessing.get_start_method", return_value="fork"), warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + # Reset the warning flag to allow testing + from cuda.core.experimental._utils import cuda_utils + + cuda_utils._fork_warning_emitted = False + + # Trigger reduction multiple times + _deep_reduce_device_memory_resource(mr1) + _deep_reduce_device_memory_resource(mr2) + + # Check that warning was emitted only once + fork_warnings = [warning for warning in w if "fork" in str(warning.message).lower()] + assert len(fork_warnings) == 1, f"Expected 1 warning, got {len(fork_warnings)}" + + mr1.close() + mr2.close() From 476459feae57f7e257011cbe531f2a7a16f83bea Mon Sep 17 00:00:00 2001 From: Andy Jost Date: Wed, 3 Dec 2025 17:16:35 -0800 Subject: [PATCH 2/3] Skip multiprocessing warning tests on Windows Change mempool_device to ipc_device fixture for tests that require IPC-enabled memory resources. The ipc_device fixture properly skips on Windows where IPC is not supported. --- cuda_core/tests/test_multiprocessing_warning.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cuda_core/tests/test_multiprocessing_warning.py b/cuda_core/tests/test_multiprocessing_warning.py index 852a92886d..6eeb14ba20 100644 --- a/cuda_core/tests/test_multiprocessing_warning.py +++ b/cuda_core/tests/test_multiprocessing_warning.py @@ -20,9 +20,9 @@ ) -def test_warn_on_fork_method_device_memory_resource(mempool_device): +def test_warn_on_fork_method_device_memory_resource(ipc_device): """Test that warning is emitted when DeviceMemoryResource is pickled with fork method.""" - device = mempool_device + device = ipc_device device.set_current() options = DeviceMemoryResourceOptions(max_size=2097152, ipc_enabled=True) mr = DeviceMemoryResource(device, options=options) @@ -49,9 +49,9 @@ def test_warn_on_fork_method_device_memory_resource(mempool_device): mr.close() -def test_warn_on_fork_method_allocation_handle(mempool_device): +def test_warn_on_fork_method_allocation_handle(ipc_device): """Test that warning is emitted when IPCAllocationHandle is pickled with fork method.""" - device = mempool_device + device = ipc_device device.set_current() options = DeviceMemoryResourceOptions(max_size=2097152, ipc_enabled=True) mr = DeviceMemoryResource(device, options=options) @@ -105,9 +105,9 @@ def test_warn_on_fork_method_event(mempool_device): event.close() -def test_no_warning_with_spawn_method(mempool_device): +def test_no_warning_with_spawn_method(ipc_device): """Test that no warning is emitted when start method is 'spawn'.""" - device = mempool_device + device = ipc_device device.set_current() options = DeviceMemoryResourceOptions(max_size=2097152, ipc_enabled=True) mr = DeviceMemoryResource(device, options=options) @@ -130,9 +130,9 @@ def test_no_warning_with_spawn_method(mempool_device): mr.close() -def test_warning_emitted_only_once(mempool_device): +def test_warning_emitted_only_once(ipc_device): """Test that warning is only emitted once even when multiple objects are pickled.""" - device = mempool_device + device = ipc_device device.set_current() options = DeviceMemoryResourceOptions(max_size=2097152, ipc_enabled=True) mr1 = DeviceMemoryResource(device, options=options) From fbdd56df7063655b6f06d4dbf1f0ada9f232c5af Mon Sep 17 00:00:00 2001 From: Andy Jost Date: Thu, 4 Dec 2025 06:42:49 -0800 Subject: [PATCH 3/3] Add reset_fork_warning function and rename check_multiprocessing_start_method - Add reset_fork_warning() function for testing purposes - Rename _check_multiprocessing_start_method to check_multiprocessing_start_method (remove leading underscore) - Update all tests to use reset_fork_warning() instead of directly accessing internal flag - Fix trailing whitespace --- cuda_core/cuda/core/experimental/_event.pyx | 4 ++-- .../cuda/core/experimental/_memory/_ipc.pyx | 6 +++--- .../core/experimental/_utils/cuda_utils.pyx | 21 +++++++++++++------ .../tests/test_multiprocessing_warning.py | 21 ++++++------------- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/cuda_core/cuda/core/experimental/_event.pyx b/cuda_core/cuda/core/experimental/_event.pyx index f580e32425..149c92b8e1 100644 --- a/cuda_core/cuda/core/experimental/_event.pyx +++ b/cuda_core/cuda/core/experimental/_event.pyx @@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Optional from cuda.core.experimental._context import Context from cuda.core.experimental._utils.cuda_utils import ( CUDAError, - _check_multiprocessing_start_method, + check_multiprocessing_start_method, driver, ) if TYPE_CHECKING: @@ -301,7 +301,7 @@ cdef class IPCEventDescriptor: def _reduce_event(event): - _check_multiprocessing_start_method() + check_multiprocessing_start_method() return event.from_ipc_descriptor, (event.get_ipc_descriptor(),) multiprocessing.reduction.register(Event, _reduce_event) diff --git a/cuda_core/cuda/core/experimental/_memory/_ipc.pyx b/cuda_core/cuda/core/experimental/_memory/_ipc.pyx index efa87b77d9..c9931855cf 100644 --- a/cuda_core/cuda/core/experimental/_memory/_ipc.pyx +++ b/cuda_core/cuda/core/experimental/_memory/_ipc.pyx @@ -10,7 +10,7 @@ from cuda.bindings cimport cydriver from cuda.core.experimental._memory._buffer cimport Buffer from cuda.core.experimental._stream cimport default_stream from cuda.core.experimental._utils.cuda_utils cimport HANDLE_RETURN -from cuda.core.experimental._utils.cuda_utils import _check_multiprocessing_start_method +from cuda.core.experimental._utils.cuda_utils import check_multiprocessing_start_method import multiprocessing import os @@ -130,7 +130,7 @@ cdef class IPCAllocationHandle: def _reduce_allocation_handle(alloc_handle): - _check_multiprocessing_start_method() + check_multiprocessing_start_method() df = multiprocessing.reduction.DupFd(alloc_handle.handle) return _reconstruct_allocation_handle, (type(alloc_handle), df, alloc_handle.uuid) @@ -143,7 +143,7 @@ multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handl def _deep_reduce_device_memory_resource(mr): - _check_multiprocessing_start_method() + check_multiprocessing_start_method() from .._device import Device device = Device(mr.device_id) alloc_handle = mr.get_allocation_handle() diff --git a/cuda_core/cuda/core/experimental/_utils/cuda_utils.pyx b/cuda_core/cuda/core/experimental/_utils/cuda_utils.pyx index 2dee276667..4489871747 100644 --- a/cuda_core/cuda/core/experimental/_utils/cuda_utils.pyx +++ b/cuda_core/cuda/core/experimental/_utils/cuda_utils.pyx @@ -289,14 +289,25 @@ class Transaction: # Track whether we've already warned about fork method -_fork_warning_emitted = False +_fork_warning_checked = False -def _check_multiprocessing_start_method(): +def reset_fork_warning(): + """Reset the fork warning check flag for testing purposes. + + This function is intended for use in tests to allow multiple test runs + to check the warning behavior. + """ + global _fork_warning_checked + _fork_warning_checked = False + + +def check_multiprocessing_start_method(): """Check if multiprocessing start method is 'fork' and warn if so.""" - global _fork_warning_emitted - if _fork_warning_emitted: + global _fork_warning_checked + if _fork_warning_checked: return + _fork_warning_checked = True # Common warning message parts common_message = ( @@ -311,7 +322,6 @@ def _check_multiprocessing_start_method(): if start_method == "fork": message = f"multiprocessing start method is 'fork', which {common_message}" warnings.warn(message, UserWarning, stacklevel=3) - _fork_warning_emitted = True except RuntimeError: # get_start_method() can raise RuntimeError if start method hasn't been set # In this case, default is 'fork' on Linux, so we should warn @@ -321,4 +331,3 @@ def _check_multiprocessing_start_method(): f"which {common_message}" ) warnings.warn(message, UserWarning, stacklevel=3) - _fork_warning_emitted = True diff --git a/cuda_core/tests/test_multiprocessing_warning.py b/cuda_core/tests/test_multiprocessing_warning.py index 6eeb14ba20..945ea83964 100644 --- a/cuda_core/tests/test_multiprocessing_warning.py +++ b/cuda_core/tests/test_multiprocessing_warning.py @@ -18,6 +18,7 @@ _deep_reduce_device_memory_resource, _reduce_allocation_handle, ) +from cuda.core.experimental._utils.cuda_utils import reset_fork_warning def test_warn_on_fork_method_device_memory_resource(ipc_device): @@ -31,9 +32,7 @@ def test_warn_on_fork_method_device_memory_resource(ipc_device): warnings.simplefilter("always") # Reset the warning flag to allow testing - from cuda.core.experimental._utils import cuda_utils - - cuda_utils._fork_warning_emitted = False + reset_fork_warning() # Trigger the reduction function directly _deep_reduce_device_memory_resource(mr) @@ -61,9 +60,7 @@ def test_warn_on_fork_method_allocation_handle(ipc_device): warnings.simplefilter("always") # Reset the warning flag to allow testing - from cuda.core.experimental._utils import cuda_utils - - cuda_utils._fork_warning_emitted = False + reset_fork_warning() # Trigger the reduction function directly _reduce_allocation_handle(alloc_handle) @@ -89,9 +86,7 @@ def test_warn_on_fork_method_event(mempool_device): warnings.simplefilter("always") # Reset the warning flag to allow testing - from cuda.core.experimental._utils import cuda_utils - - cuda_utils._fork_warning_emitted = False + reset_fork_warning() # Trigger the reduction function directly _reduce_event(event) @@ -116,9 +111,7 @@ def test_no_warning_with_spawn_method(ipc_device): warnings.simplefilter("always") # Reset the warning flag to allow testing - from cuda.core.experimental._utils import cuda_utils - - cuda_utils._fork_warning_emitted = False + reset_fork_warning() # Trigger the reduction function directly _deep_reduce_device_memory_resource(mr) @@ -142,9 +135,7 @@ def test_warning_emitted_only_once(ipc_device): warnings.simplefilter("always") # Reset the warning flag to allow testing - from cuda.core.experimental._utils import cuda_utils - - cuda_utils._fork_warning_emitted = False + reset_fork_warning() # Trigger reduction multiple times _deep_reduce_device_memory_resource(mr1)