Skip to content

Heterogeneous Computing Design  #5201

@madsbk

Description

@madsbk

I would like to discuss how we can make it easier to utilize a cluster of mixed architectures -- focusing on mixing GPU and CPU tasks/workers.

Background

Currently, a common setup is to have one Worker per GPU and nothing else. This is how a typical Dask-CUDA cluster looks like and is a simple setup that works reasonably well. It makes Distributed handle memory transfer between GPUs seamlessly and GPU tasks are not accidentally scheduled on machines without GPUs.

However, it also means that CPU-tasks are scheduled on GPU-workers and potential CPU-cores on the machines are unutilized. E.g., a DGX-1 has 8 GPUs and 80 CPU-cores thus most of the time the 80 CPU-cores are idling.

Distributed supports resource management, which makes it possible to restrict tasks to specific types of Workers. However, it requires manually annotating operations and is very hard to do for individual operations efficiently. It is easy enough to annotate high-level operations like Dask collectives but if you want to annotate each chunk individually it is hard. Additionally, it is not possible to annotate tasks based on task outputs dynamically.

Currently mixing different datatypes in Dask collectives are not possible. E.g., if you have a Dask dataframe each chunk can be a cuDF or a Pandas dataframe but not a mix of the two. This limits the use of heterogeneous computing since it forces the user to decide between either a pure GPU or a pure CPU dataframe computations.

Goals

  • Utilize both available CPUs and GPUs.
  • Avoid CPU-tasks taking up GPU slots if CPU slots are available on the Worker.
  • Avoid scheduling GPU-tasks on Workers without GPUs.
  • Make it possible for systems like rebalancing and Active Memory Management Control to reason about GPU tasks and memory.
  • Mixing cuDF and Pandas in a Dask dataframe or mixing Numpy and CuPy in a Dask array.
  • Enable computation on spilled data by converting GPU objects to CPU objects instead of spilling. E.g., spilling of a cuDF dataframe converts it to a Pandas dataframe.

I can think of three projects that could achieve the goals.

Dynamic Annotations and Restrictions

Make it possible for a Worker to update the annotation and restriction of a task based on task output. The user specifies a function that the worker calls after executing a task. The function returns a dict of annotations and restrictions (if not None), which the Worker send back to the Scheduler as part of the "op": "task-finished" message. The Scheduler then updates the task and its dependent tasks with the updated annotations and restrictions.

In order to implement Detect GPU tasks as proposed by @mrocklin and restrict GPU-tasks to GPU-Workers, a function could look something like the following, which makes the task’s dependent use the Worker’s default GPU executor:

def annotate_gpu_tasks(ts: TaskState, value:object):
    from dask_cuda.is_device_object import is_device_object
    annotations = {}
    restrictions = {}
    if is_device_object(value):
        annotations = {"executor": "gpu"}
        restrictions = {"GPU": 1}
    return (annotations, restrictions)

Mixed Typed Collectives

Make it possible to create Dask collectives with mixed typed underlying objects. This works in many cases already! E.g., calling map_partitions() with a function that returns a cuDF or a Pandas dataframe based on the partition_info argument works. The only operation that I have found to fail is concat(), which takes multiple chunks of input. We could implement concat() by first concat the cuDF dataframes, then the Pandas dataframes, and finally convert everything to cuDF dataframes before concatenating the result.

I have experimented with this approach, and it works like a charm, but we properly need a more generic design that makes use of type dispatching and are extendable like all the other backend functions.

Spilling by Conversion

Make it possible for extensions like Dask-CUDA's DeviceHostFile to annotate tasks. This way when DeviceHostFile spills GPU memory by converting to Pandas, it can inform the Worker and the Scheduler that the task and its dependents should be handled as a CPU task. And the other way around when un-spilling.

cc. @dask/gpu

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions