2021-03-22T16:47:54.5827441Z ================================== FAILURES ===================================
2021-03-22T16:47:54.5828133Z ________________________________ test_profile _________________________________
2021-03-22T16:47:54.5833735Z
2021-03-22T16:47:54.5834308Z def test_func():
2021-03-22T16:47:54.5834877Z result = None
2021-03-22T16:47:54.5835825Z workers = []
2021-03-22T16:47:54.5836525Z with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
2021-03-22T16:47:54.5837831Z
2021-03-22T16:47:54.5844201Z async def coro():
2021-03-22T16:47:54.5844957Z with dask.config.set(config):
2021-03-22T16:47:54.5845485Z s = False
2021-03-22T16:47:54.5846816Z for _ in range(60):
2021-03-22T16:47:54.5847460Z try:
2021-03-22T16:47:54.5851799Z s, ws = await start_cluster(
2021-03-22T16:47:54.5853714Z nthreads,
2021-03-22T16:47:54.5854442Z scheduler,
2021-03-22T16:47:54.5854958Z loop,
2021-03-22T16:47:54.5855590Z security=security,
2021-03-22T16:47:54.5863190Z Worker=Worker,
2021-03-22T16:47:54.5864011Z scheduler_kwargs=scheduler_kwargs,
2021-03-22T16:47:54.5864754Z worker_kwargs=worker_kwargs,
2021-03-22T16:47:54.5865280Z )
2021-03-22T16:47:54.5866867Z except Exception as e:
2021-03-22T16:47:54.5874386Z logger.error(
2021-03-22T16:47:55.5832993Z "Failed to start gen_cluster: "
2021-03-22T16:47:55.5835158Z f"{e.__class__.__name__}: {e}; retrying",
2021-03-22T16:47:55.5835680Z exc_info=True,
2021-03-22T16:47:55.5836123Z )
2021-03-22T16:47:55.5840984Z await asyncio.sleep(1)
2021-03-22T16:47:55.5841853Z else:
2021-03-22T16:47:55.5842264Z workers[:] = ws
2021-03-22T16:47:55.5842784Z args = [s] + workers
2021-03-22T16:47:55.5843194Z break
2021-03-22T16:47:55.5848212Z if s is False:
2021-03-22T16:47:55.5848846Z raise Exception("Could not start cluster")
2021-03-22T16:47:55.5849561Z if client:
2021-03-22T16:47:55.5850021Z c = await Client(
2021-03-22T16:47:55.5850447Z s.address,
2021-03-22T16:47:55.5850942Z loop=loop,
2021-03-22T16:47:55.5856191Z security=security,
2021-03-22T16:47:55.5856778Z asynchronous=True,
2021-03-22T16:47:55.5857255Z **client_kwargs,
2021-03-22T16:47:55.5857684Z )
2021-03-22T16:47:55.5858315Z args = [c] + args
2021-03-22T16:47:55.5858725Z try:
2021-03-22T16:47:55.5865141Z future = func(*args)
2021-03-22T16:47:55.5865592Z if timeout:
2021-03-22T16:47:55.5866179Z future = asyncio.wait_for(future, timeout)
2021-03-22T16:47:55.5866742Z result = await future
2021-03-22T16:47:55.5867184Z if s.validate:
2021-03-22T16:47:55.5872017Z s.validate_state()
2021-03-22T16:47:55.5872574Z finally:
2021-03-22T16:47:55.5873160Z if client and c.status not in ("closing", "closed"):
2021-03-22T16:47:55.5873814Z await c._close(fast=s.status == Status.closed)
2021-03-22T16:47:55.5874478Z await end_cluster(s, workers)
2021-03-22T16:47:55.5875266Z await asyncio.wait_for(cleanup_global_workers(), 1)
2021-03-22T16:47:55.5933092Z
2021-03-22T16:47:55.5938460Z try:
2021-03-22T16:47:55.5939159Z c = await default_client()
2021-03-22T16:47:55.5939934Z except ValueError:
2021-03-22T16:47:55.5940461Z pass
2021-03-22T16:47:55.5941024Z else:
2021-03-22T16:47:55.5941462Z await c._close(fast=True)
2021-03-22T16:47:55.5945615Z
2021-03-22T16:47:55.5946264Z def get_unclosed():
2021-03-22T16:47:55.5948639Z return [c for c in Comm._instances if not c.closed()] + [
2021-03-22T16:47:55.5949823Z c
2021-03-22T16:47:55.5950494Z for c in _global_clients.values()
2021-03-22T16:47:55.5957806Z if c.status != "closed"
2021-03-22T16:47:55.5958422Z ]
2021-03-22T16:47:55.5959679Z
2021-03-22T16:47:55.5960685Z try:
2021-03-22T16:47:55.5961519Z start = time()
2021-03-22T16:47:55.5966920Z while time() < start + 60:
2021-03-22T16:47:55.5967722Z gc.collect()
2021-03-22T16:47:55.5969431Z if not get_unclosed():
2021-03-22T16:47:55.5970063Z break
2021-03-22T16:47:55.5970676Z await asyncio.sleep(0.05)
2021-03-22T16:47:55.5971174Z else:
2021-03-22T16:47:55.5977022Z if allow_unclosed:
2021-03-22T16:47:55.5977804Z print(f"Unclosed Comms: {get_unclosed()}")
2021-03-22T16:47:55.5978687Z else:
2021-03-22T16:47:55.5980602Z raise RuntimeError("Unclosed Comms", get_unclosed())
2021-03-22T16:47:55.5982500Z finally:
2021-03-22T16:47:55.5988000Z Comm._instances.clear()
2021-03-22T16:47:55.5988856Z _global_clients.clear()
2021-03-22T16:47:55.5990047Z
2021-03-22T16:47:55.5991438Z return result
2021-03-22T16:47:55.5992032Z
2021-03-22T16:47:55.5992515Z result = loop.run_sync(
2021-03-22T16:47:55.5998259Z > coro, timeout=timeout * 2 if timeout else timeout
2021-03-22T16:47:55.5998856Z )
2021-03-22T16:47:55.6000848Z
2021-03-22T16:47:55.6001397Z distributed\utils_test.py:955:
2021-03-22T16:47:55.6002081Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2021-03-22T16:47:55.6008432Z C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py:576: in run_sync
2021-03-22T16:47:55.6010551Z return future_cell[0].result()
2021-03-22T16:47:55.6011106Z distributed\utils_test.py:913: in coro
2021-03-22T16:47:55.6011654Z result = await future
2021-03-22T16:47:55.6013159Z C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py:442: in wait_for
2021-03-22T16:47:55.6014510Z return fut.result()
2021-03-22T16:47:55.6019088Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2021-03-22T16:47:55.6020311Z
2021-03-22T16:47:55.6020745Z c = <Client: not connected>
2021-03-22T16:47:55.6022371Z s = <Scheduler: "tcp://127.0.0.1:63544" processes: 0 cores: 0>
2021-03-22T16:47:55.6023972Z a = <Worker: 'tcp://127.0.0.1:63545', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
2021-03-22T16:47:55.6024982Z b = <Worker: 'tcp://127.0.0.1:63547', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
2021-03-22T16:47:55.6033987Z
2021-03-22T16:47:55.6035589Z @gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"})
2021-03-22T16:47:55.6036290Z async def test_profile(c, s, a, b):
2021-03-22T16:47:55.6037511Z futures = c.map(slowinc, range(10), delay=0.05, workers=a.address)
2021-03-22T16:47:55.6038112Z await wait(futures)
2021-03-22T16:47:55.6039366Z
2021-03-22T16:47:55.6045403Z x = await c.profile(start=time() + 10, stop=time() + 20)
2021-03-22T16:47:55.6045945Z assert not x["count"]
2021-03-22T16:47:55.6046366Z
2021-03-22T16:47:55.6047022Z x = await c.profile(start=0, stop=time())
2021-03-22T16:47:55.6047546Z > assert (
2021-03-22T16:47:55.6051549Z x["count"]
2021-03-22T16:47:55.6052201Z == sum(p["count"] for _, p in a.profile_history) + a.profile_recent["count"]
2021-03-22T16:47:55.6054808Z )
2021-03-22T16:47:55.6055220Z E assert 31 == 36
2021-03-22T16:47:55.6065478Z E +31
2021-03-22T16:47:55.6065863Z E -36
2021-03-22T16:47:55.6066112Z
2021-03-22T16:47:55.6066652Z distributed\tests\test_client.py:5233: AssertionError
Over in #4615 we observed
test_profilehad an intermittent failureFull traceback: