Add PolicyExposureReport to AgentOperator#64433
Add PolicyExposureReport to AgentOperator#64433gopidesupavan wants to merge 1 commit intoapache:mainfrom
Conversation
462dab1 to
f94014a
Compare
f94014a to
659064b
Compare
| ti = context["task_instance"] | ||
| llm_exposure = LLMExposure( | ||
| llm_conn_id=self.llm_conn_id, | ||
| connection_type=self.llm_hook.conn_type, |
There was a problem hiding this comment.
self.llm_hook.conn_type triggers the @cached_property, which resolves the Airflow connection and instantiates the hook. If the connection is misconfigured, the outer try/except swallows the error and the task then fails inside _build_agent() with a different traceback. The user sees both a "Failed to build policy exposure report" warning and the actual connection error, which can be confusing.
Consider wrapping just the conn_type access:
try:
connection_type = self.llm_hook.conn_type
except Exception:
connection_type = NoneOr just use self.llm_conn_id here and skip resolving the connection type in the report.
| ) | ||
| ] | ||
| for table_name in sorted(self._allowed_tables or ()): | ||
| resources.append(ResourceExposure(category="table", name=table_name, access_mode="read")) |
There was a problem hiding this comment.
Per-table resources are always access_mode="read", even when self._allow_writes is True. The database-level entry correctly says "read_write", but the individual table entries are inconsistent. Should this be:
access_mode="read_write" if self._allow_writes else "read"| """Return True when a hook method name suggests write-like side effects.""" | ||
| return any( | ||
| token in method_name.lower() | ||
| for token in ("create", "delete", "drop", "update", "insert", "write", "post", "put", "patch") |
There was a problem hiding this comment.
This misses some common mutating patterns: remove, send, execute, upload, publish. For example, S3Hook.delete_objects would match, but S3Hook.upload_file or SlackHook.send_message would not.
| if runtime_notes: | ||
| return PolicyRiskSummary(level="low", reasons=["configured access includes runtime controls"]) |
There was a problem hiding this comment.
The reason "configured access includes runtime controls" fires when runtime_notes has entries like "tool logging enabled" or "durable replay enabled". These are observability features, not governance controls. Something like "no external tool access configured" (same as the no-runtime-notes branch) might be more accurate here, since the risk level is "low" either way.
There was a problem hiding this comment.
Pull request overview
Adds a first-pass “Policy Exposure Report” for common.ai’s AgentOperator, persisting a structured, configuration-derived snapshot of LLM/tool access to XCom for governance/observability.
Changes:
- Introduces
PolicyExposureReportPydantic models plus helpers to summarize toolset/resource exposure and deterministically classify risk. - Writes the policy exposure snapshot to XCom (
airflow_common_ai_policy_exposure) at the start ofAgentOperator.execute(), best-effort. - Implements
describe_policy_exposure()across common toolsets (SQL/MCP/Hook/DataFusion) with unit tests and docs.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| providers/common/ai/src/airflow/providers/common/ai/utils/policy_exposure.py | New models/helpers for toolset exposure, report structure, and deterministic risk classification. |
| providers/common/ai/src/airflow/providers/common/ai/operators/agent.py | Builds and pushes the report to XCom at task start. |
| providers/common/ai/src/airflow/providers/common/ai/toolsets/sql.py | Adds exposure description for SQL toolset (db/tables + write flags). |
| providers/common/ai/src/airflow/providers/common/ai/toolsets/mcp.py | Adds exposure description for MCP server + prefix. |
| providers/common/ai/src/airflow/providers/common/ai/toolsets/hook.py | Adds exposure description for hook methods + mutating-method heuristics. |
| providers/common/ai/src/airflow/providers/common/ai/toolsets/datafusion.py | Adds exposure description for datasources/URIs + wildcard flags. |
| providers/common/ai/tests/unit/common/ai/utils/test_policy_exposure.py | New unit tests for unwrap/describe fallback and risk classification logic. |
| providers/common/ai/tests/unit/common/ai/toolsets/test_sql.py | Tests SQLToolset.describe_policy_exposure(). |
| providers/common/ai/tests/unit/common/ai/toolsets/test_mcp.py | Tests MCPToolset.describe_policy_exposure(). |
| providers/common/ai/tests/unit/common/ai/toolsets/test_hook.py | Tests HookToolset.describe_policy_exposure() and mutating flags. |
| providers/common/ai/tests/unit/common/ai/toolsets/test_datafusion.py | Tests DataFusionToolset.describe_policy_exposure() and wildcard flags. |
| providers/common/ai/tests/unit/common/ai/operators/test_agent.py | Extends operator tests to validate report XCom push and runtime notes. |
| providers/common/ai/tests/unit/common/ai/decorators/test_agent.py | Adjusts decorator tests to provide a task_instance context for XCom push. |
| providers/common/ai/docs/operators/agent.rst | Documents the new XCom-backed policy exposure report. |
| providers/common/ai/AGENTS.md | Adds guidance to implement describe_policy_exposure() on toolsets. |
|
|
||
|
|
||
| def test_describe_toolset_exposure_uses_base_toolset_for_wrappers(): | ||
| wrapped = MagicMock() |
There was a problem hiding this comment.
MagicMock() is created without a spec, which can hide bugs by allowing any attribute/method access. Consider using MagicMock(spec=["describe_policy_exposure", "id"]) (or a concrete toolset class) so the mock matches the expected toolset surface.
| wrapped = MagicMock() | |
| wrapped = MagicMock(spec=["describe_policy_exposure", "id"]) |
| ti.run_id = "run_1" | ||
| ti.task_id = "test" | ||
| ti.map_index = map_index | ||
| ti.xcom_push = MagicMock() |
There was a problem hiding this comment.
ti is already created with a spec that includes xcom_push, so reassigning ti.xcom_push = MagicMock() is redundant and also creates an unspecced mock. Prefer relying on the existing ti.xcom_push mock created by MagicMock(spec=...), or assign a specced callable mock if you need custom behavior.
| ti.xcom_push = MagicMock() |
| ), | ||
| patch("airflow.providers.common.ai.durable.storage._get_base_path"), | ||
| patch("pydantic_ai.models.infer_model", autospec=True, return_value=MagicMock()), | ||
| patch("pydantic_ai.models.wrapper.infer_model", side_effect=lambda model: model), | ||
| ): |
There was a problem hiding this comment.
The patched infer_model uses return_value=MagicMock() without a spec, which can mask interface mismatches with the real model object. Use a specced mock (or a lightweight fake) that matches the attributes accessed by CachingModel/agent.override in this code path.
| ti.run_id = "run_1" | ||
| ti.task_id = "test" | ||
| ti.map_index = -1 | ||
| ti.xcom_push = MagicMock() |
There was a problem hiding this comment.
Similar to the operator tests, ti is created with a spec that already includes xcom_push, so reassigning ti.xcom_push = MagicMock() is redundant and leaves you with an unspecced callable mock. Prefer relying on the existing ti.xcom_push mock from MagicMock(spec=...), or replace it with a specced callable if needed.
| ti.xcom_push = MagicMock() |
| resources.append( | ||
| ResourceExposure( | ||
| category="uri", | ||
| name=config.uri, | ||
| access_mode="read_write" if self._allow_writes else "read", | ||
| details={"table_name": config.table_name}, | ||
| ) |
There was a problem hiding this comment.
describe_policy_exposure() currently stores the raw config.uri value into the XCom-backed policy exposure report. URIs can sometimes include sensitive details (e.g. bucket/key names, local paths, or embedded credentials/query params depending on scheme), and XCom values may be broadly visible. Consider redacting or normalizing URIs before persisting (e.g., strip credentials/query params, or only store scheme + bucket/table identifier).
| describe = getattr(base_toolset, "describe_policy_exposure", None) | ||
| if callable(describe): | ||
| try: | ||
| exposure = describe() | ||
| if isinstance(exposure, ToolsetExposure): | ||
| return exposure | ||
| return ToolsetExposure( | ||
| toolset_type=type(base_toolset).__name__, | ||
| toolset_id=_get_toolset_id(base_toolset), | ||
| summary="Toolset returned an invalid policy exposure report.", | ||
| risk_flags=["invalid toolset exposure report"], | ||
| ) | ||
| except Exception: | ||
| return ToolsetExposure( | ||
| toolset_type=type(base_toolset).__name__, | ||
| toolset_id=_get_toolset_id(base_toolset), | ||
| summary="Toolset exposure details are unavailable because report generation failed.", | ||
| risk_flags=["toolset exposure report failed"], | ||
| ) |
There was a problem hiding this comment.
describe_toolset_exposure() swallows exceptions from describe_policy_exposure() and returns a fallback exposure, but it does not log the failure. The docs for the policy exposure report state that toolset report generation failures are logged; consider logging the exception here (or re-raising and logging at the caller) so operators have visibility into why a toolset’s exposure report was unavailable.
| resources.append( | ||
| ResourceExposure( | ||
| category="uri", | ||
| name=config.uri, | ||
| access_mode="read_write" if self._allow_writes else "read", | ||
| details={"table_name": config.table_name}, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
DataSourceConfig.uri is optional for catalog-managed/table-provider formats (e.g. Iceberg) and may be an empty string. This implementation always adds a ResourceExposure(category="uri", name=config.uri, ...), which can yield empty/meaningless URI entries in the report. Consider only emitting a uri resource when config.uri is non-empty (and/or adding a different resource describing the catalog/table identifier from config.options when is_table_provider is true).
| resources.append( | |
| ResourceExposure( | |
| category="uri", | |
| name=config.uri, | |
| access_mode="read_write" if self._allow_writes else "read", | |
| details={"table_name": config.table_name}, | |
| ) | |
| ) | |
| if config.uri: | |
| resources.append( | |
| ResourceExposure( | |
| category="uri", | |
| name=config.uri, | |
| access_mode="read_write" if self._allow_writes else "read", | |
| details={"table_name": config.table_name}, | |
| ) | |
| ) |
What
Implemented a first pass of policy exposure reporting for common.ai AgentOperator.
This adds a structured, XCom-backed PolicyExposureReport snapshot for each task run so users can inspect the configured AI access surface without reading operator code. The report captures task identity, LLM metadata, attached toolset exposure summaries, runtime notes, and a deterministic risk summary.
For each task the output is stored in the
airflow_common_ai_policy_exposurexcom keyIdeally this kind of view is very useful in large entierprises if anyone is using llm its easy way to track what exactly the run uses configs instead of in the logs or input from the dag.
eg:
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.