Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MissingResult State data is missing. #12977

Open
ahuang11 opened this issue Jan 10, 2023 · 7 comments
Open

MissingResult State data is missing. #12977

ahuang11 opened this issue Jan 10, 2023 · 7 comments

Comments

@ahuang11
Copy link
Contributor

Occurs when a worker's memory overflows (with_get_dask_client is commented out)

import dask.dataframe
import dask.distributed
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client

client = dask.distributed.Client()

@task
def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
    df = dask.datasets.timeseries(start, end, partition_freq="4w")
    return df

@task
def process_data(df) -> dask.dataframe.DataFrame:
    # with get_dask_client():
    df_yearly_avg = df.groupby(df.index.year).mean()
    return df_yearly_avg.compute()

@flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
def dask_flow():
    df = read_data.submit("1988", "2022")
    df_yearly_average = process_data.submit(df)
    return df_yearly_average

dask_flow()

Logs:


15:23:47.601 | INFO    | Task run 'read_data-5bc97744-0' - Finished in state Completed()
2023-01-10 15:23:51,373 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:51660 (pid=5119) exceeded 95% memory budget. Restarting...
15:23:51.373 | WARNING | distributed.nanny.memory - Worker tcp://127.0.0.1:51660 (pid=5119) exceeded 95% memory budget. Restarting...
15:23:51.463 | INFO    | distributed.nanny - Worker process 5119 was killed by signal 15
15:23:51.465 | INFO    | distributed.core - Connection to tcp://127.0.0.1:51668 has been closed.
15:23:51.466 | INFO    | distributed.scheduler - Remove worker <WorkerState 'tcp://127.0.0.1:51660', name: 1, status: running, memory: 1, processing: 1>
15:23:51.466 | INFO    | distributed.core - Removing comms to tcp://127.0.0.1:51660
2023-01-10 15:23:51,469 - distributed.nanny - WARNING - Restarting worker
15:23:51.469 | WARNING | distributed.nanny - Restarting worker
15:23:51.890 | INFO    | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:53762', name: 1, status: init, memory: 0, processing: 0>
15:23:51.892 | INFO    | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:53762
15:23:51.893 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:53765
15:23:52.804 | INFO    | Task run 'read_data-5bc97744-0' - Task run '9b5215ca-1822-4c6e-930d-34480fca578e' already finished.
15:23:53.025 | ERROR   | Task run 'process_data-090555ba-0' - Crash detected! Execution was interrupted by an unexpected exception: MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.

2023-01-10 15:23:53,499 - distributed.worker - WARNING - Compute Failed
Key:       process_data-090555ba-0-fa34a3b0fd834a6aa2fd9ece3d2c233c-1
Function:  begin_task_run
args:      ()
kwargs:    {'task': <prefect.tasks.Task object at 0x10cc5b6d0>, 'task_run': TaskRun(id=UUID('fa34a3b0-fd83-4a6a-a2fd-9ece3d2c233c'), name='process_data-090555ba-0', flow_run_id=UUID('51a4a2f7-bdbe-4433-b54f-bb0b60b9e95a'), task_key='__main__.process_data', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), tags=[], state_id=UUID('3a84e6eb-1a87-487f-8956-7a268a8fb1c3'), task_inputs={'df': [TaskRunResult(input_type='task_run', id=UUID('9b5215ca-1822-4c6e-930d-34480fca578e'))]}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 1, 10, 23, 23, 46, 173061, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=41
Exception: "MissingResult('State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.')"

15:23:53.499 | WARNING | distributed.worker - Compute Failed
Key:       process_data-090555ba-0-fa34a3b0fd834a6aa2fd9ece3d2c233c-1
Function:  begin_task_run
args:      ()
kwargs:    {'task': <prefect.tasks.Task object at 0x10cc5b6d0>, 'task_run': TaskRun(id=UUID('fa34a3b0-fd83-4a6a-a2fd-9ece3d2c233c'), name='process_data-090555ba-0', flow_run_id=UUID('51a4a2f7-bdbe-4433-b54f-bb0b60b9e95a'), task_key='__main__.process_data', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), tags=[], state_id=UUID('3a84e6eb-1a87-487f-8956-7a268a8fb1c3'), task_inputs={'df': [TaskRunResult(input_type='task_run', id=UUID('9b5215ca-1822-4c6e-930d-34480fca578e'))]}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 1, 10, 23, 23, 46, 173061, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=41
Exception: "MissingResult('State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.')"

Traceback:

---------------------------------------------------------------------------
MissingResult                             Traceback (most recent call last)
Cell In[1], line 24
     21     df_yearly_average = process_data.submit(df)
     22     return df_yearly_average
---> 24 dask_flow()

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/flows.py:448, in Flow.__call__(self, return_state, wait_for, *args, **kwargs)
    444 parameters = get_call_parameters(self.fn, args, kwargs)
    446 return_type = "state" if return_state else "result"
--> 448 return enter_flow_run_engine_from_flow_call(
    449     self,
    450     parameters,
    451     wait_for=wait_for,
    452     return_type=return_type,
    453 )

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:161, in enter_flow_run_engine_from_flow_call(flow, parameters, wait_for, return_type)
    157 elif in_async_main_thread():
    158     # An event loop is already running and we must create a blocking portal to
    159     # run async code from this synchronous context
    160     with start_blocking_portal() as portal:
--> 161         return portal.call(begin_run)
    162 else:
    163     # An event loop is not running so we will create one
    164     return anyio.run(begin_run)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/from_thread.py:283, in BlockingPortal.call(self, func, *args)
    268 def call(
    269     self,
    270     func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
    271     *args: object
    272 ) -> T_Retval:
    273     """
    274     Call the given function in the event loop thread.
    275 
   (...)
    281 
    282     """
--> 283     return cast(T_Retval, self.start_task_soon(func, *args).result())

File ~/mambaforge/envs/dask/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File ~/mambaforge/envs/dask/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/from_thread.py:219, in BlockingPortal._call_func(self, func, args, kwargs, future)
    216             else:
    217                 future.add_done_callback(callback)
--> 219             retval = await retval
    220 except self._cancelled_exc_class:
    221     future.cancel()

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/client/utilities.py:47, in inject_client.<locals>.with_injected_client(*args, **kwargs)
     45 async with client_context as new_client:
     46     kwargs.setdefault("client", new_client or client)
---> 47     return await fn(*args, **kwargs)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:244, in create_then_begin_flow_run(flow, parameters, wait_for, return_type, client)
    242     return state
    243 elif return_type == "result":
--> 244     return await state.result(fetch=True)
    245 else:
    246     raise ValueError(f"Invalid return type for flow engine {return_type!r}.")

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/states.py:89, in _get_state_result(state, raise_on_failure)
     84     raise PausedRun("Run paused.")
     86 if raise_on_failure and (
     87     state.is_crashed() or state.is_failed() or state.is_cancelled()
     88 ):
---> 89     raise await get_state_exception(state)
     91 if isinstance(state.data, DataDocument):
     92     result = result_from_state_with_data_document(
     93         state, raise_on_failure=raise_on_failure
     94     )

File ~/Applications/python/prefect-dask/prefect_dask/task_runners.py:269, in DaskTaskRunner.wait(self, key, timeout)
    267 future = self._get_dask_future(key)
    268 try:
--> 269     return await future.result(timeout=timeout)
    270 except distributed.TimeoutError:
    271     return None

File ~/Applications/python/test/distributed/distributed/client.py:296, in Future._result(self, raiseit)
    294 if raiseit:
    295     typ, exc, tb = exc
--> 296     raise exc.with_traceback(tb)
    297 else:
    298     return exc

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:1303, in begin_task_run()
   1297     raise RuntimeError(
   1298         f"Cannot orchestrate task run '{task_run.id}'. "
   1299         f"Failed to connect to API at {client.api_url}."
   1300     ) from connect_error
   1302 try:
-> 1303     state = await orchestrate_task_run(
   1304         task=task,
   1305         task_run=task_run,
   1306         parameters=parameters,
   1307         wait_for=wait_for,
   1308         result_factory=result_factory,
   1309         log_prints=log_prints,
   1310         interruptible=interruptible,
   1311         client=client,
   1312     )
   1314     if not maybe_flow_run_context:
   1315         # When a a task run finishes on a remote worker flush logs to prevent
   1316         # loss if the process exits
   1317         OrionHandler.flush(block=True)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:1378, in orchestrate_task_run()
   1367 partial_task_run_context = PartialModel(
   1368     TaskRunContext,
   1369     task_run=task_run,
   (...)
   1373     log_prints=log_prints,
   1374 )
   1376 try:
   1377     # Resolve futures in parameters into data
-> 1378     resolved_parameters = await resolve_inputs(parameters)
   1379     # Resolve futures in any non-data dependencies to ensure they are ready
   1380     await resolve_inputs(wait_for, return_data=False)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:1671, in resolve_inputs()
   1668     # Only retrieve the result if requested as it may be expensive
   1669     return state.result(raise_on_failure=False, fetch=True) if return_data else None
-> 1671 return await run_sync_in_worker_thread(
   1672     visit_collection,
   1673     parameters,
   1674     visit_fn=resolve_input,
   1675     return_data=return_data,
   1676     max_depth=max_depth,
   1677     remove_annotations=True,
   1678     context={},
   1679 )

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:91, in run_sync_in_worker_thread()
     80 """
     81 Runs a sync function in a new worker thread so that the main thread's event loop
     82 is not blocked
   (...)
     88 thread may continue running — the outcome will just be ignored.
     89 """
     90 call = partial(__fn, *args, **kwargs)
---> 91 return await anyio.to_thread.run_sync(
     92     call, cancellable=True, limiter=get_thread_limiter()
     93 )

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/to_thread.py:31, in run_sync()
     10 async def run_sync(
     11     func: Callable[..., T_Retval],
     12     *args: object,
     13     cancellable: bool = False,
     14     limiter: Optional[CapacityLimiter] = None
     15 ) -> T_Retval:
     16     """
     17     Call the given function with the given arguments in a worker thread.
     18 
   (...)
     29 
     30     """
---> 31     return await get_asynclib().run_sync_in_worker_thread(
     32         func, *args, cancellable=cancellable, limiter=limiter
     33     )

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:937, in run_sync_in_worker_thread()
    935 context.run(sniffio.current_async_library_cvar.set, None)
    936 worker.queue.put_nowait((context, func, args, future))
--> 937 return await future

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:867, in run()
    865 exception: Optional[BaseException] = None
    866 try:
--> 867     result = context.run(func, *args)
    868 except BaseException as exc:
    869     exception = exc

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:318, in visit_collection()
    316 elif typ in (dict, OrderedDict):
    317     assert isinstance(expr, (dict, OrderedDict))  # typecheck assertion
--> 318     items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
    319     result = typ(items) if return_data else None
    321 elif is_dataclass(expr) and not isinstance(expr, type):

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:318, in <listcomp>()
    316 elif typ in (dict, OrderedDict):
    317     assert isinstance(expr, (dict, OrderedDict))  # typecheck assertion
--> 318     items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
    319     result = typ(items) if return_data else None
    321 elif is_dataclass(expr) and not isinstance(expr, type):

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:264, in visit_nested()
    262 def visit_nested(expr):
    263     # Utility for a recursive call, preserving options and updating the depth.
--> 264     return visit_collection(
    265         expr,
    266         visit_fn=visit_fn,
    267         return_data=return_data,
    268         remove_annotations=remove_annotations,
    269         max_depth=max_depth - 1,
    270         # Copy the context on nested calls so it does not "propagate up"
    271         context=context.copy() if context is not None else None,
    272     )

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:281, in visit_collection()
    278         return visit_fn(expr)
    280 # Visit every expression
--> 281 result = visit_expression(expr)
    283 if return_data:
    284     # Only mutate the expression while returning data, otherwise it could be null
    285     expr = result

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:276, in visit_expression()
    274 def visit_expression(expr):
    275     if context is not None:
--> 276         return visit_fn(expr, context)
    277     else:
    278         return visit_fn(expr)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:1669, in resolve_input()
   1664     raise UpstreamTaskError(
   1665         f"Upstream task run '{state.state_details.task_run_id}' did not reach a 'COMPLETED' state."
   1666     )
   1668 # Only retrieve the result if requested as it may be expensive
-> 1669 return state.result(raise_on_failure=False, fetch=True) if return_data else None

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/client/schemas.py:107, in result()
     35 """
     36 Retrieve the result attached to this state.
     37 
   (...)
    103     hello
    104 """
    105 from prefect.states import get_state_result
--> 107 return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/states.py:74, in get_state_result()
     72         return state.data
     73 else:
---> 74     return _get_state_result(state, raise_on_failure=raise_on_failure)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:226, in coroutine_wrapper()
    222     return async_fn(*args, **kwargs)
    223 elif in_async_worker_thread():
    224     # In a sync context but we can access the event loop thread; send the async
    225     # call to the parent
--> 226     return run_async_from_worker_thread(async_fn, *args, **kwargs)
    227 else:
    228     # In a sync context and there is no event loop; just create an event loop
    229     # to run the async code then tear it down
    230     return run_async_in_new_loop(async_fn, *args, **kwargs)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:177, in run_async_from_worker_thread()
    172 """
    173 Runs an async function in the main thread's event loop, blocking the worker
    174 thread until completion
    175 """
    176 call = partial(__fn, *args, **kwargs)
--> 177 return anyio.from_thread.run(call)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/from_thread.py:49, in run()
     46 except AttributeError:
     47     raise RuntimeError("This function can only be run from an AnyIO worker thread")
---> 49 return asynclib.run_async_from_thread(func, *args)

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:970, in run_async_from_thread()
    964 def run_async_from_thread(
    965     func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object
    966 ) -> T_Retval:
    967     f: concurrent.futures.Future[T_Retval] = asyncio.run_coroutine_threadsafe(
    968         func(*args), threadlocals.loop
    969     )
--> 970     return f.result()

File ~/mambaforge/envs/dask/lib/python3.9/concurrent/futures/_base.py:446, in result()
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File ~/mambaforge/envs/dask/lib/python3.9/concurrent/futures/_base.py:391, in __get_result()
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/states.py:101, in _get_state_result()
     99         return await get_state_exception(state)
    100     else:
--> 101         raise MissingResult(
    102             "State data is missing. "
    103             "Typically, this occurs when result persistence is disabled and the "
    104             "state has been retrieved from the API."
    105         )
    107 else:
    108     # The result is attached directly
    109     result = state.data

MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
@AleksandrLiadov
Copy link

@ahuang11, @madkinsz hello, Did you resolve this issue?

I have met the same issue :(

And it seems to be a little bit random, sometimes my flows are executed without this problem.

@desertaxle desertaxle transferred this issue from PrefectHQ/prefect-dask Apr 26, 2024
@tonal
Copy link

tonal commented May 3, 2024

I have same iusse in RayTaskRunner. :(

@tonal
Copy link

tonal commented May 3, 2024

For me, the situation has been consistently
reproduced in the following situation:

@task(tags={'load'}, log_prints=True, retries=3, retry_delay_seconds=5,)
async def load_raw(url:str, auth:Auth|None=None, **kwds) -> bytes:
  content = await _load_raw_inner(url, auth=auth, **kwds)
  return content

_load_raw = load_raw.with_options(
  name=f'load-raw-{PROV_NAME.lower()}', tags=load_raw.tags | {PROV_NAME},
  cache_key_fn=task_input_hash, persist_result=True,
  cache_expiration=_cache_expiration, retries=3, retry_delay_seconds=5,)

@flow
async def epool_rem_gr_active_group_csv():
  cont = await _load_raw(URL, verify=False, timeout=15.)
  ...

Fixed it by moving the caching parameters to the task definition:

@task(
  tags={'load'}, log_prints=True,
  cache_key_fn=task_input_hash, persist_result=True, retries=3, retry_delay_seconds=5,)
async def load_raw(url:str, auth:Auth|None=None, **kwds) -> bytes:
  content = await _load_raw_inner(url, auth=auth, **kwds)
  return content

_load_raw = load_raw.with_options(
  name=f'load-raw-{PROV_NAME.lower()}', tags=load_raw.tags | {PROV_NAME},
  cache_expiration=_cache_expiration,)

@flow
async def epool_rem_gr_active_group_csv():
  cont = await _load_raw(URL, verify=False, timeout=15.)
  ...

@tonal
Copy link

tonal commented May 4, 2024

Fix do not work. :(

@tonal
Copy link

tonal commented May 4, 2024

see also #8228

@tonal
Copy link

tonal commented May 4, 2024

and #8415

@tonal
Copy link

tonal commented May 7, 2024

Downgrate prefect to 2.16.9
The error has almost disappeared

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants