Skip to content

Commit

Permalink
Turn on task persistence by default (#14102)
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle committed Jun 18, 2024
1 parent 2d8cca9 commit ee7924b
Show file tree
Hide file tree
Showing 15 changed files with 180 additions and 118 deletions.
64 changes: 33 additions & 31 deletions src/prefect/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
PREFECT_RESULTS_DEFAULT_SERIALIZER,
PREFECT_RESULTS_PERSIST_BY_DEFAULT,
PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK,
default_result_storage_block_name,
)
from prefect.utilities.annotations import NotSet
from prefect.utilities.asyncutils import sync_compatible
Expand All @@ -62,35 +61,15 @@ def DEFAULT_STORAGE_KEY_FN():
P = ParamSpec("P")
R = TypeVar("R")


@sync_compatible
async def get_default_result_storage() -> ResultStorage:
"""
Generate a default file system for result storage.
"""
try:
return await Block.load(PREFECT_DEFAULT_RESULT_STORAGE_BLOCK.value())
except ValueError as e:
if "Unable to find" not in str(e):
raise e
elif (
PREFECT_DEFAULT_RESULT_STORAGE_BLOCK.value()
== default_result_storage_block_name()
):
return LocalFileSystem(basepath=PREFECT_LOCAL_STORAGE_PATH.value())
else:
raise
_default_storages: Dict[Tuple[str, str], WritableFileSystem] = {}


_default_task_scheduling_storages: Dict[Tuple[str, str], WritableFileSystem] = {}


async def get_or_create_default_task_scheduling_storage() -> ResultStorage:
async def _get_or_create_default_storage(block_document_slug: str) -> ResultStorage:
"""
Generate a default file system for background task parameter/result storage.
Generate a default file system for storage.
"""
default_storage_name, storage_path = cache_key = (
PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK.value(),
block_document_slug,
PREFECT_LOCAL_STORAGE_PATH.value(),
)

Expand All @@ -105,8 +84,8 @@ async def get_storage() -> WritableFileSystem:
if block_type_slug == "local-file-system":
block = LocalFileSystem(basepath=storage_path)
else:
raise Exception(
"The default task storage block does not exist, but it is of type "
raise ValueError(
"The default storage block does not exist, but it is of type "
f"'{block_type_slug}' which cannot be created implicitly. Please create "
"the block manually."
)
Expand All @@ -123,13 +102,32 @@ async def get_storage() -> WritableFileSystem:
return block

try:
return _default_task_scheduling_storages[cache_key]
return _default_storages[cache_key]
except KeyError:
storage = await get_storage()
_default_task_scheduling_storages[cache_key] = storage
_default_storages[cache_key] = storage
return storage


@sync_compatible
async def get_or_create_default_result_storage() -> ResultStorage:
"""
Generate a default file system for result storage.
"""
return await _get_or_create_default_storage(
PREFECT_DEFAULT_RESULT_STORAGE_BLOCK.value()
)


async def get_or_create_default_task_scheduling_storage() -> ResultStorage:
"""
Generate a default file system for background task parameter/result storage.
"""
return await _get_or_create_default_storage(
PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK.value()
)


def get_default_result_serializer() -> ResultSerializer:
"""
Generate a default file system for result storage.
Expand Down Expand Up @@ -210,7 +208,9 @@ async def default_factory(cls, client: "PrefectClient" = None, **kwargs):
kwargs.pop(key)

# Apply defaults
kwargs.setdefault("result_storage", await get_default_result_storage())
kwargs.setdefault(
"result_storage", await get_or_create_default_result_storage()
)
kwargs.setdefault("result_serializer", get_default_result_serializer())
kwargs.setdefault("persist_result", get_default_persist_setting())
kwargs.setdefault("cache_result_in_memory", True)
Expand Down Expand Up @@ -280,7 +280,9 @@ async def from_task(
"""
Create a new result factory for a task.
"""
return await cls._from_task(task, get_default_result_storage, client=client)
return await cls._from_task(
task, get_or_create_default_result_storage, client=client
)

@classmethod
@inject_client
Expand Down
4 changes: 1 addition & 3 deletions src/prefect/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,7 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None):
log_prints=log_prints,
task_run=self.task_run,
parameters=self.parameters,
result_factory=run_coro_as_sync(
ResultFactory.from_autonomous_task(self.task)
), # type: ignore
result_factory=run_coro_as_sync(ResultFactory.from_task(self.task)), # type: ignore
client=client,
)
)
Expand Down
28 changes: 15 additions & 13 deletions src/prefect/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
)
from prefect.futures import PrefectDistributedFuture, PrefectFuture
from prefect.logging.loggers import get_logger
from prefect.records.cache_policies import DEFAULT, CachePolicy
from prefect.records.cache_policies import DEFAULT, NONE, CachePolicy
from prefect.results import ResultFactory, ResultSerializer, ResultStorage
from prefect.settings import (
PREFECT_TASK_DEFAULT_RETRIES,
Expand Down Expand Up @@ -218,10 +218,8 @@ class Task(Generic[P, R]):
cannot exceed 50.
retry_jitter_factor: An optional factor that defines the factor to which a retry
can be jittered in order to avoid a "thundering herd".
persist_result: An optional toggle indicating whether the result of this task
should be persisted to result storage. Defaults to `None`, which indicates
that Prefect should choose whether the result should be persisted depending on
the features being used.
persist_result: An toggle indicating whether the result of this task
should be persisted to result storage. Defaults to `True`.
result_storage: An optional block to use to persist the result of this task.
Defaults to the value set in the flow the task is called in.
result_storage_key: An optional key to store the result in storage at when persisted.
Expand Down Expand Up @@ -273,7 +271,7 @@ def __init__(
]
] = None,
retry_jitter_factor: Optional[float] = None,
persist_result: Optional[bool] = None,
persist_result: bool = True,
result_storage: Optional[ResultStorage] = None,
result_serializer: Optional[ResultSerializer] = None,
result_storage_key: Optional[str] = None,
Expand Down Expand Up @@ -381,7 +379,13 @@ def __init__(
self.cache_expiration = cache_expiration
self.refresh_cache = refresh_cache

if cache_policy is NotSet and result_storage_key is None:
if not persist_result:
self.cache_policy = None if cache_policy is None else NONE
if cache_policy and cache_policy is not NotSet and cache_policy != NONE:
logger.warning(
"Ignoring `cache_policy` because `persist_result` is False"
)
elif cache_policy is NotSet and result_storage_key is None:
self.cache_policy = DEFAULT
elif result_storage_key:
# TODO: handle this situation with double storage
Expand Down Expand Up @@ -1330,7 +1334,7 @@ def task(
Callable[[int], List[float]],
] = 0,
retry_jitter_factor: Optional[float] = None,
persist_result: Optional[bool] = None,
persist_result: bool = True,
result_storage: Optional[ResultStorage] = None,
result_storage_key: Optional[str] = None,
result_serializer: Optional[ResultSerializer] = None,
Expand Down Expand Up @@ -1362,7 +1366,7 @@ def task(
float, int, List[float], Callable[[int], List[float]], None
] = None,
retry_jitter_factor: Optional[float] = None,
persist_result: Optional[bool] = None,
persist_result: bool = True,
result_storage: Optional[ResultStorage] = None,
result_storage_key: Optional[str] = None,
result_serializer: Optional[ResultSerializer] = None,
Expand Down Expand Up @@ -1408,10 +1412,8 @@ def task(
cannot exceed 50.
retry_jitter_factor: An optional factor that defines the factor to which a retry
can be jittered in order to avoid a "thundering herd".
persist_result: An optional toggle indicating whether the result of this task
should be persisted to result storage. Defaults to `None`, which indicates
that Prefect should choose whether the result should be persisted depending on
the features being used.
persist_result: An toggle indicating whether the result of this task
should be persisted to result storage. Defaults to `True`.
result_storage: An optional block to use to persist the result of this task.
Defaults to the value set in the flow the task is called in.
result_storage_key: An optional key to store the result in storage at when persisted.
Expand Down
14 changes: 6 additions & 8 deletions src/prefect/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
from prefect.context import ContextModel, FlowRunContext, TaskRunContext
from prefect.records import RecordStore
from prefect.records.result_store import ResultFactoryStore
from prefect.results import BaseResult, ResultFactory, get_default_result_storage
from prefect.settings import PREFECT_DEFAULT_RESULT_STORAGE_BLOCK
from prefect.results import (
BaseResult,
ResultFactory,
get_or_create_default_result_storage,
)
from prefect.utilities.asyncutils import run_coro_as_sync
from prefect.utilities.collections import AutoEnum

Expand Down Expand Up @@ -265,12 +268,7 @@ def transaction(
}
)
else:
default_storage = get_default_result_storage(_sync=True)
if not default_storage._block_document_id:
default_name = PREFECT_DEFAULT_RESULT_STORAGE_BLOCK.value().split("/")[
-1
]
default_storage.save(default_name, overwrite=True, _sync=True)
default_storage = get_or_create_default_result_storage(_sync=True)
if existing_factory:
new_factory = existing_factory.model_copy(
update={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ def happy_path():

await asserting_events_worker.drain()
assert isinstance(asserting_events_worker._client, AssertingEventsClient)
assert len(task_run_states) == len(asserting_events_worker._client.events) == 3
events = [
event
for event in asserting_events_worker._client.events
if event.event.startswith("prefect.task-run.")
]
assert len(task_run_states) == len(events) == 3

last_state = None
for i, task_run_state in enumerate(task_run_states):
event = asserting_events_worker._client.events[i]
event = events[i]

assert event.id == task_run_state.id
assert event.occurred == task_run_state.timestamp
Expand Down Expand Up @@ -93,11 +98,16 @@ def happy_path():

await asserting_events_worker.drain()
assert isinstance(asserting_events_worker._client, AssertingEventsClient)
assert len(task_run_states) == len(asserting_events_worker._client.events) == 3
events = [
event
for event in asserting_events_worker._client.events
if event.event.startswith("prefect.task-run.")
]
assert len(task_run_states) == len(events) == 3

last_state = None
for i, task_run_state in enumerate(task_run_states):
event = asserting_events_worker._client.events[i]
event = events[i]

assert event.id == task_run_state.id
assert event.occurred == task_run_state.timestamp
Expand Down Expand Up @@ -160,6 +170,7 @@ def foo():
await asserting_events_worker.drain()

events = sorted(asserting_events_worker._client.events, key=lambda e: e.occurred)
events = [e for e in events if e.event.startswith("prefect.task-run.")]

assert len(task_run_states) == len(events) == 4

Expand Down
8 changes: 8 additions & 0 deletions tests/fixtures/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from fastapi import Body, FastAPI, status
from fastapi.exceptions import RequestValidationError

import prefect.results
from prefect.filesystems import LocalFileSystem
from prefect.server.api.server import validation_exception_handler

Expand All @@ -29,6 +30,13 @@ async def local_filesystem(tmp_path):
return block


@pytest.fixture(autouse=True)
async def clear_cached_filesystems():
prefect.results._default_storages.clear()
yield
prefect.results._default_storages.clear()


# Key-value storage API ----------------------------------------------------------------

kv_api_app = FastAPI(
Expand Down
6 changes: 4 additions & 2 deletions tests/results/test_flow_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,13 @@ async def foo():
with temporary_settings(
{
PREFECT_RESULTS_PERSIST_BY_DEFAULT: True,
PREFECT_DEFAULT_RESULT_STORAGE_BLOCK: "local-file-system/my-result-storage",
PREFECT_DEFAULT_RESULT_STORAGE_BLOCK: "fake-block-type-slug/my-result-storage",
}
):
with pytest.raises(
ValueError, match="Unable to find block document named my-result-storage"
ValueError,
match="The default storage block does not exist, but it is of type"
" 'fake-block-type-slug' which cannot be created implicitly",
):
await foo()

Expand Down
Loading

0 comments on commit ee7924b

Please sign in to comment.