Skip to content

Commit

Permalink
added test and rebased on master changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Rose Yemelyanova committed May 3, 2023
1 parent a3a3891 commit e82f4d4
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
11 changes: 6 additions & 5 deletions src/blueapi/worker/reworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import uuid
from dataclasses import dataclass
from functools import partial
from queue import Queue
from queue import Full, Queue
from threading import Event, RLock
from typing import Any, Dict, Iterable, List, Mapping, Optional, Union

Expand All @@ -15,6 +15,7 @@
EventStream,
WatchableStatus,
)
from blueapi.worker.worker_busy_error import WorkerBusyError

from .event import (
ProgressEvent,
Expand Down Expand Up @@ -85,11 +86,11 @@ def __init__(
def submit_task(self, name: str, task: Task) -> None:
active_task = ActiveTask(name, task)
LOGGER.info(f"Submitting: {active_task}")
if self._task_queue.qsize() != 0:
try:
self._task_queue.put_nowait(active_task)
except Full:
LOGGER.error("Cannot submit task while another is running")
raise Exception("Cannot submit task while another is running")

self._task_queue.put(active_task)
raise WorkerBusyError("Cannot submit task while another is running")

def start(self) -> None:
if self._started.is_set():
Expand Down
3 changes: 3 additions & 0 deletions src/blueapi/worker/worker_busy_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class WorkerBusyError(Exception):
def __init__(self, message):
super().__init__(message)
10 changes: 10 additions & 0 deletions tests/worker/test_reworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
WorkerEvent,
WorkerState,
)
from blueapi.worker.worker_busy_error import WorkerBusyError


@pytest.fixture
Expand Down Expand Up @@ -125,3 +126,12 @@ def on_event(event: E, event_id: Optional[str]) -> None:
sub = stream.subscribe(on_event)
future.add_done_callback(lambda _: stream.unsubscribe(sub))
return future


def test_worker_only_accepts_one_task_on_queue(worker: Worker, timeout: float = 5.0):
worker.start()
task: Task = RunPlan(name="sleep", params={"time": 1.0})

worker.submit_task("first_task", task)
with pytest.raises(WorkerBusyError):
worker.submit_task("second_task", task)

0 comments on commit e82f4d4

Please sign in to comment.