Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Array fully loaded into memory when trying to store into a memmap array #5985

Open
astrofrog opened this issue Mar 6, 2020 · 11 comments
Open
Labels
array needs info Needs further information from the user

Comments

@astrofrog
Copy link
Contributor

astrofrog commented Mar 6, 2020

I am trying to load data from a memory-mapped array, do operations with it in dask, and store the result in another memory-mapped array. Currently this results in the entire array being loaded into memory when calling Array.store. Here is an example:

import numpy as np
from dask.array import from_array


@profile
def main():

    N = 512

    original_array = np.memmap('original.mmap', dtype=float, mode='w+', shape=(N, N, N))
    for i in range(N):
        original_array[i] = i

    original_array = np.memmap('original.mmap', dtype=float, mode='r', shape=(N, N, N))

    dask_array = from_array(original_array)
    dask_array = dask_array.rechunk('auto')
    print('chunksize', dask_array.chunksize)

    dask_array = np.sqrt(dask_array)

    output_array = np.memmap('final.mmap', dtype=float, mode='w+', shape=(N, N, N))
    dask_array.store(output_array, lock=True, compute=True)


main()

The memory profile obtained with the memory_profile package is:

Line #    Mem usage    Increment   Line Contents
================================================
    31   56.801 MiB   56.801 MiB   @profile
    32                             def main():
    33                             
    34   56.801 MiB    0.000 MiB       N = 512
    35                             
    36   57.086 MiB    0.285 MiB       original_array = np.memmap('original.mmap', dtype=float, mode='w+', shape=(N, N, N))
    37 1081.070 MiB    0.000 MiB       for i in range(N):
    38 1081.070 MiB    2.000 MiB           original_array[i] = i
    39                             
    40   57.121 MiB    0.000 MiB       original_array = np.memmap('original.mmap', dtype=float, mode='r', shape=(N, N, N))
    41                             
    42   57.121 MiB    0.000 MiB       dask_array = from_array(original_array)
    43   57.121 MiB    0.000 MiB       dask_array = dask_array.rechunk('auto')
    44   57.121 MiB    0.000 MiB       print('chunksize', dask_array.chunksize)
    45                             
    46   57.121 MiB    0.000 MiB       dask_array = np.sqrt(dask_array)
    47                             
    48   57.121 MiB    0.000 MiB       output_array = np.memmap('final.mmap', dtype=float, mode='w+', shape=(N, N, N))
    49 2105.648 MiB 2105.648 MiB       dask_array.store(output_array, lock=True, compute=True)

I would have expected the array to be computed/written chunk by chunk to the output memory mapped array, resulting in a maximum memory usage of the size of a chunk, not that of the whole array. This seems like a possible bug?

@TomAugspurger
Copy link
Member

resulting in a maximum memory usage of the size of a chunk, not that of the whole array. This seems like a possible bug?

Are the results computed in parallel, leading to the high memory usage?

@astrofrog
Copy link
Contributor Author

@TomAugspurger - no I still see the issue if I put:

dask.config.set(scheduler='single-threaded')

at the top

@astrofrog
Copy link
Contributor Author

I think this might just be due to memory_profiler including the cache/buffer size in the memory usage. If I exclude cache size variations I think everything works fine.

@d-v-b
Copy link
Member

d-v-b commented Mar 7, 2020

Could it be this? dask/distributed#3032

from_array has confusing behavior with numpy arrays. Every resulting task will have the full original array as a dependency, regardless of how you chunk things. I think the only thing you can do is slice your numpy array, then use da.stack to combine all these sub-arrays into a larger array.

@mrocklin
Copy link
Member

mrocklin commented Mar 8, 2020

Every resulting task will have the full original array as a dependency, regardless of how you chunk things

I believe that this is controllable with some of the keyword arguments to from_array

@d-v-b
Copy link
Member

d-v-b commented Mar 8, 2020

which kwarg? nothing i tried in the second example over in #5367 made a difference in the topology of the task graph.

@mrocklin
Copy link
Member

mrocklin commented Mar 8, 2020 via email

@TomAugspurger
Copy link
Member

This seems different from dask/distributed#3032 I think. The size of the memmapped array should be large, right?

@chanshing
Copy link

What is the status of this? I have seen a related PR #6605 but I'm not sure if memmaps are supported (it seems not from the discussion).
Sometimes we may have numpy arrays stored on disk that is too big to fit in memory. Numpy handles this quite well with np.load('bigarray.npy', mmap_mode='r'). What is the current behavior of da.from_array(np.load('bigarray.npy', mmap_mode='r'))?

@delgadom
Copy link
Contributor

Current behavior

this question on SO encounters this error and I see the same behavior on 2022.04.0
https://stackoverflow.com/questions/72663716/how-to-efficiently-convert-npy-to-xarray-zarr/72666271#72666271

I'm not sure if the full workaround below is necessary, but I'm also not sure how to read the array directly from the .npy file without using map_blocks

MRE:

import numpy as np, dask.array as dda, xarray as xr, pandas as pd, dask.distributed

# save a large numpy array
np.save('myarr.npy', np.empty(shape=(47789, 310, 310), dtype=np.float32))

cluster = dask.distributed.LocalCluster()
client = dask.distributed.Client(cluster)

print(cluster.dashboard_link)

arr = np.load('myarr.npy', mmap_mode='r')
da = dda.from_array(arr).rechunk(chunks=(100, 310, 310))
da.to_zarr('myarr.zarr', mode='w')

This bottlenecks all tasks behind a giant read job "original-array" which consumes all system memory then dies

dask dashboard with large high-memory task

workaround

I'm not sure if this is a bug or just points to a useful feature. For now, this works well:

import numpy as np, dask.array as dda, xarray as xr, pandas as pd, dask.distributed

def load_npy_chunk(da, fp, block_info=None, mmap_mode='r'):
    """Load a slice of the .npy array, making use of the block_info kwarg"""
    np_mmap = np.load(fp, mmap_mode=mmap_mode)
    array_location = block_info[0]['array-location']
    dim_slicer = tuple(list(map(lambda x: slice(*x), array_location)))
    return np_mmap[dim_slicer]

def dask_read_npy(fp, chunks=None, mmap_mode='r'):
    """Read metadata by opening the mmap, then send the read job to workers"""
    np_mmap = np.load(fp, mmap_mode=mmap_mode)
    da = dda.empty_like(np_mmap, chunks=chunks)
    return da.map_blocks(load_npy_chunk, fp=fp, mmap_mode=mmap_mode, meta=da)

# save a large numpy array
np.save('myarr.npy', np.empty(shape=(47789, 310, 310), dtype=np.float32))

cluster = dask.distributed.LocalCluster()
client = dask.distributed.Client(cluster)

da = dask_read_npy('myarr.npy', chunks=(300, -1, -1), mmap_mode='r')
da.to_zarr('myarr.zarr', mode='w')

@delgadom
Copy link
Contributor

delgadom commented Jun 18, 2022

ok - in the above, using threads in the LocalCluster does solve the problem. This works:

import numpy as np, dask.array as dda, xarray as xr, pandas as pd, dask.distributed

# save a large numpy array
np.save('myarr.npy', np.empty(shape=(47789, 310, 310), dtype=np.float32))

# use threads, not processes
cluster = dask.distributed.LocalCluster(processes=False)
client = dask.distributed.Client(cluster)

print(cluster.dashboard_link)

arr = np.load('myarr.npy', mmap_mode='r')
da = dda.from_array(arr).rechunk(chunks=(100, 310, 310))
da.to_zarr('myarr.zarr', mode='w')

But it does seem like a read_npy function could be helpful for cases where workers can’t share memory? Happy to throw this into a PR if so, though I’d definitely need a hand getting it across the finish line as I imagine the test setup you’d want for a new reader would be pretty extensive.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
array needs info Needs further information from the user
Projects
None yet
Development

No branches or pull requests

7 participants