Skip to content

Commit

Permalink
No sense in repeating unchanging information
Browse files Browse the repository at this point in the history
The much more interesting information is when data _changes_.
  • Loading branch information
khk-globus committed Jul 22, 2022
1 parent 339a8c3 commit a9d70f1
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions funcx_endpoint/funcx_endpoint/strategies/simple.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import logging
import math
import time
Expand Down Expand Up @@ -36,6 +38,11 @@ def __init__(self, *args, threshold=20, interval=1, max_idletime=60):
self.max_idletime = max_idletime
self.executors = {"idle_since": None}

# caching vars; for log noise reduction
self._prev_info = None
self._prev_task_breakdown = None
self._prev_status = None

def strategize(self, *args, **kwargs):
try:
self._strategize(*args, **kwargs)
Expand All @@ -45,7 +52,9 @@ def strategize(self, *args, **kwargs):

def _strategize(self, *args, **kwargs):
task_breakdown = self.interchange.get_outstanding_breakdown()
log.debug(f"Task breakdown {task_breakdown}")
if task_breakdown != self._prev_task_breakdown:
self._prev_task_breakdown = task_breakdown
log.debug(f"Task breakdown {task_breakdown}")

min_blocks = self.interchange.provider.min_blocks
max_blocks = self.interchange.provider.max_blocks
Expand All @@ -61,21 +70,30 @@ def _strategize(self, *args, **kwargs):

active_tasks = sum(self.interchange.get_total_tasks_outstanding().values())
status = self.interchange.provider_status()
log.debug(f"Provider status : {status}")

running = sum(1 for x in status if x.state == JobState.RUNNING)
pending = sum(1 for x in status if x.state == JobState.PENDING)
active_blocks = running + pending
active_slots = active_blocks * tasks_per_node * nodes_per_block

log.debug(
"Endpoint has %s active tasks, %s/%s running/pending blocks, "
"and %s connected workers",
status = str(status)
if status != self._prev_status:
self._prev_status = status
log.debug(f"Provider status: {status}")

cur_info = (
active_tasks,
running,
pending,
self.interchange.get_total_live_workers(),
)
if cur_info != self._prev_info:
self._prev_info = cur_info
log.debug(
"Endpoint has %s active tasks, %s/%s running/pending blocks, "
"and %s connected workers",
*cur_info,
)

# reset kill timer if executor has active tasks
if active_tasks > 0 and self.executors["idle_since"]:
Expand Down

0 comments on commit a9d70f1

Please sign in to comment.