Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure log_event of non-msgpack serializable object do not kill servers #7472

Merged
merged 5 commits into from May 11, 2023

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Jan 12, 2023

Closes #7471

For the purposes this log system is intended for, I believe sticking to msgpack serializable objects is a good choice. There are cases where this could be surprising (as in #7471 where a numpy scalar is encountered) but I believe raising properly w/out killing the worker is a decent compromise

I copied the doc strings from Client.log_event

@fjetter
Copy link
Member Author

fjetter commented Jan 12, 2023

Stumbled over #7473

@github-actions
Copy link
Contributor

github-actions bot commented Jan 12, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       26 files  +       22         26 suites  +22   15h 37m 17s ⏱️ + 14h 25m 23s
  3 584 tests +     395    3 476 ✔️ +     501     105 💤  -    109  3 +3 
45 379 runs  +39 086  43 214 ✔️ +37 512  2 162 💤 +1 571  3 +3 

For more details on these failures, see this check.

Results for commit 4b5d7c4. ± Comparison against base commit 2ab8cbd.

♻️ This comment has been updated with latest results.

@graingert
Copy link
Member

graingert commented May 3, 2023

this change means that log_event can no longer handle distributed.protocol.serialize.Serialize types, which causes distributed/tests/test_utils_test.py::test_fail_hard to fail

_______________________________________________________________________________ test_fail_hard[False] ________________________________________________________________________________

self = <FailWorker 'tcp://127.0.0.1:34987', status: closed, stored: 0, running: 0/20, ready: 0, comm: 0, waiting: 0>, args = (), kwargs = {}

    @wraps(method)
    async def wrapper(self, *args: P.args, **kwargs: P.kwargs) -> Any:
        try:
>           return await method(self, *args, **kwargs)  # type: ignore

distributed/worker.py:207: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <FailWorker 'tcp://127.0.0.1:34987', status: closed, stored: 0, running: 0/20, ready: 0, comm: 0, waiting: 0>

    @fail_hard
    async def fail_async(self):
>       raise CustomError()
E       test_utils_test.test_fail_hard.<locals>.CustomError

distributed/tests/test_utils_test.py:855: CustomError

During handling of the above exception, another exception occurred:

sync = False

    @pytest.mark.parametrize("sync", [True, False])
    def test_fail_hard(sync):
        """@fail_hard is a last resort when error handling for everything that we foresaw
        could possibly go wrong failed.
        Instead of trying to force a crash here, we'll write custom methods which do crash.
        """
    
        class CustomError(Exception):
            pass
    
        class FailWorker(Worker):
            @fail_hard
            def fail_sync(self):
                raise CustomError()
    
            @fail_hard
            async def fail_async(self):
                raise CustomError()
    
        test_done = False
    
        @gen_cluster(nthreads=[])
        async def test(s):
            nonlocal test_done
            with captured_logger("distributed.worker") as logger:
                async with FailWorker(s.address) as a:
                    with pytest.raises(CustomError):
                        if sync:
                            a.fail_sync()
                        else:
                            await a.fail_async()
    
                    while a.status != Status.closed:
                        await asyncio.sleep(0.01)
                method_name = "fail_sync" if sync else "fail_async"
                assert f"worker-{method_name}-fail-hard" in logger.getvalue()
    
            test_done = True
    
        with pytest.raises(CustomError):
>           test()

distributed/tests/test_utils_test.py:878: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../anaconda3/envs/dask-distributed/lib/python3.11/contextlib.py:81: in inner
    return func(*args, **kwds)
../../anaconda3/envs/dask-distributed/lib/python3.11/contextlib.py:81: in inner
    return func(*args, **kwds)
distributed/utils_test.py:1098: in test_func
    return _run_and_close_tornado(async_fn_outer)
distributed/utils_test.py:377: in _run_and_close_tornado
    return asyncio.run(inner_fn())
../../anaconda3/envs/dask-distributed/lib/python3.11/asyncio/runners.py:190: in run
    return runner.run(main)
../../anaconda3/envs/dask-distributed/lib/python3.11/asyncio/runners.py:118: in run
    return self._loop.run_until_complete(task)
../../anaconda3/envs/dask-distributed/lib/python3.11/asyncio/base_events.py:653: in run_until_complete
    return future.result()
distributed/utils_test.py:374: in inner_fn
    return await async_fn(*args, **kwargs)
distributed/utils_test.py:1095: in async_fn_outer
    return await utils_wait_for(async_fn(), timeout=timeout * 2)
distributed/utils.py:1873: in wait_for
    return await fut
distributed/utils_test.py:1003: in async_fn
    result = await coro2
distributed/utils.py:1873: in wait_for
    return await fut
distributed/tests/test_utils_test.py:868: in test
    await a.fail_async()
distributed/worker.py:210: in wrapper
    self.log_event("worker-fail-hard", error_message(e))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <FailWorker 'tcp://127.0.0.1:34987', status: closed, stored: 0, running: 0/20, ready: 0, comm: 0, waiting: 0>, topic = 'worker-fail-hard'
msg = {'exception': <Serialize: b"\x80\x05\x95\xda\x03\x00\x00\x00\x00\x00\x00\x8c\x16tblib.pickling_support\x94\x8c\x12unpi...94K\x00\x8c\x08co_flags\x94K@\x8c\x0eco_firstlineno\x94K\x00ub\x8c\x08f_lineno\x94MW\x03ubMW\x03N\x87\x94R\x94.'>, ...}

    def log_event(self, topic: str | Collection[str], msg: Any) -> None:
        """Log an event under a given topic
    
        Parameters
        ----------
        topic : str, list[str]
            Name of the topic under which to log an event. To log the same
            event under multiple topics, pass a list of topic names.
        msg
            Event message to log. Note this must be msgpack serializable.
    
        See also
        --------
        Client.log_event
        """
        if not _is_msgpack_serializable(msg):
>           raise TypeError(
                f"Message must be msgpack serializable. Got {type(msg)=} instead."
            )
E           TypeError: Message must be msgpack serializable. Got type(msg)=<class 'dict'> instead.

distributed/worker.py:996: TypeError

distributed/tests/test_scheduler.py::test_transition_counter_max_worker fails for the same reason

2023-05-03 13:55:40,136 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7ff3469ed050>>, <Task finished name='Task-9' coro=<Worker.handle_scheduler() done, defined at /home/graingert/projects/distributed/distributed/worker.py:204> exception=TypeError("Message must be msgpack serializable. Got type(msg)=<class 'dict'> instead.")>)
Traceback (most recent call last):
  File "/home/graingert/projects/distributed/distributed/worker.py", line 220, in wrapper
    return method(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/graingert/projects/distributed/distributed/worker.py", line 1960, in handle_stimulus
    super().handle_stimulus(*stims)
  File "/home/graingert/projects/distributed/distributed/worker_state_machine.py", line 3706, in handle_stimulus
    instructions = self.state.handle_stimulus(*stims)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/graingert/projects/distributed/distributed/worker_state_machine.py", line 1357, in handle_stimulus
    instructions += self._transitions(recs, stimulus_id=stim.stimulus_id)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/graingert/projects/distributed/distributed/worker_state_machine.py", line 2698, in _transitions
    process_recs(recommendations.copy())
  File "/home/graingert/projects/distributed/distributed/worker_state_machine.py", line 2692, in process_recs
    a_recs, a_instructions = self._transition(
                             ^^^^^^^^^^^^^^^^^
  File "/home/graingert/projects/distributed/distributed/worker_state_machine.py", line 2605, in _transition
    raise TransitionCounterMaxExceeded(ts.key, start, finish, self.story(ts))
distributed.worker_state_machine.TransitionCounterMaxExceeded: TransitionCounterMaxExceeded: inc-deebfb8e8b05bf230e909b88993dc421 :: released->waiting
  Story:
    ('inc-deebfb8e8b05bf230e909b88993dc421', 'compute-task', 'released', 'compute-task-1683118540.0337124', 1683118540.0339403)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/graingert/anaconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/ioloop.py", line 738, in _run_callback
    ret = callback()
          ^^^^^^^^^^
  File "/home/graingert/anaconda3/envs/dask-distributed/lib/python3.11/site-packages/tornado/ioloop.py", line 762, in _discard_future_result
    future.result()
  File "/home/graingert/projects/distributed/distributed/worker.py", line 207, in wrapper
    return await method(self, *args, **kwargs)  # type: ignore
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/graingert/projects/distributed/distributed/worker.py", line 1302, in handle_scheduler
    await self.handle_stream(comm)
  File "/home/graingert/projects/distributed/distributed/core.py", line 916, in handle_stream
    handler(**merge(extra, msg))
  File "/home/graingert/projects/distributed/distributed/worker.py", line 1945, in _
    self.handle_stimulus(event)
  File "/home/graingert/projects/distributed/distributed/worker.py", line 223, in wrapper
    self.log_event("worker-fail-hard", error_message(e))
  File "/home/graingert/projects/distributed/distributed/worker.py", line 996, in log_event
    raise TypeError(
TypeError: Message must be msgpack serializable. Got type(msg)=<class 'dict'> instead.

@hendrikmakait
Copy link
Member

Is this one ready for review?

@graingert
Copy link
Member

@hendrikmakait yep, it's ready for review thanks!

@hendrikmakait hendrikmakait added the needs review Needs review from a contributor. label May 10, 2023
@@ -7818,6 +7818,20 @@ async def get_worker_logs(self, n=None, workers=None, nanny=False):
return results

def log_event(self, topic: str | Collection[str], msg: Any) -> None:
"""Log an event under a given topic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to test this as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's tested indirectly by test_nanny.py::test_log_event because it calls the scheduler log_event

I'm happy to add a more direct test if you think it's needed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd appreciate an explicit test, that would make it easier to narrow down where things break.

Copy link
Member

@graingert graingert May 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hendrikmakait there's already a test that calls this: test_configurable_events_log_length and the only way a user should be calling the scheduler.log_event is via RPC which will have already checked if the message _is_dumpable so it doesn't need to be checked again

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying!

@hendrikmakait hendrikmakait merged commit b68d71d into dask:main May 11, 2023
29 of 33 checks passed
milesgranger pushed a commit to milesgranger/distributed that referenced this pull request May 15, 2023
…rs (dask#7472)

Co-authored-by: Thomas Grainger <tagrain@gmail.com>
@fjetter fjetter deleted the add_docs_log_event branch June 14, 2023 09:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs review Needs review from a contributor.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Logging an object with incompatible type kills Dask worker
3 participants