Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3fd1184
Switch Lock from threading to multiprocessing
jsiirola Apr 24, 2026
0264d1c
Do not raise threading Lock errors when Python is shutting down
jsiirola Apr 24, 2026
c98bc8b
Split the deadlock timeout parameter
jsiirola Apr 24, 2026
b72d5e7
Minor code cleanup
jsiirola Apr 24, 2026
bb31b5f
Merge branch 'main' into capture-output-deadlock
jsiirola Apr 27, 2026
9352266
Avoid Lock deadlock in @timeout() decorator
jsiirola Apr 27, 2026
09b6e52
multiprocessing is now an automatic import
jsiirola Apr 27, 2026
96e3aff
NFC: typo
jsiirola Apr 27, 2026
4042736
Debugging
jsiirola Apr 27, 2026
e1c7edc
Update parser for new output
jsiirola Apr 27, 2026
c0081ef
Fix logic error
jsiirola Apr 27, 2026
155c1f8
Attempt to make multiprocessing import more deterministic
jsiirola Apr 28, 2026
30994a3
Attempt to make multiprocessing import more deterministic (try 2)
jsiirola Apr 28, 2026
5dc09be
Update importtime tester to handle multiprocessing, improve output
jsiirola Apr 28, 2026
01be5ce
NFC: apply black
jsiirola Apr 28, 2026
00164ee
Debugging
jsiirola Apr 28, 2026
ce60ebc
Remove debugging; increase delay for GHA
jsiirola Apr 28, 2026
4c5d032
Clean up multiprocessing imports
jsiirola Apr 28, 2026
63635b1
Remove required import of multiprocessing
jsiirola May 18, 2026
1ea96e8
Merge branch 'main' into capture-output-deadlock
jsiirola May 18, 2026
61ffa30
Use a threading.Lock until multiprocessing is imported
jsiirola May 18, 2026
5e7ad72
NFC: fix typo
jsiirola May 18, 2026
8afd437
Update references to capture_output_lock
jsiirola May 19, 2026
90094a3
Merge branch 'main' into capture-output-deadlock
jsiirola May 19, 2026
f31df34
Add debugging to environ summary
jsiirola May 19, 2026
b057923
Disable coverage when testing importtime
jsiirola May 19, 2026
ba4d83b
Remove unused dependency
jsiirola May 19, 2026
63974f8
Importtime tests: prevent import of site pth files
jsiirola May 19, 2026
6089c53
Convert commented test to 'documentation'
jsiirola May 19, 2026
4c64b30
Preserve PYTHONPATH when running importtime test
jsiirola May 20, 2026
b8a1608
Add global flag to (partially) disable capture_output
jsiirola May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions pyomo/common/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import importlib.util
import logging
import sys
import threading
import warnings

from collections.abc import Mapping
Expand Down Expand Up @@ -945,6 +946,11 @@ def __exit__(self, exc_type, exc_value, traceback):
# Common optional dependencies used throughout Pyomo
#

#: lock for deconflicting access to capturing the process file
#: descriptors. This starts as a threading.Lock, unless the environment
#: imports multiprocessing, in which case, it is upgraded to a
#: multiprocessing lock.
capture_output_lock = threading.Lock()
yaml_load_args = {}


Expand All @@ -961,6 +967,18 @@ def _finalize_ctypes(module, available):
import ctypes.util


def _finalize_multiprocessing(module, available):
# Note: multiprocessing is very slow to import, but we need to make
# sure that the capture_output_lock Lock is created *before* the
# user spawns any subprocesses. tee.capture_output will look here
# for the lock, which will start out as a "dummy" lock, and then
# will be updated to a multiprocessing.Lock when the first module
# triggers the multiprocessing import.

global capture_output_lock
capture_output_lock = module.Lock()


def _finalize_scipy(module, available):
if available:
# Import key subpackages that we will want to assume are present
Expand Down Expand Up @@ -1081,12 +1099,25 @@ def _pyutilib_importer():


with declare_modules_as_importable(globals()):
# Standard libraries that we will unconditionally import. We are
# importing it here so that import timing is better reported from
# pyomo.environ.tests.test_environ (hence the imports are not
# necessarily alphebetical)
#
# Pickle is used by Pyomo and by multiprocessing
try:
import cPickle as pickle
except ImportError:
import pickle

# Standard libraries that are slower to import and not strictly required
# on all platforms / situations.
ctypes, _ = attempt_import(
'ctypes', deferred_submodules=['util'], callback=_finalize_ctypes
)
multiprocessing, _ = attempt_import('multiprocessing')
multiprocessing, _ = attempt_import(
'multiprocessing', callback=_finalize_multiprocessing
)
random, _ = attempt_import('random')

# Necessary for minimum version checking for other optional dependencies
Expand Down Expand Up @@ -1127,8 +1158,3 @@ def _pyutilib_importer():
deferred_submodules=['pyplot', 'pylab', 'backends'],
catch_exceptions=(ImportError, RuntimeError),
)

try:
import cPickle as pickle
except ImportError:
import pickle
27 changes: 27 additions & 0 deletions pyomo/common/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,30 @@ class SolverAPIVersion(NamedIntEnum):

minimize = ObjectiveSense.minimize
maximize = ObjectiveSense.maximize


class CaptureOutputMode(IntEnum):
"""Enum to override the default behavior of the :class:`capture_output`
context manager.

This enum provides options for overriding the behavior of the
:class:`capture_output` context manager through the
:attr:`~pyomo.common.tee.OVERRIDE_CAPTURE_OUTPUT` flag.

"""

#: Setting this mode will cause :class:`capture_output` to be a noop
DISABLE = 0
#: :class:`capture_output` will capture the standard ``sys.stdout`` /
#: ``sys.stderr`` streams
ENABLE_STREAM_CAPTURE = 1
#: :class:`capture_output` will capture the file descriptors that
#: underlie the standard ``sys.stdout`` / ``sys.stderr`` streams
ENABLE_FD_CAPTURE = 2
#: This is :class:`capture_output`'s normal operation (all capturing is enabled)
NORMAL = 3
#: :class:`capture_output` will capture the standard ``sys.stdout`` /
#: ``sys.stderr`` streams, but will not attempt to capture their raw
#: file descriptors (regardless of the value of `capture_fd`) [alias
#: of ENABLE_STREAM_CAPTURE]
DISABLE_FD_CAPTURE = 1
2 changes: 1 addition & 1 deletion pyomo/common/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import os

from .dependencies import ctypes, multiprocessing
from pyomo.common.dependencies import ctypes, multiprocessing


def _as_bytes(val):
Expand Down
38 changes: 24 additions & 14 deletions pyomo/common/tee.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import threading
import time

import pyomo.common.dependencies as dependencies
from pyomo.common.enums import CaptureOutputMode
from pyomo.common.errors import DeveloperError
from pyomo.common.log import LoggingIntercept, LogStream
from pyomo.common.shutdown import python_is_shutting_down

_poll_interval = 0.0001
_poll_rampup_limit = 0.099
Expand All @@ -35,6 +38,7 @@
# ~(13.1 * #threads) seconds
_poll_timeout = 1 # 14 rounds: 0.0001 * 2**14 == 1.6384
_poll_timeout_deadlock = 100 # seconds
_threading_deadlock = 200 # seconds; should be longer than _poll_timeout_deadlock
_pipe_buffersize = 1 << 16 # 65536
_noop = lambda: None
_mswindows = sys.platform.startswith('win')
Expand All @@ -54,6 +58,8 @@

logger = logging.getLogger(__name__)

OVERRIDE_CAPTURE_OUTPUT = CaptureOutputMode.NORMAL


class _SignalFlush:
def __init__(self, ostream, handle):
Expand Down Expand Up @@ -297,14 +303,14 @@ class capture_output:

"""

startup_shutdown = threading.Lock()

def __init__(self, output=None, capture_fd=False):
self.output = output
self.output_stream = None
self.old = None
self.tee = None
self.capture_fd = capture_fd
self.capture_fd = capture_fd and (
OVERRIDE_CAPTURE_OUTPUT & CaptureOutputMode.ENABLE_FD_CAPTURE
)
self.context_stack = []
Comment thread
mrmundt marked this conversation as resolved.

def _enter_context(self, cm, prior_to=None):
Expand Down Expand Up @@ -339,13 +345,12 @@ def _exit_context_stack(self, et, ev, tb):
cm.__exit__(et, ev, tb)
except:
_stack = self.context_stack
FAIL.append(
f"{sys.exc_info()[0].__name__}: {sys.exc_info()[1]} ({len(_stack)+1}: {cm}@{id(cm):x})"
)
_et, _e, _tb = sys.exc_info()
FAIL.append(f"{_et.__name__}: {_e} ({len(_stack)+1}: {cm}@{id(cm):x})")
return FAIL

def __enter__(self):
if not capture_output.startup_shutdown.acquire(timeout=_poll_timeout_deadlock):
if not dependencies.capture_output_lock.acquire(timeout=_threading_deadlock):
# This situation *shouldn't* happen. If it does, it is
# unlikely that the user can fix it (or even debug it).
# Instead they should report it back to us.
Expand All @@ -358,20 +363,22 @@ def __enter__(self):
# was trying to start up / run (so the other solver held
# the lock, but the GC interrupted that thread and
# wouldn't let go).
raise DeveloperError("Deadlock starting capture_output")
if not python_is_shutting_down():
raise DeveloperError("Deadlock starting capture_output")
try:
return self._enter_impl()
finally:
capture_output.startup_shutdown.release()
dependencies.capture_output_lock.release()

def __exit__(self, et, ev, tb):
if not capture_output.startup_shutdown.acquire(timeout=_poll_timeout_deadlock):
if not dependencies.capture_output_lock.acquire(timeout=_threading_deadlock):
# See comments & breadcrumbs in __enter__() above.
raise DeveloperError("Deadlock closing capture_output")
if not python_is_shutting_down():
raise DeveloperError("Deadlock closing capture_output")
try:
return self._exit_impl(et, ev, tb)
finally:
capture_output.startup_shutdown.release()
dependencies.capture_output_lock.release()

def _enter_impl(self):
self.old = (sys.stdout, sys.stderr)
Expand Down Expand Up @@ -486,8 +493,11 @@ def _enter_impl(self):
# exception.
self._exit_context_stack(*sys.exc_info())
raise
sys.stdout = self.tee.STDOUT
sys.stderr = self.tee.STDERR
if OVERRIDE_CAPTURE_OUTPUT & CaptureOutputMode.ENABLE_STREAM_CAPTURE:
sys.stdout = self.tee.STDOUT
sys.stderr = self.tee.STDERR
else:
self.old = None
buf = self.tee.ostreams
if len(buf) == 1:
buf = buf[0]
Expand Down
15 changes: 8 additions & 7 deletions pyomo/common/tests/test_multithread.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@
# ____________________________________________________________________________________

import threading
import pyomo.common.unittest as unittest
from pyomo.common.multithread import *
from threading import Thread
from multiprocessing.dummy import Pool as ThreadPool

import pyomo.common.unittest as unittest

from pyomo.common.multithread import MultiThreadWrapper, MultiThreadWrapperWithMain
from pyomo.opt.base.solvers import check_available_solvers

import pyomo.environ as pyo


class Dummy:
"""asdfg"""
Expand Down Expand Up @@ -103,10 +108,6 @@ def thread_func():
)
def test_solve(self):
# Based on the minimal example in https://github.com/Pyomo/pyomo/issues/2475
import pyomo.environ as pyo
from pyomo.opt import SolverFactory
from multiprocessing.dummy import Pool as ThreadPool

model = pyo.ConcreteModel()
model.nVars = pyo.Param(initialize=4)
model.N = pyo.RangeSet(model.nVars)
Expand All @@ -115,7 +116,7 @@ def test_solve(self):
model.cuts = pyo.ConstraintList()

def test(model):
opt = SolverFactory('glpk')
opt = pyo.SolverFactory('glpk')
opt.solve(model)

# Iterate, adding a cut to exclude the previously found solution
Expand Down
45 changes: 37 additions & 8 deletions pyomo/common/tests/test_tee.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pyomo.common.errors import DeveloperError
from pyomo.common.log import LoggingIntercept, LogStream
from pyomo.common.tempfiles import TempfileManager
import pyomo.common.dependencies as deps
import pyomo.common.tee as tee
import pyomo.common.unittest as unittest

Expand Down Expand Up @@ -581,28 +582,40 @@ def test_capture_output_stack_error(self):
logging.getLogger('pyomo.common.tee').handlers.clear()

def test_atomic_deadlock(self):
save_poll = tee._poll_timeout_deadlock
tee._poll_timeout_deadlock = 0.01
save_poll = tee._threading_deadlock
tee._threading_deadlock = 0.01

# Ensure multiprocessing is loaded:
deps.multiprocessing.Lock

co = tee.capture_output()
try:
tee.capture_output.startup_shutdown.acquire()
deps.capture_output_lock.acquire()
with self.assertRaisesRegex(
DeveloperError, "Deadlock starting capture_output"
):
with tee.capture_output():
pass
tee.capture_output.startup_shutdown.release()
deps.capture_output_lock.release()

with self.assertRaisesRegex(
DeveloperError, "Deadlock closing capture_output"
):
with co:
tee.capture_output.startup_shutdown.acquire()
deps.capture_output_lock.acquire()
finally:
tee._poll_timeout_deadlock = save_poll
if tee.capture_output.startup_shutdown.locked():
tee.capture_output.startup_shutdown.release()
tee._threading_deadlock = save_poll
# We would like to just test if out Lock was acquired and
# then release it if necessary. Unfortunately,
# multiprocessing.Lock doesn't support locked(), so we will
# just catch and eat the error for releasing an unlocked
# lock.
#
## if deps.capture_output_lock.locked():
Comment thread
mrmundt marked this conversation as resolved.
try:
deps.capture_output_lock.release()
except ValueError:
pass
co.reset()

def test_capture_output_invalid_ostream(self):
Expand Down Expand Up @@ -710,6 +723,22 @@ def flush(self):
finally:
tee._poll_timeout, tee._poll_timeout_deadlock = _save

def test_capture_output_override(self):
capture1 = tee.capture_output(capture_fd=True)
self.assertTrue(capture1.capture_fd)
with capture1 as OUT1:
try:
orig = tee.OVERRIDE_CAPTURE_OUTPUT
tee.OVERRIDE_CAPTURE_OUTPUT = tee.CaptureOutputMode.DISABLE
capture2 = tee.capture_output(capture_fd=True)
with capture2 as OUT2:
self.assertFalse(capture2.capture_fd)
print("Hello, World")
self.assertEqual(OUT2.getvalue(), "")
finally:
tee.OVERRIDE_CAPTURE_OUTPUT = orig
self.assertEqual(OUT1.getvalue(), "Hello, World\n")


class BufferTester:
def setUp(self):
Expand Down
27 changes: 24 additions & 3 deletions pyomo/common/tests/test_unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
# ____________________________________________________________________________________

import datetime
import multiprocessing
import os
import pickle
import time

import pyomo.common.unittest as unittest
import pyomo.common.dependencies as deps
from pyomo.common.dependencies import multiprocessing
from pyomo.common.log import LoggingIntercept
from pyomo.common.tee import capture_output
from pyomo.common.tempfiles import TempfileManager
Expand Down Expand Up @@ -171,8 +172,28 @@ def test_assertStructuredAlmostEqual_numericvalue(self):

def test_timeout_fcn_call(self):
self.assertEqual(short_sleep(), 42)
with self.assertRaisesRegex(TimeoutError, 'test timed out after 0.01 seconds'):
long_sleep()
with LoggingIntercept() as LOG:
with self.assertRaisesRegex(
TimeoutError, 'test timed out after 0.01 seconds'
):
long_sleep()
self.assertEqual(LOG.getvalue(), "")
deps.capture_output_lock.acquire()
save = unittest._timeout_terminate_timeout
unittest._timeout_terminate_timeout = 0.01
try:
with self.assertRaisesRegex(
TimeoutError, 'test timed out after 0.01 seconds'
):
long_sleep()
finally:
unittest._timeout_terminate_timeout = save
deps.capture_output_lock.release()
self.assertEqual(
LOG.getvalue(),
"Failed to acquire capture_output_lock Lock before "
"terminating subprocess on timeout: process deadlock is likely.\n",
)
with self.assertRaisesRegex(
NameError, r"name 'foo' is not defined\s+Original traceback:"
):
Expand Down
Loading
Loading