From 25903aa19c8e1fa0d9013936f7e157d4fc9a988e Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sat, 30 May 2026 20:25:06 +0100 Subject: [PATCH 1/3] Add Agent Skills support to the Common AI provider Add AgentSkillsToolset, a pydantic-ai toolset that loads agentskills.io SKILL.md bundles from a local directory or a Git repository. Git credentials come from an Airflow git connection (HTTPS token or SSH key) resolved through the Git provider's GitHook: cleartext http and credential-bearing URLs are rejected, interactive credential prompts are disabled, and the token is stripped from the clone's .git/config. Sources are resolved on the worker when the agent enters the toolset, so a token is never baked into the serialized DAG, and clones are removed when the run ends. Pass it via AgentOperator's toolsets=, or use it with a raw pydantic-ai Agent. The framework-agnostic resolve_skills() helper returns local SKILL.md directories for other Agent Skills loaders (LangChain DeepAgents, Strands). --- docs/spelling_wordlist.txt | 1 + providers/common/ai/docs/index.rst | 1 + providers/common/ai/docs/operators/agent.rst | 2 +- providers/common/ai/docs/toolsets.rst | 95 ++++++++ providers/common/ai/pyproject.toml | 15 ++ .../ai/example_dags/example_agent_skills.py | 106 +++++++++ .../skills/sql-reporting/SKILL.md | 41 ++++ .../src/airflow/providers/common/ai/skills.py | 214 ++++++++++++++++++ .../providers/common/ai/toolsets/skills.py | 140 ++++++++++++ .../ai/tests/unit/common/ai/test_skills.py | 155 +++++++++++++ .../unit/common/ai/toolsets/test_skills.py | 140 ++++++++++++ ...eck_providers_subpackages_all_have_init.py | 3 + uv.lock | 30 ++- 13 files changed, 941 insertions(+), 2 deletions(-) create mode 100644 providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_skills.py create mode 100644 providers/common/ai/src/airflow/providers/common/ai/example_dags/skills/sql-reporting/SKILL.md create mode 100644 providers/common/ai/src/airflow/providers/common/ai/skills.py create mode 100644 providers/common/ai/src/airflow/providers/common/ai/toolsets/skills.py create mode 100644 providers/common/ai/tests/unit/common/ai/test_skills.py create mode 100644 providers/common/ai/tests/unit/common/ai/toolsets/test_skills.py diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index fe93cebe41ae6..d8837585eed46 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -837,6 +837,7 @@ instanceTemplates InstanceType instanceType instantiation +InstructionPart integrations ints intvl diff --git a/providers/common/ai/docs/index.rst b/providers/common/ai/docs/index.rst index adde6dbe9cc77..590a770d4816d 100644 --- a/providers/common/ai/docs/index.rst +++ b/providers/common/ai/docs/index.rst @@ -129,6 +129,7 @@ Dependent package ================================================================================================================== ================= `apache-airflow-providers-common-compat `_ ``common.compat`` `apache-airflow-providers-common-sql `_ ``common.sql`` +`apache-airflow-providers-git `_ ``git`` `apache-airflow-providers-standard `_ ``standard`` ================================================================================================================== ================= diff --git a/providers/common/ai/docs/operators/agent.rst b/providers/common/ai/docs/operators/agent.rst index f1c13fb4490d3..51e575d139e3d 100644 --- a/providers/common/ai/docs/operators/agent.rst +++ b/providers/common/ai/docs/operators/agent.rst @@ -304,7 +304,7 @@ Parameters - ``output_type``: Expected output type (default: ``str``). Set to a Pydantic ``BaseModel`` for structured output. - ``toolsets``: List of pydantic-ai toolsets (``SQLToolset``, ``HookToolset``, - etc.). + ``AgentSkillsToolset`` for :ref:`agent-skills`, etc.). - ``enable_tool_logging``: Wrap each toolset in :class:`~airflow.providers.common.ai.toolsets.logging.LoggingToolset` so that every tool call is logged in real time. Default ``True``. diff --git a/providers/common/ai/docs/toolsets.rst b/providers/common/ai/docs/toolsets.rst index ec3927946f9e0..33fc04f0e18d1 100644 --- a/providers/common/ai/docs/toolsets.rst +++ b/providers/common/ai/docs/toolsets.rst @@ -313,6 +313,101 @@ This works because PydanticAI's MCP server classes implement code instead of being managed through Airflow connections and secret backends. +.. _agent-skills: + +``AgentSkillsToolset`` +---------------------- + +:class:`~airflow.providers.common.ai.toolsets.skills.AgentSkillsToolset` loads +`Agent Skills `__ -- ``SKILL.md`` bundles (instructions, +and optionally scripts and resources) that the model discovers and loads *on +demand*. Only a compact catalog of skill names and descriptions sits in the +prompt until the model decides it needs one, so a large skill library costs few +tokens until used (progressive disclosure). + +It is backed by the community `pydantic-ai-skills +`__ package (MIT); native +progressive disclosure is in flight upstream in `pydantic/pydantic-ai#5230 +`__. Install the optional +extra to use it: + +.. code-block:: bash + + pip install "apache-airflow-providers-common-ai[skills]" + +Each source is a local directory or a connection-resolved +:class:`~airflow.providers.common.ai.skills.GitSkills`. Sources are resolved when +the agent enters the toolset, on the worker -- never while the DAG processor +parses the file -- so a Git token is never baked into the serialized DAG, and +cloned repositories are removed when the run ends. + +A local directory of ``SKILL.md`` bundles: + +.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_agent_skills.py + :language: python + :start-after: [START howto_operator_agent_skills_local] + :end-before: [END howto_operator_agent_skills_local] + +A Git repository, with credentials from an Airflow connection: + +.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_agent_skills.py + :language: python + :start-after: [START howto_operator_agent_skills_git] + :end-before: [END howto_operator_agent_skills_git] + +For a private repository, point ``conn_id`` at a +:doc:`git connection `; credentials +are resolved through the Git provider's ``GitHook`` (an HTTPS token in the +connection password, or an SSH key in the connection's extra). A plain ``http://`` +URL with ``conn_id`` is rejected so a credential is never sent in cleartext, and a +``repo_url`` that embeds a username/password is rejected (use ``conn_id``). After +cloning, the credential is stripped from the checkout's ``.git/config``. As with +any ``git clone``, the worker's own git configuration (credential helpers, SSH +agent) may still apply, so run workers without ambient git credentials if you +need strict isolation. + +.. warning:: + + Skill bundles can contain scripts that the agent may run on the worker via + the ``run_skill_script`` tool. For a remote source, anyone who can modify the + repository can introduce code that executes on your worker, outside DAG + review and versioning. Point ``GitSkills`` at a trusted repository, pin + ``branch`` to a trusted ref, and treat skill contents as code that runs in + your environment. + +Parameters +^^^^^^^^^^ + +- ``sources``: List of skill sources -- local directory paths and/or + :class:`~airflow.providers.common.ai.skills.GitSkills`. +- ``exclude_tools``: Optional set of skill tool names to hide from the agent + (e.g. ``{"run_skill_script"}`` to disable on-worker script execution). + +Using Agent Skills with other frameworks +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +``AgentSkillsToolset`` is a standard pydantic-ai toolset, so it also works with a +plain ``pydantic_ai.Agent`` you build yourself, not just ``AgentOperator``. + +Because Agent Skills is a cross-framework format, the connection handling is also +reusable through :func:`~airflow.providers.common.ai.skills.resolve_skills`, which +resolves sources to local ``SKILL.md`` directories that any loader accepts: + +.. code-block:: python + + from airflow.providers.common.ai.skills import GitSkills, resolve_skills + + sources = ["./skills", GitSkills(repo_url="https://github.com/org/skills", conn_id="github_skills")] + with resolve_skills(sources) as dirs: + # LangChain DeepAgents + agent = create_deep_agent(model="openai:gpt-5.4", skills=dirs) + # ...or Strands + agent = Agent(plugins=[AgentSkills(skills=dirs)]) + +``resolve_skills`` needs the Git provider (for ``GitSkills``) but not pydantic-ai, +and removes any cloned directories when the ``with`` block exits. + + Security -------- diff --git a/providers/common/ai/pyproject.toml b/providers/common/ai/pyproject.toml index 0f684c26941bc..b4ce8c67a857f 100644 --- a/providers/common/ai/pyproject.toml +++ b/providers/common/ai/pyproject.toml @@ -80,6 +80,15 @@ dependencies = [ "google" = ["pydantic-ai-slim[google]"] "openai" = ["pydantic-ai-slim[openai]"] "mcp" = ["pydantic-ai-slim[mcp]"] +# Agent Skills (agentskills.io) support. pydantic-ai-skills provides the toolset +# (pulls in pydantic-ai-slim>=1.74 transitively; the provider base floor stays +# 1.71); the git provider supplies GitHook + GitPython for cloning GitSkills with +# credentials from a `git` connection. Native progressive disclosure is tracked +# upstream in pydantic/pydantic-ai#5230; revisit this extra once that lands. +"skills" = [ + "apache-airflow-providers-git>=0.4.0", + "pydantic-ai-skills>=0.11.0", +] "avro" = [ 'fastavro>=1.10.0; python_version < "3.14"', 'fastavro>=1.12.1; python_version >= "3.14"', @@ -105,6 +114,9 @@ dependencies = [ ] "pdf" = ["pypdf>=4.0.0"] "docx" = ["python-docx>=1.0.0"] +"git" = [ + "apache-airflow-providers-git" +] [dependency-groups] dev = [ @@ -113,10 +125,13 @@ dev = [ "apache-airflow-devel-common", "apache-airflow-providers-common-compat", "apache-airflow-providers-common-sql", + "apache-airflow-providers-git", "apache-airflow-providers-standard", # Additional devel dependencies (do not remove this line and add extra development dependencies) "sqlglot>=30.0.0", "pydantic-ai-slim[mcp]", + "pydantic-ai-skills>=0.11.0", + "apache-airflow-providers-git", "apache-airflow-providers-common-sql[datafusion]", "langchain>=1.0.0", "llama-index-core>=0.13.0", diff --git a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_skills.py b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_skills.py new file mode 100644 index 0000000000000..4608446cf107a --- /dev/null +++ b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent_skills.py @@ -0,0 +1,106 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Example DAGs demonstrating Agent Skills with ``AgentOperator``. + +`Agent Skills `__ are ``SKILL.md`` bundles the model +discovers and loads on demand (progressive disclosure). They are passed to the +agent as an ``AgentSkillsToolset`` in the operator's ``toolsets=`` list. Skill +sources are resolved when the task runs, on the worker (not while the DAG +processor parses the file), so a Git token resolved from an Airflow connection +is never baked into the serialized DAG. + +These DAGs need the optional ``skills`` extra:: + + pip install "apache-airflow-providers-common-ai[skills]" +""" + +from __future__ import annotations + +from pathlib import Path + +from airflow.providers.common.ai.operators.agent import AgentOperator +from airflow.providers.common.ai.skills import GitSkills +from airflow.providers.common.ai.toolsets.skills import AgentSkillsToolset +from airflow.providers.common.ai.toolsets.sql import SQLToolset +from airflow.providers.common.compat.sdk import dag + +# Skills ship next to this DAG file; resolve relative to __file__ so the path +# holds regardless of the dag-processor's working directory. +SKILLS_DIR = Path(__file__).parent / "skills" + + +# --------------------------------------------------------------------------- +# 1. Local filesystem skills (a directory of SKILL.md bundles) +# --------------------------------------------------------------------------- + + +# [START howto_operator_agent_skills_local] +@dag(tags=["example"]) +def example_agent_skills_local(): + AgentOperator( + task_id="reporter", + prompt="How many orders did our top 5 customers place last month?", + llm_conn_id="pydanticai_default", + system_prompt="You are a data analyst. Consult your skills before writing SQL.", + toolsets=[ + AgentSkillsToolset(sources=[str(SKILLS_DIR)]), + SQLToolset( + db_conn_id="postgres_default", + allowed_tables=["customers", "orders"], + max_rows=50, + ), + ], + ) + + +# [END howto_operator_agent_skills_local] + +example_agent_skills_local() + + +# --------------------------------------------------------------------------- +# 2. Remote skills from a Git repo, credentials from an Airflow connection +# --------------------------------------------------------------------------- +# ``github_skills`` is a git connection (HTTPS token in the password, or an SSH +# key in the extra). The DAG only references it by id; no credential is inlined. + + +# [START howto_operator_agent_skills_git] +@dag(tags=["example"]) +def example_agent_skills_git(): + AgentOperator( + task_id="support_agent", + prompt="Summarize our refund policy and apply it to order 12345.", + llm_conn_id="pydanticai_default", + system_prompt="You are a support agent. Load the relevant skill before answering.", + toolsets=[ + AgentSkillsToolset( + sources=[ + GitSkills( + repo_url="https://github.com/my-org/agent-skills", + conn_id="github_skills", + path="skills", + ), + ], + ), + ], + ) + + +# [END howto_operator_agent_skills_git] + +example_agent_skills_git() diff --git a/providers/common/ai/src/airflow/providers/common/ai/example_dags/skills/sql-reporting/SKILL.md b/providers/common/ai/src/airflow/providers/common/ai/example_dags/skills/sql-reporting/SKILL.md new file mode 100644 index 0000000000000..b3bef6b2348d8 --- /dev/null +++ b/providers/common/ai/src/airflow/providers/common/ai/example_dags/skills/sql-reporting/SKILL.md @@ -0,0 +1,41 @@ +--- +name: sql-reporting +description: Conventions and review steps for writing analytics SQL against the warehouse. Use whenever the task involves querying tables, building a report, or aggregating metrics. +license: Apache-2.0 +--- + + +# SQL Reporting Skill + +Apply this skill before writing or running any analytics SQL so reports stay +consistent and safe. + +## When to Use This Skill + +Use this skill when the task involves: + +- Querying warehouse tables for a metric, report, or dashboard figure +- Aggregating rows (counts, sums, rolling windows) +- Cross-referencing two or more tables + +## Conventions + +1. Always `SELECT` explicit column names, never `SELECT *`. +2. Filter on a partition/date column first to bound the scan. +3. Alias aggregates with snake_case names (`order_count`, not `count(*)`). +4. Cap exploratory queries with `LIMIT` unless an aggregate already collapses + the result set. +5. Prefer `COUNT(DISTINCT ...)` over a sub-query when de-duplicating. + +## Review Checklist (run before returning an answer) + +- [ ] No `SELECT *`. +- [ ] A date or partition predicate is present. +- [ ] Every aggregate has an explicit alias. +- [ ] The query reads only from tables the task actually needs. + +## Output Format + +Return the final SQL in a fenced ```sql block, then one sentence describing +what the query returns. diff --git a/providers/common/ai/src/airflow/providers/common/ai/skills.py b/providers/common/ai/src/airflow/providers/common/ai/skills.py new file mode 100644 index 0000000000000..9ca782116ad47 --- /dev/null +++ b/providers/common/ai/src/airflow/providers/common/ai/skills.py @@ -0,0 +1,214 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Framework-agnostic `Agent Skills `__ sources for Airflow. + +This module resolves skill *sources* -- a local directory or a ``GitSkills`` +descriptor -- into local ``SKILL.md`` directories, cloning Git repositories with +a token taken from an Airflow connection. The output is a list of directory +paths, the interchange format every Agent Skills implementation consumes +(pydantic-ai-skills, LangChain DeepAgents, Strands), so the same Airflow +credential handling works across frameworks. + +For pydantic-ai use the :class:`~airflow.providers.common.ai.toolsets.skills.AgentSkillsToolset` +binding. For other frameworks, resolve the directories yourself:: + + from airflow.providers.common.ai.skills import GitSkills, resolve_skills + + with resolve_skills(["./skills", GitSkills(repo_url="https://...", conn_id="github")]) as dirs: + # LangChain DeepAgents + agent = create_deep_agent(model=..., skills=dirs) + # ...or Strands + agent = Agent(plugins=[AgentSkills(skills=dirs)]) + +Resolution (connection lookup, clone) happens when ``resolve_skills`` is entered, +so run it inside the task, not at module import / DAG-parse time. The context +manager removes any cloned directories on exit. +""" + +from __future__ import annotations + +import os +import shutil +import tempfile +from collections.abc import Callable, Iterator +from contextlib import contextmanager +from dataclasses import dataclass +from typing import Any, Union +from urllib.parse import urlsplit + +__all__ = ["GitSkills", "SkillSource", "resolve_skills"] + + +@dataclass +class GitSkills: + """ + Agent Skills cloned from a Git repository when resolved. + + :param repo_url: HTTPS or SSH URL of the repository to clone. + :param conn_id: Airflow ``git`` connection used for credentials, resolved + through the Git provider's ``GitHook`` (HTTPS token in the connection + password, or an SSH key in the connection's extra). **Set this for private + repositories.** Plain ``http://`` is rejected when ``conn_id`` is set so a + credential is never sent in cleartext, and a ``repo_url`` with embedded + credentials is rejected (use ``conn_id`` instead). When ``conn_id`` is + ``None`` the clone is unauthenticated; as with any ``git clone``, the + worker's own git configuration (credential helpers, SSH agent) may still + apply, so run workers without ambient git credentials if you need strict + isolation. + :param path: Sub-path inside the repository that holds the skill directories + (e.g. ``"skills"``). Defaults to the repository root. + :param branch: Branch, tag, or ref to check out. Defaults to the + repository's default branch. + + .. warning:: + + Skill bundles can contain scripts an agent may run on the worker. Because + the repository is fetched at run time, anyone who can modify it can + introduce code that runs in your environment, outside DAG review. Point + ``repo_url`` at a trusted repository and pin ``branch`` to a trusted ref. + """ + + repo_url: str + conn_id: str | None = None + path: str = "" + branch: str | None = None + + +SkillSource = Union[str, "os.PathLike[str]", GitSkills] + + +def _clone_git(source: GitSkills) -> tuple[str, str]: + """ + Clone *source* into a fresh temp dir; return (skills_dir, temp_dir_to_remove). + + When ``conn_id`` is set, credentials come from the Airflow ``git`` connection + via ``GitHook`` (HTTPS token or SSH key). The token is stripped from the + clone's ``.git/config`` afterwards so a skill script in the checkout cannot + read it back, interactive credential prompts are disabled, and the temp dir is + removed if the clone fails. As with any ``git clone``, the worker's own git + configuration (credential helpers, SSH agent) may still apply. + """ + try: + from git import Repo + except ImportError as e: + raise ValueError( + "GitSkills requires GitPython. Install the 'skills' extra: " + "pip install 'apache-airflow-providers-common-ai[skills]'." + ) from e + + # Reject credentials embedded directly in the URL: they would be stored in + # the serialized DAG, written back into .git/config by the scrub below, and + # leak into error messages. Credentials must come from ``conn_id`` instead. + split = urlsplit(source.repo_url) + if split.username or split.password: + raise ValueError( + "GitSkills repo_url must not embed a username or password; pass credentials " + "through a git connection via conn_id instead." + ) + if source.conn_id and source.repo_url.startswith("http://"): + raise ValueError( + f"GitSkills refuses to send credentials from conn_id {source.conn_id!r} over plain " + f"http://; use an https:// URL (token) or an ssh URL with a key on the connection." + ) + + clone_kwargs: dict[str, Any] = {"depth": 1} + if source.branch: + clone_kwargs["branch"] = source.branch + # Never drop into an interactive credential prompt on the worker. + base_env = {"GIT_TERMINAL_PROMPT": "0"} + + temp_dir = tempfile.mkdtemp(prefix="airflow_skills_") + hook = None + try: + if source.conn_id: + from airflow.providers.git.hooks.git import GitHook + + hook = GitHook(git_conn_id=source.conn_id, repo_url=source.repo_url) + with hook.configure_hook_env(): + clone_url = hook.repo_url or source.repo_url + repo = Repo.clone_from(clone_url, temp_dir, env={**base_env, **hook.env}, **clone_kwargs) + else: + repo = Repo.clone_from(source.repo_url, temp_dir, env=base_env, **clone_kwargs) + # Strip any embedded credential from .git/config so a skill script in the + # checkout cannot read it back out of the temporary clone. + repo.remote("origin").set_url(source.repo_url) + except Exception as exc: + shutil.rmtree(temp_dir, ignore_errors=True) + # GitPython errors embed the failing command, which may contain the + # token-bearing URL GitHook built -- scrub it before surfacing to logs. + message = str(exc) + if hook is not None and hook.repo_url and hook.repo_url != source.repo_url: + message = message.replace(hook.repo_url, source.repo_url) + raise RuntimeError(f"Failed to clone {source.repo_url}: {message}") from None + except BaseException: + shutil.rmtree(temp_dir, ignore_errors=True) + raise + + skills_dir = os.path.join(temp_dir, source.path) if source.path else temp_dir + return skills_dir, temp_dir + + +def _materialize_skills(sources: list[SkillSource]) -> tuple[list[str], Callable[[], None]]: + """ + Resolve *sources* to local directories; return (directories, cleanup). + + ``cleanup`` removes any directories cloned for ``GitSkills`` sources; local + directory sources are returned untouched and are not removed. + """ + directories: list[str] = [] + temp_dirs: list[str] = [] + try: + for source in sources: + if isinstance(source, GitSkills): + skills_dir, temp_dir = _clone_git(source) + directories.append(skills_dir) + temp_dirs.append(temp_dir) + elif isinstance(source, (str, os.PathLike)): + directories.append(os.fspath(source)) + else: + raise TypeError( + f"Unsupported skill source {type(source).__name__!r}; expected a path or GitSkills." + ) + except BaseException: + # Don't leak partially cloned directories if a later source fails. + for temp_dir in temp_dirs: + shutil.rmtree(temp_dir, ignore_errors=True) + raise + + def cleanup() -> None: + for temp_dir in temp_dirs: + shutil.rmtree(temp_dir, ignore_errors=True) + + return directories, cleanup + + +@contextmanager +def resolve_skills(sources: list[SkillSource]) -> Iterator[list[str]]: + """ + Resolve skill *sources* to local ``SKILL.md`` directories. + + Yields a list of directory paths suitable for any Agent Skills loader + (pydantic-ai-skills, LangChain DeepAgents ``skills=``, Strands + ``AgentSkills(skills=...)``). Cloned repositories are removed on exit, so use + the returned directories inside the ``with`` block. + """ + directories, cleanup = _materialize_skills(sources) + try: + yield directories + finally: + cleanup() diff --git a/providers/common/ai/src/airflow/providers/common/ai/toolsets/skills.py b/providers/common/ai/src/airflow/providers/common/ai/toolsets/skills.py new file mode 100644 index 0000000000000..6df834e2dc20d --- /dev/null +++ b/providers/common/ai/src/airflow/providers/common/ai/toolsets/skills.py @@ -0,0 +1,140 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +A pydantic-ai toolset that loads `Agent Skills `__. + +``AgentSkillsToolset`` is a normal pydantic-ai ``AbstractToolset``: it can be +passed to :class:`~airflow.providers.common.ai.operators.agent.AgentOperator` +via ``toolsets=`` or used directly with a ``pydantic_ai.Agent`` anywhere the +Airflow connection backend is reachable (i.e. inside a worker/task runtime). + +Skill sources are resolved lazily when the agent enters the toolset (run time, +on the worker), never at DAG-parse time, so a Git token resolved from an Airflow +connection is never baked into the serialized DAG. Cloned repositories are +removed when the toolset context exits. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from airflow.providers.common.ai.skills import SkillSource, _materialize_skills + +try: + from pydantic_ai.toolsets.abstract import AbstractToolset +except ImportError: # pragma: no cover - pydantic-ai is a provider dependency + AbstractToolset = object # type: ignore[assignment,misc] + +if TYPE_CHECKING: + from collections.abc import Callable, Sequence + + from pydantic_ai._run_context import RunContext + from pydantic_ai.messages import InstructionPart + from pydantic_ai.toolsets.abstract import ToolsetTool + + +class AgentSkillsToolset(AbstractToolset): + """ + A pydantic-ai toolset that loads Agent Skills, with Git credentials from Airflow connections. + + Sources are local directory paths and/or + :class:`~airflow.providers.common.ai.skills.GitSkills`. + + :param sources: Skill sources -- local directory paths and/or ``GitSkills``. + :param exclude_tools: Optional set of skill tool names to hide from the agent + (e.g. ``{"run_skill_script"}`` to disable on-worker script execution). + + Requires the ``skills`` extra: ``pip install "apache-airflow-providers-common-ai[skills]"``. + """ + + def __init__( + self, + sources: list[SkillSource], + *, + exclude_tools: set[str] | None = None, + ) -> None: + self._sources = list(sources) + self._exclude_tools = exclude_tools + self._inner: Any = None + self._cleanup: Callable[[], None] | None = None + + @property + def id(self) -> str | None: + return None + + async def for_run(self, ctx: RunContext) -> AbstractToolset: + # Per-run isolation: pydantic-ai shares one toolset instance across runs, + # but we hold per-run clone/cleanup state on __aenter__/__aexit__. Hand + # each run its own instance so concurrent runs never clobber each other. + return AgentSkillsToolset(self._sources, exclude_tools=self._exclude_tools) + + async def __aenter__(self) -> AgentSkillsToolset: + # Resolve + clone at run time, on the worker -- not at DAG-parse time. + try: + from pydantic_ai_skills import SkillsToolset + except ImportError as e: + raise ValueError( + "AgentSkillsToolset requires the optional 'skills' extra: " + "pip install 'apache-airflow-providers-common-ai[skills]'." + ) from e + + directories, cleanup = _materialize_skills(self._sources) + self._cleanup = cleanup + try: + kwargs: dict[str, Any] = {"directories": directories} + if self._exclude_tools: + kwargs["exclude_tools"] = self._exclude_tools + self._inner = SkillsToolset(**kwargs) + await self._inner.__aenter__() + except BaseException: + cleanup() + self._inner = None + self._cleanup = None + raise + return self + + async def __aexit__(self, *args: Any) -> bool | None: + try: + if self._inner is not None: + return await self._inner.__aexit__(*args) + return None + finally: + if self._cleanup is not None: + self._cleanup() + self._inner = None + self._cleanup = None + + def _require_inner(self) -> Any: + if self._inner is None: + raise RuntimeError( + "AgentSkillsToolset must be entered via 'async with' (the agent does this " + "during a run) before its tools are used." + ) + return self._inner + + async def get_tools(self, ctx: RunContext) -> dict[str, ToolsetTool]: + return await self._require_inner().get_tools(ctx) + + async def call_tool( + self, name: str, tool_args: dict[str, Any], ctx: RunContext, tool: ToolsetTool + ) -> Any: + return await self._require_inner().call_tool(name, tool_args, ctx, tool) + + async def get_instructions( + self, ctx: RunContext + ) -> str | InstructionPart | Sequence[str | InstructionPart] | None: + return await self._require_inner().get_instructions(ctx) diff --git a/providers/common/ai/tests/unit/common/ai/test_skills.py b/providers/common/ai/tests/unit/common/ai/test_skills.py new file mode 100644 index 0000000000000..efd9b95452487 --- /dev/null +++ b/providers/common/ai/tests/unit/common/ai/test_skills.py @@ -0,0 +1,155 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Tests for the framework-agnostic skill source resolver.""" + +from __future__ import annotations + +import os +from unittest.mock import patch + +import pytest + +from airflow.providers.common.ai.skills import GitSkills, resolve_skills + +pytest.importorskip("git") +pytest.importorskip("airflow.providers.git.hooks.git") + + +class TestGitSkills: + def test_defaults(self): + source = GitSkills(repo_url="https://github.com/org/repo") + assert source.repo_url == "https://github.com/org/repo" + assert source.conn_id is None + assert source.path == "" + assert source.branch is None + + def test_holds_no_secret(self): + source = GitSkills(repo_url="u", conn_id="github_skills") + assert "github_skills" in repr(source) + assert not hasattr(source, "token") + assert not hasattr(source, "password") + + +class TestResolveLocal: + def test_local_dirs_pass_through_untouched(self, tmp_path): + d1, d2 = str(tmp_path / "a"), str(tmp_path / "b") + with resolve_skills([d1, d2]) as dirs: + assert dirs == [d1, d2] + + def test_unsupported_source_raises_typeerror(self): + with pytest.raises(TypeError, match="Unsupported skill source"): + with resolve_skills([123]): + pass + + +class TestResolveGitWithHook: + """Credentials come from the git connection via GitHook; never the environment.""" + + @patch("airflow.providers.git.hooks.git.GitHook") + @patch("git.Repo") + def test_conn_id_uses_githook_and_scrubs_config(self, mock_repo, mock_githook): + mock_githook.return_value.repo_url = "https://user:tok@github.com/org/repo" + mock_githook.return_value.env = {} + src = GitSkills(repo_url="https://github.com/org/repo", conn_id="git_default", path="skills") + + with resolve_skills([src]) as dirs: + assert dirs[0].endswith(os.sep + "skills") + + mock_githook.assert_called_once_with( + git_conn_id="git_default", repo_url="https://github.com/org/repo" + ) + # Cloned via the hook's token-bearing URL... + assert mock_repo.clone_from.call_args.args[0] == "https://user:tok@github.com/org/repo" + # ...with interactive credential prompts disabled... + assert mock_repo.clone_from.call_args.kwargs["env"]["GIT_TERMINAL_PROMPT"] == "0" + # ...then .git/config is scrubbed back to the credential-free URL. + mock_repo.clone_from.return_value.remote.return_value.set_url.assert_called_once_with( + "https://github.com/org/repo" + ) + + @pytest.mark.parametrize( + "repo_url", + ["https://user:tok@github.com/org/repo", "https://tok@github.com/org/repo"], + ) + @patch("airflow.providers.git.hooks.git.GitHook") + @patch("git.Repo") + def test_repo_url_with_embedded_credentials_rejected(self, mock_repo, mock_githook, repo_url): + # Credentials in the URL would land in the serialized DAG and be written + # back to .git/config by the scrub -- reject them outright. + with pytest.raises(ValueError, match="must not embed"): + with resolve_skills([GitSkills(repo_url=repo_url, conn_id="git_default")]): + pass + mock_githook.assert_not_called() + mock_repo.clone_from.assert_not_called() + + @patch("airflow.providers.git.hooks.git.GitHook") + @patch("git.Repo") + def test_http_with_conn_id_is_rejected(self, mock_repo, mock_githook): + with pytest.raises(ValueError, match="http://"): + with resolve_skills([GitSkills(repo_url="http://github.com/org/repo", conn_id="git_default")]): + pass + mock_githook.assert_not_called() + mock_repo.clone_from.assert_not_called() + + @patch("airflow.providers.git.hooks.git.GitHook") + @patch("git.Repo") + def test_no_conn_id_clones_anonymously_without_hook(self, mock_repo, mock_githook): + with resolve_skills([GitSkills(repo_url="https://github.com/org/repo")]): + pass + mock_githook.assert_not_called() + assert mock_repo.clone_from.call_args.args[0] == "https://github.com/org/repo" + + @patch("airflow.providers.git.hooks.git.GitHook") + @patch("git.Repo") + def test_failed_clone_removes_temp_dir(self, mock_repo, mock_githook, tmp_path): + mock_githook.return_value.repo_url = "https://github.com/org/repo" + mock_githook.return_value.env = {} + mock_repo.clone_from.side_effect = RuntimeError("boom") + created = str(tmp_path / "clone") + os.makedirs(created) + with patch("airflow.providers.common.ai.skills.tempfile.mkdtemp", return_value=created): + with pytest.raises(RuntimeError): + with resolve_skills( + [GitSkills(repo_url="https://github.com/org/repo", conn_id="git_default")] + ): + pass + assert not os.path.exists(created) + + @patch("airflow.providers.git.hooks.git.GitHook") + @patch("git.Repo") + def test_clone_error_scrubs_token_from_message(self, mock_repo, mock_githook): + mock_githook.return_value.repo_url = "https://user:ghp_secret@github.com/org/repo" + mock_githook.return_value.env = {} + mock_repo.clone_from.side_effect = Exception( + "fatal: unable to access https://user:ghp_secret@github.com/org/repo" + ) + with pytest.raises(RuntimeError) as exc_info: + with resolve_skills([GitSkills(repo_url="https://github.com/org/repo", conn_id="git_default")]): + pass + assert "ghp_secret" not in str(exc_info.value) + + +class TestResolveGitCleanup: + @patch("airflow.providers.git.hooks.git.GitHook") + @patch("git.Repo") + def test_clone_dir_removed_on_exit(self, mock_repo, mock_githook): + mock_githook.return_value.repo_url = "https://github.com/org/repo" + mock_githook.return_value.env = {} + with resolve_skills([GitSkills(repo_url="https://github.com/org/repo", conn_id="git_default")]): + dest = mock_repo.clone_from.call_args.args[1] + assert os.path.isdir(dest) + assert not os.path.exists(dest) diff --git a/providers/common/ai/tests/unit/common/ai/toolsets/test_skills.py b/providers/common/ai/tests/unit/common/ai/toolsets/test_skills.py new file mode 100644 index 0000000000000..a3bf781f80336 --- /dev/null +++ b/providers/common/ai/tests/unit/common/ai/toolsets/test_skills.py @@ -0,0 +1,140 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Tests for the pydantic-ai AgentSkillsToolset binding. + +Async methods are driven with ``asyncio.run`` to avoid depending on a particular +pytest-asyncio mode. +""" + +from __future__ import annotations + +import asyncio +import sys +from unittest.mock import MagicMock, patch + +import pytest + +from airflow.providers.common.ai.skills import GitSkills +from airflow.providers.common.ai.toolsets.skills import AgentSkillsToolset + +pytest.importorskip("pydantic_ai_skills") + + +class _FakeInner: + """Minimal async-context-manager stand-in for pydantic_ai_skills.SkillsToolset.""" + + def __init__(self, **kwargs): + self.kwargs = kwargs + self.entered = False + self.exited = False + + async def __aenter__(self): + self.entered = True + return self + + async def __aexit__(self, *args): + self.exited = True + return None + + +def _write_skill(directory): + skill_dir = directory / "demo-skill" + skill_dir.mkdir(parents=True) + (skill_dir / "SKILL.md").write_text( + "---\nname: demo-skill\ndescription: A demo skill for tests.\n---\n\n# Demo\n" + ) + + +class TestConstruction: + def test_is_abstract_toolset_and_lazy(self): + from pydantic_ai.toolsets.abstract import AbstractToolset + + toolset = AgentSkillsToolset(sources=["./skills", GitSkills(repo_url="https://x/y", conn_id="c")]) + assert isinstance(toolset, AbstractToolset) + # Nothing resolved or cloned at construction. + assert toolset._inner is None + + def test_get_tools_before_enter_raises(self): + toolset = AgentSkillsToolset(sources=["./skills"]) + with pytest.raises(RuntimeError, match="must be entered"): + asyncio.run(toolset.get_tools(MagicMock())) + + +class TestLifecycle: + def test_enter_builds_inner_and_exit_tears_down(self, tmp_path): + from pydantic_ai_skills import SkillsToolset + + _write_skill(tmp_path) + toolset = AgentSkillsToolset(sources=[str(tmp_path)]) + + async def run(): + async with toolset: + assert isinstance(toolset._inner, SkillsToolset) + assert "demo-skill" in toolset._inner._skills + assert toolset._inner is None + + asyncio.run(run()) + + def test_exclude_tools_passed_to_inner(self): + captured: dict = {} + + def fake_skillstoolset(**kwargs): + captured.update(kwargs) + return _FakeInner(**kwargs) + + toolset = AgentSkillsToolset(sources=["/x"], exclude_tools={"run_skill_script"}) + with patch( + "airflow.providers.common.ai.toolsets.skills._materialize_skills", + return_value=(["/x"], lambda: None), + ): + with patch("pydantic_ai_skills.SkillsToolset", fake_skillstoolset): + asyncio.run(_enter_exit(toolset)) + + assert captured["exclude_tools"] == {"run_skill_script"} + assert captured["directories"] == ["/x"] + + +class TestCleanup: + def test_cleanup_runs_if_inner_construction_fails(self): + cleanup = MagicMock() + + def boom(**kwargs): + raise RuntimeError("inner build failed") + + toolset = AgentSkillsToolset(sources=["/x"]) + with patch( + "airflow.providers.common.ai.toolsets.skills._materialize_skills", + return_value=(["/x"], cleanup), + ): + with patch("pydantic_ai_skills.SkillsToolset", boom): + with pytest.raises(RuntimeError, match="inner build failed"): + asyncio.run(toolset.__aenter__()) + + cleanup.assert_called_once() + + +class TestMissingExtra: + def test_helpful_error_when_package_missing(self): + toolset = AgentSkillsToolset(sources=["./skills"]) + with patch.dict(sys.modules, {"pydantic_ai_skills": None}): + with pytest.raises(ValueError, match=r"\[skills\]"): + asyncio.run(toolset.__aenter__()) + + +async def _enter_exit(toolset): + await toolset.__aenter__() + await toolset.__aexit__(None, None, None) diff --git a/scripts/ci/prek/check_providers_subpackages_all_have_init.py b/scripts/ci/prek/check_providers_subpackages_all_have_init.py index c1511f213193b..a67f914e9829b 100755 --- a/scripts/ci/prek/check_providers_subpackages_all_have_init.py +++ b/scripts/ci/prek/check_providers_subpackages_all_have_init.py @@ -45,6 +45,9 @@ ".pnpm-store", "node_modules", "non_python_src", + # Agent Skills bundles (agentskills.io): a "skills" dir holds SKILL.md files + # and their assets, not a Python package. + "skills", ] IGNORE_DIR_PATTERNS = ["airflow/providers/edge3/plugins", "airflow/providers/common/ai/plugins"] diff --git a/uv.lock b/uv.lock index d80641fe5ca6e..7148faade19aa 100644 --- a/uv.lock +++ b/uv.lock @@ -4264,6 +4264,9 @@ common-sql = [ docx = [ { name = "python-docx" }, ] +git = [ + { name = "apache-airflow-providers-git" }, +] google = [ { name = "pydantic-ai-slim", extra = ["google"] }, ] @@ -4287,6 +4290,10 @@ parquet = [ pdf = [ { name = "pypdf" }, ] +skills = [ + { name = "apache-airflow-providers-git" }, + { name = "pydantic-ai-skills" }, +] sql = [ { name = "apache-airflow-providers-common-sql" }, { name = "sqlglot" }, @@ -4298,12 +4305,14 @@ dev = [ { name = "apache-airflow-devel-common" }, { name = "apache-airflow-providers-common-compat" }, { name = "apache-airflow-providers-common-sql", extra = ["datafusion"] }, + { name = "apache-airflow-providers-git" }, { name = "apache-airflow-providers-standard" }, { name = "apache-airflow-task-sdk" }, { name = "langchain" }, { name = "llama-index-core" }, { name = "llama-index-embeddings-openai" }, { name = "llama-index-llms-openai" }, + { name = "pydantic-ai-skills" }, { name = "pydantic-ai-slim", extra = ["mcp"] }, { name = "sqlglot" }, ] @@ -4317,6 +4326,8 @@ requires-dist = [ { name = "apache-airflow-providers-common-compat", editable = "providers/common/compat" }, { name = "apache-airflow-providers-common-sql", marker = "extra == 'common-sql'", editable = "providers/common/sql" }, { name = "apache-airflow-providers-common-sql", marker = "extra == 'sql'", editable = "providers/common/sql" }, + { name = "apache-airflow-providers-git", marker = "extra == 'git'", editable = "providers/git" }, + { name = "apache-airflow-providers-git", marker = "extra == 'skills'", editable = "providers/git" }, { name = "apache-airflow-providers-standard", editable = "providers/standard" }, { name = "fastavro", marker = "python_full_version >= '3.14' and extra == 'avro'", specifier = ">=1.12.1" }, { name = "fastavro", marker = "python_full_version < '3.14' and extra == 'avro'", specifier = ">=1.10.0" }, @@ -4326,6 +4337,7 @@ requires-dist = [ { name = "llama-index-llms-openai", marker = "extra == 'llamaindex'", specifier = ">=0.6.0" }, { name = "pyarrow", marker = "python_full_version >= '3.14' and extra == 'parquet'", specifier = ">=22.0.0" }, { name = "pyarrow", marker = "python_full_version < '3.14' and extra == 'parquet'", specifier = ">=18.0.0" }, + { name = "pydantic-ai-skills", marker = "extra == 'skills'", specifier = ">=0.11.0" }, { name = "pydantic-ai-slim", specifier = ">=1.71.0" }, { name = "pydantic-ai-slim", extras = ["anthropic"], marker = "extra == 'anthropic'" }, { name = "pydantic-ai-slim", extras = ["bedrock"], marker = "extra == 'bedrock'" }, @@ -4336,7 +4348,7 @@ requires-dist = [ { name = "python-docx", marker = "extra == 'docx'", specifier = ">=1.0.0" }, { name = "sqlglot", marker = "extra == 'sql'", specifier = ">=30.0.0" }, ] -provides-extras = ["anthropic", "bedrock", "google", "openai", "mcp", "avro", "parquet", "sql", "common-sql", "langchain", "llamaindex", "pdf", "docx"] +provides-extras = ["anthropic", "bedrock", "google", "openai", "mcp", "skills", "avro", "parquet", "sql", "common-sql", "langchain", "llamaindex", "pdf", "docx", "git"] [package.metadata.requires-dev] dev = [ @@ -4345,12 +4357,14 @@ dev = [ { name = "apache-airflow-providers-common-compat", editable = "providers/common/compat" }, { name = "apache-airflow-providers-common-sql", editable = "providers/common/sql" }, { name = "apache-airflow-providers-common-sql", extras = ["datafusion"], editable = "providers/common/sql" }, + { name = "apache-airflow-providers-git", editable = "providers/git" }, { name = "apache-airflow-providers-standard", editable = "providers/standard" }, { name = "apache-airflow-task-sdk", editable = "task-sdk" }, { name = "langchain", specifier = ">=1.0.0" }, { name = "llama-index-core", specifier = ">=0.13.0" }, { name = "llama-index-embeddings-openai", specifier = ">=0.6.0" }, { name = "llama-index-llms-openai", specifier = ">=0.6.0" }, + { name = "pydantic-ai-skills", specifier = ">=0.11.0" }, { name = "pydantic-ai-slim", extras = ["mcp"] }, { name = "sqlglot", specifier = ">=30.0.0" }, ] @@ -18748,6 +18762,20 @@ email = [ { name = "email-validator" }, ] +[[package]] +name = "pydantic-ai-skills" +version = "0.11.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "pydantic-ai-slim" }, + { name = "pyyaml" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/94/d1/f8fbc1c792ba8d73fc424ddab143ab0e1d95f9e982be5a949aaa3c84d64e/pydantic_ai_skills-0.11.0.tar.gz", hash = "sha256:d4040f0b81da34e25b8f14dac5e1895e59d00390db8896448aac1ed1a4d0cf90", size = 9023711, upload-time = "2026-05-26T01:52:55.375Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9a/80/d9b4bf0f5a3e8d256d62aa30b1c4589dc492efdb2a2d0da0334b4e71f25c/pydantic_ai_skills-0.11.0-py3-none-any.whl", hash = "sha256:af8d78d451ce192dd2ef33abe86ad900bd51d9fd10c81a11abee82f62e8daf30", size = 61218, upload-time = "2026-05-26T01:52:53.773Z" }, +] + [[package]] name = "pydantic-ai-slim" version = "1.102.0" From 8fc965984961022fc039dd39493cae6ca21b7eae Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sun, 31 May 2026 00:48:32 +0100 Subject: [PATCH 2/3] Remove duplicate apache-airflow-providers-git dev dependency --- providers/common/ai/pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/common/ai/pyproject.toml b/providers/common/ai/pyproject.toml index b4ce8c67a857f..4f370d17c3587 100644 --- a/providers/common/ai/pyproject.toml +++ b/providers/common/ai/pyproject.toml @@ -131,7 +131,6 @@ dev = [ "sqlglot>=30.0.0", "pydantic-ai-slim[mcp]", "pydantic-ai-skills>=0.11.0", - "apache-airflow-providers-git", "apache-airflow-providers-common-sql[datafusion]", "langchain>=1.0.0", "llama-index-core>=0.13.0", From 1898233b0f42779935e100683b7ac8b68c75789b Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sun, 31 May 2026 01:02:09 +0100 Subject: [PATCH 3/3] Reject absolute or traversing GitSkills path --- .../ai/src/airflow/providers/common/ai/skills.py | 10 ++++++++++ .../common/ai/tests/unit/common/ai/test_skills.py | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/providers/common/ai/src/airflow/providers/common/ai/skills.py b/providers/common/ai/src/airflow/providers/common/ai/skills.py index 9ca782116ad47..20238ec85115a 100644 --- a/providers/common/ai/src/airflow/providers/common/ai/skills.py +++ b/providers/common/ai/src/airflow/providers/common/ai/skills.py @@ -126,6 +126,16 @@ def _clone_git(source: GitSkills) -> tuple[str, str]: f"http://; use an https:// URL (token) or an ssh URL with a key on the connection." ) + # ``path`` is joined onto the clone dir; an absolute or upward-traversing + # value would point outside the checkout. Require a relative sub-path so a + # misconfigured value fails fast instead of silently reading elsewhere. + if source.path: + normalized = os.path.normpath(source.path) + if os.path.isabs(source.path) or normalized == ".." or normalized.startswith(".." + os.sep): + raise ValueError( + f"GitSkills path must be a relative sub-path inside the repository; got {source.path!r}." + ) + clone_kwargs: dict[str, Any] = {"depth": 1} if source.branch: clone_kwargs["branch"] = source.branch diff --git a/providers/common/ai/tests/unit/common/ai/test_skills.py b/providers/common/ai/tests/unit/common/ai/test_skills.py index efd9b95452487..cbaddb2787e41 100644 --- a/providers/common/ai/tests/unit/common/ai/test_skills.py +++ b/providers/common/ai/tests/unit/common/ai/test_skills.py @@ -96,6 +96,17 @@ def test_repo_url_with_embedded_credentials_rejected(self, mock_repo, mock_githo mock_githook.assert_not_called() mock_repo.clone_from.assert_not_called() + @pytest.mark.parametrize("bad_path", ["/etc", "../outside", "a/../../b"]) + @patch("airflow.providers.git.hooks.git.GitHook") + @patch("git.Repo") + def test_absolute_or_traversing_path_rejected(self, mock_repo, mock_githook, bad_path): + with pytest.raises(ValueError, match="relative sub-path"): + with resolve_skills( + [GitSkills(repo_url="https://github.com/org/repo", conn_id="git_default", path=bad_path)] + ): + pass + mock_repo.clone_from.assert_not_called() + @patch("airflow.providers.git.hooks.git.GitHook") @patch("git.Repo") def test_http_with_conn_id_is_rejected(self, mock_repo, mock_githook):