Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ChildWorker module #8527

Merged
merged 4 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions src/python/TaskWorker/ChildWorker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
Fork process to run a function inside child process
This to prevent the worker process get stuck or die without master notice.
Leverage concurrent.futures.ProcessPoolExecutor to fork process with single
process, return value back or propargate exception from child process
to caller.

The startChildWorker() can handle coredump, timeout, and generic exception.

Original issue: https://github.com/dmwm/CRABServer/issues/8428

Note about `logger` object. This works out-of-the-box because:
- We spawn child process with `fork`
- Worker process stop and do nothing, wait until `work()` finish.
This makes child-worker and worker processes have the same log file fd, but only
one write the logs to the file at a time. Not sure if there is any risk of
deadlock. Need more test on production.

See more: https://github.com/python/cpython/issues/84559
Possible solution (but need a lot of code change): https://stackoverflow.com/a/32065395
"""

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool
import multiprocessing as mp
import signal
from TaskWorker.WorkerExceptions import ChildUnexpectedExitException, ChildTimeoutException


def startChildWorker(config, work, workArgs, logger):
"""
Public function to run any function in child-worker.

:param config: crab configuration object
:type config: WMCore.Configuration.ConfigurationEx
:param work: a function that need to run in child process
:type work: function
:param workArgs: tuple of arguments of `work()`
:type workArgs: tuple
:param logger: log object
:param logger: logging.Logger

:returns: return value from `work()`
:rtype: any
"""
procTimeout = config.FeatureFlags.childWorkerTimeout
# Force start method to 'fork' to inherit logging setting. Otherwise logs
# from child-worker will not go to log files in process/tasks or propagate
# back to MasterWorker process.
with ProcessPoolExecutor(max_workers=1, mp_context=mp.get_context('fork')) as executor:
future = executor.submit(_runChildWorker, work, workArgs, procTimeout, logger)
try:
outputs = future.result(timeout=procTimeout+1)
except BrokenProcessPool as e:
raise ChildUnexpectedExitException('Child process exited unexpectedly.') from e
except TimeoutError as e:
raise ChildTimeoutException(f'Child process timeout reached (timeout {procTimeout} seconds).') from e
except Exception as e:
raise e
return outputs

def _signalHandler(signum, frame):
"""
Simply raise timeout exception and let ProcessPoolExecutor propagate error
back to parent process.
Boilerplate come from https://docs.python.org/3/library/signal.html#examples
"""
raise TimeoutError("The process reached timeout.")

def _runChildWorker(work, workArgs, timeout, logger):
"""
The wrapper function to start running `work()` on the child-worker. It
install SIGALARM with `timeout` to stop processing current work and raise
TimeoutError when timeout is reach.

:param work: a function that need to run in child process
:type work: function
:param workArgs: tuple of arguments of `work()`
:type workArgs: tuple
:param timeout: function call timeout in seconds
:type timeout: int
:param loggerConfig: logger configuration
:param loggerConfig: dict

:returns: return value from `work()`
:rtype: any
"""

# main
logger.debug(f'Installing SIGALARM with timeout {timeout} seconds.')
signal.signal(signal.SIGALRM, _signalHandler)
signal.alarm(timeout)
outputs = work(*workArgs)
logger.debug('Uninstalling SIGALARM.')
signal.alarm(0)
return outputs
3 changes: 3 additions & 0 deletions src/python/TaskWorker/SequentialWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,8 @@
import pdb
pdb.set_trace()

# no childWorker when running with pdb
config.FeatureFlags.childWorker = False

mc = MasterWorker(config=config, logWarning=False, logDebug=True, sequential=True, console=True)
mc.algorithm()
20 changes: 17 additions & 3 deletions src/python/TaskWorker/Worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

from RESTInteractions import CRABRest
from TaskWorker.DataObjects.Result import Result
from ServerUtilities import truncateError, executeCommand
from TaskWorker.WorkerExceptions import WorkerHandlerException, TapeDatasetException
from ServerUtilities import truncateError, executeCommand, FEEDBACKMAIL
from TaskWorker.WorkerExceptions import WorkerHandlerException, TapeDatasetException, ChildUnexpectedExitException, ChildTimeoutException
from TaskWorker.ChildWorker import startChildWorker


## Creating configuration globals to avoid passing these around at every request
Expand Down Expand Up @@ -96,12 +97,25 @@ def processWorkerLoop(inputs, results, resthost, dbInstance, procnum, logger, lo
logger.debug("%s: Starting %s on %s", procName, str(work), task['tm_taskname'])
try:
msg = None
outputs = work(resthost, dbInstance, WORKER_CONFIG, task, procnum, inputargs)
if hasattr(WORKER_CONFIG, 'FeatureFlags') and \
hasattr(WORKER_CONFIG.FeatureFlags, 'childWorker') and \
WORKER_CONFIG.FeatureFlags.childWorker:
logger.debug(f'Run {work.__name__} in childWorker.')
args = (resthost, dbInstance, WORKER_CONFIG, task, procnum, inputargs)
outputs = startChildWorker(WORKER_CONFIG, work, args, logger)
else:
outputs = work(resthost, dbInstance, WORKER_CONFIG, task, procnum, inputargs)
except TapeDatasetException as tde:
outputs = Result(task=task, err=str(tde))
except WorkerHandlerException as we:
outputs = Result(task=task, err=str(we))
msg = str(we)
except (ChildUnexpectedExitException, ChildTimeoutException) as e:
# custom message
outputs = Result(task=task, err=str(e))
msg = f"Server-side failed with an error: {str(e)}"
msg += "\n This could be a temporary glitch. Please try again later."
msg += f"\n If the error persists, please send an e-mail to {FEEDBACKMAIL}."
except Exception as exc: #pylint: disable=broad-except
outputs = Result(task=task, err=str(exc))
msg = "%s: I just had a failure for %s" % (procName, str(exc))
Expand Down
8 changes: 7 additions & 1 deletion src/python/TaskWorker/WorkerExceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,10 @@ class TapeDatasetException(TaskWorkerException):


class CannotMigrateException(TaskWorkerException):
"""Used by Publisher in case DBS server refuses to migrate"""
"""Used by Publisher in case DBS server refuses to migrate"""

class ChildUnexpectedExitException(TaskWorkerException):
"""Used by ChildWorker simply to rename BrokenProcessPool to be more understandable name"""

class ChildTimeoutException(TaskWorkerException):
"""Used by ChildWorker to rename built-in TimeoutException from SIGALARM's signalHandler function to be more understandable name"""
58 changes: 58 additions & 0 deletions test/python/TaskWorker/test_ChildWorker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import time
import pytest
import ctypes
from argparse import Namespace
from unittest.mock import Mock

from WMCore.Configuration import ConfigurationEx
from TaskWorker.ChildWorker import startChildWorker
from TaskWorker.WorkerExceptions import ChildUnexpectedExitException, ChildTimeoutException



@pytest.fixture
def config_ChildWorker():
config = ConfigurationEx()
config.section_("FeatureFlags")
config.FeatureFlags.childWorkerTimeout = 1
return config

@pytest.fixture
def mock_logger():
logger = Mock()
logger.name = '1'
return logger

def fn(n, timeSleep=0, mode='any'):
"""
function to test startChildWorker contains 4 behaviors
1. normal: normal function
2. exception: any exception occur in child should raise to parent properly
3. timeout: should raise ChildTimeoutException
4. coredump: should raise ChildUnexpectedExitException
"""
print(f'executing function with n={n},timeSleep={timeSleep},mode={mode}')
if mode == 'exception':
raise TypeError('simulate raise generic exception')
elif mode == 'timeout':
time.sleep(timeSleep)
elif mode == 'coredump':
#https://codegolf.stackexchange.com/a/22383
ctypes.string_at(1)
else:
pass
return n*5

testList = [
(17, 0, 'any', None),
(17, 0, 'exception', TypeError),
(17, 5, 'timeout', ChildTimeoutException),
(17, 0, 'coredump', ChildUnexpectedExitException),
]
@pytest.mark.parametrize("n, timeSleep, mode, exceptionObj", testList)
def test_executeTapeRecallPolicy_allow(n, timeSleep, mode, exceptionObj, config_ChildWorker, mock_logger):
if not exceptionObj:
startChildWorker(config_ChildWorker, fn, (n, timeSleep, mode), mock_logger)
else:
with pytest.raises(exceptionObj):
startChildWorker(config_ChildWorker, fn, (n, timeSleep, mode), mock_logger)