Skip to content
Permalink
Browse files
[webkitcorey] Gracefully handle CNTRL-C in TaskPool
https://bugs.webkit.org/show_bug.cgi?id=226238
<rdar://problem/78472148>

Reviewed by Dewei Zhu.

* Scripts/libraries/webkitcorepy/setup.py: Bump version.
* Scripts/libraries/webkitcorepy/webkitcorepy/__init__.py: Ditto.
* Scripts/libraries/webkitcorepy/webkitcorepy/task_pool.py:
(_BiDirectionalQueue.close): Tear-down queue objects while suppressing any logging.
(_Process.handler): Both SIGTERM and SIGINT signals should stop worker processes.
(_Process.main): Add SIGINT handler, explicitly close queue.
(TaskPool.__init__): Defer worker and queue construction to context manager so that we
do not have an instantiated but invalid queue if pipes are broken by children processes.
(TaskPool.__enter__): Construct worker processes.
(TaskPool.do): Only catch Queue.Empty exception.
(TaskPool.__exit__): Explicitly close queue, reset workers and queue.


Canonical link: https://commits.webkit.org/238230@main
git-svn-id: https://svn.webkit.org/repository/webkit/trunk@278187 268f45cc-cd09-0410-ab3c-d52691b4dbfc
  • Loading branch information
JonWBedard committed May 28, 2021
1 parent bc14815 commit 35efa1b950875c9513810777b1a140209c83c948
Showing 4 changed files with 61 additions and 14 deletions.
@@ -1,3 +1,23 @@
2021-05-27 Jonathan Bedard <jbedard@apple.com>

[webkitcorey] Gracefully handle CNTRL-C in TaskPool
https://bugs.webkit.org/show_bug.cgi?id=226238
<rdar://problem/78472148>

Reviewed by Dewei Zhu.

* Scripts/libraries/webkitcorepy/setup.py: Bump version.
* Scripts/libraries/webkitcorepy/webkitcorepy/__init__.py: Ditto.
* Scripts/libraries/webkitcorepy/webkitcorepy/task_pool.py:
(_BiDirectionalQueue.close): Tear-down queue objects while suppressing any logging.
(_Process.handler): Both SIGTERM and SIGINT signals should stop worker processes.
(_Process.main): Add SIGINT handler, explicitly close queue.
(TaskPool.__init__): Defer worker and queue construction to context manager so that we
do not have an instantiated but invalid queue if pipes are broken by children processes.
(TaskPool.__enter__): Construct worker processes.
(TaskPool.do): Only catch Queue.Empty exception.
(TaskPool.__exit__): Explicitly close queue, reset workers and queue.

2021-05-27 Darin Adler <darin@apple.com>

Next step toward using std::optional directly instead of through WTF::Optional typedef
@@ -30,7 +30,7 @@ def readme():

setup(
name='webkitcorepy',
version='0.5.14',
version='0.5.15',
description='Library containing various Python support classes and functions.',
long_description=readme(),
classifiers=[
@@ -37,7 +37,7 @@
from webkitcorepy.task_pool import TaskPool
from webkitcorepy.credentials import credentials

version = Version(0, 5, 14)
version = Version(0, 5, 15)

from webkitcorepy.autoinstall import Package, AutoInstall
if sys.version_info > (3, 0):
@@ -27,6 +27,11 @@
import signal
import sys

if sys.version_info < (3, 0):
import Queue
else:
import queue as Queue

from webkitcorepy import OutputCapture, Timeout, log


@@ -146,6 +151,13 @@ def receive(self, blocking=True):
return self.incoming.get(timeout=difference)
return self.incoming.get()

def close(self):
with OutputCapture():
self.outgoing.close()
self.incoming.close()
self.outgoing.join_thread()
self.incoming.join_thread()


class _Process(object):
name = None
@@ -228,7 +240,7 @@ def writable(self):

@classmethod
def handler(cls, value, _):
if value == getattr(signal, 'SIGTERM'):
if value in (getattr(signal, 'SIGTERM'), getattr(signal, 'SIGINT')):
cls.working = False

@classmethod
@@ -246,6 +258,8 @@ def main(cls, name, loglevel, setup, setupargs, setupkwargs, queue, teardown, te

if getattr(signal, 'SIGTERM'):
signal.signal(signal.SIGTERM, cls.handler)
if getattr(signal, 'SIGINT'):
signal.signal(signal.SIGINT, cls.handler)

logger = logging.getLogger()
for handler in logger.handlers:
@@ -280,6 +294,7 @@ def main(cls, name, loglevel, setup, setupargs, setupkwargs, queue, teardown, te
sys.stdout.flush()
sys.stderr.flush()
queue.send(_State(_State.STOPPING))
cls.queue.close()
cls.queue = None


@@ -309,21 +324,18 @@ def __init__(
name = name or 'worker'
if name == self.Process.name:
raise ValueError("Parent process is already named {}".format(name))
self.name = name

if workers < 1:
raise ValueError('TaskPool requires positive number of workers')

self.queue = self.BiDirectionalQueue()
self.queue = None
self.workers = []

self._setup_args = (setup, setupargs, setupkwargs)
self._teardown_args = (teardown, teardownargs, teardownkwargs)
self._num_workers = int(workers)

self.workers = [multiprocessing.Process(
target=self.Process.main,
args=(
'{}/{}'.format(name, count), logging.getLogger().getEffectiveLevel(),
setup, setupargs, setupkwargs,
self.BiDirectionalQueue(outgoing=self.queue.incoming, incoming=self.queue.outgoing),
teardown, teardownargs, teardownkwargs,
),
) for count in range(workers)]
self._started = 0

self.callbacks = {}
@@ -334,6 +346,17 @@ def __init__(
def __enter__(self):
from mock import patch

self.queue = self.BiDirectionalQueue()
self.workers = [multiprocessing.Process(
target=self.Process.main,
args=(
'{}/{}'.format(self.name, count), logging.getLogger().getEffectiveLevel(),
self._setup_args[0], self._setup_args[1], self._setup_args[2],
self.BiDirectionalQueue(outgoing=self.queue.incoming, incoming=self.queue.outgoing),
self._teardown_args[0], self._teardown_args[1], self._teardown_args[2],
),
) for count in range(self._num_workers)]

with Timeout(seconds=10, patch=False, handler=self.Exception('Failed to start all workers')):
for worker in self.workers:
worker.start()
@@ -353,7 +376,7 @@ def do(self, function, *args, **kwargs):
while True:
try:
self.queue.receive(blocking=False)(self)
except Exception:
except Queue.Empty:
break

def wait(self):
@@ -393,3 +416,7 @@ def __exit__(self, *args, **kwargs):
os.kill(worker.pid, signal.SIGKILL)
else:
worker.terminate()

self.queue.close()
self.queue = None
self.workers = []

0 comments on commit 35efa1b

Please sign in to comment.