Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default store for transactions #13983

Merged
merged 12 commits into from
Jun 14, 2024
Merged

Conversation

desertaxle
Copy link
Member

@desertaxle desertaxle commented Jun 12, 2024

Add a default store and key for transactions opened via the transaction context manager from inside a flow. The transaction context manager will use a copy of the ResultFactory from the flow or task run context with persist_result set to True. That ResultFactory copy will also use the default result storage block if one was not provided to the flow.

The PREFECT_DEFAULT_RESULT_STORAGE_BLOCK has been updated to default to a LocalFilesystem block with the name containing a slugified version of the hostname of the current machine (to avoid conflicts with other users). This block will be created when a transaction is opened with no store provided.

Example

Run a transaction in flow with default record transaction storage:

from prefect import task, flow
from prefect.transactions import transaction, Transaction, TransactionState


@task
def task1():
    return {"x": 1}


@task1.on_commit
def on_commit_task1(txn: Transaction):
    print(f"Task 1 committed with staged value {txn._staged_value}")


@task
def task2(x):
    return {"y": x["x"] + 3}


@task2.on_commit
def on_commit_task2(txn: Transaction):
    print(f"Task 2 committed with staged value {txn._staged_value}")


@flow
def basic_flow():
    with transaction(key="static") as txn:
        if txn.state == TransactionState.COMMITTED:
            return txn.read()
        x = task1()
        y = task2(x)
        txn.stage(y)
        return y


if __name__ == "__main__":
    print(basic_flow())

First run with log the following:

15:11:19.166 | INFO    | prefect.engine - Created flow run 'strict-firefly' for flow 'basic-flow'
15:11:19.167 | WARNING | prefect.utilities.urls - No URL found for the Prefect UI, and no default base path provided.
15:11:19.283 | INFO    | prefect.engine - Created task run 'task1-0' for task 'task1'
15:11:19.334 | INFO    | Task run 'task1-0' - Finished in state Completed()
15:11:19.346 | INFO    | prefect.engine - Created task run 'task2-0' for task 'task2'
15:11:19.393 | INFO    | Task run 'task2-0' - Finished in state Completed()
Task 1 committed with staged value type='unpersisted' artifact_type='result' artifact_description='Unpersisted result of type `dict`'
Task 2 committed with staged value type='unpersisted' artifact_type='result' artifact_description='Unpersisted result of type `dict`'
15:11:19.403 | INFO    | Flow run 'strict-firefly' - Finished in state Completed()
{'y': 4}

Subsequent runs will log the following (previous transaction value will be used):

15:12:38.479 | INFO    | prefect.engine - Created flow run 'porcelain-husky' for flow 'basic-flow'
15:12:38.479 | WARNING | prefect.utilities.urls - No URL found for the Prefect UI, and no default base path provided.
15:12:38.577 | INFO    | Flow run 'porcelain-husky' - Finished in state Completed()
{'y': 4}

Checklist

  • This pull request references any related issue by including "closes <link to issue>"
    • If no issue exists and your change is not a small fix, please create an issue first.
  • If this pull request adds new functionality, it includes unit tests that cover the changes
  • This pull request includes a label categorizing the change e.g. maintenance, fix, feature, enhancement, docs.

For documentation changes:

  • This pull request includes redirect settings in mint.json for files that are removed or renamed.

For new functions or classes in the Python SDK:

  • This pull request includes helpful docstrings.
  • If a new Python file was added, this pull request contains a stub page in the Python SDK docs and an entry in docs/mint.json navigation.

@desertaxle desertaxle added the enhancement An improvement of an existing feature label Jun 12, 2024
src/prefect/results.py Outdated Show resolved Hide resolved
src/prefect/settings.py Outdated Show resolved Hide resolved
src/prefect/transactions.py Show resolved Hide resolved
src/prefect/transactions.py Show resolved Hide resolved
src/prefect/transactions.py Outdated Show resolved Hide resolved
@desertaxle desertaxle changed the title Default transaction storage Add default store for transactions Jun 13, 2024
@desertaxle desertaxle marked this pull request as ready for review June 13, 2024 21:13
@desertaxle desertaxle requested a review from a team as a code owner June 13, 2024 21:13
@desertaxle desertaxle requested a review from cicdw June 13, 2024 21:13
Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! one nit about the default name, otherwise LGTM

src/prefect/settings.py Outdated Show resolved Hide resolved
default_name = PREFECT_DEFAULT_RESULT_STORAGE_BLOCK.value().split("/")[
-1
]
default_storage.save(default_name, _sync=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need an overwrite flag?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't because we only run this save after confirming it hasn't been loaded from the server, but I'll add overwrite=True to be safe since that should guard against race conditions.

@desertaxle desertaxle merged commit 4199ec2 into main Jun 14, 2024
26 checks passed
@desertaxle desertaxle deleted the default-transaction-storage branch June 14, 2024 15:28
@desertaxle desertaxle linked an issue Jun 17, 2024 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add default local storage persistence for transactions
2 participants