Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache across task/flow runs in different process with dask #2636

Closed
nicolls1 opened this issue May 22, 2020 · 5 comments
Closed

Cache across task/flow runs in different process with dask #2636

nicolls1 opened this issue May 22, 2020 · 5 comments
Labels
enhancement An improvement of an existing feature status:stale This may not be relevant anymore

Comments

@nicolls1
Copy link

nicolls1 commented May 22, 2020

Current behavior

The docs here: https://docs.prefect.io/core/examples/cached_task.html document how to cache in a single flow. I am wondering if its possible to extend this to flows run from multiple processes.

Here in Output Caching (https://docs.prefect.io/core/concepts/persistence.html#output-caching) it mentions:

The cache is stored in context

Note that when running Prefect Core locally, your Tasks' cached states will be stored in memory within prefect.context.

I assume the dask executor still uses the prefect core locally?

Proposed behavior

Store cached outputs in dask with a distributed variable (https://distributed.dask.org/en/latest/api.html?highlight=queue#distributed.Variable) so that when you run the example script below 2 times the cached value is used.

Example

This modified docs example doesn't return the same random number when run 2 times in a row

import datetime
import random

from prefect import task, Flow
from prefect.engine.executors import DaskExecutor


@task(cache_for=datetime.timedelta(seconds=30), cache_key='key')
def return_random_number():
    return random.random()


@task
def print_number(num):
    print("=" * 50)
    print("Value: {}".format(num))
    print("=" * 50)


with Flow("cached-task") as flow:
    result = print_number(return_random_number)


executor = DaskExecutor(address="tcp://localhost:8786")
flow.run(executor=executor)

Maybe this is possible at the moment but I'm not sure how I could do it from the docs. Would be happy to document if that is the case.

@nicolls1 nicolls1 added the enhancement An improvement of an existing feature label May 22, 2020
@joshmeek
Copy link

Hey @nicolls1 instead of using the cache_for and cache_key I think you should use the newer Results / Targets interface to cache the results of your local flow runs somewhere like on disk https://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target

@jcrist
Copy link

jcrist commented May 22, 2020

Yeah, using the Result interface will be much more robust than trying to hack something together to store things in the dask cluster. #2619 about integrating results with cache_for/cache_validator may also interest you.

@nicolls1
Copy link
Author

Thanks for the feedback! Looking into the result interface now and I'll see if I can make a new class that will keep a task in dask's distributed memory.

@github-actions
Copy link
Contributor

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

@github-actions github-actions bot added the status:stale This may not be relevant anymore label Nov 18, 2022
@github-actions
Copy link
Contributor

github-actions bot commented Dec 3, 2022

This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Dec 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature status:stale This may not be relevant anymore
Projects
None yet
Development

No branches or pull requests

3 participants