New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
H5py and Dask Distributed #2787
Comments
You appear to be mixing the APIs of dask.array and futures. The array What you wanted was:
However, h5py file objects are tricky at the best of times, so are are well advised to split your reading and writing operations:
|
Thank you for the quick response. I am wanting to use Client(), this was just a simplified way of expressing how I am trying to implement Dask. I am trying to implement it into some functions of an already completed Python package. The user of the package defines the _unit_computation() for each unit in an n-dim dataset. The goal is the map the unit computation to every unit in the dataset and automate the parallel execution for usability. I prefer to use client.map() over creating a for-loop and then calling compute() function. I found it to not work well with my program. I found using client to be most useful. Should I not turn the HDF into a Dask array? If so, what data frame should I use? |
Right, so there are two issues here:
I hope that this example helps: from dask.distributed import Client
client = Client()
import h5py
import numpy as np
with h5py.File('foo.h5', 'w') as f:
dset = f.create_dataset('x', shape=(10, 10), dtype=float, chunks=(5, 5))
dset[:] = 1
f = h5py.File('foo.h5', 'r')
dset = f['x']
import dask.array as da
x = da.from_array(dset, chunks=dset.chunks)
def inc(x):
return x + 1
y = x.map_blocks(inc)
>>> y.compute()
array([[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.],
[2., 2., 2., 2., 2., 2., 2., 2., 2., 2.]]) You may also want to look at the Xarray project, which uses Dask array extensively, and has lots of experience dealing with HDF5 files. |
@emilyjcosta5 does the answer above help? |
@mrocklin Unfortunately, I am still running into trouble with pickling. The data already exists, and I need to be able to write the results back to the file so getting the main dataset looks something like this:
then I proceed to create the Dask array: then I process the data:
The _unit_compute() is a signal filter for chunks of data. Simplified, looks like this:
I don't believe the _unit_compute() function to be the issue. When I make self.data a random Dask array, it works. |
Try things in read only mode. |
I did and I have the same error.TypeError Traceback (most recent call last) /anaconda3/lib/python3.6/site-packages/zict/lru.py in getitem(self, key) TypeError: unhashable type: 'SubgraphCallable' During handling of the above exception, another exception occurred: TypeError Traceback (most recent call last) TypeError: can't pickle _thread._local objects During handling of the above exception, another exception occurred: TypeError Traceback (most recent call last) ~/Downloads/daskUSID/daskUSID/signal_filter/dask_process.py in compute(self, override, *args, **kwargs) /anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs) /anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs) /anaconda3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs) /anaconda3/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors) /anaconda3/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap() /anaconda3/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap() /anaconda3/lib/python3.6/site-packages/distributed/worker.py in dumps_task(task) /anaconda3/lib/python3.6/site-packages/distributed/worker.py in dumps_function(func) /anaconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x) /anaconda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol) /anaconda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj) /anaconda3/lib/python3.6/pickle.py in dump(self, obj) /anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) /anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj) /anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) /anaconda3/lib/python3.6/pickle.py in save_tuple(self, obj) /anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) /anaconda3/lib/python3.6/pickle.py in save_dict(self, obj) /anaconda3/lib/python3.6/pickle.py in _batch_setitems(self, items) /anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) /anaconda3/lib/python3.6/pickle.py in save_tuple(self, obj) /anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) /anaconda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_instancemethod(self, obj) /anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj) /anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) /anaconda3/lib/python3.6/pickle.py in save_tuple(self, obj) /anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) /anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj) /anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) /anaconda3/lib/python3.6/pickle.py in save_dict(self, obj) /anaconda3/lib/python3.6/pickle.py in _batch_setitems(self, items) /anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) /anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj) /anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) /anaconda3/lib/python3.6/pickle.py in save_dict(self, obj) /anaconda3/lib/python3.6/pickle.py in _batch_setitems(self, items) /anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id) TypeError: can't pickle _thread._local objects |
Interesting. I don't know then. Perhaps work from my example (verify that
it works first) and then change things towards yours until something
breaks? Maybe that helps to narrow things down?
…On Mon, Jul 1, 2019 at 7:20 PM Emily Costa ***@***.***> wrote:
I did and I have the same error.
TypeError Traceback (most recent call last)
/anaconda3/lib/python3.6/site-packages/distributed/worker.py in
dumps_function(func)
3039 try:
-> 3040 result = cache[func]
3041 except KeyError:
/anaconda3/lib/python3.6/site-packages/zict/lru.py in *getitem*(self, key)
47 def *getitem*(self, key):
---> 48 result = self.d[key]
49 self.i += 1
TypeError: unhashable type: 'SubgraphCallable'
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
/anaconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py in
dumps(x)
39 try:
---> 40 result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
41 if len(result) < 1000:
TypeError: can't pickle _thread._local objects
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
in
----> 1 h5_filt_grp = sig_filt.compute(override=True)
~/Downloads/daskUSID/daskUSID/signal_filter/dask_process.py in
compute(self, override, *args, **kwargs)
415 data = self.data
416 results = data.map_blocks(self._unit_computation, dtype=data.dtype,
*args, **kwargs)
--> 417 data = results.compute()
418 self.data = data
419 #self._write_results_chunk()
/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self,
**kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args,
**kwargs)
396 keys = [x.*dask_keys*() for x in collections]
397 postcomputes = [x.*dask_postcompute*() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
/anaconda3/lib/python3.6/site-packages/distributed/client.py in get(self,
dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous,
direct, retries, priority, fifo_timeout, actors, **kwargs)
2555 retries=retries,
2556 user_priority=priority,
-> 2557 actors=actors,
2558 )
2559 packed = pack_data(keys, futures)
/anaconda3/lib/python3.6/site-packages/distributed/client.py in
_graph_to_futures(self, dsk, keys, restrictions, loose_restrictions,
priority, user_priority, resources, retries, fifo_timeout, actors)
2482 {
2483 "op": "update-graph",
-> 2484 "tasks": valmap(dumps_task, dsk3),
2485 "dependencies": dependencies,
2486 "keys": list(flatkeys),
/anaconda3/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in
cytoolz.dicttoolz.valmap()
/anaconda3/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in
cytoolz.dicttoolz.valmap()
/anaconda3/lib/python3.6/site-packages/distributed/worker.py in
dumps_task(task)
3075 return d
3076 elif not any(map(_maybe_complex, task[1:])):
-> 3077 return {"function": dumps_function(task[0]), "args":
warn_dumps(task[1:])}
3078 return to_serialize(task)
3079
/anaconda3/lib/python3.6/site-packages/distributed/worker.py in
dumps_function(func)
3044 cache[func] = result
3045 except TypeError:
-> 3046 result = pickle.dumps(func)
3047 return result
3048
/anaconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py in
dumps(x)
51 except Exception:
52 try:
---> 53 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
54 except Exception as e:
55 logger.info("Failed to serialize %s. Exception: %s", x, e)
/anaconda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in
dumps(obj, protocol)
1095 try:
1096 cp = CloudPickler(file, protocol=protocol)
-> 1097 cp.dump(obj)
1098 return file.getvalue()
1099 finally:
/anaconda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in
dump(self, obj)
355 self.inject_addons()
356 try:
--> 357 return Pickler.dump(self, obj)
358 except RuntimeError as e:
359 if 'recursion' in e.args[0]:
/anaconda3/lib/python3.6/pickle.py in dump(self, obj)
407 if self.proto >= 4:
408 self.framer.start_framing()
--> 409 self.save(obj)
410 self.write(STOP)
411 self.framer.end_framing()
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
/anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state,
listitems, dictitems, obj)
608 else:
609 save(func)
--> 610 save(args)
611 write(REDUCE)
612
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/pickle.py in save_tuple(self, obj)
749 write(MARK)
750 for element in obj:
--> 751 save(element)
752
753 if id(obj) in memo:
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
/anaconda3/lib/python3.6/pickle.py in _batch_setitems(self, items)
850 k, v = tmp[0]
851 save(k)
--> 852 save(v)
853 write(SETITEM)
854 # else tmp is empty, and we're done
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/pickle.py in save_tuple(self, obj)
734 if n <= 3 and self.proto >= 2:
735 for element in obj:
--> 736 save(element)
737 # Subtle. Same as in the big comment below.
738 if id(obj) in memo:
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in
save_instancemethod(self, obj)
861 else:
862 if PY3: # pragma: no branch
--> 863 self.save_reduce(types.MethodType, (obj.*func*, obj.*self*),
obj=obj)
864 else:
865 self.save_reduce(types.MethodType, (obj.*func*, obj.*self*, obj.*self*
.*class*),
/anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state,
listitems, dictitems, obj)
608 else:
609 save(func)
--> 610 save(args)
611 write(REDUCE)
612
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/pickle.py in save_tuple(self, obj)
734 if n <= 3 and self.proto >= 2:
735 for element in obj:
--> 736 save(element)
737 # Subtle. Same as in the big comment below.
738 if id(obj) in memo:
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
/anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state,
listitems, dictitems, obj)
632
633 if state is not None:
--> 634 save(state)
635 write(BUILD)
636
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
/anaconda3/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
/anaconda3/lib/python3.6/pickle.py in save_reduce(self, func, args, state,
listitems, dictitems, obj)
632
633 if state is not None:
--> 634 save(state)
635 write(BUILD)
636
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/anaconda3/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
/anaconda3/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
/anaconda3/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
494 reduce = getattr(obj, "*reduce_ex*", None)
495 if reduce is not None:
--> 496 rv = reduce(self.proto)
497 else:
498 reduce = getattr(obj, "*reduce*", None)
TypeError: can't pickle _thread._local objects
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#2787?email_source=notifications&email_token=AACKZTGAPBIJZTXGLRUBCT3P5JDIFA5CNFSM4HZMZIH2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODY66DNA#issuecomment-507371956>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AACKZTFTF4CEQBPVWVV4253P5JDIFANCNFSM4HZMZIHQ>
.
|
I'm sorry, I don't know what sig_filt is. You have something that isn't serializable. I'm not sure what else I can do to help. Good luck! |
Okay, thank you. |
I still recommend the following approach: #2787 (comment)
In general, lots of things can make a function not serializable. Certainly depending on an open file handle such as you have in your example would be one reasonable cause. I think that making h5py objects serializable is outside of the scope of Dask. There are a variety of workarounds such as the example I showed, opening and closing the file every time in a task, using a nicer format for serialization, like Zarr, or using a project like Xarray. I'm going to go ahead and close this now. Again, good luck! |
When I use Client to map a function to a Dask array made from an HDF5, the following error appears:
TypeError: can't pick _thread._local objects
Here is a simplified version of what I am trying to do:
After some testing, I believe the issue to lay with HDF in a lazy dask array function, which perhaps is not pickle-able when used in the map() function.
I am trying to implement Dask into the pyUSID Python package, which is built on h5py, for spectroscopy and imaging computation. Therefore, I need to use Dask with HDF.
I am using Python person 3.7.3 on a MacBook Air with a 1.8 GHz Intel Core i7 (4-core) processor and 4 gb RAM.
Here is the traceback:
The text was updated successfully, but these errors were encountered: