Skip to content

Add opt-in Presidio input_guard for outbound LLM PII sanitization#65485

Draft
gopidesupavan wants to merge 1 commit intoapache:mainfrom
gopidesupavan:pii-masking-llm-input
Draft

Add opt-in Presidio input_guard for outbound LLM PII sanitization#65485
gopidesupavan wants to merge 1 commit intoapache:mainfrom
gopidesupavan:pii-masking-llm-input

Conversation

@gopidesupavan
Copy link
Copy Markdown
Member

Summary

Adds an opt-in input_guard parameter to LLMOperator, AgentOperator, and
LLMFileAnalysisOperator that uses Microsoft Presidio to sanitize PII and
other sensitive entities in text before it is sent to the model.

What it does

  • Sanitizes user prompt text, system instructions, and multi-turn message
    history (via pydantic-ai history_processors) before each model call
  • Sanitizes normalized file content in LLMFileAnalysisOperator
  • Supports four Presidio modes: replace (default), mask, redact, hash
  • Rejects binary multimodal attachments by default when guarding is enabled;
    opt out with attachment_policy="allow_unmodified"
  • Logs sanitization summaries (entity types and counts) at INFO; optionally
    logs a truncated sanitized preview with log_sanitized_text=True

Usage

LLMOperator(
    task_id="safe_call",
    prompt="Customer Jane Doe, jane@example.com, card 4111 1111 1111 1111",
    llm_conn_id="pydanticai_default",
    input_guard={
        "enabled": True,
        "entities": ["EMAIL_ADDRESS", "CREDIT_CARD", "PHONE_NUMBER"],
        "mode": "replace",
    },
)

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)
  • gpt codex

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

…/common/ai that sanitizes text before it is sent to the model
@gopidesupavan
Copy link
Copy Markdown
Member Author

Screenshot 2026-04-19 at 11 09 32

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in input_guard capability to the common.ai provider so outbound text (prompts, instructions, and message history) can be sanitized with Microsoft Presidio before being sent to an LLM.

Changes:

  • Introduces a new privacy module (InputGuardConfig, Presidio-backed guard, history processors, and related exceptions).
  • Threads input_guard through LLMOperator, AgentOperator, and LLMFileAnalysisOperator via PydanticAIHook.create_agent().
  • Adds documentation, an example DAG, optional presidio extras, and unit tests covering modes/logging/attachment rejection.

Reviewed changes

Copilot reviewed 25 out of 25 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
providers/common/ai/src/airflow/providers/common/ai/privacy/config.py Adds InputGuardConfig normalization and defaults for guard behavior.
providers/common/ai/src/airflow/providers/common/ai/privacy/exceptions.py Defines shared guard exceptions and the binary-attachment rejection message.
providers/common/ai/src/airflow/providers/common/ai/privacy/history.py Adds sanitization helpers and a pydantic-ai history processor for outbound message content.
providers/common/ai/src/airflow/providers/common/ai/privacy/presidio_guard.py Implements the Presidio analyzer/anonymizer integration and privacy-safe logging.
providers/common/ai/src/airflow/providers/common/ai/privacy/init.py Exposes the privacy guard public API surface.
providers/common/ai/src/airflow/providers/common/ai/hooks/pydantic_ai.py Adds input_guard to create_agent() to sanitize instructions and prepend a history processor.
providers/common/ai/src/airflow/providers/common/ai/operators/llm.py Adds input_guard to LLMOperator and centralizes agent creation via _create_agent().
providers/common/ai/src/airflow/providers/common/ai/operators/agent.py Forwards input_guard when building an agent-backed operator.
providers/common/ai/src/airflow/providers/common/ai/operators/llm_file_analysis.py Sanitizes file-analysis user_content when guarding is enabled; switches to _create_agent().
providers/common/ai/src/airflow/providers/common/ai/operators/llm_sql.py Uses _create_agent() to inherit input_guard behavior consistently.
providers/common/ai/src/airflow/providers/common/ai/operators/llm_schema_compare.py Uses _create_agent() to inherit input_guard behavior consistently.
providers/common/ai/src/airflow/providers/common/ai/operators/llm_branch.py Uses _create_agent() to inherit input_guard behavior consistently.
providers/common/ai/tests/unit/common/ai/privacy/test_presidio_guard.py Adds unit coverage for config normalization, modes, caching, logging, and attachment policy behavior.
providers/common/ai/tests/unit/common/ai/hooks/test_pydantic_ai.py Adds tests asserting instructions sanitization + history processor injection when input_guard is enabled.
providers/common/ai/tests/unit/common/ai/operators/test_llm.py Adds a test asserting LLMOperator forwards input_guard to the hook.
providers/common/ai/tests/unit/common/ai/operators/test_agent.py Adds a test asserting AgentOperator forwards input_guard to the hook.
providers/common/ai/tests/unit/common/ai/operators/test_llm_file_analysis.py Adds a test asserting binary attachments are rejected when guarding is enabled.
providers/common/ai/docs/privacy.rst Introduces privacy guard documentation and limitations/attachment policy guidance.
providers/common/ai/docs/index.rst Links the new privacy guard docs into the provider docs index.
providers/common/ai/docs/operators/llm.rst Documents input_guard usage for LLMOperator.
providers/common/ai/docs/operators/agent.rst Documents input_guard usage for AgentOperator.
providers/common/ai/docs/operators/llm_file_analysis.rst Documents input_guard usage and binary attachment behavior for file analysis.
providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_input_guard.py Adds runnable examples for guard modes and preview logging.
providers/common/ai/pyproject.toml Adds presidio optional extra and includes Presidio deps in dev.

try:
entities = value.get("entities")
if entities is not None:
value = {**value, "entities": tuple(entities)}
Comment on lines +95 to +96
from airflow.providers.common.ai.privacy.config import InputGuardConfig as _Config

Comment on lines 185 to 193
def create_agent(
self, output_type: type[Any] = str, *, instructions: str, **agent_kwargs
self,
output_type: type[Any] = str,
*,
instructions: str,
input_guard: InputGuardConfig | dict[str, Any] | None = None,
**agent_kwargs,
) -> Agent[None, Any]:
"""
Comment on lines +118 to +121
if self.input_guard is not None:
request.user_content = sanitize_file_content(
request.user_content, self.input_guard, logger=self.log
)
@gopidesupavan gopidesupavan marked this pull request as draft April 19, 2026 10:44
new_parts = []
changed = False
for part_index, part in enumerate(parts):
content = getattr(part, "content", None)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tool call args bypass the guard entirely. Real pydantic_ai.messages.ToolCallPart has no content field, just args: str | dict[str, Any] | None, so this getattr returns None and the part falls into the continue branch on line 126. On the next turn the LLM sees the raw tool args it emitted earlier, which routinely include PII (e.g. send_email(to="alice@example.com", ...)). I verified this against pydantic-ai 1.80 with a TestModel + real tool. The test_part_without_content_attr_is_kept test uses a fake ToolCallPart dataclass that doesn't have args, so it confirms the bypass is intentional but hides the production impact. Same gap for BuiltinToolCallPart.args. Consider extending the processor to also sanitize part.args for these parts.

for key, value in content.items()
}
# Non-text, non-container types (int, float, …) pass through unchanged.
return content
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pydantic BaseModel tool returns bypass this function. ToolReturnPart.content is typed Any, and tools often return BaseModel instances like Customer(email="jane@example.com"). It isn't str/list/tuple/dict, so it falls through here unchanged. Pydantic-ai then calls part.model_response_str() (which JSON-dumps it) when building the model request, so the PII reaches the model. Handling BaseModel via .model_dump() and dataclasses via asdict would close this. Verified in a minimal repro: a tool returning Customer(email="user@example.com") arrives at the model as {"email":"user@example.com"}.

"""
config = InputGuardConfig.from_value(input_guard)
if config.enabled:
instructions = PresidioInputGuard(config, logger=self.log).sanitize_text(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only sanitizes the static instructions string passed to Agent(...). Pydantic-ai merges three sources into the final ModelRequest.instructions: (a) static instructions, (b) @agent.instructions callables, and (c) AbstractToolset.get_instructions(). Only (a) is sanitized here. Because ModelRequest.instructions is a top-level attribute (not in message.parts), the history processor in history.py never inspects it either. So anyone adding dynamic instructions, or using a toolset that injects instructions (SQL toolset, MCP, etc.), gets unsanitized text in every request. Consider sanitizing at a before_model_request hook on request_context.request.instructions so all three sources are covered.

) -> Any:
"""Sanitize supported user-content payloads while respecting attachment policy."""
if isinstance(content, BinaryContent):
if config.attachment_policy == "reject":
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rejects BinaryContent regardless of origin. When agent.run_sync is used with a model that returns a FilePart (model generated an image or document, FilePart.content is BinaryContent), the next turn's history processor calls into this function and raises InputGuardAttachmentError even though the user never attached anything. The feature is documented as guarding "user input", so maybe only UserPromptPart content should go through the reject check. A separate knob for model-output attachments would be safer.

entity_type = getattr(result, "entity_type", "PII")
operators[entity_type] = OperatorConfig(
"replace",
{"new_value": self.config.replace_value_template.format(entity_type=entity_type)},
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace_value_template is user-supplied and goes through str.format(entity_type=...) with no validation. A typo like "<{entity}>" raises KeyError: 'entity' at sanitize time, after the expensive analyze call has already run. Templates like "{0}" or "{__class__}" work but read unintended attributes. Validating the template once in InputGuardConfig.from_value (render with a dummy entity_type, reject on KeyError) would catch this at DAG-parse time.

sample_rows=self.sample_rows,
)
if self.input_guard is not None:
request.user_content = sanitize_file_content(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This calls the guard once on request.user_content, then _create_agent wires up the history processor which runs Presidio a second time on the same text once agent.run_sync(request.user_content) fires. For replace mode it's idempotent but wasteful. For hash mode it isn't idempotent: a hex hash can re-match CREDIT_CARD / US_BANK_NUMBER regexes and get re-hashed, producing a different final value than documented. And _log_sanitization emits two INFO lines for the same content. The early binary-reject check is the only piece that has to run before run_sync; the rest could be left to the history processor.

if self.input_guard is not None:
agent_kwargs["input_guard"] = self.input_guard
return self.llm_hook.create_agent(
output_type=output_type or self.output_type,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output_type or self.output_type is a subtle trap. It works today because classes are truthy, but if a caller ever passes a falsy value, the instance attribute silently takes over. output_type if output_type is not None else self.output_type reads clearer and matches how instructions is handled on the next line.

self._log_sanitization(results=results, sanitized_text=sanitized_text, source=source)
return sanitized_text

def sanitize_value(self, value: Any, *, source: str = "value") -> Any:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sanitize_value isn't called anywhere in the PR. history.py::sanitize_user_content covers the same ground (with slightly different semantics, Mapping vs dict). Either wire this up or drop it, two parallel implementations are a maintenance hazard.


def _make_engines(analyzer_results=None, anonymized_text="<REDACTED>"):
"""Return (analyzer_mock, anonymizer_mock) pre-configured for a single analyze call."""
analyzer = MagicMock()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tests mock AnalyzerEngine and AnonymizerEngine, so no test exercises actual Presidio behaviour. The mode-specific outputs (what replace / mask / redact / hash actually produce), the KeyError from a bad template, and the missing-language-model case are invisible to CI. One end-to-end test guarded by pytest.importorskip("presidio_analyzer") would catch real regressions without inflating CI cost much.

:param logger: Logger to use for sanitization summaries.
:return: Sanitized content, or the original content when guarding is disabled.
"""
from airflow.providers.common.ai.privacy.config import InputGuardConfig as _Config
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline import. There's no circular dependency here, config.py is already referenced at the top via TYPE_CHECKING, and importing it eagerly is fine. Moving this to the module-level imports matches the provider convention and the global "imports at top" rule.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants