New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve SpectralClustering performance #152

Merged
merged 27 commits into from Jun 29, 2018

Conversation

Projects
None yet
3 participants
@TomAugspurger
Member

TomAugspurger commented Mar 20, 2018

An intermediate computation helps avoid excessive memory usage in a sample
problem.

Still having issues for larger problems (hence the WIP).

Closes #151

TomAugspurger added some commits Feb 27, 2018

PERF: Some scalability fixes
Computing the upper-left block was taking a while since we didn't persist
the small block before doing pairwise distances.
Alleviate memory pressure in SpectralClustering
An intermediate computation helps avoid excessive memory usage in a sample
problem.

Closes #151
@@ -216,7 +220,9 @@ def fit(self, X, y=None):
# compute the exact blocks
# these are done in parallel for dask arrays
if isinstance(X, da.Array):
logger.info("Starting small rechunk-persist")
X_keep = X[keep].rechunk(self.n_components).persist()

This comment has been minimized.

@mrocklin

mrocklin Mar 20, 2018

Member

Note that rechunk takes in a chunk size, not a desired number of chunks

This comment has been minimized.

@TomAugspurger

TomAugspurger Mar 20, 2018

Member

Yes, in this case, n_components is small. We want this to be a single block w/ n_components chunksize. IIRC, I needed this to help with performance in the pairwise_kernels later on.

This comment has been minimized.

@mrocklin

mrocklin Mar 20, 2018

Member

OK, the name here just threw me off is all. I was afraid that you might have mistaken the intent of the argument. You might want at least .rechunk({0: self.n_components}) to ensure that you don't accidentally rechunk the second axis down to chunks of size n_components as well (presumably this would only be an issue if you had very many columns)

@TomAugspurger

This comment has been minimized.

Member

TomAugspurger commented Mar 21, 2018

I think I've narrowed things down a bit.

setup:

import numpy as np
import dask.array as da
from distributed import Client
from sklearn.datasets import make_blobs

from dask_ml.metrics.pairwise import  rbf_kernel

N = 100
c = N // 8
n_components = 10

X, y = make_blobs(n_samples=N, n_features=1)
X = da.from_array(X.reshape(-1, 1), chunks=c).persist()

keep = X[:n_components]
rest = X[n_components:]

A = rbf_kernel(keep)
B = rbf_kernel(keep, rest)

bb = da.random.uniform(size=B.shape, chunks=B.chunks)
cc = np.random.uniform(size=B.shape[1],)

r = (B * cc).sum(1)
s = (bb * cc).sum(1)

For larger problems, the computation of r leads to excessive memory usage, while the computation of s is fine.

Here's the graph for r

bad

And here's the graph for s

ok

Two things:

Computing just B.sum(1).compute(), i.e. r without multiplying by a NumPy array is fine. This makes me think that something can be improved within dask, though perhaps not.

For now, I'm hopeful we'll be able to rewrite r to fuse the kernel function and the multiplication into one operation within dask-ml.

@TomAugspurger

This comment has been minimized.

Member

TomAugspurger commented Mar 21, 2018

Hmm that last post may not be entirely accurate. For large values

N = 4_000_000
c = N // 8
n_components = 100

I'm seeing the memory usage computing s (the good case) grow as well.

%memit s.compute()
peak memory: 7319.70 MiB, increment: 5721.94 MiB
%memit r.compute()
peak memory: 7053.45 MiB, increment: 4311.25 MiB
@mrocklin

This comment has been minimized.

Member

mrocklin commented Mar 21, 2018

@mrocklin

This comment has been minimized.

Member

mrocklin commented Mar 21, 2018

Looking at the graph page during computation I can convince myself that it's doing the right thing.

@TomAugspurger

This comment has been minimized.

Member

TomAugspurger commented Mar 22, 2018

Yes, I think I see the same. Something that confuses me though is why these two have different memory behavior:

In both cases, we multiply a small numpy array with the large dask array.

import numpy as np

import dask.array as da
from distributed import Client
from sklearn.datasets import make_classification
from dask_ml.cluster import SpectralClustering
from dask_ml.metrics.pairwise import rbf_kernel
client = Client(processes=False)

N = 1_000_000
C = 100_000
n_components=100

X, y = make_classification(n_samples=N, n_features=9)

X = (X - X.mean(0)) / X.std(0)
X = da.from_array(X.reshape(-1, 1), chunks=C).persist()


X_keep = X[:n_components]
X_rest = X[n_components:]

B = rbf_kernel(X_keep, X_rest)

a = np.random.uniform(size=len(X_keep,))  # small array to pre-multiply
b = np.random.uniform(size=len(B.shape[1],))  # large array to post-multiply

This is OK

%memit (a.reshape(-1, 1) * B).sum(1).compute();
peak memory: 2375.54 MiB, increment: 1933.70 MiB

This is not

%memit (B * b).sum(1).compute();

/Users/taugspurger/Envs/dask-dev/lib/python3.6/site-packages/distributed/distributed/worker.py:741: UserWarning: Large object of size 72.00 MB detected in task graph: 
  (array([0.73740599, 0.29811969, 0.59019085, ..., 0 ... 99900, None),))
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s))
distributed.core - WARNING - Event loop was unresponsive in Scheduler for 12.42s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive in Worker for 12.43s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive in Scheduler for 1.15s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive in Worker for 1.16s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive in Scheduler for 1.15s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

peak memory: 8595.58 MiB, increment: 7368.24 MiB

distributed.core - WARNING - Event loop was unresponsive in Worker for 1.15s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

In my system activity monitor, I see the python process peaking at ~13GB.

I assumed it was just that b is so much larger. However, that's not the sole cause. I'm unable to reproduce the memory spike with a large dask array of the same shape.

N = 100
P = 9000000
X = da.random.uniform(size=(N, P), chunks=(100, 100000))
a = np.random.uniform(size=N,)
b = np.random.uniform(size=P,)

%memit (X * b).sum(1).compute()
peak memory: 1869.46 MiB, increment: 1426.89 MiB

So it's likely something within rbf_kernel (which is doing some moderately complex operations) that causes the issue when combined with the large multiply (B.sum(1) is just fine).

I'll keep looking, this is a good debugging exercise.

@mrocklin

This comment has been minimized.

Member

mrocklin commented Mar 22, 2018

My guess is that the numpy array b gets included in every task and so is multiplied a bit throughout the scheduler. You might try wrapping it in da.from_array explicitly to see if that makes a difference.

At a low level I would be curious to know if we include be in each of the tasks like (add, B._keys()[0], b) or if we include it in the graph once and then refer to it like {'b': b, ('out', 0): (add, B._keys()[0], 'b')}

@TomAugspurger

This comment has been minimized.

Member

TomAugspurger commented Mar 22, 2018

every task and so is multiplied a bit throughout the scheduler

Ah, I suspected we might get one per block, but stopped looking when that didn't get near enough bytes used that way. B has ~10x more tasks than X.

da.from_array(b, chunks=B.chunks[1]) didn't seem to help.

I'll take a look at the keys in a bit.

@ebo

This comment has been minimized.

ebo commented Apr 3, 2018

any updates?

@TomAugspurger

This comment has been minimized.

Member

TomAugspurger commented Apr 3, 2018

@ebo

This comment has been minimized.

ebo commented Apr 3, 2018

@ebo

This comment has been minimized.

ebo commented Apr 17, 2018

I have modified your original example to read/write GeoTIFFs (a moderate sized LANDSAT mosaic):

https://github.com/ebo/pangeo-tutorials/blob/master/geo-spectral-geotiff-da.ipynb

To stress test this I will work on constraining the number of workers and cores, and watch the memory. I will also plan on making a new version that uses xarrays, and probably modify both to work out of core.

@TomAugspurger TomAugspurger changed the title from [WIP]Alleviate memory pressure in SpectralClustering to Alleviate memory pressure in SpectralClustering Jun 25, 2018

TomAugspurger added some commits Jun 25, 2018

# now the approximation of C
a = A.sum(0) # (l,)
b1 = B.sum(1) # (l,)
b2 = B.sum(0) # (m,)
# TODO: I think we have some unnecessary delayed wrapping of A here.
A_inv = da.from_delayed(delayed(pinv)(A), A.shape, A.dtype)

This comment has been minimized.

@mrocklin

mrocklin Jun 25, 2018

Member

A is a dask array here or a numpy array?

If it's a dask array then what is your objective here? Do you want to collapse it into a single chunk and then apply pinv?

This comment has been minimized.

@TomAugspurger

TomAugspurger Jun 25, 2018

Member

A is a dask array here or a numpy array?

Could be either depending on whether the original X is a dask or numpy array (I think).

@@ -116,7 +116,7 @@ def check_pairwise_arrays(X, Y, precomputed=False):
@doc_wraps(metrics.pairwise.linear_kernel)
def linear_kernel(X, Y=None):
X, Y = check_pairwise_arrays(X, Y)
return X.dot(Y.T)
return da.dot(X, Y.T)

This comment has been minimized.

@mrocklin

mrocklin Jun 25, 2018

Member

Why changes like this?

This comment has been minimized.

@TomAugspurger

TomAugspurger Jun 25, 2018

Member

I think I had an intermediate version where we precomputed A, so when we got down here X was a NumPy array.

Now it should always be a dask array, but I'm not sure.

return '%0.2f MB' % (n / 1e6)
if n > 1e3:
return '%0.2f kB' % (n / 1000)
return '%d B' % n

This comment has been minimized.

@mrocklin

mrocklin Jun 25, 2018

Member

We might move this from distributed to dask instead.

@TomAugspurger

This comment has been minimized.

Member

TomAugspurger commented Jun 25, 2018

OK, picked this up again.

I'm able to successfully cluster 24GB of data on my laptop w/ 16GB of ram, with a peak memory usage of 282 MiB according to memit, in 1 min 29s.

I haven't decomposed the improvement into changes in dask/dask vs. changes here yet. Will hopefully have time later.

@TomAugspurger

This comment has been minimized.

Member

TomAugspurger commented Jun 26, 2018

#228 has the non-memory impacting changes.

Overall, with just #228, things look really good. I'm able to cluster larger than memory datasets in low memory. This makes the changes here to manage memory & chunk size less important.

One potential remaining slowdown is how we handle reordering the large X. The approximation algorithm requires us to re-order X into two "blocks" (not dask blocks), a small, exactly computed block and a large approximated block:

[
    X_small,
    X_large,
]

Currently, this induces a slice with a large array of integers (which has also been recently improved in dask). But we're getting some performance warnings because the slice isn't entirely sorted. There are n_components that are out of place (something like 5-100 elements of the large n_samples). Looking into ways to improve this.

@mrocklin

This comment has been minimized.

Member

mrocklin commented Jun 26, 2018

Can you say more about the array of integers, and why it might not be entirely sorted?

@TomAugspurger

This comment has been minimized.

Member

TomAugspurger commented Jun 26, 2018

In the code, it's https://github.com/dask/dask-ml/blob/master/dask_ml/cluster/spectral.py#L280

I'll have a small example shortly. I had hoped

X = da.arange(0, 1000, chunks=100).reshape(-1, 1)

ind = np.arange(len(X))
keep = np.random.choice(ind, 10, replace=False)
print('keep', keep)
rest = ~np.isin(ind, keep)

blocked = da.concatenate([X[keep].rechunk(-1, -1), X[rest]])
blocked[np.argsort(np.concatenate([keep, ind[rest]]))].compute();

would produce the warning, but apparently it doesn't meet the condition.

@TomAugspurger

This comment has been minimized.

Member

TomAugspurger commented Jun 26, 2018

import dask.array as da
import numpy as np
from dask.array.utils import assert_eq


X = da.arange(0, 1000, chunks=500).reshape(-1, 1)

ind = np.arange(len(X))
keep = np.random.choice(ind, 20, replace=False)
rest = ~np.isin(ind, keep)

blocked = da.concatenate([X[keep].rechunk(-1, -1), X[rest]])
result = blocked[np.argsort(np.concatenate([keep, ind[rest]]))].compute()

assert_eq(result, X)

gets the warning.

perf.py:13: PerformanceWarning: Slicing with an out-of-order index is generating 14 times more chunks
  blocked[np.argsort(np.concatenate([keep, ind[rest]]))].compute()
@mrocklin

This comment has been minimized.

Member

mrocklin commented Jun 26, 2018

Is it OK to sort the keep array?

@TomAugspurger

This comment has been minimized.

Member

TomAugspurger commented Jun 26, 2018

Yeah (and I think it is sorted in dask-ml).

I think the only constraint from the algorithm is that keep comes first when we do the approximation. The relative ordering withing there shouldn't matter.

@TomAugspurger

This comment has been minimized.

Member

TomAugspurger commented Jun 26, 2018

My current plan is to partition the final index of integers np.argsort(np.concatenate([keep, ind[rest]]) into sections that are all sorted. It'll be

[
    block_before_first_component,
    first_component,  # single element
    block_before_second_component,
    second_component,
    ...
]

and then concatenate those. We'll have 2 * n_compomenents slices to concatenate, which seems fine.

@TomAugspurger

This comment has been minimized.

Member

TomAugspurger commented Jun 26, 2018

def test_slice():
    X = da.arange(0, 1000, chunks=500).reshape(-1, 1)

    ind = np.arange(len(X))
    keep = np.random.choice(ind, 20, replace=False)
    keep.sort()
    rest = ~np.isin(ind, keep)

    blocked = da.concatenate([X[keep].rechunk(-1, -1), X[rest]])
    idx = np.argsort(np.concatenate([keep, ind[rest]]))

    slices = [slice(None, keep[0]), [keep[0]]]
    windows = list(sliding_window(2, keep))

    for l, r in windows:
        slices.append(slice(l + 1, r))
        slices.append([r])

    slices.append(slice(keep[-1] + 1, None))

    with pytest.warns(None) as w:
        result = da.concatenate([blocked[idx[[slice_]]] for slice_ in slices])

    assert len(w) == 0

    assert_eq(result, X)

seems to do it (aside from not handling edge cases properly). I think this is too specific a use-case for inclusion in dask.array.slicing, so I'll just it here for now, unless you think otherwise.

@TomAugspurger TomAugspurger changed the title from Alleviate memory pressure in SpectralClustering to Improve SpectralClustering performance Jun 26, 2018

@TomAugspurger

This comment has been minimized.

Member

TomAugspurger commented Jun 26, 2018

This now just just the slicing changes.

I've removed the proposed new hyperparameters, as they didn't seem to be beneficial, and just added complexity.

I could maybe see an argument for rechunk_strategy, which allowed the user to specify chunks to use the two stages, but I'd rather hold off on it for now, since I think it'll be difficult to use correctly.

@TomAugspurger TomAugspurger merged commit ef7bd43 into dask:master Jun 29, 2018

3 checks passed

ci/circleci: py27 Your tests passed on CircleCI!
Details
ci/circleci: py36 Your tests passed on CircleCI!
Details
ci/circleci: sklearn_dev Your tests passed on CircleCI!
Details

@TomAugspurger TomAugspurger deleted the TomAugspurger:kmeans-perf branch Jun 29, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment