Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make annotations available in the thread_state #6932

Open
averri opened this issue Aug 22, 2022 · 4 comments
Open

Make annotations available in the thread_state #6932

averri opened this issue Aug 22, 2022 · 4 comments

Comments

@averri
Copy link

averri commented Aug 22, 2022

The following is a useful code that Dask distributed provides to get the key from a task, inside the task code:

from distributed.worker import thread_state
task_key = thread_state.key

I would like to suggest making the annotations available using the same mechanism:

from distributed.worker import thread_state
# Get the annotations, if defined.
annotations = thread_state.annotations 

This is very important to facilitate the implementation of distributed tracing and log correlation using single request ids, provided via annotations.

@averri averri changed the title Make annotations availble in the thread_state Make annotations available in the thread_state Aug 22, 2022
@jrbourbeau jrbourbeau transferred this issue from dask/dask Aug 22, 2022
@jrbourbeau
Copy link
Member

jrbourbeau commented Aug 22, 2022

Thanks for the issue @averri. I've transferred this over to the dask/distributed repo as it's specifically about thread_state (which lives in distributed). Though this request is interesting as it touches both dask and distributed.

Without digging too deeply, this seems not unreasonable as the worker (and worker state machine) TaskState objects already have an .annotations attribute storing this information. That said, I do wonder how public thread_state is intended to be.

cc @gjoseph92 @fjetter as you may find this interesting

@averri
Copy link
Author

averri commented Aug 22, 2022

Hi @jrbourbeau, thank you so much for your reply. I agree with you, TaskState has a good amount of useful information, including the annotations. It would be extremely useful to make more of that information available to the workers, easily accessible in the task's code. I have looked into the source code of the distributed/worker.py module, here is the notable part:

image

It needs a change in the signature of the function apply_function to pass the annotations, also the other functions that invoke this one.

@fjetter
Copy link
Member

fjetter commented Aug 23, 2022

I'm a bit on the fence here.

  1. If we wanted to expose anything like this, I think we should go through the efforts of introducing a function that does what we're looking for. This is easier to support in terms of lifecycle/deprecation/etc than plain thread local. thread local might be the way it is implemented but that would not be the official API. Basically a wrapper, similar to get_worker
  2. Theoretically we could expose the entire TaskState object but I do not want the entire TaskState object to be considered public API. This is very low level stuff and mutation of that object would very likely deadlock the cluster. Exposing the annotations would be possible but I'm not sure how many applications would benefit from this

I also see the problem that annotations are used to set various other things, like resources, retries, etc. We'd need to very explicitly state that this kind of behavior is not possible.

This is very important to facilitate the implementation of distributed tracing and log correlation using single request ids, provided via annotations.

This is interesting and I'd love to learn more about your use case and how we can help beyond annotations. We're tracing some internal calls as well with a unique ID and I wanted to improve/rework our logging system for a very long time now, see #4762

@averri
Copy link
Author

averri commented Aug 23, 2022

Hi @fjetter, I agree with your points (1) and (2). For point (2), maybe it should use another name different than annotations... I thought about a generic name like get_context, that can be invoked from inside the task code. This get_context can be useful for propagating any kind of cross-cutting concern like logging, security, etc. Maybe this is not useful for users doing ad-hoc manipulations with large datasets, but it is extremely useful for developers using Dask to scale web applications.

So, the get_context can be used in workflows generated with something like:

# Here the context_data will be made available to all tasks submitted inside this context manager.
with distributed.context(**context_data): 
  with Client(**params) as cli:
    # Submit the workload, using map/submit/get

From any task's code, developers can use:

context = get_context(default={}) 
if "user_roles" in context and "admin" in context["user_roles"]:
  # Do something that only admins can do.

A custom logging formatter can inspect the context and extract information to print the logs:

class DaskLogContextFormatter(Formatter):

    def format(self, record: LogRecord):
        # get_context from Dask distributed.
        context = get_context()
        if context and "request_id" in context:
          record.request_id = context["request_id"]
        return self.format(record)

Configuring the Python logging formatter above and providing the request_id in the context manager, all log records from the entire stack can be correlated by the request_id, which is a UUID. This is just a simple example. I believe that the logs should stay in the worker node, to be collected by any logging collector, part of logging stacks (ELK, etc).

More powerful features like the ones provided by Openmonitoring can also be implemented using that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants