Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions src/google/adk/agents/config_agent_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def _resolve_tools(self, tool_configs: list[ToolConfig]) -> list[Any]:
obj = getattr(module, tool_config.name)
else:
# User-defined tools
_validate_module_reference(tool_config.name)
module_path, obj_name = tool_config.name.rsplit(".", 1)
module = importlib.import_module(module_path)
obj = getattr(module, obj_name)
Expand Down Expand Up @@ -490,6 +491,63 @@ def _resolve_agent_class(agent_class: str) -> type[Any]:
_BLOCKED_YAML_KEYS = frozenset({"args"})
_ENFORCE_DENYLIST = False

# Modules that must never be imported via YAML agent configuration.
# These provide direct access to the operating system, process execution,
# or dynamic code evaluation and could be abused to achieve arbitrary
# code execution when referenced in callback, tool, schema, or model
# code-reference fields.
_BLOCKED_MODULES = frozenset({
"os",
"subprocess",
"sys",
"builtins",
"importlib",
"shutil",
"socket",
"http",
"urllib",
"ctypes",
"multiprocessing",
"threading",
"signal",
"code",
"codeop",
"compileall",
"runpy",
"webbrowser",
"antigravity",
"pty",
"commands",
"pdb",
"profile",
"tempfile",
"shelve",
"pickle",
"marshal",
})


def _validate_module_reference(fully_qualified_name: str) -> None:
"""Validate that a module reference does not target a blocked module.

Args:
fully_qualified_name: The fully-qualified Python name to validate
(e.g. ``"my_package.my_module.my_func"``).

Raises:
ValueError: If the top-level module is in ``_BLOCKED_MODULES``.
"""
if not _ENFORCE_DENYLIST:
return
# Extract the top-level package from the fully-qualified name.
top_module = fully_qualified_name.split(".")[0]
if top_module in _BLOCKED_MODULES:
raise ValueError(
f"Blocked module reference: {fully_qualified_name!r}. "
f"Importing from the '{top_module}' module is not allowed in "
"agent configurations because it can execute arbitrary code."
)


def _set_enforce_denylist(value: bool) -> None:
global _ENFORCE_DENYLIST
Expand All @@ -516,6 +574,7 @@ def _check_config_for_blocked_keys(node: Any, filename: str) -> None:
def resolve_fully_qualified_name(name: str) -> Any:
try:
module_path, obj_name = name.rsplit(".", 1)
_validate_module_reference(name)
module = importlib.import_module(module_path)
return getattr(module, obj_name)
except Exception as e:
Expand Down Expand Up @@ -596,6 +655,7 @@ def resolve_code_reference(code_config: CodeConfig) -> Any:
if not code_config or not code_config.name:
raise ValueError("Invalid CodeConfig.")

_validate_module_reference(code_config.name)
module_path, obj_name = code_config.name.rsplit(".", 1)
module = importlib.import_module(module_path)
obj = getattr(module, obj_name)
Expand Down
103 changes: 103 additions & 0 deletions tests/unittests/agents/test_agent_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,109 @@ def test_from_config_blocks_args_when_enforced(tmp_path):
config_agent_utils._set_enforce_denylist(False)


@pytest.mark.parametrize(
"blocked_module",
[
"os.system",
"subprocess.call",
"subprocess.Popen",
"builtins.exec",
"builtins.eval",
"importlib.import_module",
"shutil.rmtree",
"socket.socket",
"ctypes.cdll",
"pickle.loads",
"marshal.loads",
],
)
def test_blocked_module_in_callback_raises_when_enforced(
blocked_module: str, tmp_path: Path
):
"""Verify that referencing blocked stdlib modules in callbacks is rejected."""
config_file = tmp_path / "malicious.yaml"
config_file.write_text(f"""\
name: evil_agent
model: gemini-2.0-flash
instruction: "harmless"
before_agent_callbacks:
- name: "{blocked_module}"
""")

config_agent_utils._set_enforce_denylist(True)
try:
with pytest.raises(ValueError, match="Blocked module reference"):
config_agent_utils.from_config(str(config_file))
finally:
config_agent_utils._set_enforce_denylist(False)


@pytest.mark.parametrize(
"blocked_module",
[
"os.system",
"subprocess.run",
"builtins.exec",
],
)
def test_blocked_module_in_tools_raises_when_enforced(
blocked_module: str, tmp_path: Path
):
"""Verify that referencing blocked stdlib modules in tools is rejected."""
config_file = tmp_path / "malicious.yaml"
config_file.write_text(f"""\
name: evil_agent
model: gemini-2.0-flash
instruction: "harmless"
tools:
- name: "{blocked_module}"
""")

config_agent_utils._set_enforce_denylist(True)
try:
with pytest.raises(ValueError, match="Blocked module reference"):
config_agent_utils.from_config(str(config_file))
finally:
config_agent_utils._set_enforce_denylist(False)


def test_resolve_code_reference_blocks_os_when_enforced():
"""Verify resolve_code_reference blocks os module directly."""
from google.adk.agents.common_configs import CodeConfig

config_agent_utils._set_enforce_denylist(True)
try:
with pytest.raises(ValueError, match="Blocked module reference"):
config_agent_utils.resolve_code_reference(
CodeConfig(name="os.system")
)
finally:
config_agent_utils._set_enforce_denylist(False)


def test_resolve_fully_qualified_name_blocks_subprocess_when_enforced():
"""Verify resolve_fully_qualified_name blocks subprocess module."""
config_agent_utils._set_enforce_denylist(True)
try:
with pytest.raises(ValueError, match="Blocked module reference"):
config_agent_utils.resolve_fully_qualified_name("subprocess.Popen")
finally:
config_agent_utils._set_enforce_denylist(False)


def test_allowed_module_passes_when_enforced(tmp_path: Path):
"""Verify that google.adk modules are NOT blocked by the module denylist."""
config_agent_utils._set_enforce_denylist(True)
try:
# This should NOT raise — google.adk modules must remain allowed
result = config_agent_utils.resolve_fully_qualified_name(
"google.adk.agents.llm_agent.LlmAgent"
)
assert result is LlmAgent
finally:
config_agent_utils._set_enforce_denylist(False)


def test_create_workflow_from_yaml(tmp_path: Path):
"""Test creating a Workflow from a YAML file."""
yaml_content = """
Expand Down