Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modifying a task via scheduler plugin #1384

Closed
limx0 opened this issue Sep 11, 2017 · 4 comments
Closed

Modifying a task via scheduler plugin #1384

limx0 opened this issue Sep 11, 2017 · 4 comments

Comments

@limx0
Copy link

limx0 commented Sep 11, 2017

I am trying to modify a task by using a scheduler plugin, unsure whether this is possible? @mrocklin you mentioned here dask/dask#2119 (comment) that this could be done - although this was the dask scheduler, not distributed.

My use case is; I would like to combine the distributed scheduler with joblib.Memory for building a data pipeline that has some smart caching. Joblib does this well by saving a copy of the the source code of the function as well as the inputs. I would like to extend this notion by invalidating any child nodes where a parent is to be recomputed.

Now, my first thought would be to do something like;

class MemoryCachePlugin(SchedulerPlugin):

    def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, **kwargs):

        def node_invalidated(node):
            func = cloudpickle.loads(tasks[node]['function'])
            args = cloudpickle.loads(tasks[node]['args'])
            return not (is_memoize_func(func) and cache_is_valid(func, *args))

        tasks = kwargs['tasks']
        for key, task in tasks.items():
            # Find parents of this task by checking for arg names in the tasks dict
            parent_nodes = filter(lambda arg: arg in tasks, cloudpickle.loads(task['args']))
            
            if any(filter(node_invalidated, parent_nodes)):
                # A parent of this node is required to be update, invalidate the cache of this node.
                task_func = cloudpickle.loads(kwargs['tasks'][key]['function'])
                task_args = cloudpickle.loads(kwargs['tasks'][key]['args'])
                cache_file = get_cache_file(task_func, *task_args)
                os.remove(cache_file)

I would like to simply remove the cache file that joblib uses, however at the update_graph stage the task_args are only references to other tasks, not the actual result values.

My next thought was that I could mark the function with some sort of force_recompute flag, but it appears (please correct me if I am wrong) that modifying the functions in kwargs has no effect on the tasks that are sent to the workers. Is this conclusion correct?
edit: this was an issue in cloudpickle hard-coding attributes to pickle

What is the most suitable way for me to achieve the above?

@mrocklin
Copy link
Member

Modifying tasks as they arrive would be tricky. You would have to rearrange update_graph to run the update_graph plugin callbacks before taking any actions, though this seems possible.

Another option would be to do this on the client side by overriding collections_to_dsk. This is the last point of modification before the tasks make it up to the scheduler. It's a nice choke-point to modify things.

@jakirkham
Copy link
Member

jakirkham commented Feb 22, 2018

Given PR ( dask/dask#2748 ) and PR ( #930 ) add dask.base.collections_to_dsk and reduce Client.collections_to_dsk to calling dask.base.collections_to_dsk respectively, I wonder if it would make sense to allow collections_to_dsk to be overridden using something like dask.set_options to make this easier to tap into.

Edit: Maybe PR ( dask/dask#3196 ) would make this possible.

@jakirkham
Copy link
Member

This can be done pretty easily these days by adding to the "optimizations" in dask.config.

@lorenzolucido
Copy link

lorenzolucido commented Nov 12, 2018

I have the exact same use case. @jakirkham, do you mind sharing an example on how this can be achieved on a simple custom graph (using dask.config) ? Thanks!

Edit: In fact, I am wondering if nowadays @mrocklin 's streamz library would be a good fit for this, where the parameters are simply sources, i.e. when you change a parameter node you simply emit the new value.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants