Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iterating over ts dataset using zarr driver does not parallelize #99

Closed
iNLyze opened this issue May 9, 2023 · 1 comment
Closed

Iterating over ts dataset using zarr driver does not parallelize #99

iNLyze opened this issue May 9, 2023 · 1 comment

Comments

@iNLyze
Copy link

iNLyze commented May 9, 2023

I am trying to wrap a TensorStore dataset in an iterator to do parallel computation on it. The data uses the zarr driver and reads from local file.

TensorStore({
'context': {
'cache_pool': {},
'data_copy_concurrency': {},
'file_io_concurrency': {},
},
'driver': 'zarr',
'dtype': 'float32',
'kvstore': {
'driver': 'file',
'path': '/path/to/file.zarr/',
},
'metadata': {
'chunks': [1, 100, 100, 4],
'compressor': {
'blocksize': 0,
'clevel': 5,
'cname': 'lz4',
'id': 'blosc',
'shuffle': 1,
},
'dimension_separator': '.',
'dtype': '<f4',
'fill_value': 0.0,
'filters': None,
'order': 'C',
'shape': [1134592, 100, 100, 4],
'zarr_format': 2,
},
'transform': {
'input_exclusive_max': [[1134592], [100], [100], [4]],
'input_inclusive_min': [0, 0, 0, 0],
},
})

I wrote this Iterator:

class TensorStoreIterator():
    def __init__(self, store, num_workers=32):
        self.store = store
        self.shape = store.shape
        self.chunk_size = int(np.ceil(self.shape[0]/num_workers))
        self.n_chunks = num_workers
        
    def __iter__(self):
        self.chunk_index = 0
        return self
    
    def __next__(self):
        if self.chunk_index >= self.n_chunks:
            raise StopIteration
        
        # Start and end of chunk slice 
        start = self.chunk_index * self.chunk_size
        end = min((self.chunk_index + 1) * self.chunk_size, self.__len__())
        
        # Get next chunk
        chunk = self.store[start:end]
        
        return chunk
        
    def __len__(self):
        return self.shape[0]
    ```
     
     
     Then I want to apply joblib parallelism to it using
```python
from joblib import Parallel, delayed
     
 results = Parallel(n_jobs = num_workers)(
    delayed(np.var)(chunk)
    for chunk in ts_data_iter
)

I am getting:

Traceback (most recent call last):
File "/home/user/base/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py", line 391, in _process_worker
call_item = call_queue.get(block=True, timeout=timeout)
File "/usr/local/lib/python3.10/multiprocessing/queues.py", line 122, in get
return _ForkingPickler.loads(res)
ValueError: Error opening "zarr" driver: Error reading local file "/path/to/file.zarr/": Invalid key: "/path/to/file.zarr/"

I thought it might be related to chunk_layout, but was not able to confirm. Does anyone have another idea?
I did confirm that I can do np.var(ts_data[0:3]) , so the path and zarr loading as such worked fine.

@laramiel
Copy link
Collaborator

The main issue is that you are not incrementing your chunk index in your iterator.
Also, you don't actually need to read the tensorstore as part of the iterator, you could instead generate tensorstore.DimExpression as in the following:

class TensorStoreIterator():
  def __init__(self, shape, num_chunks=32):
    self.shape = shape
    self.i = 0
    self.num_chunks = num_chunks
    self.chunk_size = int(np.ceil(self.shape[0] / num_chunks))

  def __len__(self):
    return self.num_chunks

  def __iter__(self):
    return self

  def __next__(self):
    if self.i >= self.num_chunks:
      raise StopIteration
    chunk_start = self.i * self.chunk_size
    chunk_end = min(chunk_start + self.chunk_size, self.shape[0])
    self.i += 1
    return ts.d[0][chunk_start:chunk_end]

Then, perhaps, something like this will work:

ts_data_iter = TensorStoreIterator(store.shape)

list(ts_data_iter)

But you might not want to chunk solely on the "x" dimension. You might also look at the google-research connectomics repository, which uses tensorstore for chunk-based processing:

https://github.com/google-research/connectomics

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants