Skip to content

Commit

Permalink
Add test for unhandled exception in sync worker
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasst committed Apr 25, 2024
1 parent d6291dd commit b90593e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
4 changes: 4 additions & 0 deletions tests/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def exception_task():
raise Exception("this failed")


def system_exit_task():
raise SystemExit()


@tiger.task(queue="other")
def task_on_other_queue():
pass
Expand Down
26 changes: 26 additions & 0 deletions tests/test_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
long_task_ok,
simple_task,
sleep_task,
system_exit_task,
wait_for_long_task,
)
from .test_base import BaseTestCase
Expand Down Expand Up @@ -238,3 +239,28 @@ def test_heartbeat(self, tiger):
assert task_lock_2 > DELAY

worker.kill()

def test_stop_heartbeat_thread_on_unhandled_exception(
self, tiger, ensure_queues
):
task = Task(tiger, system_exit_task)
task.delay()

# Start a worker and wait until it starts processing.
worker = Process(
target=external_worker,
kwargs={
"worker_kwargs": {
"executor_class": SyncExecutor,
},
},
)
worker.start()

# Ensure process exits and does not hang here.
worker.join()

# Since SystemExit derives from BaseException and is therefore not
# handled by the executor, the task is still active until it times out
# and gets requeued by another worker.
ensure_queues(active={"default": 1})

0 comments on commit b90593e

Please sign in to comment.