TypeError Traceback (most recent call last)
/tmp/ipykernel_1124063/2279001398.py in <module>
----> 1 df.memory_usage().sum().compute()
~/workspace/dask/dask/base.py in compute(self, **kwargs)
290 dask.base.compute
291 """
--> 292 (result,) = compute(self, traverse=False, **kwargs)
293 return result
294
~/workspace/dask/dask/base.py in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
573 postcomputes.append(x.__dask_postcompute__())
574
--> 575 results = schedule(dsk, keys, **kwargs)
576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
577
~/workspace/distributed/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
3008 should_rejoin = False
3009 try:
-> 3010 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3011 finally:
3012 for f in futures.values():
~/workspace/distributed/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
2160 else:
2161 local_worker = None
-> 2162 return self.sync(
2163 self._gather,
2164 futures,
~/workspace/distributed/distributed/utils.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
309 return future
310 else:
--> 311 return sync(
312 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
313 )
~/workspace/distributed/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
376 if error:
377 typ, exc, tb = error
--> 378 raise exc.with_traceback(tb)
379 else:
380 return result
~/workspace/distributed/distributed/utils.py in f()
349 future = asyncio.wait_for(future, callback_timeout)
350 future = asyncio.ensure_future(future)
--> 351 result = yield future
352 except Exception:
353 error = sys.exc_info()
~/mambaforge/lib/python3.9/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/workspace/distributed/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
2023 exc = CancelledError(key)
2024 else:
-> 2025 raise exception.with_traceback(traceback)
2026 raise exc
2027 if errors == "skip":
~/workspace/dask/dask/optimization.py in __call__()
967 if not len(args) == len(self.inkeys):
968 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
970
971 def __reduce__(self):
~/workspace/dask/dask/core.py in get()
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
~/workspace/dask/dask/core.py in _execute_task()
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
~/workspace/dask/dask/core.py in <genexpr>()
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
~/workspace/dask/dask/core.py in _execute_task()
111 """
112 if isinstance(arg, list):
--> 113 return [_execute_task(a, cache) for a in arg]
114 elif istask(arg):
115 func, args = arg[0], arg[1:]
~/workspace/dask/dask/core.py in <listcomp>()
111 """
112 if isinstance(arg, list):
--> 113 return [_execute_task(a, cache) for a in arg]
114 elif istask(arg):
115 func, args = arg[0], arg[1:]
~/workspace/dask/dask/core.py in _execute_task()
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
~/workspace/dask/dask/dataframe/io/demo.py in __call__()
91
92 def __call__(self, part):
---> 93 divisions, state_data = part
94 if isinstance(state_data, int):
95 state_data = random_state_data(1, state_data)
TypeError: cannot unpack non-iterable Serialize object
2022-03-13 10:36:26,512 - distributed.worker - WARNING - Compute Failed
Key: ('series-groupby-sum-chunk-392a540a6809308bf6f7844bff67de62-eb05d589544154f29d67386652a6d3cc', 13)
Function: subgraph_callable-44728467-2d88-420b-84ca-ea7e36a4
args: (<Serialize: ([Timestamp('2000-01-14 00:00:00', freq='D'), Timestamp('2000-01-15 00:00:00', freq='D')], 1660101422)>)
kwargs: {}
Exception: "TypeError('cannot unpack non-iterable Serialize object')"
I get the following error from main. This doesn't appear to happen when using multiple processes.
Traceback
TypeError Traceback (most recent call last) /tmp/ipykernel_1124063/2279001398.py in <module> ----> 1 df.memory_usage().sum().compute() ~/workspace/dask/dask/base.py in compute(self, **kwargs) 290 dask.base.compute 291 """ --> 292 (result,) = compute(self, traverse=False, **kwargs) 293 return result 294 ~/workspace/dask/dask/base.py in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs) 573 postcomputes.append(x.__dask_postcompute__()) 574 --> 575 results = schedule(dsk, keys, **kwargs) 576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 577 ~/workspace/distributed/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs) 3008 should_rejoin = False 3009 try: -> 3010 results = self.gather(packed, asynchronous=asynchronous, direct=direct) 3011 finally: 3012 for f in futures.values(): ~/workspace/distributed/distributed/client.py in gather(self, futures, errors, direct, asynchronous) 2160 else: 2161 local_worker = None -> 2162 return self.sync( 2163 self._gather, 2164 futures, ~/workspace/distributed/distributed/utils.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 309 return future 310 else: --> 311 return sync( 312 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 313 ) ~/workspace/distributed/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs) 376 if error: 377 typ, exc, tb = error --> 378 raise exc.with_traceback(tb) 379 else: 380 return result ~/workspace/distributed/distributed/utils.py in f() 349 future = asyncio.wait_for(future, callback_timeout) 350 future = asyncio.ensure_future(future) --> 351 result = yield future 352 except Exception: 353 error = sys.exc_info() ~/mambaforge/lib/python3.9/site-packages/tornado/gen.py in run(self) 760 761 try: --> 762 value = future.result() 763 except Exception: 764 exc_info = sys.exc_info() ~/workspace/distributed/distributed/client.py in _gather(self, futures, errors, direct, local_worker) 2023 exc = CancelledError(key) 2024 else: -> 2025 raise exception.with_traceback(traceback) 2026 raise exc 2027 if errors == "skip": ~/workspace/dask/dask/optimization.py in __call__() 967 if not len(args) == len(self.inkeys): 968 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args))) --> 969 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args))) 970 971 def __reduce__(self): ~/workspace/dask/dask/core.py in get() 147 for key in toposort(dsk): 148 task = dsk[key] --> 149 result = _execute_task(task, cache) 150 cache[key] = result 151 result = _execute_task(out, cache) ~/workspace/dask/dask/core.py in _execute_task() 117 # temporaries by their reference count and can execute certain 118 # operations in-place. --> 119 return func(*(_execute_task(a, cache) for a in args)) 120 elif not ishashable(arg): 121 return arg ~/workspace/dask/dask/core.py in <genexpr>() 117 # temporaries by their reference count and can execute certain 118 # operations in-place. --> 119 return func(*(_execute_task(a, cache) for a in args)) 120 elif not ishashable(arg): 121 return arg ~/workspace/dask/dask/core.py in _execute_task() 111 """ 112 if isinstance(arg, list): --> 113 return [_execute_task(a, cache) for a in arg] 114 elif istask(arg): 115 func, args = arg[0], arg[1:] ~/workspace/dask/dask/core.py in <listcomp>() 111 """ 112 if isinstance(arg, list): --> 113 return [_execute_task(a, cache) for a in arg] 114 elif istask(arg): 115 func, args = arg[0], arg[1:] ~/workspace/dask/dask/core.py in _execute_task() 117 # temporaries by their reference count and can execute certain 118 # operations in-place. --> 119 return func(*(_execute_task(a, cache) for a in args)) 120 elif not ishashable(arg): 121 return arg ~/workspace/dask/dask/dataframe/io/demo.py in __call__() 91 92 def __call__(self, part): ---> 93 divisions, state_data = part 94 if isinstance(state_data, int): 95 state_data = random_state_data(1, state_data) TypeError: cannot unpack non-iterable Serialize object