Skip to content

Commit

Permalink
Fixed broken process pool executor issue
Browse files Browse the repository at this point in the history
Fixes #362.
  • Loading branch information
agronholm committed Jan 17, 2021
1 parent a114d56 commit 616b2e6
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 2 deletions.
9 changes: 8 additions & 1 deletion apscheduler/executors/pool.py
@@ -1,4 +1,5 @@
from abc import abstractmethod
from concurrent.futures.process import BrokenProcessPool
import concurrent.futures

from apscheduler.executors.base import BaseExecutor, run_job
Expand All @@ -19,7 +20,13 @@ def callback(f):
else:
self._run_job_success(job.id, f.result())

f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
try:
f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
except BrokenProcessPool:
self._logger.warning('Process pool is broken; replacing pool with a fresh instance')
self._pool = self._pool.__class__(self._pool._max_workers)
f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)

f.add_done_callback(callback)

def shutdown(self, wait=True):
Expand Down
2 changes: 2 additions & 0 deletions docs/versionhistory.rst
Expand Up @@ -29,6 +29,8 @@ APScheduler, see the :doc:`migration section <migration>`.
one search condition
* Fixed a problem where bound methods added as jobs via textual references were called with an
unwanted extra ``self`` argument (PR by Pengjie Song)
* Fixed ``BrokenPoolError`` in ``ProcessPoolExecutor`` so that it will automatically replace the
broken pool with a fresh instance


3.6.3
Expand Down
34 changes: 33 additions & 1 deletion tests/test_executors.py
Expand Up @@ -2,14 +2,18 @@
from threading import Event
from types import TracebackType
import gc
import os
import signal
import time

import pytest
from pytz import UTC

from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, EVENT_JOB_EXECUTED
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED
from apscheduler.executors.base import MaxInstancesReachedError, run_job
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.job import Job
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.base import BaseScheduler

try:
Expand Down Expand Up @@ -144,3 +148,31 @@ def func():

foos = [x for x in gc.get_objects() if type(x) is FooBar]
assert len(foos) == 0


def test_broken_pool():
def listener(evt):
nonlocal pid
pid = evt.retval
event.set()

pid = None
event = Event()
scheduler = BackgroundScheduler(executors={'default': ProcessPoolExecutor(1)})
scheduler.add_listener(listener, EVENT_JOB_EXECUTED)
scheduler.add_job(os.getpid, 'date', run_date=datetime.now(UTC))
scheduler.start()

event.wait(3)
killed_pid = pid
os.kill(pid, signal.SIGTERM)
try:
os.waitpid(pid, 0)
except OSError:
pass

event.clear()
scheduler.add_job(os.getpid, 'date', run_date=datetime.now(UTC))
event.wait(3)
assert pid != killed_pid
scheduler.shutdown(True)

0 comments on commit 616b2e6

Please sign in to comment.