Skip to content

Commit

Permalink
Merge pull request #2502 from PrefectHQ/context_cache
Browse files Browse the repository at this point in the history
Fix runners using context.caches that isn't present on individual runner runs
  • Loading branch information
cicdw committed May 6, 2020
2 parents 8c9d7ee + 229d0a3 commit c31e944
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 4 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
### Enhancements

- Agents now support an optional HTTP health check, for use by their backing orchestration layer (e.g. k8s, docker, supervisord, ...) - [#2406](https://github.com/PrefectHQ/prefect/pull/2406)
- Sets dask scheduler default to "threads" on LocalDaskExecutor to provide parallelism - [#2494](https://github.com/PrefectHQ/prefect/pull/2494)
- Enhance agent verbose logs to include provided kwargs at start - [#2486](https://github.com/PrefectHQ/prefect/issues/2486)
- Add `no_cloud_logs` option to all Agent classes for an easier way to disable sending logs to backend - [#2484](https://github.com/PrefectHQ/prefect/issues/2484)
- Add option to set flow run environment variables on Kubernetes agent install - [#2424](https://github.com/PrefectHQ/prefect/issues/2424)
- Sets dask scheduler default to "threads" on LocalDaskExecutor to provide parallelism [#2494](https://github.com/PrefectHQ/prefect/pull/2494)

### Task Library

Expand All @@ -23,6 +23,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
### Fixes

- Give a better error for non-serializable callables when registering with cloud/server - [#2491](https://github.com/PrefectHQ/prefect/pull/2491)
- Fix runners retrieving invalid `context.caches` on runs started directly from a flow runner - [#2403](https://github.com/PrefectHQ/prefect/issues/2403)

### Deprecations

Expand Down
8 changes: 5 additions & 3 deletions src/prefect/engine/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,9 +637,11 @@ def check_task_is_cached(self, state: State, inputs: Dict[str, Result]) -> State
state = Pending("Cache was invalid; ready to run.")

if self.task.cache_for is not None:
candidate_states = prefect.context.caches.get(
self.task.cache_key or self.task.name, []
)
candidate_states = []
if prefect.context.get("caches"):
candidate_states = prefect.context.caches.get(
self.task.cache_key or self.task.name, []
)
sanitized_inputs = {key: res.value for key, res in inputs.items()}
for candidate in candidate_states:
if self.task.cache_validator(
Expand Down
18 changes: 18 additions & 0 deletions tests/engine/test_flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,24 @@ def run(self, x, s):
assert flow_state.result[y].result == 100


class TestCachingFromContext:
def test_caches_do_not_persist_across_flow_runner_runs(self):
@prefect.task(cache_for=datetime.timedelta(seconds=10))
def test_task():
return random.random()

with Flow("test_cache") as flow:
t = test_task()

flow_state = FlowRunner(flow=flow).run(return_tasks=[t])
first_result = flow_state.result[t].result

flow_state = FlowRunner(flow=flow).run(return_tasks=[t])
second_result = flow_state.result[t].result

assert first_result != second_result


class TestInitializeRun:
def test_initialize_sets_none_to_pending(self):
result = FlowRunner(Flow(name="test")).initialize_run(
Expand Down

0 comments on commit c31e944

Please sign in to comment.