Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cachey #502

Merged
merged 14 commits into from Aug 12, 2015
Merged

Cachey #502

merged 14 commits into from Aug 12, 2015

Conversation

mrocklin
Copy link
Member

Adds opportunistic caching via callbacks and cachey

Example

In [1]: import dask.dataframe as dd

In [2]: from cachey import Cache

In [3]: from dask.diagnostics.cache import cache

In [4]: df = dd.read_csv('data/accounts.*.csv')

In [5]: c = Cache(1000000000)

In [6]: result = df.amount.sum()

In [7]: with cache(c):
    print result.compute()
   ...:     
3575826832

In [8]: with cache(c):
    print result.compute()
   ...:     
3575826832

In [9]: c.data
Out[9]: 
{('reduction-aggregation-6', 0): 3575826832,
 ('reduction-chunk-5', 0): 1187712489,
 ('reduction-chunk-5', 1): 1192801040,
 ('reduction-chunk-5', 2): 1195313303}

Some problems

  1. badly named keys like ('x', 1000) stop this from being as effective as it might be
  2. fuse operations stop us from seeing some of the useful intermediate computations we could cache
  3. I had to add cull into the async scheduler and depend on an impure start callback
  4. Cachey is still quite immature

@hussainsultan
Copy link
Contributor

ping @jefsayshi this might be of interest to you as you think about speeding up the calculation workflow with dask persistent storage and caching.

@mrocklin
Copy link
Member Author

mrocklin commented Aug 3, 2015

I've removed the WIP label. This is ready for review.

When combined with #510 for dataframe hash naming we get some nice results

In [1]: import dask.dataframe as dd
In [2]: from dask.diagnostics import Cache
In [3]: c = Cache(5e7)  # 50 MB
In [4]: c.register()    # globally active

In [5]: df = dd.read_csv('accounts.*.csv')
In [6]: %time len(df)   # normal time to read the csv files
CPU times: user 887 ms, sys: 169 ms, total: 1.06 s
Wall time: 940 ms
Out[6]: 3000000

In [7]: %time len(df)  # other times mostly free
CPU times: user 1.51 ms, sys: 145 µs, total: 1.65 ms
Wall time: 1.44 ms
Out[7]: 3000000

In [8]: c.cache.data  # we only cached the reductions, not the entire dataset
Out[8]: 
{('reduction-aggregation-cf5d8142bdd248cc7c42a746e7108ca7', 0): 3000000,
 ('reduction-chunk-cf5d8142bdd248cc7c42a746e7108ca7', 0): 1000000,
 ('reduction-chunk-cf5d8142bdd248cc7c42a746e7108ca7', 1): 1000000,
 ('reduction-chunk-cf5d8142bdd248cc7c42a746e7108ca7', 2): 1000000}

In [9]: %time df.amount.sum().compute()  # so we don't get speedups
CPU times: user 951 ms, sys: 136 ms, total: 1.09 s
Wall time: 952 ms
Out[9]: 3575826832

In [10]: %time df.amount.mean().compute()  # but df.amount is small enough to keep
CPU times: user 50.2 ms, sys: 9.05 ms, total: 59.2 ms
Wall time: 26.4 ms
Out[10]: 1191.9422773333333

We see that we get to hold on to small frequently used bits from the dataset which makes somewhat new computations df.amount.sum() -> df.amount.mean() very fast.

@mrocklin mrocklin closed this Aug 3, 2015
@mrocklin mrocklin reopened this Aug 3, 2015
@mrocklin mrocklin force-pushed the cachey branch 3 times, most recently from 2ff2f63 to a816ddb Compare August 10, 2015 21:31
@mrocklin
Copy link
Member Author

This is now backed off of #569

@mrocklin
Copy link
Member Author

I seem to be leaking memory somewhere. Memory usage goes well above the stated limits when using caching.

@mrocklin
Copy link
Member Author

Hrm, I've ruled out most things. I slightly suspect that this is just my OS not flushing things. Considering merging this and seeing what happens.

@jcrist
Copy link
Member

jcrist commented Aug 11, 2015

You could try adding a call to gc() somewhere and see if that clears things up. Not as a permanent fix, just for diagnosing if this is a bug or just not cleaning up memory immediately.

@mrocklin
Copy link
Member Author

I've tried this. gc.collect() doesn't have much effect.

@mrocklin
Copy link
Member Author

  • Clearing out the cache explicitly doesn't have much of an effect.
  • Calling gc.collect doesn't do much.
  • This only occurs when things are actually put into the cache (e.g. cache with small size doesn't produce this problem).
  • It doesn't occur when using cachey manually (e.g. cachey.Cache(...).put(key, value, cost))
  • It doesn't occur if we use dask but comment out the part that puts stuff into cachey

the last two in particular were strange. They seem to mutually exclude any isolated part of the code.

@mrocklin
Copy link
Member Author

I'd like to merge this and come back to the leak issue. The troublesome parts are sufficiently on the edges of dask (nothing else depends on them) and yet this also includes some fixes within more core parts that keep getting changed. I also intend to continue working on this for the forseeable future.

mrocklin added a commit that referenced this pull request Aug 12, 2015
@mrocklin mrocklin merged commit 3a330c6 into dask:master Aug 12, 2015
@mrocklin mrocklin deleted the cachey branch August 12, 2015 15:41
shoyer added a commit to shoyer/xarray that referenced this pull request Aug 31, 2015
This will allow xray users to take advantage of dask's nascent support for
caching intermediate results (dask/dask#502).

For example:

	In [1]: import xray

	In [2]: from dask.diagnostics.cache import Cache

	In [3]: c = Cache(5e7)

	In [4]: c.register()

	In [5]: ds = xray.open_mfdataset('/Users/shoyer/data/era-interim/2t/2014-*.nc', engine='scipy')

	In [6]: %time ds.sum().load()
	CPU times: user 2.72 s, sys: 2.7 s, total: 5.41 s
	Wall time: 3.85 s
	Out[6]:
	<xray.Dataset>
	Dimensions:  ()
	Coordinates:
	    *empty*
	Data variables:
	    t2m      float64 5.338e+10

	In [7]: %time ds.mean().load()
	CPU times: user 5.31 s, sys: 1.86 s, total: 7.17 s
	Wall time: 1.81 s
	Out[7]:
	<xray.Dataset>
	Dimensions:  ()
	Coordinates:
	    *empty*
	Data variables:
	    t2m      float64 279.0

	In [8]: %time ds.mean().load()
	CPU times: user 7.73 ms, sys: 2.73 ms, total: 10.5 ms
	Wall time: 8.45 ms
	Out[8]:
	<xray.Dataset>
	Dimensions:  ()
	Coordinates:
	    *empty*
	Data variables:
	    t2m      float64 279.0
shoyer added a commit to shoyer/xarray that referenced this pull request Sep 5, 2015
This will allow xray users to take advantage of dask's nascent support for
caching intermediate results (dask/dask#502).

For example:

	In [1]: import xray

	In [2]: from dask.diagnostics.cache import Cache

	In [3]: c = Cache(5e7)

	In [4]: c.register()

	In [5]: ds = xray.open_mfdataset('/Users/shoyer/data/era-interim/2t/2014-*.nc', engine='scipy')

	In [6]: %time ds.sum().load()
	CPU times: user 2.72 s, sys: 2.7 s, total: 5.41 s
	Wall time: 3.85 s
	Out[6]:
	<xray.Dataset>
	Dimensions:  ()
	Coordinates:
	    *empty*
	Data variables:
	    t2m      float64 5.338e+10

	In [7]: %time ds.mean().load()
	CPU times: user 5.31 s, sys: 1.86 s, total: 7.17 s
	Wall time: 1.81 s
	Out[7]:
	<xray.Dataset>
	Dimensions:  ()
	Coordinates:
	    *empty*
	Data variables:
	    t2m      float64 279.0

	In [8]: %time ds.mean().load()
	CPU times: user 7.73 ms, sys: 2.73 ms, total: 10.5 ms
	Wall time: 8.45 ms
	Out[8]:
	<xray.Dataset>
	Dimensions:  ()
	Coordinates:
	    *empty*
	Data variables:
	    t2m      float64 279.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants