Skip to content

Commit

Permalink
Don't cast int metrics to float
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Nov 18, 2023
1 parent a923909 commit 1c4df7a
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 8 deletions.
11 changes: 6 additions & 5 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,11 +478,12 @@ def stop() -> bool:
except ImportError:
self.digests = None

# In case crick is not installed, also log cumulative totals (reset at server
# restart) and local maximums (reset by prometheus poll)
self.digests_total = defaultdict(float)
self.digests_total_since_heartbeat = defaultdict(float)
self.digests_max = defaultdict(float)
# Also log cumulative totals (reset at server restart)
# and local maximums (reset by prometheus poll)
# Don't cast int metrics to float
self.digests_total = defaultdict(int)
self.digests_total_since_heartbeat = defaultdict(int)
self.digests_max = defaultdict(int)

Check warning on line 486 in distributed/core.py

View check run for this annotation

Codecov / codecov/patch

distributed/core.py#L484-L486

Added lines #L484 - L486 were not covered by tests

self.counters = defaultdict(Counter)
pc = PeriodicCallback(self._shift_counters, 5000)
Expand Down
3 changes: 2 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3576,7 +3576,8 @@ def __init__(
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)

self.cumulative_worker_metrics = defaultdict(float)
# Don't cast int metrics to float
self.cumulative_worker_metrics = defaultdict(int)

Check warning on line 3580 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L3580

Added line #L3580 was not covered by tests

if not preload:
preload = dask.config.get("distributed.scheduler.preload")
Expand Down
5 changes: 4 additions & 1 deletion distributed/spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ def __init__(
self.children = []
self.groups = set()
self._code = {}
self._cumulative_worker_metrics = defaultdict(float)

# Don't cast int metrics to float
self._cumulative_worker_metrics = defaultdict(int)

Check warning on line 145 in distributed/spans.py

View check run for this annotation

Codecov / codecov/patch

distributed/spans.py#L145

Added line #L145 was not covered by tests

assert len(total_nthreads_history) > 0
self._total_nthreads_history = total_nthreads_history
self._total_nthreads_offset = len(total_nthreads_history) - 1
Expand Down
22 changes: 22 additions & 0 deletions distributed/tests/test_worker_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,3 +622,25 @@ def f(_):

out = await c.gather(c.map(f, range(1000)))
assert max(out) < 10


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_int_metrics(c, s, a):
"""Test that int metrics are not cast to float"""

def f():
context_meter.digest_metric("foo", 1, "u")

await c.submit(f, key="x")
await a.heartbeat()

span = s.extensions["spans"].spans_search_by_name["default",][0]

def assert_int(d, k):
assert d[k] == 1
assert isinstance(d[k], int)

assert_int(a.digests_total, ("execute", span.id, "x", "foo", "u"))
assert_int(a.digests_max, ("execute", span.id, "x", "foo", "u"))
assert_int(s.cumulative_worker_metrics, ("execute", "x", "foo", "u"))
assert_int(span.cumulative_worker_metrics, ("execute", "x", "foo", "u"))
3 changes: 2 additions & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,8 @@ async def get_metrics(self) -> dict:
spans_ext.collect_digests()

# Send metrics with squashed span_id
digests: defaultdict[Hashable, float] = defaultdict(float)
# Don't cast int metrics to float
digests: defaultdict[Hashable, float] = defaultdict(int)

Check warning on line 1046 in distributed/worker.py

View check run for this annotation

Codecov / codecov/patch

distributed/worker.py#L1046

Added line #L1046 was not covered by tests
for k, v in self.digests_total_since_heartbeat.items():
if isinstance(k, tuple) and k[0] == "execute":
k = k[:1] + k[2:]
Expand Down

0 comments on commit 1c4df7a

Please sign in to comment.