Skip to content

Commit

Permalink
Make HTEX scale-down be aware of unstarted blocks (#3353)
Browse files Browse the repository at this point in the history
Prior to this PR, the scale_in code for the HighThroughputExecutor will not scale in any block that has not had at least one manager register with the interchange, because it retrieves the list of blocks from the interchange. This is documented in issue #3232

This PR makes the htex scale in code also pay attention to blocks in the status_facade list - which includes blocks that have been submitted, and blocks which have been reported by the provider mechanism.
  • Loading branch information
benclifford committed Apr 12, 2024
1 parent e155bd1 commit e4b3c3b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 2 deletions.
16 changes: 14 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from parsl.serialize import pack_res_spec_apply_message, deserialize
from parsl.serialize.errors import SerializationError, DeserializationError
from parsl.app.errors import RemoteExceptionWrapper
from parsl.jobs.states import JobStatus, JobState
from parsl.jobs.states import JobStatus, JobState, TERMINAL_STATES
from parsl.executors.high_throughput import zmq_pipes
from parsl.executors.high_throughput import interchange
from parsl.executors.errors import (
Expand Down Expand Up @@ -713,8 +713,20 @@ class BlockInfo:
tasks: int # sum of tasks in this block
idle: float # shortest idle time of any manager in this block

# block_info will be populated from two sources:
# the Job Status Poller mutable block list, and the list of blocks
# which have connected to the interchange.

def new_block_info():
return BlockInfo(tasks=0, idle=float('inf'))

block_info: Dict[str, BlockInfo] = defaultdict(new_block_info)

for block_id, job_status in self._status.items():
if job_status.state not in TERMINAL_STATES:
block_info[block_id] = new_block_info()

managers = self.connected_managers()
block_info: Dict[str, BlockInfo] = defaultdict(lambda: BlockInfo(tasks=0, idle=float('inf')))
for manager in managers:
if not manager['active']:
continue
Expand Down
74 changes: 74 additions & 0 deletions parsl/tests/test_scaling/test_scale_down_htex_unregistered.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import logging
import time

import pytest

import parsl

from parsl import File, python_app
from parsl.jobs.states import JobState, TERMINAL_STATES
from parsl.providers import LocalProvider
from parsl.channels import LocalChannel
from parsl.launchers import SingleNodeLauncher
from parsl.config import Config
from parsl.executors import HighThroughputExecutor

logger = logging.getLogger(__name__)

_max_blocks = 1
_min_blocks = 0


def local_config():
return Config(
executors=[
HighThroughputExecutor(
heartbeat_period=1,
heartbeat_threshold=2,
poll_period=100,
label="htex_local",
address="127.0.0.1",
max_workers=1,
encrypted=True,
launch_cmd="sleep inf",
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=_max_blocks,
min_blocks=_min_blocks,
launcher=SingleNodeLauncher(),
),
)
],
max_idletime=0.5,
strategy='htex_auto_scale',
strategy_period=0.1
)


# see issue #1885 for details of failures of this test.
# at the time of issue #1885 this test was failing frequently
# in CI.
@pytest.mark.local
def test_scaledown_with_register(try_assert):
dfk = parsl.dfk()
htex = dfk.executors['htex_local']

num_managers = len(htex.connected_managers())
assert num_managers == 0, "Expected 0 managers at start"

try_assert(lambda: len(htex.status()),
fail_msg="Expected 1 block at start")

s = htex.status()
assert s['0'].state == JobState.RUNNING, "Expected block to be in RUNNING"

def check_zero_blocks():
s = htex.status()
return len(s) == 1 and s['0'].state in TERMINAL_STATES

try_assert(
check_zero_blocks,
fail_msg="Expected 0 blocks after idle scaledown",
timeout_ms=15000,
)

0 comments on commit e4b3c3b

Please sign in to comment.