-
-
Notifications
You must be signed in to change notification settings - Fork 716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eagerly update aggregate statistics for TaskPrefix
instead of calculating them on-demand
#8681
Eagerly update aggregate statistics for TaskPrefix
instead of calculating them on-demand
#8681
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 29 files ± 0 29 suites ±0 11h 3m 47s ⏱️ + 1h 19m 38s For more details on these failures, see this check. Results for commit c84d42d. ± Comparison against base commit 5708bdf. This pull request removes 14 and adds 9 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
like ``{"memory": 10, "processing": 3, "released": 4, ...}`` | ||
""" | ||
return merge_with(sum, [tg.states for tg in self.groups]) | ||
@_deprecated(use_instead="groups") # type: ignore[misc] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that we remove groups from prefixes once all their tasks have been forgotten in
distributed/distributed/scheduler.py
Lines 2031 to 2035 in 5708bdf
if ts.state == "forgotten" and tg.name in self.task_groups: | |
# Remove TaskGroup if all tasks are in the forgotten state | |
if all(v == 0 or k == "forgotten" for k, v in tg.states.items()): | |
ts.prefix.groups.remove(tg) | |
del self.task_groups[tg.name] |
means that the additional filtering applied to active in
distributed/distributed/scheduler.py
Lines 995 to 1001 in 5708bdf
@property | |
def active(self) -> list[TaskGroup]: | |
return [ | |
tg | |
for tg in self.groups | |
if any(k != "forgotten" and v != 0 for k, v in tg.states.items()) | |
] |
distributed/scheduler.py
Outdated
class TaskPrefix(TaskCollection): | ||
"""Collection tracking all tasks within a prefix | ||
|
||
# FIXME: This comment belongs to the TaskGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: I need to adjust this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM modulo minor nits
distributed/scheduler.py
Outdated
group.add(self) | ||
group.prefix.add(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intuitively, I would expect group.add(self)
to be sufficient and not require the caller to also do group.prefix.add(self)
.
Why can't TaskGroup.add
be extended to call TaskPrefix.add
?
Same question for transition
below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, this is now aligned.
distributed/scheduler.py
Outdated
self._groups.pop(tg) | ||
for state, count in tg.states.items(): | ||
self.states[state] -= count | ||
self.duration -= tg.duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we've had this problem before but I strongly suspect we'll have to deal with floating point arithmetic precision here.
Historically, we've occasionally encountered negative occupancy issues and maybe this goes back to this diff.
To avoid this issue entirely we may want to track duration as an integer with ms
accuracy to avoid this problem. This would require a bit of refactoring and is likely out of scope for this PR.
A quick fix would be to add self.duration = max(self.duration, 0)
below this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the duration tracking is only loosely-coupled with other durations in the system, so internally tracking this in ms-resolution and exposing the seconds-variant should be pretty quick to do without far-reaching consequences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2306ae7 stores the duration in microseconds internally. (I think that's the smallest precision we get, milliseconds are too coarse-grained and cause test failures.)
distributed/scheduler.py
Outdated
self.group.transition(self._state, value) | ||
self.prefix.transition(self._state, value) | ||
self._state = value | ||
self.prefix.state_counts[value] += 1 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something obvious: I think this is the only truly performance critical section since this is effectively called on every task state transition. The duration and nbytes update is only called whenever a task completes so at least five times more rarely. Since this essentially performs the same operations (plus a couple of additional function indirections/calls) this should perform similarly to the old version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly, the nbytes and duration updates should now be twice as expensive but that's a constant. Given that we iterated over everything and performed this computation every 100ms before I assume this is a net positive
if ts.state == "forgotten" and tg.name in self.task_groups: | ||
# Remove TaskGroup if all tasks are in the forgotten state | ||
if all(v == 0 or k == "forgotten" for k, v in tg.states.items()): | ||
ts.prefix.groups.remove(tg) | ||
ts.prefix.remove_group(tg) | ||
del self.task_groups[tg.name] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this cannot be encapsulated in the TaskState.transition
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I guess not entirely. The del self.task_groups[tg.name]
would still have to be done here. never mind then
1114b9e
to
2306ae7
Compare
Closes #8680
pre-commit run --all-files