Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backend Server Caching #2343

Closed
nathan5280 opened this issue Apr 16, 2020 · 1 comment · Fixed by #2353
Closed

Backend Server Caching #2343

nathan5280 opened this issue Apr 16, 2020 · 1 comment · Fixed by #2353
Assignees
Labels
bug Something isn't working

Comments

@nathan5280
Copy link

Description

When caching multiple tasks the second task cache seems to get confused with the first tasks cache and the second task isn't run. Running the flow without caching and a LocalResultHandler generates the correct output.

Expected Behavior

In the flow below I expected to get output from the write_data task with the data transformed.

 {'col_1': [6, 4, 2, 0], 'col_2': ['a', 'b', 'c', 'd']}

but I get the untransformed data.

{'col_1': [3, 2, 1, 0], 'col_2': ['a', 'b', 'c', 'd']}

The log shows that a cached state was found for the task even if it is the first time the flow has been run. First run = run with a fresh local server.

[2020-04-16 14:40:03,832] INFO - prefect.CloudTaskRunner | Task 'xform_data': Starting task run...
[2020-04-16 14:40:03,855] DEBUG - prefect.CloudTaskRunner | Task 'xform_data': 1 candidate cached states were found
[2020-04-16 14:40:03,855] DEBUG - prefect.LocalResultHandler | Starting to read result from /home/nate/projects/ed/ed-prefect/trial/cache-checkpoint/data/int/prefect-result-2020-04-16t14-40-03-648495-00-00...
[2020-04-16 14:40:03,856] DEBUG - prefect.LocalResultHandler | Finished reading result from /home/nate/projects/ed/ed-prefect/trial/cache-checkpoint/data/int/prefect-result-2020-04-16t14-40-03-648495-00-00...
[2020-04-16 14:40:03,857] DEBUG - prefect.CloudTaskRunner | Task 'xform_data': Handling state change from Pending to Cached
[2020-04-16 14:40:03,913] DEBUG - prefect.CloudTaskRunner | Task 'xform_data': can't set state to Running because it isn't Pending; ending run.
[2020-04-16 14:40:03,921] INFO - prefect.CloudTaskRunner | Task 'xform_data': finished task run for task with final state: 'Cached

Workaround:
Per Slack conversation with Josh, setting the cache_key on the xform_data task resolves the problem.

Reproduction

import copy
import json
import time
from datetime import timedelta
from pathlib import Path
from typing import Union, Dict

import prefect
from prefect import task, Flow, Parameter
from prefect.engine.result_handlers import LocalResultHandler

STR_PARAM = Union[Parameter, str]


# @task(
#     result_handler=LocalResultHandler("/tmp/inter"), cache_for=timedelta(seconds=60),
# )
@task(cache_for=timedelta(seconds=60),)
# @task
def load_data() -> Dict:
    time.sleep(10)
    data = {"col_1": [3, 2, 1, 0], "col_2": ["a", "b", "c", "d"]}
    return data


# @task(
#     result_handler=LocalResultHandler("/tmp/inter"), cache_for=timedelta(seconds=60),
# )
@task(cache_for=timedelta(seconds=60),)
# @task
def xform_data(data: Dict) -> Dict:
    logger = prefect.context["logger"]
    logger.info("xform_data: sleep")
    time.sleep(10)
    logger.info("xform_data: xform")
    xformed = copy.deepcopy(data)
    xformed["col_1"] = [v * 2 for v in xformed["col_1"]]
    logger.info(xformed)
    return xformed


@task
def write_data(xformed_data: Dict, filename: STR_PARAM):
    logger = prefect.context["logger"]
    logger.info(xformed_data)
    with Path(filename).open("wt") as fp:
        json.dump(xformed_data, fp)


with Flow("cache", result_handler=LocalResultHandler("data/int")) as flow:
    dst = Parameter("dst_filename", default="data/dst/0_dst.json")

    raw_data = load_data()
    xformed_value = xform_data(raw_data)
    write_data(xformed_value, dst)

# flow.run()
flow.register(project_name="cache_checkpoint")

Environment

Ubuntu: 18.04
Prefect: 0.10.2
Backend: Server

@joshmeek
Copy link

Thanks for opening @nathan5280 we will look into it!

This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants