From cdf529d43016ee64984ee8f5fe12cde5423de540 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Mon, 27 Apr 2026 23:30:32 +0200 Subject: [PATCH] Apply reserved-key check to XCom update payload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit XComCreateBody (POST /xcomEntries) rejects payloads containing reserved serialization keys (__classname__, __type, __var, __data__, …) via a field_validator that walks the value recursively. XComUpdateBody (PATCH /xcomEntries/{key}) was missing the same validator, so a payload that POST correctly rejects with 422 was accepted on PATCH and stored as-is. Extracts the recursive walker to a module-level _check_forbidden_xcom_keys helper and has both XComCreateBody and XComUpdateBody delegate to it, so create and update apply the same payload-key check from a single source. A parametrized test mirroring the existing test_create_xcom_entry_blocks_forbidden_keys covers the PATCH path. --- .../api_fastapi/core_api/datamodels/xcom.py | 47 +++++++++++-------- .../core_api/routes/public/test_xcom.py | 20 ++++++++ 2 files changed, 48 insertions(+), 19 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py index 05cbb47c36ca4..b42cc176f01c7 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py @@ -83,6 +83,28 @@ class XComCollectionResponse(BaseModel): total_entries: int +def _check_forbidden_xcom_keys(value: Any) -> Any: + """Recursively reject forbidden deserialization keys in user-provided XCom data.""" + from airflow._shared.serialization import FORBIDDEN_XCOM_KEYS + + def _walk(obj: Any, path: str = "value") -> None: + if isinstance(obj, dict): + found = FORBIDDEN_XCOM_KEYS & obj.keys() + if found: + raise ValueError( + f"XCom {path} contains reserved serialization keys: {', '.join(sorted(found))}. " + f"These keys are reserved for internal use." + ) + for k, v in obj.items(): + _walk(v, f"{path}.{k}") + elif isinstance(obj, (list, tuple)): + for i, item in enumerate(obj): + _walk(item, f"{path}[{i}]") + + _walk(value) + return value + + class XComCreateBody(StrictBaseModel): """Payload serializer for creating an XCom entry.""" @@ -93,25 +115,7 @@ class XComCreateBody(StrictBaseModel): @field_validator("value") @classmethod def _check_forbidden_keys(cls, value: Any) -> Any: - """Recursively check for forbidden deserialization keys in user-provided XCom data.""" - from airflow._shared.serialization import FORBIDDEN_XCOM_KEYS - - def _walk_forbidden_keys(obj: Any, path: str = "value") -> None: - if isinstance(obj, dict): - found = FORBIDDEN_XCOM_KEYS & obj.keys() - if found: - raise ValueError( - f"XCom {path} contains reserved serialization keys: {', '.join(sorted(found))}. " - f"These keys are reserved for internal use." - ) - for k, v in obj.items(): - _walk_forbidden_keys(v, f"{path}.{k}") - elif isinstance(obj, (list, tuple)): - for i, item in enumerate(obj): - _walk_forbidden_keys(item, f"{path}[{i}]") - - _walk_forbidden_keys(value) - return value + return _check_forbidden_xcom_keys(value) class XComUpdateBody(StrictBaseModel): @@ -119,3 +123,8 @@ class XComUpdateBody(StrictBaseModel): value: Any map_index: int = -1 + + @field_validator("value") + @classmethod + def _check_forbidden_keys(cls, value: Any) -> Any: + return _check_forbidden_xcom_keys(value) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py index 97d3bb9c5527b..07dae5ef6bbfb 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py @@ -865,6 +865,26 @@ def test_patch_xcom_entry_with_slash_key(self, test_client, session): assert response.json()["value"] == new_value check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", logical_date=None) + @pytest.mark.parametrize( + ("key", "value"), + [ + ("__classname__", {"__classname__": "airflow.sdk.definitions.connection.Connection"}), + ("__type", {"__type": "airflow.sdk.definitions.connection.Connection", "__var": {}}), + ("__data__", {"nested": {"__data__": "malicious"}}), + ], + ) + def test_patch_xcom_entry_blocks_forbidden_keys(self, test_client, key, value): + """Test that XCom update blocks deserialization metadata keys.""" + self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE) + response = test_client.patch( + f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}", + json={"value": value, "map_index": -1}, + ) + assert response.status_code == 422 + detail = str(response.json()["detail"]) + assert "reserved serialization keys" in detail + assert key in detail + def test_patch_xcom_preserves_int_type(self, test_client, session): """Test scenario described in #59032: if existing XCom value type is int, after patching with different value, it should still be int in the API response.