From 7b01f047315c1e262bea0e97a497429081f51493 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 31 Jul 2015 09:13:45 -0700 Subject: [PATCH 1/4] start callbacks happen before start_state generation --- dask/async.py | 5 +++-- dask/diagnostics/profile.py | 2 +- dask/diagnostics/progress.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/dask/async.py b/dask/async.py index 30234e47d8e3..0d23c6b47da6 100644 --- a/dask/async.py +++ b/dask/async.py @@ -431,11 +431,12 @@ def get_async(apply_async, num_workers, dsk, result, cache=None, result_flat = set([result]) results = set(result_flat) + for f in start_cbs: + f(dsk) + keyorder = order(dsk) state = start_state_from_dask(dsk, cache=cache, sortkey=keyorder.get) - for f in start_cbs: - f(dsk, state) if rerun_exceptions_locally is None: rerun_exceptions_locally = _globals.get('rerun_exceptions_locally', False) diff --git a/dask/diagnostics/profile.py b/dask/diagnostics/profile.py index db4e993d3e81..8196f2d7acd7 100644 --- a/dask/diagnostics/profile.py +++ b/dask/diagnostics/profile.py @@ -45,7 +45,7 @@ def __init__(self): self._results = {} self._dsk = {} - def _start(self, dsk, state): + def _start(self, dsk): self.clear() self._dsk = dsk.copy() diff --git a/dask/diagnostics/progress.py b/dask/diagnostics/progress.py index d79f54763002..4babe8654292 100644 --- a/dask/diagnostics/progress.py +++ b/dask/diagnostics/progress.py @@ -42,7 +42,7 @@ def __init__(self, width=40, dt=0.1): self._width = width self._dt = dt - def _start(self, dsk, state): + def _start(self, dsk): self._ntasks = len([k for (k, v) in dsk.items() if istask(v)]) self._ndone = 0 self._update_rate = max(1, self._ntasks // self._width) From d9dee3cde94aeee3254f23053e3e424e4e37a9d5 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 31 Jul 2015 09:40:33 -0700 Subject: [PATCH 2/4] add cache profiler --- .travis.yml | 1 + dask/async.py | 3 +++ dask/diagnostics/cache.py | 29 ++++++++++++++++++++++++++++ dask/diagnostics/tests/test_cache.py | 29 ++++++++++++++++++++++++++++ 4 files changed, 62 insertions(+) create mode 100644 dask/diagnostics/cache.py create mode 100644 dask/diagnostics/tests/test_cache.py diff --git a/.travis.yml b/.travis.yml index bb43d98fc48c..1053f8527b96 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,6 +26,7 @@ install: - if [[ $TRAVIS_PYTHON_VERSION != '2.6' ]]; then conda install bokeh; fi - if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then conda install unittest2; fi - pip install git+https://github.com/mrocklin/partd --upgrade + - pip install git+https://github.com/mrocklin/cachey --upgrade - pip install blosc --upgrade - pip install graphviz - if [[ $TRAVIS_PYTHON_VERSION < '3' ]]; then pip install git+https://github.com/Blosc/castra; fi diff --git a/dask/async.py b/dask/async.py index 0d23c6b47da6..b85e00da9fc0 100644 --- a/dask/async.py +++ b/dask/async.py @@ -123,6 +123,7 @@ from .context import _globals from .order import order from .callbacks import unpack_callbacks +from .optimize import cull def inc(x): return x + 1 @@ -434,6 +435,8 @@ def get_async(apply_async, num_workers, dsk, result, cache=None, for f in start_cbs: f(dsk) + dsk = cull(dsk, list(results)) + keyorder = order(dsk) state = start_state_from_dask(dsk, cache=cache, sortkey=keyorder.get) diff --git a/dask/diagnostics/cache.py b/dask/diagnostics/cache.py new file mode 100644 index 000000000000..aa334d55312b --- /dev/null +++ b/dask/diagnostics/cache.py @@ -0,0 +1,29 @@ +from .core import Diagnostic +from timeit import default_timer +from cachey import Cache, nbytes + + +class cache(Diagnostic): + """ Use cache for computation + + """ + + def __init__(self, cache): + self.cache = cache + self.starttimes = dict() + + def _start(self, dsk): + overlap = set(dsk) & set(self.cache.data) + for key in overlap: + dsk[key] = self.cache.data[key] + + def _pretask(self, key, dsk, state): + self.starttimes[key] = default_timer() + + def _posttask(self, key, value, dsk, state, id): + duration = default_timer() - self.starttimes[key] + nb = nbytes(value) + self.cache.put(key, value, cost=duration / nb / 1e9, nbytes=nb) + + def _finish(self, dsk, state, errored): + pass diff --git a/dask/diagnostics/tests/test_cache.py b/dask/diagnostics/tests/test_cache.py new file mode 100644 index 000000000000..bf5b228512da --- /dev/null +++ b/dask/diagnostics/tests/test_cache.py @@ -0,0 +1,29 @@ +from dask.diagnostics.cache import cache +from cachey import Cache +from dask.threaded import get +from operator import add + + +flag = [] + +def inc(x): + flag.append(x) + return x + 1 + + +def test_cache(): + c = Cache(10000) + + with cache(c): + assert get({'x': (inc, 1)}, 'x') == 2 + + assert flag == [1] + assert c.data['x'] == 2 + + while flag: + flag.pop() + dsk = {'x': (inc, 1), 'y': (inc, 2), 'z': (add, 'x', 'y')} + with cache(c): + assert get(dsk, 'z') == 5 + + assert flag == [2] # no x present From 551db6bf006a225f7d6e98d99061e294c0c4bf65 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 31 Jul 2015 17:14:41 -0700 Subject: [PATCH 3/4] read_csv fails on no filenames --- dask/dataframe/io.py | 5 ++++- dask/dataframe/tests/test_io.py | 8 ++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/dask/dataframe/io.py b/dask/dataframe/io.py index 36a03f83b7df..a21149496106 100644 --- a/dask/dataframe/io.py +++ b/dask/dataframe/io.py @@ -53,7 +53,10 @@ def fill_kwargs(fn, args, kwargs): # Let pandas infer on the first 100 rows if '*' in fn: - fn = sorted(glob(fn))[0] + filenames = sorted(glob(fn)) + if not filenames: + raise ValueError("No files found matching name %s" % fn) + fn = filenames[0] if 'names' not in kwargs: kwargs['names'] = csv_names(fn, **kwargs) diff --git a/dask/dataframe/tests/test_io.py b/dask/dataframe/tests/test_io.py index a5daae43362d..cc08a8a40723 100644 --- a/dask/dataframe/tests/test_io.py +++ b/dask/dataframe/tests/test_io.py @@ -482,3 +482,11 @@ def test_read_csv_with_nrows(): assert list(f.columns) == ['name', 'amount'] assert f.npartitions == 1 assert eq(read_csv(fn, nrows=3), pd.read_csv(fn, nrows=3)) + + +def test_read_csv_raises_on_no_files(): + try: + dd.read_csv('21hflkhfisfshf.*.csv') + assert False + except Exception as e: + assert "21hflkhfisfshf.*.csv" in str(e) From 821c9728341e13f229b0fd8f41b5d553dcad1ff7 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 3 Aug 2015 12:36:54 -0700 Subject: [PATCH 4/4] rename cache->Cache, support direct inputs --- dask/diagnostics/cache.py | 22 ++++++++++++++++++---- dask/diagnostics/tests/test_cache.py | 17 ++++++++++++----- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/dask/diagnostics/cache.py b/dask/diagnostics/cache.py index aa334d55312b..dc633a1251de 100644 --- a/dask/diagnostics/cache.py +++ b/dask/diagnostics/cache.py @@ -1,14 +1,28 @@ from .core import Diagnostic from timeit import default_timer -from cachey import Cache, nbytes +import cachey +from numbers import Number -class cache(Diagnostic): +class Cache(Diagnostic): """ Use cache for computation + Example + ------- + + >>> cache = Cache(1e9) # available bytes + + >>> with cache: # use as a context manager around get/compute calls + ... result = x.compute() + + >>> cache.register() # or use globally """ - def __init__(self, cache): + def __init__(self, cache, *args, **kwargs): + if isinstance(cache, Number): + cache = cachey.Cache(cache, *args, **kwargs) + else: + assert not args and not kwargs self.cache = cache self.starttimes = dict() @@ -22,7 +36,7 @@ def _pretask(self, key, dsk, state): def _posttask(self, key, value, dsk, state, id): duration = default_timer() - self.starttimes[key] - nb = nbytes(value) + nb = cachey.nbytes(value) self.cache.put(key, value, cost=duration / nb / 1e9, nbytes=nb) def _finish(self, dsk, state, errored): diff --git a/dask/diagnostics/tests/test_cache.py b/dask/diagnostics/tests/test_cache.py index bf5b228512da..95b62ef3829a 100644 --- a/dask/diagnostics/tests/test_cache.py +++ b/dask/diagnostics/tests/test_cache.py @@ -1,5 +1,5 @@ -from dask.diagnostics.cache import cache -from cachey import Cache +from dask.diagnostics.cache import Cache +import cachey from dask.threaded import get from operator import add @@ -12,9 +12,9 @@ def inc(x): def test_cache(): - c = Cache(10000) + c = cachey.Cache(10000) - with cache(c): + with Cache(c): assert get({'x': (inc, 1)}, 'x') == 2 assert flag == [1] @@ -23,7 +23,14 @@ def test_cache(): while flag: flag.pop() dsk = {'x': (inc, 1), 'y': (inc, 2), 'z': (add, 'x', 'y')} - with cache(c): + with Cache(c): assert get(dsk, 'z') == 5 assert flag == [2] # no x present + + +def test_cache_with_number(): + c = Cache(10000, limit=1) + assert isinstance(c.cache, cachey.Cache) + assert c.cache.available_bytes == 10000 + assert c.cache.limit == 1