From d0a93e950d70680c41959c517a68ee6fc0ce2138 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 8 Feb 2022 09:43:10 +0000 Subject: [PATCH] Fix flaky test_robust_to_bad_sizeof_estimates (#5753) --- distributed/tests/test_worker.py | 34 +++++++++++++++++++------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 8155492e92..6ad29293cc 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1180,33 +1180,39 @@ async def test_statistical_profiling_2(c, s, a, b): @gen_cluster( - nthreads=[("127.0.0.1", 1)], + nthreads=[("", 1)], client=True, - worker_kwargs={"memory_monitor_interval": 10}, + config={ + "distributed.worker.memory.target": False, + "distributed.worker.memory.spill": 0.7, + }, + worker_kwargs={"memory_monitor_interval": "10ms"}, ) async def test_robust_to_bad_sizeof_estimates(c, s, a): - np = pytest.importorskip("numpy") - memory = psutil.Process().memory_info().rss + """Test that the spill threshold uses the process memory and not the managed memory + reported by sizeof(), which may be inaccurate + """ + memory = s.workers[a.address].memory.process + # Reach 'spill' threshold after 400MB of managed data a.memory_limit = memory / 0.7 + 400e6 class BadAccounting: - def __init__(self, data): - self.data = data + """100 MB process memory, 10 bytes reported managed memory""" + + def __init__(self, *args): + self.data = "x" * int(100e6) def __sizeof__(self): return 10 - def f(n): - x = np.ones(int(n), dtype="u1") - result = BadAccounting(x) - return result + def __reduce__(self): + """Speed up test by writing very little to disk when spilling""" + return BadAccounting, () - futures = c.map(f, [100e6] * 8, pure=False) + futures = c.map(BadAccounting, range(8)) - start = time() while not a.data.disk: - await asyncio.sleep(0.1) - assert time() < start + 5 + await asyncio.sleep(0.01) @pytest.mark.slow