Skip to content

Local distributed scheduler way slower than local processes scheduler #2327

@aberres

Description

@aberres

I am right now investigating the overhead of dask to get a feeling if dask might make sense for my workload (medium-sized data frames with short operations).

I came up with an (dumb) example where scheduler='processes' outperformed scheduler='single-threaded' as expected.
I am constructing a pipeline where I do nothing more but routing a dataframe with minimal manipulations. As the pipeline has no dependencies I would expect no worker-worker communication. All data could be available locally on the worker
The larger the dataframes are, the slower the process scheduler is though - as expected.

As next step I was trying the very same example with the local distributed scheduler. After all the documentation tells me

the distributed scheduler described a couple sections below is often a better choice today.

The results were not very pleasing. Compared to the process scheduler the distributed scheduler is up to 10x slower.
Is this expected and the distributed scheduler does simply not make sense in this case?
Am I doing something wrong?
Without knowing the internals of the scheduler I did expect it to perform relatively well (and comparable to the process scheduler) in the given case as data (in my understanding) never needs to leave a worker.

import dask
import time
import pandas as pd
import numpy as np
from distributed import Client

def process1(data):
    return data

def process2(inp):
    time.sleep(0.001)
    return inp + 1

def process3(inp):
    return inp

tasks = []
for i in range(20):
    data = pd.DataFrame(np.zeros((1000, 100)))
    processed = dask.delayed(process1)(data)
    for _ in range(100):
        processed = dask.delayed(process2)(processed)
    
    processed2 = dask.delayed(process3)(processed)
    
    tasks.append(processed2)

%time r0 = dask.compute(tasks, scheduler='processes')
# Wall time: 1.67 s

%time r1 = dask.compute(tasks, scheduler='single-threaded')
# Wall time: 4.57 s

client = Client()
%time r2 = dask.compute(tasks)
# Wall time: 14.4 s

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions