Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

key value store #4499

Merged
merged 30 commits into from Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9d19258
Add kv client and interface
zangell44 Apr 26, 2021
0a18a53
Merge branch 'master' of https://github.com/PrefectHQ/prefect into kv…
zangell44 May 18, 2021
44b90ba
Add backend and tests
zangell44 May 19, 2021
2b1831b
Merge branch 'master' of https://github.com/PrefectHQ/prefect into kv…
zangell44 May 19, 2021
1196952
Add cli tests
zangell44 May 20, 2021
2f209b8
Correct cli test
zangell44 May 20, 2021
2335684
Add changelog
zangell44 May 20, 2021
a1ad4fd
Document kv store
zangell44 May 21, 2021
8d748a6
Merge branch 'master' of https://github.com/PrefectHQ/prefect into kv…
zangell44 May 21, 2021
2da78ce
Update docs naming convention
zangell44 May 21, 2021
f0c2227
Expose kv functions as top level backend imports
zangell44 May 21, 2021
c8f8ba2
Format
zangell44 May 21, 2021
1925733
Sort keys client side
zangell44 May 21, 2021
34810ba
Feed the formatter
zangell44 May 21, 2021
1b5361c
Refactor backend tests
zangell44 May 21, 2021
a4cbe2d
Refactor cli error handling
zangell44 May 21, 2021
6a588a1
Merge branch 'master' of https://github.com/PrefectHQ/prefect into kv…
zangell44 May 21, 2021
858e25e
Remove duplicate concepts entries
zangell44 May 21, 2021
ef917e1
Update docs
zangell44 May 22, 2021
346f34d
Add kv store flow example
zangell44 May 22, 2021
a31bb98
Improve docs
zangell44 May 24, 2021
0031fd6
Tweak doc capitalization
zangell44 May 24, 2021
b4d0f8e
Cleanup non-cloud backend error message
zangell44 May 24, 2021
e431502
Format
zangell44 May 24, 2021
a9edc74
Cleanup cli error handling and messaging
zangell44 May 24, 2021
9c8dec0
Clarify test mocks
zangell44 May 24, 2021
bdf1110
Make changelog friendly
zangell44 May 24, 2021
b0042d9
Make changelog even more friendly
zangell44 May 24, 2021
82ec4c1
Update docs
zangell44 May 27, 2021
4a92456
Correct docs typo
zangell44 May 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/pr4499.yaml
@@ -0,0 +1,2 @@
feature:
- "Add support for new KV Store feature - [#4499](https://github.com/PrefectHQ/prefect/pull/4499)"
6 changes: 6 additions & 0 deletions docs/.vuepress/config.js
Expand Up @@ -111,6 +111,11 @@ module.exports = {
collapsable: true,
children: ['triggers']
},
{
title: 'prefect.backend',
collapsable: true,
children: getChildren('docs/api/latest', 'backend')
},
{
title: 'prefect.client',
collapsable: true,
Expand Down Expand Up @@ -199,6 +204,7 @@ module.exports = {
'concepts/projects',
'concepts/flows',
'concepts/flow_runs',
'concepts/kv_store',
'concepts/secrets',
'concepts/api_keys',
'concepts/roles',
Expand Down
161 changes: 161 additions & 0 deletions docs/orchestration/concepts/kv_store.md
@@ -0,0 +1,161 @@
# KV Store <Badge text="Cloud"/>

Key Value Store is a managed metadata database for Prefect Cloud.

**Keys** are strings. **Values** are JSON blobs.

The number of key value pairs allowed is limited by license, starting with 50 pairs on the Free tier. Values are limited to 1 MB in size.
ThatGalNatalie marked this conversation as resolved.
Show resolved Hide resolved

Key value pairs can be configured via the Prefect CLI, Python library, API, and UI.

## UI

You can view, update, and delete key value pairs on the [KV Store page](https://cloud.prefect.io/team/kv) of the UI.

## Setting key value pairs

Setting a key value pair will overwrite the existing value if the key exists.

:::: tabs
::: tab Prefect library
```python
from prefect.backend import set_key_value
key_value_uuid = set_key_value(key="foo", value="bar")
```
:::
::: tab CLI
```bash
$ prefect kv set foo bar
Key value pair set successfully
```
:::
::: tab GraphQL API
```graphql
mutation {
set_key_value(input: { key : "foo", value: "\"bar\"" }) {
id
}
}
```
:::
::::

## Getting the value of a key

:::: tabs
::: tab Prefect library
```python
from prefect.backend import get_key_value
value = get_key_value(key="foo")
```
:::
::: tab CLI
```bash
$ prefect kv get foo
Key 'foo' has value 'bar'
```
:::
::: tab GraphQL API
```graphql
query {
key_value (where: {key: {_eq: "foo"}}) {
value
}
}
```
:::
::::

## Deleting key value pairs

:::: tabs
::: tab Prefect library
```python
from prefect.backend import delete_key
success = delete_key(key="foo")
```
:::
::: tab CLI
```bash
$ prefect kv delete foo
Key 'foo' has been deleted
```
:::
::: tab GraphQL API
```graphql
mutation {
delete_key_value(input: { key_value_id : "35c8cabb-ab30-41c2-b464-6c2ed39f0d5b" }) {
success
}
}
```
:::
::::

## Listing keys

:::: tabs
::: tab Prefect library
```python
from prefect.backend import list_keys
my_keys = list_keys()
```
:::
::: tab CLI
```bash
$ prefect kv list
foo
my-other-key
another-key
```
:::
::: tab GraphQL API
```graphql
query {
key_value {
id
key
value
}
}
```
:::
::::

## Using key value pairs in flows

To interact with the KV Store from a flow, call the Prefect library functions in a task.

For example, let's say we wanted to track the last date a flow has been executed and pick up from that date.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet example :)


```python
from datetime import datetime, timedelta
import prefect
from prefect import task, Flow
from prefect.backend import set_key_value, get_key_value

LAST_EXECUTED_KEY = 'my-flow-last-executed'

@task
def get_last_execution_date():
last_executed = get_key_value(LAST_EXECUTED_KEY)
return datetime.strptime(last_executed, "%Y-%m-%d")

@task
def run_etl(start_date):
logger = prefect.context.get("logger")
while start_date <= datetime.today():
logger.info(f"Running ETL for date {start_date.strftime('%Y-%m-%d')}")
# do some etl
start_date += timedelta(days=1)
return start_date.strftime('%Y-%m-%d')

@task
def set_last_execution_date(date):
set_key_value(key=LAST_EXECUTED_KEY, value=date)

with Flow('my-flow') as flow:
last_executed_date = get_last_execution_date()
final_execution_date = run_etl(last_executed_date)
set_last_execution_date(final_execution_date)
```
10 changes: 10 additions & 0 deletions docs/outline.toml
Expand Up @@ -18,6 +18,11 @@ functions = [
"some_successful",
]

[pages.backend.kv_store]
title = "KV Store"
module = "prefect.backend.kv_store"
functions = ["set_key_value", "get_key_value", "delete_key", "list_keys"]

[pages.client.client]
title = "Client"
module = "prefect.client"
Expand Down Expand Up @@ -68,6 +73,11 @@ title = "build"
module = "prefect.cli.build_register"
commands = ["build"]

[pages.cli.kv_store]
title = "kv_store"
module = "prefect.cli.kv_store"
commands = ["set_command", "get_command", "delete_command", "list_command"]

[pages.cli.run]
title = "run"
module = "prefect.cli.run"
Expand Down
1 change: 1 addition & 0 deletions src/prefect/backend/__init__.py
@@ -0,0 +1 @@
from prefect.backend.kv_store import set_key_value, get_key_value, delete_key, list_keys
126 changes: 126 additions & 0 deletions src/prefect/backend/kv_store.py
@@ -0,0 +1,126 @@
from typing import Any, List

import prefect
from prefect.utilities.graphql import with_args
from prefect.client import Client
from prefect.utilities.exceptions import ClientError


NON_CLOUD_BACKEND_ERROR_MESSAGE = (
"KV Store operations are only supported while using Prefect Cloud as a backend."
)


def set_key_value(key: str, value: Any) -> str:
"""
Set key value pair, overwriting values for existing key

Args:
- key (str): the name of the key
- value (Any): A json compatible value

Returns:
- id (str): the id of the key value pair

Raises:
- ClientError: if using Prefect Server instead of Cloud
"""
if prefect.config.backend != "cloud":
zangell44 marked this conversation as resolved.
Show resolved Hide resolved
raise ClientError(NON_CLOUD_BACKEND_ERROR_MESSAGE)

mutation = {
"mutation($input: set_key_value_input!)": {
"set_key_value(input: $input)": {"id"}
}
}

client = Client()
result = client.graphql(
query=mutation, variables=dict(input=dict(key=key, value=value))
)

return result.data.set_key_value.id


def get_key_value(key: str) -> Any:
"""
Get the value for a key

Args:
- key (str): the name of the key

Returns:
- value (Any): A json compatible value

Raises:
- ValueError: if the specified key does not exist
- ClientError: if using Prefect Server instead of Cloud
"""
if prefect.config.backend != "cloud":
raise ClientError(NON_CLOUD_BACKEND_ERROR_MESSAGE)

query = {
"query": {with_args("key_value", {"where": {"key": {"_eq": key}}}): {"value"}}
}
client = Client()
result = client.graphql(query) # type: Any
if len(result.data.key_value) == 0:
raise ValueError(f"No value found for key: {key}")
return result.data.key_value[0].value


def delete_key(key: str) -> bool:
"""
Delete a key value pair

Args:
- key (str): the name of the key

Returns:
- success (bool): Whether or not deleting the key succeeded

Raises:
- ValueError: if the specified key does not exist
- ClientError: if using Prefect Server instead of Cloud
"""
if prefect.config.backend != "cloud":
raise ClientError(NON_CLOUD_BACKEND_ERROR_MESSAGE)

query = {
"query": {with_args("key_value", {"where": {"key": {"_eq": key}}}): {"id"}}
}
mutation = {
"mutation($input: delete_key_value_input!)": {
"delete_key_value(input: $input)": {"success"}
}
}

client = Client()
key_value_id_query = client.graphql(query=query)
if len(key_value_id_query.data.key_value) == 0:
raise ValueError(f"No key {key} found to delete")
result = client.graphql(
query=mutation,
variables=dict(
input=dict(key_value_id=key_value_id_query.data.key_value[0].id)
),
)

return result.data.delete_key_value.success


def list_keys() -> List[str]:
"""
List all keys

Returns:
- keys (list): A list of keys

Raises:
- ClientError: if using Prefect Server instead of Cloud
"""
if prefect.config.backend != "cloud":
raise ClientError(NON_CLOUD_BACKEND_ERROR_MESSAGE)
client = Client()
result = client.graphql({"query": {"key_value": {"key"}}}) # type: ignore
return sorted([res["key"] for res in result.data.key_value])
2 changes: 2 additions & 0 deletions src/prefect/cli/__init__.py
Expand Up @@ -17,6 +17,7 @@
from .server import server as _server
from .heartbeat import heartbeat as _heartbeat
from .build_register import register as _register, build as _build
from .kv_store import kv as _kv


CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
Expand Down Expand Up @@ -73,6 +74,7 @@ def cli():
cli.add_command(_heartbeat)
cli.add_command(_register)
cli.add_command(_build)
cli.add_command(_kv)


# Miscellaneous Commands
Expand Down