This notebook is based on the benchmark by Matthew Rocklin (https://matthewrocklin.com/blog/work/2017/07/03/scaling), but expanded to also look at the difference between cores and threads.

In [1]:
from dask.distributed import Client, LocalCluster, wait
import dask.dataframe as dd

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import math

import time

# Test functions

# Operations

In [2]:
def run(func, client):

    client = client or default_client()
    client.restart()
    
    workers = len(client.ncores())
    threads = list(client.ncores().values())[0]
    n = sum(client.ncores().values())
    print(f"Running with {workers} workers, {threads} threads and {n} cores")
    
    coroutine = func(n)

    name, unit, numerator = next(coroutine)
    out = []
    while True:
        start = time.time()
        try:
            next_name, next_unit, next_numerator = next(coroutine)
        except StopIteration:
            break
        finally:
            end = time.time()
            record = {'name': name, 
                      'duration': end - start, 
                      'unit': unit + '/s', 
                      'rate': numerator / (end - start), 
                      'n': n,
                      'workers': workers,
                      'threads': threads,
                      'collection': func.__name__}
            out.append(record)
        name = next_name
        unit = next_unit
        numerator = next_numerator
    return pd.DataFrame(out)

# Basic

In [3]:
import operator
import time

def slowinc(x, delay=0.1):
    time.sleep(delay)
    return x + 1

def slowadd(x, y, delay=0.1):
    time.sleep(delay)
    return x + y

def slowsum(L, delay=0.1):
    time.sleep(delay)
    return sum(L)

def inc(x):
    return x + 1


def tasks(n):
    yield 'task map fast tasks', 'tasks', n * 200
    
    futures = client.map(inc, range(n * 200))
    wait(futures)
    
    yield 'task map 100ms tasks', 'tasks', n * 100

    futures = client.map(slowinc, range(100 * n))
    wait(futures)
        
    yield 'task map 1s tasks', 'tasks', n * 4

    futures = client.map(slowinc, range(4 * n), delay=1)
    wait(futures)

    yield 'tree reduction fast tasks', 'tasks', 2**7 * n
    
    from dask import delayed

    L = range(2**7 * n)
    while len(L) > 1:
        L = list(map(delayed(operator.add), L[0::2], L[1::2]))

    L[0].compute()
    
    yield 'tree reduction 100ms tasks', 'tasks', 2**6 * n * 2
    
    from dask import delayed

    L = range(2**6 * n)
    while len(L) > 1:
        L = list(map(delayed(slowadd), L[0::2], L[1::2]))

    L[0].compute()
    
    yield 'sequential', 'tasks', 100

    x = 1

    for i in range(100):
        x = delayed(inc)(x)
        
    x.compute()
    
    yield 'dynamic tree reduction fast tasks', 'tasks', 100 * n
    
    from dask.distributed import as_completed
    futures = client.map(inc, range(n * 100))
    
    pool = as_completed(futures)
    batches = pool.batches()
    
    while True:
        try:
            batch = next(batches)
            if len(batch) == 1:
                batch += next(batches)
        except StopIteration:
            break
        future = client.submit(sum, batch)
        pool.add(future)
        
    yield 'dynamic tree reduction 100ms tasks', 'tasks', 100 * n
    
    from dask.distributed import as_completed
    futures = client.map(slowinc, range(n * 20))
    
    pool = as_completed(futures)
    batches = pool.batches()
    
    while True:
        try:
            batch = next(batches)
            if len(batch) == 1:
                batch += next(batches)
        except StopIteration:
            break
        future = client.submit(slowsum, batch)
        pool.add(future)

        
    yield 'nearest neighbor fast tasks', 'tasks', 100 * n * 2
    
    L = range(100 * n)
    L = client.map(operator.add, L[:-1], L[1:])
    L = client.map(operator.add, L[:-1], L[1:])
    wait(L)
    
    yield 'nearest neighbor 100ms tasks', 'tasks', 20 * n * 2
    
    L = range(20 * n)
    L = client.map(slowadd, L[:-1], L[1:])
    L = client.map(slowadd, L[:-1], L[1:])
    wait(L)

In [4]:
def arrays(n):
    import dask.array as da
    N = int(5000 * math.sqrt(n))
    x = da.random.randint(0, 10000, size=(N, N), chunks=(2000, 2000))
    
    yield 'create random', 'MB', x.nbytes / 1e6
    
    x = x.persist()
    wait(x)
    
    yield 'blockwise 100ms tasks', 'MB', x.nbytes / 1e6
    
    y = x.map_blocks(slowinc, dtype=x.dtype).persist()
    wait(y)
    
    yield 'random access', 'bytes', 8
    
    x[1234, 4567].compute()
   
    yield 'reduction', 'MB', x.nbytes / 1e6
    
    x.std().compute()
    
    yield 'reduction along axis', 'MB', x.nbytes / 1e6
    
    x.std(axis=0).compute()
    
    yield 'elementwise computation', 'MB', x.nbytes / 1e6
    
    y = da.sin(x) ** 2 + da.cos(x) ** 2
    y = y.persist()
    wait(y)    
    
    yield 'rechunk small', 'MB', x.nbytes / 1e6
    
    y = x.rechunk((20000, 200)).persist()
    wait(y)
    
    yield 'rechunk large', 'MB', x.nbytes / 1e6
    
    y = y.rechunk((200, 20000)).persist()
    wait(y)
    
    yield 'transpose addition', 'MB', x.nbytes / 1e6
    y = x + x.T
    y = y.persist()
    wait(y)
    
    yield 'nearest neighbor fast tasks', 'MB', x.nbytes / 1e6
    
    y = x.map_overlap(inc, depth=1).persist()
    wait(y)   
        
    yield 'nearest neighbor 100ms tasks', 'MB', x.nbytes / 1e6
    
    y = x.map_overlap(slowinc, depth=1, delay=0.1).persist()
    wait(y)

In [5]:
def dataframes(n):
    import dask.array as da
    import dask.dataframe as dd
    N = 2000000 * n
    
    x = da.random.randint(0, 10000, size=(N, 10), chunks=(1000000, 10))

    
    yield 'create random', 'MB', x.nbytes / 1e6
    
    df = dd.from_dask_array(x).persist()
    wait(df)
    
    yield 'blockwise 100ms tasks', 'MB', x.nbytes / 1e6
    
    wait(df.map_partitions(slowinc, meta=df).persist())
    
    yield 'arithmetic', 'MB', x.nbytes / 1e6
    
    y = (df[0] + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10).persist()
    wait(y)
    
    yield 'random access', 'bytes', 8
    
    df.loc[123456].compute()
    
    yield 'dataframe reduction', 'MB', x.nbytes / 1e6
    
    df.std().compute()
    
    yield 'series reduction', 'MB', x.nbytes / 1e6 / 10
    
    df[3].std().compute()
    
    yield 'groupby reduction', 'MB', x.nbytes / 1e6
    
    df.groupby(0)[1].mean().compute()
    
    yield 'groupby apply (full shuffle)', 'MB', x.nbytes / 1e6
    
    df.groupby(0).apply(len).compute()
    
    yield 'set index (full shuffle)', 'MB', x.nbytes / 1e6
    
    wait(df.set_index(1).persist())
    
    yield 'rolling aggregations', 'MB', x.nbytes / 1e6
    
    wait(df.rolling(5).mean().persist())

# Client

In [45]:
%%time

cores_list = [1, 2, 6, 4, 12, 24, 48]
thread_list = [1, 2, 4, 6, 12, 24, 48, 96]

L = []
for cores in cores_list:
    for threads in cores_list:

        if threads > cores:
            break

        if (cores / threads).is_integer():
            workers = int(cores / threads)
        else:
            break

        cluster = LocalCluster(
            n_workers=workers,
            threads_per_worker=threads,
        )

        print(f"Started cluster with {workers} workers with {threads} threads")
        print(f"Dashboard: {cluster.dashboard_link}")
        client = Client(cluster)

        for i in range(3):
            for func in [tasks, arrays, dataframes]:
                print(f"Iteration {i+1}: {func.__name__}")
                df = run(func, client=client)
                L.append(df)
        print('Computation complete! Stopping workers...')
        client.close()

1
2
1
6
3
1
4
2
12
6
2
3
1
24
12
4
6
2
1
48
24
8
12
4
2
1
96
48
16
24
8
4
2
1
Wall time: 1e+03 µs


In [30]:
ddf = pd.concat(L)

ValueError: No objects to concatenate

In [8]:
df = ddf.groupby(['collection', 'name', 'n', 'workers', 'threads', 'unit']).median()
df

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,Unnamed: 5_level_0,duration,rate
collection,name,n,workers,threads,unit,Unnamed: 6_level_1,Unnamed: 7_level_1
arrays,blockwise 100ms tasks,12,1,12,MB/s,1.044292,1149.035981
arrays,blockwise 100ms tasks,12,2,6,MB/s,0.994721,1206.297434
arrays,blockwise 100ms tasks,12,3,4,MB/s,1.013629,1183.795696
arrays,blockwise 100ms tasks,12,6,2,MB/s,0.978687,1226.060869
arrays,blockwise 100ms tasks,12,12,1,MB/s,0.978109,1226.785002
...,...,...,...,...,...,...,...
tasks,tree reduction fast tasks,12,1,12,tasks/s,0.601722,2552.671795
tasks,tree reduction fast tasks,12,2,6,tasks/s,0.539680,2846.132276
tasks,tree reduction fast tasks,12,3,4,tasks/s,0.510999,3005.876934
tasks,tree reduction fast tasks,12,6,2,tasks/s,0.552835,2778.403519


In [22]:
df.to_csv('scaling-data-cores.csv')

In [10]:
from bokeh.plotting import figure, show, output_notebook
from bokeh.layouts import Row, Column, gridplot
output_notebook()

In [11]:
def comparison_plot(part, axis_type='linear'):
    row = part.iloc[0]
    title = (row['collection'] + ': ' + row['name']).title().replace('0M', '0m').replace('1S', '1s')

    fig = figure(title=title, sizing_mode='scale_width', x_axis_type=axis_type, y_axis_type=axis_type)
    fig.line(x=part.threads, y=part.rate)
    fig.circle(x=part.threads, y=part.rate)
    fig.xaxis.axis_label = 'threads'
    fig.yaxis.axis_label = row['unit']
    fig.x_range.start = 0
    fig.y_range.start = 0

    # Add in perfect scaling line
#     y_end = fig.y_range.end
#     mn = part.n.min()
#     mx = part.n.max()
#     slope = part[part.n == mn].iloc[0]['rate'] / mn
#     fig.line(x=[0, mx], y=[0, slope * mx], color='gray', line_dash='dashed')
#     fig.y_range.end = part.rate.max()
    
    fig.xaxis.ticker = part.threads

    return fig

In [12]:
def scaling_plot(part, axis_type='linear'):
    row = part.iloc[0]
    title = (row['collection'] + ': ' + row['name']).title().replace('0M', '0m').replace('1S', '1s')

    fig = figure(title=title, sizing_mode='scale_width', x_axis_type=axis_type, y_axis_type=axis_type)
    fig.line(x=part.n, y=part.rate)
    fig.circle(x=part.n, y=part.rate)
    fig.xaxis.axis_label = 'cores'
    fig.yaxis.axis_label = row['unit']
    fig.x_range.start = 0
    fig.y_range.start = 0

    # Add in perfect scaling line
    y_end = fig.y_range.end
    mn = part.n.min()
    mx = part.n.max()
    slope = part[part.n == mn].iloc[0]['rate'] / mn
    fig.line(x=[0, mx], y=[0, slope * mx], color='gray', line_dash='dashed')
    fig.y_range.end = part.rate.max()
    
    fig.xaxis.ticker = part.n

    return fig

In [13]:
df = pd.read_csv('scaling-data.csv')

In [14]:
df.head()

Unnamed: 0,collection,name,n,workers,threads,unit,duration,rate
0,arrays,blockwise 100ms tasks,12,1,12,MB/s,1.044292,1149.035981
1,arrays,blockwise 100ms tasks,12,2,6,MB/s,0.994721,1206.297434
2,arrays,blockwise 100ms tasks,12,3,4,MB/s,1.013629,1183.795696
3,arrays,blockwise 100ms tasks,12,6,2,MB/s,0.978687,1226.060869
4,arrays,blockwise 100ms tasks,12,12,1,MB/s,0.978109,1226.785002


In [15]:
df2 = df.groupby(['collection', 'name'])['collection', 'name', 'n', 'rate', 'unit', 'threads'].apply(comparison_plot)
df2

collection  name                              
arrays      blockwise 100ms tasks                 Figure(id='37160', ...)
            create random                         Figure(id='37206', ...)
            elementwise computation               Figure(id='37252', ...)
            nearest neighbor 100ms tasks          Figure(id='37298', ...)
            nearest neighbor fast tasks           Figure(id='37344', ...)
            random access                         Figure(id='37390', ...)
            rechunk large                         Figure(id='37436', ...)
            rechunk small                         Figure(id='37482', ...)
            reduction                             Figure(id='37528', ...)
            reduction along axis                  Figure(id='37574', ...)
            transpose addition                    Figure(id='37620', ...)
dataframes  arithmetic                            Figure(id='37666', ...)
            blockwise 100ms tasks                 Figure(id='3771

In [16]:
names = ['task map 1s tasks', 'task map 100ms tasks', 'task map fast tasks',
         'tree reduction 100ms tasks', 'tree reduction fast tasks', 'sequential',
         'nearest neighbor 100ms tasks', 'nearest neighbor fast tasks', 
         'dynamic tree reduction 100ms tasks', 'dynamic tree reduction fast tasks']
from toolz import partition_all
L = df2.loc['tasks'].loc[names].values.tolist()
grid = list(partition_all(3, L))
# show(Column(*[Row(*g, sizing_mode='scale_width') for g in grid], sizing_mode='scale_width'))
show(gridplot(grid, sizing_mode='scale_width'))

In [17]:
names = ['create random', 'blockwise 100ms tasks', 'elementwise computation', 'reduction', 
         'reduction along axis', 'random access', 'transpose addition', 'rechunk large', 
         'nearest neighbor fast tasks', 'nearest neighbor 100ms tasks']
from toolz import partition_all
L = df2.loc['arrays'].loc[names].values.tolist()
grid = list(partition_all(3, L))
# show(Column(*[Row(*g, sizing_mode='scale_width') for g in grid], sizing_mode='scale_width'))
show(gridplot(grid, sizing_mode='scale_width'))

In [21]:
names = ['task map 1s tasks', 'task map 100ms tasks', 'task map fast tasks',
         'tree reduction 100ms tasks', 'tree reduction fast tasks', 'sequential',
         'nearest neighbor 100ms tasks', 'nearest neighbor fast tasks', 
         'dynamic tree reduction 100ms tasks', 'dynamic tree reduction fast tasks']
L = df2.loc['dataframes'].values.tolist()
grid = list(partition_all(3, L))
# show(Column(*[Row(*g, sizing_mode='scale_width') for g in grid], sizing_mode='scale_width'))
show(gridplot(grid, sizing_mode='scale_width'))

In [None]:
# Clipboard

In [None]:
def load_df(file, method):
    if method == 'dask':
        t = %timeit -o -n 1 -r 2 dd.read_csv(file + ".csv")
        return t.timings
    elif method == 'dask_compute':
        t = %timeit -o -n 1 -r 2 dd.read_csv(file + ".csv").compute()
        return t.timings
    elif method == 'pandas':
        t = %timeit -o -n 1 -r 2 pd.read_csv(file + ".csv")
        return t.timings
    else:
        return 'Choose an available method'

methods = ['dask', 'dask_compute', 'pandas']
datasets = ['random', 'random_0.5', 'random_0.1']

# benchmark
result = {method: [load_df(dataset,method) for dataset in datasets] for method in methods}

# Make dataframe
df = pd.DataFrame.from_dict(result, orient='index', columns=datasets).reset_index()
df.columns = ['method', 'random', 'random_0.5', 'random_0.1']

# Make separate columns into one row
df = df.melt(id_vars='method', var_name='dataset', value_vars=datasets, value_name='average')

# Explode every measurement into separate rows
df = df.explode('average')

# Plot that shit
sns.catplot(data=df, x='dataset', y='average', hue='method', kind='bar')