In [1]:
from distributed import Client
import dask.array as da
import os
import dask
from dask_jobqueue import LSFCluster
import pprint

# ensure that the lsf launch script works properly
dask.config.set({"jobqueue.lsf.use-stdin": True})
# configure worker behavior as per matt rocklin's suggestion 
dask.config.set({"distributed.worker.memory.target" : False,
                 "distributed.worker.memory.spill": False, 
                 "distributed.worker.memory.pause" : 0.70 ,
                 "distributed.worker.memory.terminate" : 0.95})

pprint.pprint(dask.config.get('distributed'))

def get_jobqueue_cluster(
    walltime="1:00",
    ncpus=1,
    cores=1,
    local_directory=None,
    memory="16GB",
    env_extra='single-threaded',
    **kwargs
):
    """
    Instantiate a dask_jobqueue cluster using the LSF scheduler on the Janelia Research Campus compute cluster.
    This function wraps the class dask_jobqueue.LSFCLuster and instantiates this class with some sensible defaults.
    Extra kwargs added to this function will be passed to LSFCluster().
    The full API for the LSFCluster object can be found here:
    https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.LSFCluster.html#dask_jobqueue.LSFCluster
    """

    if env_extra == 'single-threaded':
        env_extra = [
            "export NUM_MKL_THREADS=1",
            "export OPENBLAS_NUM_THREADS=1",
            "export OPENMP_NUM_THREADS=1",
            "export OMP_NUM_THREADS=1",
        ]

    if local_directory is None:
        local_directory = "/scratch/" + os.environ["USER"] + "/"

    cluster = LSFCluster(
        queue="normal",
        walltime=walltime,
        cores=cores,
        ncpus=ncpus,
        local_directory=local_directory,
        memory=memory,
        env_extra=env_extra,
        job_extra=["-o /dev/null"],
        **kwargs
    )
    return cluster

# create static and adaptive cluster

client_static, client_adaptive = Client(get_jobqueue_cluster()), Client(get_jobqueue_cluster())
floor = 10
client_static.cluster.scale(floor)
client_adaptive.cluster.adapt(minimum_jobs=floor)

print(f'static: {client_static.cluster.dashboard_link}\nadaptive {client_adaptive.cluster.dashboard_link}') 

shape = (11087, 2500, 10000) # based on my actual data
dtype = 'uint16' 
chunks_easy = (40, 1024, 1024)
chunks_hard = (1, 1024, 1024) 
dummy_easy = da.zeros(shape=shape, chunks=chunks_easy, dtype=dtype)
dummy_hard = da.zeros(shape=shape, chunks=chunks_hard, dtype=dtype)

{'admin': {'log-format': '%(name)s - %(levelname)s - %(message)s',
           'log-length': 10000,
           'max-error-length': 10000,
           'pdb-on-err': False,
           'tick': {'interval': '20ms', 'limit': 3000}},
 'client': {'heartbeat': '5s'},
 'comm': {'compression': 'auto',
          'default-scheme': 'tcp',
          'offload': '10MiB',
          'recent-messages-log-length': 0,
          'require-encryption': False,
          'retry': {'count': 0, 'delay': {'max': '20s', 'min': '1s'}},
          'socket-backlog': 2048,
          'timeouts': {'connect': '5s', 'tcp': '30s'},
          'tls': {'ca-file': None,
                  'ciphers': None,
                  'client': {'cert': None, 'key': None},
                  'scheduler': {'cert': None, 'key': None},
                  'worker': {'cert': None, 'key': None}},
          'zstd': {'level': 3, 'threads': 0}},
 'dashboard': {'export-tool': False, 'link': 'http://{host}:{port}/status'},
 'deploy': {'lost-worker-timeout'

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


static: http://10.36.111.12:8787/status
adaptive http://10.36.111.12:46139/status


In [2]:
sample_easy = dummy_easy
# even the static cluster chokes if I use the entire dataset with tiny chunks, so for the hard sample
# I only take the first 1000 elements of the data
sample_hard = dummy_hard[:1000]

In [9]:
# Static cluster

In [4]:
%%time
result = client_static.compute([sample_easy.min(), sample_easy.max()], sync=True)

CPU times: user 58.4 s, sys: 1.92 s, total: 1min
Wall time: 1min 15s


In [5]:
%%time
result = client_static.compute([sample_hard.min(), sample_hard.max()], sync=True)

CPU times: user 1min 18s, sys: 2.77 s, total: 1min 21s
Wall time: 1min 21s


In [6]:
# Adaptive cluster

In [7]:
%%time
result = client_adaptive.compute([sample_easy.min(0), sample_easy.max(0)], sync=True)

CPU times: user 38.1 s, sys: 2.17 s, total: 40.3 s
Wall time: 42.9 s


In [8]:
%%time 
# dashboard is unresponsive, a lot of worker-errors are reported, and this never completes.
# killing execution with a keyboard interrupt doesn't stop zombie tasks from running on workers.
result = client_adaptive.compute([sample_hard.min(0), sample_hard.max(0)], sync=True)

KilledWorker: ("('zeros-getitem-8c2e645ab441159d64b8ad576aa516f2', 245, 0, 9)", <Worker 'tcp://10.36.111.34:39592', memory: 0, processing: 95>)