Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,15 +425,15 @@ def test_cancelled_resumed_after_flight_with_dependencies_workerstate(ws):
ws2 = "127.0.0.1:2"
instructions = ws.handle_stimulus(
# Create task x and put it in flight from ws2
ComputeTaskEvent.dummy(key="y", who_has={"x": [ws2]}, stimulus_id="s1"),
ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s1"),
# The scheduler realises that ws2 is unresponsive, although ws doesn't know yet.
# Having lost the last surviving replica of x, the scheduler cancels all of its
# dependents. This also cancels x.
FreeKeysEvent(keys=["y"], stimulus_id="s2"),
# The scheduler reschedules x on another worker, which just happens to be one
# that was previously fetching it. This does not generate an Execute
# instruction, because the GatherDep instruction isn't complete yet.
ComputeTaskEvent.dummy(key="x", stimulus_id="s3"),
ComputeTaskEvent.dummy("x", stimulus_id="s3"),
# After ~30s, the TCP socket with ws2 finally times out and collapses.
# This triggers the Execute instruction.
GatherDepNetworkFailureEvent(worker=ws2, total_nbytes=1, stimulus_id="s4"),
Expand Down Expand Up @@ -556,9 +556,7 @@ def test_resume_executing_worker_state(ws_with_running_task):

instructions = ws.handle_stimulus(
FreeKeysEvent(keys=["x"], stimulus_id="s1"),
ComputeTaskEvent.dummy(
key="x", resource_restrictions={"R": 1}, stimulus_id="s2"
),
ComputeTaskEvent.dummy("x", resource_restrictions={"R": 1}, stimulus_id="s2"),
)
assert not instructions
assert ws.tasks["x"] is ts
Expand Down
12 changes: 1 addition & 11 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -1330,17 +1330,7 @@ def test_steal_worker_state(ws_with_running_task):
assert ws.available_resources == {"R": 0}
assert ws.tasks["x"].state == "cancelled"

instructions = ws.handle_stimulus(
ExecuteSuccessEvent(
key="x",
value=None,
start=0.0,
stop=1.0,
nbytes=8,
type=None,
stimulus_id="s2",
),
)
instructions = ws.handle_stimulus(ExecuteSuccessEvent.dummy("x", stimulus_id="s2"))
assert not instructions
assert "x" not in ws.tasks
assert "x" not in ws.data
Expand Down
34 changes: 32 additions & 2 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def test_computetask_to_dict():


def test_computetask_dummy():
ev = ComputeTaskEvent.dummy(key="x", stimulus_id="s")
ev = ComputeTaskEvent.dummy("x", stimulus_id="s")
assert ev == ComputeTaskEvent(
key="x",
who_has={},
Expand All @@ -326,7 +326,7 @@ def test_computetask_dummy():
)

# nbytes is generated from who_has if omitted
ev2 = ComputeTaskEvent.dummy(key="x", who_has={"y": "127.0.0.1:2"}, stimulus_id="s")
ev2 = ComputeTaskEvent.dummy("x", who_has={"y": "127.0.0.1:2"}, stimulus_id="s")
assert ev2.nbytes == {"y": 1}


Expand Down Expand Up @@ -391,6 +391,22 @@ def test_executesuccess_to_dict():
assert ev3.type is None


def test_executesuccess_dummy():
ev = ExecuteSuccessEvent.dummy("x", stimulus_id="s")
assert ev == ExecuteSuccessEvent(
key="x",
value=None,
start=0.0,
stop=1.0,
nbytes=1,
type=None,
stimulus_id="s",
)

ev2 = ExecuteSuccessEvent.dummy("x", 123, stimulus_id="s")
assert ev2.value == 123


def test_executefailure_to_dict():
ev = ExecuteFailureEvent(
stimulus_id="test",
Expand Down Expand Up @@ -431,6 +447,20 @@ def test_executefailure_to_dict():
assert ev3.traceback_text == "tb text"


def test_executefailure_dummy():
ev = ExecuteFailureEvent.dummy("x", stimulus_id="s")
assert ev == ExecuteFailureEvent(
key="x",
start=None,
stop=None,
exception=Serialize(None),
traceback=None,
exception_text="",
traceback_text="",
stimulus_id="s",
)


@gen_cluster(client=True)
async def test_fetch_to_compute(c, s, a, b):
with freeze_data_fetching(b):
Expand Down
43 changes: 42 additions & 1 deletion distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,8 @@ def _after_from_dict(self) -> None:

@staticmethod
def dummy(
*,
key: str,
*,
who_has: dict[str, Collection[str]] | None = None,
nbytes: dict[str, int] | None = None,
priority: tuple[int, ...] = (0,),
Expand Down Expand Up @@ -746,6 +746,27 @@ def _after_from_dict(self) -> None:
self.value = None
self.type = None

@staticmethod
def dummy(
key: str,
value: object = None,
*,
nbytes: int = 1,
stimulus_id: str,
) -> ExecuteSuccessEvent:
"""Build a dummy event, with most attributes set to a reasonable default.
This is a convenience method to be used in unit testing only.
"""
return ExecuteSuccessEvent(
key=key,
value=value,
start=0.0,
stop=1.0,
nbytes=nbytes,
type=None,
stimulus_id=stimulus_id,
)


@dataclass
class ExecuteFailureEvent(StateMachineEvent):
Expand Down Expand Up @@ -788,6 +809,26 @@ def from_exception(
stimulus_id=stimulus_id,
)

@staticmethod
def dummy(
key: str,
*,
stimulus_id: str,
) -> ExecuteFailureEvent:
"""Build a dummy event, with most attributes set to a reasonable default.
This is a convenience method to be used in unit testing only.
"""
return ExecuteFailureEvent(
key=key,
start=None,
stop=None,
exception=Serialize(None),
traceback=None,
exception_text="",
traceback_text="",
stimulus_id=stimulus_id,
)


@dataclass
class CancelComputeEvent(StateMachineEvent):
Expand Down