Skip to content

Commit

Permalink
Rename PolledExecutorFacade methods that overlap BlockProviderExecutor (
Browse files Browse the repository at this point in the history
#3351)

This is in preparation for moving all the PolledExecutorFacade code into
BlockProviderExecutor, in an upcoming PR.

There should now be no intersection in the attributes or methods of
PolledExecutorFacade and BlockProviderExecutor, in the methods that
will move. __init__ is an exception because it will merge with the
__init__ of BlockProviderException.

The renamed methods are slightly awkwardly named because there isn't a
short adjective that I have found that adequately describes the difference
between BlockProviderExecutor and PolledExecutorFacade behaviours.

Future untangling work in this area will necessarily involve quite a lot
of rearranging and picking appropriate names for whatever remains is part
of that future work - so no bikeshedding over adjectives.

This PR should only rename methods. It should not change any behaviour.
  • Loading branch information
benclifford committed Apr 11, 2024
1 parent 5c1ffd4 commit ceda7ef
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
16 changes: 8 additions & 8 deletions parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self, executor: BlockProviderExecutor):
self._last_poll_time = 0.0
self._status = {} # type: Dict[str, JobStatus]

def poll(self) -> None:
def poll_facade(self) -> None:
now = time.time()
if now >= self._last_poll_time + self._executor.status_polling_interval:
previous_status = self._status
Expand All @@ -36,7 +36,7 @@ def poll(self) -> None:
self._executor.send_monitoring_info(delta_status)

@property
def status(self) -> Dict[str, JobStatus]:
def status_facade(self) -> Dict[str, JobStatus]:
"""Return the status of all jobs/blocks of the executor of this poller.
:return: a dictionary mapping block ids (in string) to job status
Expand All @@ -47,7 +47,7 @@ def status(self) -> Dict[str, JobStatus]:
def executor(self) -> BlockProviderExecutor:
return self._executor

def scale_in(self, n: int, max_idletime: Optional[float] = None) -> List[str]:
def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[str]:

if max_idletime is None:
block_ids = self._executor.scale_in(n)
Expand All @@ -66,7 +66,7 @@ def scale_in(self, n: int, max_idletime: Optional[float] = None) -> List[str]:
self._executor.send_monitoring_info(new_status)
return block_ids

def scale_out(self, n: int) -> List[str]:
def scale_out_facade(self, n: int) -> List[str]:
block_ids = self._executor.scale_out(n)
if block_ids is not None:
new_status = {}
Expand All @@ -93,11 +93,11 @@ def poll(self) -> None:

def _run_error_handlers(self, status: List[PolledExecutorFacade]) -> None:
for es in status:
es.executor.handle_errors(es.status)
es.executor.handle_errors(es.status_facade)

def _update_state(self) -> None:
for item in self._executor_facades:
item.poll()
item.poll_facade()

def add_executors(self, executors: Sequence[BlockProviderExecutor]) -> None:
for executor in executors:
Expand All @@ -116,8 +116,8 @@ def close(self, timeout: Optional[float] = None) -> None:
# cancelling, but it is safe to be more, as the scaling
# code will cope with being asked to cancel more blocks
# than exist.
block_count = len(ef.status)
ef.scale_in(block_count)
block_count = len(ef.status_facade)
ef.scale_in_facade(block_count)

else: # and bad_state_is_set
logger.warning(f"Not scaling in executor {ef.executor.label} because it is in bad state")
14 changes: 7 additions & 7 deletions parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def _strategy_init_only(self, executor_facades: List[jsp.PolledExecutorFacade])
executor = ef.executor
if self.executors[executor.label]['first']:
logger.debug(f"strategy_init_only: scaling out {executor.provider.init_blocks} initial blocks for {executor.label}")
ef.scale_out(executor.provider.init_blocks)
ef.scale_out_facade(executor.provider.init_blocks)
self.executors[executor.label]['first'] = False
else:
logger.debug("strategy_init_only: doing nothing")
Expand Down Expand Up @@ -193,13 +193,13 @@ def _general_strategy(self, executor_facades: List[jsp.PolledExecutorFacade], *,
if self.executors[label]['first']:
executor = ef.executor
logger.debug(f"Scaling out {executor.provider.init_blocks} initial blocks for {label}")
ef.scale_out(executor.provider.init_blocks)
ef.scale_out_facade(executor.provider.init_blocks)
self.executors[label]['first'] = False

# Tasks that are either pending completion
active_tasks = executor.outstanding

status = ef.status
status = ef.status_facade

# FIXME we need to handle case where provider does not define these
# FIXME probably more of this logic should be moved to the provider
Expand Down Expand Up @@ -257,7 +257,7 @@ def _general_strategy(self, executor_facades: List[jsp.PolledExecutorFacade], *,
# We have resources idle for the max duration,
# we have to scale_in now.
logger.debug(f"Idle time has reached {self.max_idletime}s for executor {label}; scaling in")
ef.scale_in(active_blocks - min_blocks)
ef.scale_in_facade(active_blocks - min_blocks)

else:
logger.debug(
Expand All @@ -281,7 +281,7 @@ def _general_strategy(self, executor_facades: List[jsp.PolledExecutorFacade], *,
excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block))
excess_blocks = min(excess_blocks, max_blocks - active_blocks)
logger.debug(f"Requesting {excess_blocks} more blocks")
ef.scale_out(excess_blocks)
ef.scale_out_facade(excess_blocks)

elif active_slots == 0 and active_tasks > 0:
logger.debug("Strategy case 4a: No active slots but some active tasks - could scale out by a single block")
Expand All @@ -290,7 +290,7 @@ def _general_strategy(self, executor_facades: List[jsp.PolledExecutorFacade], *,
if active_blocks < max_blocks:
logger.debug("Requesting single block")

ef.scale_out(1)
ef.scale_out_facade(1)
else:
logger.debug("Not requesting single block, because at maxblocks already")

Expand All @@ -306,7 +306,7 @@ def _general_strategy(self, executor_facades: List[jsp.PolledExecutorFacade], *,
excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block))
excess_blocks = min(excess_blocks, active_blocks - min_blocks)
logger.debug(f"Requesting scaling in by {excess_blocks} blocks with idle time {self.max_idletime}s")
ef.scale_in(excess_blocks, max_idletime=self.max_idletime)
ef.scale_in_facade(excess_blocks, max_idletime=self.max_idletime)
else:
logger.error("This strategy does not support scaling in except for HighThroughputExecutor - taking no action")
else:
Expand Down

0 comments on commit ceda7ef

Please sign in to comment.