Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: Pre-post task hooks #4789

Closed
crepererum opened this issue May 10, 2019 · 12 comments
Closed

Feature Request: Pre-post task hooks #4789

crepererum opened this issue May 10, 2019 · 12 comments

Comments

@crepererum
Copy link
Contributor

Abstract

I would like to request that dask (and therefore distributed) support pre-/post-task hooks. These are functions that can be executed before/after a node of the computation graph is executed on the target system (subprocess, distributed worker).

Applications

  • logging
  • debugging
  • interpreter state clean-up / preparation
  • sending out prometheus data via push protocol

Technical Details

  • hooks should be installed by the client
  • both, pre and post hooks, can be installed as:
    • elementary hooks: use the computation graph before the optimization like fusing, helpful for detailed debugging/logging
    • block hooks: use the computation graph after the optimization like fusing, helpful for interpreter setup/cleanup and prometheus data push
  • hooks should get the node they're currently working on as a parameter

Alternatives

This should currently be possible by writing a custom "optimization" pass, but it kinda feels like this would abuse the system. On the other hand, it is a generic system and we could just extend the documentation to mention that such a "bending" would be possible.

@jcrist
Copy link
Member

jcrist commented May 10, 2019

This is already doable with scheduler plugins, which provide a general way to hook into the scheduler for custom feedback. I suspect the information exposed in the transition method would be sufficient for your usecase.

@crepererum
Copy link
Contributor Author

@jcrist scheduler plugins only run on the scheduler, not on the worker, or do I misunderstood that? Use cases listed above (e.g. prometheus metric push and interpreter cleanup) would require the code to be executed on the worker.

@jcrist
Copy link
Member

jcrist commented May 13, 2019

I'm not sure what you mean by prometheus metric push, but if you're just trying to push a log every time a task completes, a scheduler plugin may be sufficient. For running code actually on the workers you'd currently need to implement this by wrapping a task. You'd have two options here:

  • Implement this as an optimization pass that transforms all tasks to whatever you need. Here's a simple implementation:
from dask.core import istask

def wrapper(func, *args):
    print("Before task")
    out = func(*args)
    print("After task)
    return out

def wrap_tasks(dsk):
    # Note that this only handles top-level tasks, and not nested tasks
    # Rewrites (func, a, b, c) -> (wrapper, func, a, b, c)
    return {k: (wrapper,) + v if istask(v) else v
            for k, v in dsk.items()}
  • Or implement update_graph for a scheduler plugin: https://distributed.dask.org/en/latest/plugins.html. This is called every time a new graph is sent to the scheduler. This wouldn't require the client to change anything, but would be trickier as the scheduler has to handle more things here.

Worker plugins could be added, but we'd want to determine what plugin points were needed. If you think this is something you'd be interested in, it would be useful to better understand your use cases. In particular, why do you need to run code on the worker pre/post each task? Can you elaborate on your applications listed above?

@crepererum
Copy link
Contributor Author

First of all, thanks for the very detailed response and helpful implementation ideas.

I'm not sure what you mean by prometheus metric push, but if you're just trying to push a log every time a task completes, a scheduler plugin may be sufficient.

Worker plugins could be added, but we'd want to determine what plugin points were needed. If you think this is something you'd be interested in, it would be useful to better understand your use cases. In particular, why do you need to run code on the worker pre/post each task? Can you elaborate on your applications listed above?

Fair. Let me elaborate a bit:

Prometheus Metric Push

I'm talking about this feature. It harvests the metrics registered in the interpreter-wide global registry and pushes them to a pushgateway. The metrics I'm thinking about are registered on the worker, e.g.:

  • blob storage IO metrics
  • DB connection latency measurements
  • GC metrics

So you can now argue that the distributed worker already exposes Prometheus metrics and that these can be harvested. The problem here is the Prometheus data model which works similar to Hollywood: prometheus calls you and you cannot influence when you're called. Since we use our distributed cluster for a diverse set of tasks, we would like to know these metrics on a per task level (ideally on the per-graph-node level) so we can better track the mentioned metrics on production systems. This requires us to push the metrics to a pushgateway after node finished.

Logging

It would be helpful if we could setup logging before a graph node starts so we can add tags to the logging messages depending on who submitted the computation to the distributed scheduler. These submissions can overlap and multiple diverse computations might be active/running at the same time. Tagging the logging messages with "who submitted this and how is this computation called" is essential for debugging and to even find the right logging messages in systems like Graylog.

Interpreter Setup

Sometimes you want to wipe/clean the interpreter state before a graph node runs, e.g. for debugging or for determinism. The state may include:

  • random seeds used by certain libs
  • caches (numba JIT cache and others)
  • open connections (like requests sessions)

@TomAugspurger
Copy link
Member

prometheus calls you and you cannot influence when you're called. Since we use our distributed cluster for a diverse set of tasks, we would like to know these metrics on a per task level (ideally on the per-graph-node level) so we can better track the mentioned metrics on production systems. This requires us to push the metrics to a pushgateway after node finished.

I don't quite understand this, and how it relates to the issue. Taking the title, "pre-post task hooks" literally, a scheduler plugin should suffice. @crepererum could you try to clear up my misunderstanding?

FYI, worker plugins were recently added to distributed: dask/distributed#2453

@mrocklin
Copy link
Member

I think he's looking for something more like Scheduler Plugins (which have hooks for a variety of events) but on the Worker. The recently added Worker Plugins only support hooks for for setup/teardown currently, but more could be added for things like task transitions and other events.

@crepererum
Copy link
Contributor Author

@TomAugspurger sure. I think I kinda missed the 2 modes the prometheus offers. So the metrics are present on the worker and have to be gathered/pushed/collected from there. You have 2 ways of doing so:

  1. default prometheus mode: Prometheus fetches the data from the worker node. You have no influence when this happens since it is configured on the prometheus server ("scraping" and "scrape interval"). Metrics that usually obey this model are "python version", "current memory consumption", "current CPU usage" etc. per node / (sub)interpreter. Distributed already implements this for worker and scheduler.
  2. pushgateway: For batch computations, where you might want to collect metrics per job / graph node (not to be confused with the cluster/worker node), the scraping does not work since you cannot influence the scrape granularity. Metric labels can help but to obey the prometheus data model, you must not alter the metrics based on the scraping itself, means you have to keep metrics of a certain job around for a long time. This is kinda pointless, since the job is already finished. Also, it does not map too well to any of the prometheus metric types and their best practices. Here, the pushgateway mode comes in. It's a special server to which (in contrast to the main prometheus system) you can actually push metrics to, perfect for batch jobs. Also see the official docs for more details. Here, you must have some kind of hook that runs right after the job finished on the worker (since this is where the data is). So a scheduler plugin doesn't work.

@jakirkham
Copy link
Member

Does this help clarify things, @TomAugspurger?

@mrocklin
Copy link
Member

mrocklin commented Jun 5, 2019

The recently added Worker Plugins only support hooks for for setup/teardown currently, but more could be added for things like task transitions and other events.

@crepererum is this something that you would be interested in doing?

@crepererum
Copy link
Contributor Author

Sure. Might take a week or two.

@marco-neumann-by
Copy link

I think we can close this since dask/distributed#2994 was merged which basically covers the use cases I had in mind via the transition hook.

@mrocklin
Copy link
Member

Great. Closing. Thanks @marco-neumann-jdas .

@crepererum if you disagree please speak up and we're reopen

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants