Skip to content

Adding futures into a Queue disables resources argument in submit function. #8860

@OvidiuMircescu

Description

@OvidiuMircescu

This run script creates 8 workers and each worker has a different resource (work0, work1, ...).

#!/bin/bash
config_scheduler=scheduler.json

dask-scheduler --scheduler-file $config_scheduler > scheduler.log 2>&1 &
for i in {0..7}
do
dask-worker --scheduler-file $config_scheduler --nworkers 1 --nthreads 1 --resources "work$i=1" > worker_$i.log 2>&1 &
done

python3 test_script.py $config_scheduler
wait

The test script submits 3*8 tasks using resources argument to choose the worker.

import dask.distributed
import os
import time

def f(x):
  dask.distributed.print(x, "run on", os.getpid())

def main(scheduler_file):
  client = dask.distributed.Client(scheduler_file=scheduler_file)
  try:
    for it in range(3):
      for wrk in range(8):
        resource = "work" + str(wrk)
        arg = resource + ":" + str(it)
        result = client.submit(f, arg, resources={resource:1})
        dask.distributed.fire_and_forget(result)
        #queue = dask.distributed.Queue("started_tasks")
        #queue.put(result)
  finally:
    time.sleep(2)
    client.shutdown()

if __name__ == '__main__':
  import argparse
  parser = argparse.ArgumentParser(description="Test dask")
  parser.add_argument("scheduler_file", help="File generated by dask-scheduler")
  
  args = parser.parse_args()
  main(args.scheduler_file)

We expect to have the same pid printed when the resource is the same and it is what we get as long as the 2 lines where the Queue object is used are commented.

This is an example of the output, when the Queue object is not used :

work2:2 run on 124329
work2:0 run on 124329
work2:1 run on 124329
work6:2 run on 124349
work6:0 run on 124349
work6:1 run on 124349
work4:2 run on 124359
work4:0 run on 124359
work4:1 run on 124359
work0:2 run on 124345
work0:0 run on 124345
work0:1 run on 124345
work5:2 run on 124338
work3:2 run on 124354
work7:2 run on 124364
work5:0 run on 124338
work5:1 run on 124338
work3:0 run on 124354
work3:1 run on 124354
work7:0 run on 124364
work7:1 run on 124364
work1:2 run on 124340
work1:0 run on 124340
work1:1 run on 124340

work2 is always on 124329, work6 always on 124349 and so on. It works fine!

But if we uncomment the 2 lines and we use the Queue object, the resources argument seems ignored.

An example of output when the Queue object is used:

work0:0 run on 126130
work2:0 run on 126135
work1:0 run on 126130
work3:0 run on 126135
work4:0 run on 126130
work6:0 run on 126135
work5:0 run on 126130
work0:1 run on 126146
work7:0 run on 126135
work1:1 run on 126146
work2:1 run on 126130
work4:1 run on 126162
work6:1 run on 126135
work3:1 run on 126130
work0:2 run on 126146
work5:1 run on 126162
work7:1 run on 126135
work2:2 run on 126130
work1:2 run on 126146
work4:2 run on 126162
work6:2 run on 126135
work3:2 run on 126130
work5:2 run on 126162
work7:2 run on 126135

work0 is done on 126130, then on 126146 and the last on 126146 again, but 126146 is also used for work1 twice.

Environment:

I have the issue with distributed 2024.8.0 on debian 11, but it works fine with a much older version (2022.2.0 on debian 10).

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions