Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k-means Memory usage #39

Closed
TomAugspurger opened this issue Oct 16, 2017 · 9 comments
Closed

k-means Memory usage #39

TomAugspurger opened this issue Oct 16, 2017 · 9 comments

Comments

@TomAugspurger
Copy link
Member

Debugging a memory usage issue I'm seeing with k-means initialization. The issue is in this loop. A small example is

import numpy as np
import dask.array as da
from dask import delayed
from distributed import Client, wait

from sklearn.datasets import make_classification


if __name__ == '__main__':

    c = Client()
    s = c.cluster.scheduler
    N_SAMPLES = 1_000_000
    N_BLOCKS = 24

    def mem():
        print("{:.2f}".format(sum(s.worker_bytes.values()) / 10**9), "GB")

    def make_block(n_samples):
        X, y = make_classification(n_samples=n_samples)
        return X

    blocks = [delayed(make_block)(N_SAMPLES) for i in range(N_BLOCKS)]

    arrays = [da.from_delayed(block, dtype='f8', shape=(N_SAMPLES, 20))
              for block in blocks]
    stacked = da.vstack(arrays)

    print(stacked.nbytes / 10**9, "GB")
    X = c.persist(stacked)
    wait(X)

    for i in range(5):
        idx = np.random.randint(0, len(X), size=5)
        centers = X[idx].compute()
        mem()

Which outputs

3.84 GB
4.32 GB
4.80 GB
5.12 GB
5.60 GB
6.08 GB

@mrocklin is that increasing memory usage expected? Inside that for loop, the result is always going to be a small NumPy array.

(side-note, that generates some exceptions I've left out of the output):

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/Users/taugspurger/.virtualenvs/dask-dev/lib/python3.6/site-packages/distributed/distributed/protocol/core.py", line 122, in loads
    value = _deserialize(head, fs)
  File "/Users/taugspurger/.virtualenvs/dask-dev/lib/python3.6/site-packages/distributed/distributed/protocol/serialize.py", line 160, in deserialize
    f = deserializers[header.get('type')]
KeyError: 'numpy.ndarray'

looking into those now.

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Oct 16, 2017

@mrocklin I don't see "failed to deserialize" error every time. Adding

diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py
index d657dab..b7b492e 100644
--- a/distributed/protocol/serialize.py
+++ b/distributed/protocol/serialize.py
@@ -87,6 +87,8 @@ def typename(typ):
 def _find_lazy_registration(typename):
     toplevel, _, _ = typename.partition('.')
     if toplevel in lazy_registrations:
+        import time
+        time.sleep(.1)
         lazy_registrations.pop(toplevel)()
         return True
     else:

to serialize.py makes it consistent. Perhaps a race condition in the lazy importer? I'll see if I can narrow down the example further.

@TomAugspurger
Copy link
Member Author

Collecting some some debugging observations:

The getitem takes ~0.01s normally with an indexer of length 1. There's no increase in memory usage. When things slow down, things take ~.5s. The slower tasks include a disk-read-getitem:

screen shot 2017-10-17 at 9 55 46 am

(slow on the left, normal on the right)

When slicing with multiple (e.g. size=2) the graphs sometimes look different. Fast:

fast

slow:

slow

Not sure if this is meaningful or not. I suspect it is. I assumed the order of operations would be

getitem for each worker -> concatenate results.

But if it's

transfer blocks to single worker -> getitem

that would explain the slowdown and memory increase.

@mrocklin
Copy link
Member

disk-read- time blocks are due to getting elements out of worker.data. This could mean that there are many elements in worker.data that are in memory, or more likely that there are a few elements that are stored on disk. This also corresponds to colored bars in the upper left memory use plot in the scheduler.

@mrocklin
Copy link
Member

If your getitem/transfer vs transfer/getitem question refers to the above code:

for i in range(5):
    idx = np.random.randint(0, len(X), size=5)
    centers = X[idx].compute()
    mem()

Then you should be fine. Dask.array definitely does the intelligent thing here.

@TomAugspurger
Copy link
Member Author

Interestingly, whether or not the indexer is sorted seems to matter. Adding a sorted(idx) before indexing:

centers = c.compute(X[sorted(idx)])

outputs:

3.84 GB
3.84 GB
3.84 GB
3.84 GB
3.84 GB

Trying it out in k_init now.

@mrocklin
Copy link
Member

There is a special fast-path within dask/array/slicing.py for when the index is sorted. You might search that file for issorted to find what the other non-fast-path is doing

@mrocklin
Copy link
Member

Or just always sort

@TomAugspurger
Copy link
Member Author

Just sorting works for me here. I'll still take a look in slicing to see if anything weird is going on.

@TomAugspurger
Copy link
Member Author

Thanks!

TomAugspurger added a commit to TomAugspurger/dask-ml that referenced this issue Oct 17, 2017
Seems to help the task scheduler in some situations

Closes dask#39
TomAugspurger added a commit that referenced this issue Oct 17, 2017
* PERF: Sort indexes before slicing

Seems to help the task scheduler in some situations

Closes #39

* BUG: Pass through random_state to sample points
TomAugspurger pushed a commit to TomAugspurger/dask-ml that referenced this issue Jun 28, 2018
* First pass at re-adding feature union support

Still needs tests, and could be more efficient.

* Add tests for feature_unions

* Simplify the code a bit
TomAugspurger pushed a commit to TomAugspurger/dask-ml that referenced this issue Oct 17, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants