Skip to content

Unify task/asset state storage between Core API and Execution API#67547

Open
amoghrajesh wants to merge 2 commits into
apache:mainfrom
astronomer:aip-103-make-core-api-and-task-execution-usable-interchangeably
Open

Unify task/asset state storage between Core API and Execution API#67547
amoghrajesh wants to merge 2 commits into
apache:mainfrom
astronomer:aip-103-make-core-api-and-task-execution-usable-interchangeably

Conversation

@amoghrajesh
Copy link
Copy Markdown
Contributor


Was generative AI tooling used to co-author this PR?
  • Yes: claude sonnet 4.6

What problem are we solving?

Task and asset state written by workers (via the Execution API) is stored as JSON-encoded strings in the DB — e.g., the integer 42 is stored as "42", and the string "hello" as '"hello"'. The core API read path returned the raw DB string without decoding it, so callers saw '"hello"' instead of "hello" and "42" (a string) instead of 42 (an integer).

The write path had the inverse bug: values posted to the core API were stored without json.dumps, so a worker reading them back via the execution API would hit a JSONDecodeError.

Current behaviour

  • GET /dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/states and the equivalent asset state endpoints return raw JSON-encoded strings in the value field (e.g. ""hello"", "42" as a string).
  • PUT on these endpoints stores the value as-is without JSON-encoding, so values written via the Core API are unreadable by workers.
  • The value field on request/response models is typed as str, with no null or size validation.

Classic example is if I run this task:

    @task
    def my_task(**context: Context):
        task_state = context["task_state"]

        task_state.set("job_id", "12345")
        task_state.set("secret-dict", {"key": "value"})
        task_state.set("int_value", 42)

        print("Fetching task states I stored earlier")

        print("job_id:", task_state.get("job_id"))
        print("secret-dict:", task_state.get("secret-dict"))
        print("int_value:", task_state.get("int_value"))

    my_task()

Core API call to get all task states returned this:

{
    "task_states": [
        {
            "key": "int_value",
            "value": "42",
            "updated_at": "2026-05-26T10:07:01.872139Z",
            "expires_at": "2026-06-25T10:07:01.868443Z"
        },
        {
            "key": "job_id",
            "value": "\"12345\"",
            "updated_at": "2026-05-26T10:07:01.855922Z",
            "expires_at": "2026-06-25T10:07:01.851283Z"
        },
        {
            "key": "secret-dict",
            "value": "{\"key\": \"value\"}",
            "updated_at": "2026-05-26T10:07:01.864312Z",
            "expires_at": "2026-06-25T10:07:01.860810Z"
        }
    ],
    "total_entries": 3
}

Proposed change

  • value: str widened to value: JsonValue on all four core API datamodels (TaskStateBody, TaskStateResponse, AssetStateBody, AssetStateResponse).
  • Write paths now call json.dumps(body.value) before passing to the backend, matching the execution API as introduced in Simplifing authoring of task and asset states by allowing JSON types #67418
  • Read paths now call json.loads(r.value) when constructing responses, so callers receive native types (int, dict, list, bool, str).
  • Body models now have validators rejecting null, non-finite floats (NaN, Inf, -Inf), and values whose serialized form exceeds 65535 bytes (preserving the old max_length constraint).

NOTE: The UI PR: #67292 sends in raw values as entered by user, so no impact there.

Testing

Test 1: Set using task execution, Get using core API and task execution should have same response type

image image

Test 2: Value set using core API, getting via task execution and core API should have same response type

Set values using curl for this task:

    @task
    def my_task(**context: Context):
        task_state = context["task_state"]

        print("Fetching task states that are set by API")

        print("str_value:", task_state.get("str_value"), type(task_state.get("str_value")))
        print("dict-value:", task_state.get("dict-value"), type(task_state.get("dict-value")))
        print("int_value:", task_state.get("int_value"), type(task_state.get("int_value")))

    my_task()
image image
  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

entries = [
TaskStateResponse(key=r.key, value=r.value, updated_at=r.updated_at, expires_at=r.expires_at)
TaskStateResponse(
key=r.key, value=json.loads(r.value), updated_at=r.updated_at, expires_at=r.expires_at
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json.loads(r.value) will raise JSONDecodeError on any row whose value column is not valid JSON, and FastAPI surfaces that as a generic 500. Two ways this hits prod:

  1. Rows written by the execution API between AIP-103: Add Execution API endpoints for task and asset states #66073 (2026-05-04) and Simplifing authoring of task and asset states by allowing JSON types #67418 (2026-05-25) stored raw strings; Simplifing authoring of task and asset states by allowing JSON types #67418 switched to json.dumps(...) but didn't migrate existing rows. Anyone who ran on a pre-Simplifing authoring of task and asset states by allowing JSON types #67418 build now poisons reads.
  2. The BaseStateBackend interface only requires set(scope, key, value: str, ...). A custom backend that stores its own value format (e.g., escaped/quoted, msgpack-decoded-to-str, etc.) would have worked under the old value: str contract and now breaks.

In a list endpoint this is worse than in get: one bad row poisons the whole page, so users can't even paginate past it.

Suggest wrapping the decode in a try/except (skip+log, or return a sentinel) so a single legacy/odd row doesn't 500 the whole listing. Same pattern needed at get_task_state (line 133), and in asset_state.py at lines 90 and 121.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.3 hasn't shipped, so the pre-#67418 row scenario cannot exist in any deployed cluster.

For the custom backend concern: the core API currently calls get_state_backend(), but that's a known issue I already flagged in the backlog and will be fixing in next PRs, ie: the core API routes should be DB-direct (same as XCom), never routing through a custom backend. Once that is fixed, the core API only ever reads rows written by the execution API into the database, either direct data or references, which always stores json.dumps(value), so json.loads on read is always safe. The JSONDecodeError risk goes away entirely at that point.

Copy link
Copy Markdown
Contributor Author

@amoghrajesh amoghrajesh May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both write paths (task execution and core rest API) guarantee the DB always contains valid JSON: the execution API does json.dumps(value) before storing, and the Core API PUT does json.dumps(body.value) before storing.json.loads on the read path is always safe.

)
return TaskStateResponse(
key=row.key, value=row.value, updated_at=row.updated_at, expires_at=row.expires_at
key=row.key, value=json.loads(row.value), updated_at=row.updated_at, expires_at=row.expires_at
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same json.loads 500 risk as the list endpoint above (see comment on line 92). A row written by an execution-API build between #66073 and #67418, or by a custom BaseStateBackend, will turn a GET into a 500 instead of returning the value. Worth catching JSONDecodeError explicitly here too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. No such rows can exist since both write paths (execution API and Core API PUT) always store json.dumps(...) before writing to the DB. json.loads on the read path is always safe.

rows = session.execute(paginated).all()
entries = [AssetStateResponse(key=r.key, value=r.value, updated_at=r.updated_at) for r in rows]
entries = [
AssetStateResponse(key=r.key, value=json.loads(r.value), updated_at=r.updated_at) for r in rows
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same json.loads 500 risk noted on task_state.py:92 -- one bad row in the asset's state table will 500 the whole list endpoint. Worth defensive decoding here, especially since asset state is meant to be a long-lived watermark (so legacy rows from pre-#67418 are more likely to still be around than transient task state).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as task state — both write paths always store json.dumps(...), so no non-JSON rows can exist. The long-lived watermark concern doesn't apply since 3.3 hasn't shipped yet.

detail=f"Asset state key {key!r} not found",
)
return AssetStateResponse(key=row.key, value=row.value, updated_at=row.updated_at)
return AssetStateResponse(key=row.key, value=json.loads(row.value), updated_at=row.updated_at)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same json.loads 500 risk as the list endpoint (line 90). A legacy or custom-backend row turns GET into a 500.

Copy link
Copy Markdown
Contributor Author

@amoghrajesh amoghrajesh May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above — no non-JSON rows can exist since both write paths always do json.dumps(...) before storing.

raise ValueError("value cannot be null")
if isinstance(v, float) and not math.isfinite(v):
raise ValueError("value must be a finite number; NaN and Inf are not JSON representable")
if len(json.dumps(v)) > _MAX_SERIALIZED_BYTES:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The non-finite check on line 56 only catches top-level floats. Nested non-finite values pass through:

TaskStateBody.model_validate({"value": {"a": float("nan")}})  # passes
# then json.dumps({"a": float("nan")}) -> '{"a": NaN}'  -- not valid JSON

Simpler + complete fix: drop the isinstance(v, float) and not math.isfinite(v) branch and pass allow_nan=False to the size check:

try:
    serialized = json.dumps(v, allow_nan=False)
except ValueError as e:
    raise ValueError("value contains non-finite numbers; NaN and Inf are not JSON representable") from e
if len(serialized) > _MAX_SERIALIZED_BYTES:
    raise ValueError(...)

That catches NaN/Inf anywhere in the structure with a single pass and the existing top-level guard becomes redundant. Same fix needed in asset_state.py:57 -- the two validators are line-for-line identical.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, making the change

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled in 1e8e0ea


@pytest.mark.parametrize("bad_float", [float("nan"), float("inf"), float("-inf")])
def test_non_finite_float_returns_422(self, test_client, bad_float):
with pytest.raises(ValueError, match="Out of range float values are not JSON compliant"):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't actually verify the validator behavior. The matched message "Out of range float values are not JSON compliant" is CPython's stdlib json error message (raised by FastAPI's request-body serializer with allow_nan=False before the request ever reaches the route), not the validator's "value must be a finite number; NaN and Inf are not JSON representable".

Consequence: the test passes even if the validator is removed entirely, so it gives false confidence in the new check. Two ways to fix:

  1. To actually exercise the validator, you have to bypass FastAPI's JSON parsing -- e.g. directly construct TaskStateBody(value=float("nan")) and pytest.raises(ValidationError, match="value must be a finite number").
  2. Or, if the intent is to confirm the endpoint rejects non-finite floats (regardless of layer), assert response.status_code == 422 instead of expecting ValueError to propagate.

Same issue in test_asset_state.py:185.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made changes.

  • Endpoint tests now status code instead of catching ValueError
  • Added a new test_non_finite_float_rejected_by_validator which tests the validator directly with both top-level and nested non-finite floats

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled in 1e8e0ea

@amoghrajesh amoghrajesh moved this from In progress to In review in AIP-103: Task State Management May 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:airflow-ctl area:API Airflow's REST/HTTP API area:UI Related to UI/UX. For Frontend Developers. backport-to-airflow-ctl/v0-1-test

Projects

Status: In review

Development

Successfully merging this pull request may close these issues.

2 participants