Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions operator_use/agent/tools/builtin/control_center.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import json
import logging
import os
import subprocess
import sys
from typing import Optional

Expand Down Expand Up @@ -131,7 +132,7 @@ async def _do_restart(graceful_fn=None) -> None:
``os._exit(75)`` which skips cleanup but guarantees the process terminates.
"""
global _requested_exit_code
os.system("cls" if os.name == "nt" else "clear")
subprocess.run(["cls"] if os.name == "nt" else ["clear"], check=False)
frames = ["↑", "↗", "→", "↘", "↓", "↙", "←", "↖"]
for i in range(20):
sys.stdout.write(f"\r {frames[i % len(frames)]} Restarting Operator...")
Expand Down Expand Up @@ -288,7 +289,8 @@ async def control_center(
if callable(on_restart):
asyncio.ensure_future(on_restart())
else:
asyncio.ensure_future(_do_restart(graceful_fn=None)) # fallback: no gateway wired
graceful_fn = kwargs.get("_graceful_restart_fn")
asyncio.ensure_future(_do_restart(graceful_fn=graceful_fn))
return ToolResult.success_result(f"{msg}\nRestart initiated.", metadata={"stop_loop": True})

return ToolResult.success_result(msg)
2 changes: 1 addition & 1 deletion operator_use/agent/tools/builtin/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def mcp(
for s in servers:
status = "connected" if s["connected"] else "disconnected"
tool_info = f" ({s['tool_count']} tools)" if s["connected"] else ""
agent_status = f" [you: connected]" if s["agent_connected"] else f" [you: disconnected]"
agent_status = " [you: connected]" if s["agent_connected"] else " [you: disconnected]"
shared_info = f" [shared: {s['connection_count']} agent(s)]" if s["connection_count"] > 1 else ""
lines.append(f" • {s['name']} [{status}]{tool_info}{agent_status}{shared_info}")
return ToolResult.success_result("\n".join(lines))
Expand Down
3 changes: 1 addition & 2 deletions operator_use/agent/tools/builtin/skill.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Skill tool: load and invoke procedural skills from workspace."""

from pathlib import Path
import yaml
from operator_use.tools.service import Tool, ToolResult
from operator_use.config.paths import get_named_workspace_dir
Expand Down Expand Up @@ -89,7 +88,7 @@ async def skill(
response_parts.append("")

if args:
response_parts.append(f"## Arguments")
response_parts.append("## Arguments")
response_parts.append(f"{args}")
response_parts.append("")

Expand Down
2 changes: 0 additions & 2 deletions operator_use/cli/mcp_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import json
from typing import Optional
from pathlib import Path
from operator_use.config.service import MCPServerConfig
from operator_use.cli.tui import (
clear_screen, print_start, select, text_input, confirm, console
)
Expand Down
7 changes: 6 additions & 1 deletion operator_use/cli/start.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
"""Run Operator with channels and agents."""

from __future__ import annotations

import asyncio
import os
import shutil
from pathlib import Path
from typing import TYPE_CHECKING

from dotenv import load_dotenv
import logging
from rich.console import Console

if TYPE_CHECKING:
from operator_use.mcp import MCPManager

load_dotenv()

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -72,7 +78,6 @@ def setup_logging(userdata_dir: Path, verbose: bool = False) -> None:
from operator_use.config import Config, load_config, AgentDefinition
from operator_use.config.paths import get_named_workspace_dir
from typing import Optional
from pathlib import Path

LLM_CLASS_MAP = {
"openai": "ChatOpenAI",
Expand Down
6 changes: 4 additions & 2 deletions operator_use/computer/macos/desktop/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,10 @@ def draw_annotation(label: int, node: TreeElementNode) -> None:
else:
x1, y1 = _logical_to_pixel(box.left, box.top)
x2, y2 = _logical_to_pixel(box.right, box.bottom)
x1 += padding; y1 += padding
x2 += padding; y2 += padding
x1 += padding
y1 += padding
x2 += padding
y2 += padding

# Deterministic color per label
random.seed(label)
Expand Down
2 changes: 2 additions & 0 deletions operator_use/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ACPAgentEntry,
ACPServerSettings,
HeartbeatConfig,
SessionConfig,
ToolsConfig,
RetryConfig,
SubagentConfig,
Expand Down Expand Up @@ -50,6 +51,7 @@
"ACPAgentEntry",
"ACPServerSettings",
"HeartbeatConfig",
"SessionConfig",
"ToolsConfig",
"RetryConfig",
"SubagentConfig",
Expand Down
7 changes: 7 additions & 0 deletions operator_use/config/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class HeartbeatConfig(Base):
llm_config: Optional[LLMConfig] = None # Dedicated LLM for heartbeat tasks


class SessionConfig(Base):
"""Session lifecycle configuration."""

ttl_hours: float = 24.0 # Session idle timeout in hours (default: 24h)


class Config(BaseSettings):
"""Root configuration for Operator."""

Expand All @@ -297,6 +303,7 @@ class Config(BaseSettings):
search: SearchConfig = Field(default_factory=SearchConfig)
providers: ProvidersConfig = Field(default_factory=ProvidersConfig)
heartbeat: HeartbeatConfig = Field(default_factory=HeartbeatConfig)
session: SessionConfig = Field(default_factory=SessionConfig)
# Named registry of pre-approved remote ACP agents.
# The LLM can only call agents listed here — it never supplies raw URLs.
acp_agents: Dict[str, ACPAgentEntry] = Field(default_factory=dict)
Expand Down
2 changes: 1 addition & 1 deletion operator_use/mcp/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from contextlib import AsyncExitStack
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING

from operator_use.mcp.tool import MCPTool

Expand Down
2 changes: 1 addition & 1 deletion operator_use/mcp/tool.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""MCP Tool — a Tool backed by a remote MCP server."""

from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
from operator_use.tools.service import Tool, ToolResult

if TYPE_CHECKING:
Expand Down
2 changes: 0 additions & 2 deletions operator_use/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
# Image generation providers
from operator_use.providers.openai import ImageOpenAI
from operator_use.providers.google import ImageGoogle
from operator_use.providers.xai import ImageXai

try:
from operator_use.providers.together import ImageTogether
Expand All @@ -90,7 +89,6 @@
except ImportError:
pass
from operator_use.providers.groq import TTSGroq
from operator_use.providers.xai import TTSXai

try:
from operator_use.providers.elevenlabs import TTSElevenLabs
Expand Down
2 changes: 1 addition & 1 deletion operator_use/providers/zai/llm.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import json
import logging
from typing import Iterator, AsyncIterator, List, Optional, Any, overload
from typing import Iterator, AsyncIterator, List, Optional, overload
from pydantic import BaseModel
import httpx
from operator_use.providers.base import BaseChatLLM
Expand Down
150 changes: 134 additions & 16 deletions operator_use/session/service.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
"""Session store service."""
"""Session store service."""

import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any
from typing import Any, Optional

from operator_use.messages.service import BaseMessage
from operator_use.utils.helper import ensure_directory
from operator_use.session.views import Session
from operator_use.session.views import Session, DEFAULT_SESSION_TTL


class SessionStore:
"""Store for sessions, keyed by session id. Persists to JSONL files."""
"""Store for sessions, keyed by session id. Persists to JSONL files.

def __init__(self, workspace: Path):
When *encryption_key* is provided (a URL-safe base-64 Fernet key), session
files are written as a single encrypted blob instead of plain JSONL lines.
The key can be generated with ``cryptography.fernet.Fernet.generate_key()``.
"""

def __init__(self, workspace: Path, encryption_key: Optional[str] = None):
self.workspace = Path(workspace)
self.sessions_dir = ensure_directory(self.workspace / "sessions")
self._sessions: dict[str, Session] = {}
self._fernet = None
if encryption_key:
from cryptography.fernet import Fernet
key_bytes = encryption_key.encode() if isinstance(encryption_key, str) else encryption_key
self._fernet = Fernet(key_bytes)

def _session_id_to_filename(self, session_id: str) -> str:
"""Make session_id filesystem-safe (e.g. `:` invalid on Windows)."""
Expand All @@ -26,10 +36,20 @@ def _session_id_to_filename(self, session_id: str) -> str:
def _sessions_path(self, session_id: str) -> Path:
return self.sessions_dir / f"{self._session_id_to_filename(session_id)}.jsonl"

def load(self, session_id: str) -> Session | None:
def load(self, session_id: str, ttl: float = DEFAULT_SESSION_TTL) -> Session | None:
path = self._sessions_path(session_id)
if not path.exists():
return None

if self._fernet:
return self._load_encrypted(session_id, path, ttl)

raw = path.read_bytes()
if raw.startswith(b"gAAAAA") and self._fernet is None:
raise ValueError(
f"Session file for '{session_id}' is Fernet-encrypted but no encryption_key was provided."
)

messages: list[BaseMessage] = []
created_at = datetime.now()
updated_at = datetime.now()
Expand All @@ -49,16 +69,53 @@ def load(self, session_id: str) -> Session | None:
continue
if "role" in obj:
messages.append(BaseMessage.from_dict(obj))
return Session(

return Session._from_persisted(
id=session_id,
messages=messages,
created_at=created_at,
updated_at=updated_at,
metadata=metadata,
ttl=ttl,
)

def _load_encrypted(self, session_id: str, path: Path, ttl: float) -> Session | None:
"""Load and decrypt a session file written by _save_encrypted()."""
from cryptography.fernet import InvalidToken

if self._fernet is None:
raise ValueError(
f"Session {session_id!r} appears to be encrypted but no encryption_key was provided."
)
raw = path.read_bytes()
try:
decrypted = self._fernet.decrypt(raw)
except InvalidToken as exc:
raise ValueError(
f"Failed to decrypt session '{session_id}': wrong key or corrupted data."
) from exc

payload = json.loads(decrypted.decode())
created_at = datetime.fromisoformat(payload.get("created_at", datetime.now().isoformat()))
updated_at = datetime.fromisoformat(payload.get("updated_at", datetime.now().isoformat()))
metadata = payload.get("metadata", {})
messages = [BaseMessage.from_dict(m) for m in payload.get("messages", [])]
return Session._from_persisted(
id=session_id,
messages=messages,
created_at=created_at,
updated_at=updated_at,
metadata=metadata,
ttl=ttl,
)

def save(self, session: Session) -> None:
path = self._sessions_path(session.id)

if self._fernet:
self._save_encrypted(session, path)
return

with open(path, "w", encoding="utf-8") as f:
meta = {
"type": "metadata",
Expand All @@ -71,15 +128,45 @@ def save(self, session: Session) -> None:
for msg in session.messages:
f.write(json.dumps(msg.to_dict()) + "\n")

def get_or_create(self, session_id: str | None = None) -> Session:
"""Get a session by id, or create and store a new one. Loads from JSONL if exists."""
def _save_encrypted(self, session: Session, path: Path) -> None:
"""Serialize the session to JSON and write as a Fernet-encrypted blob."""
payload = {
"id": session.id,
"created_at": session.created_at.isoformat(),
"updated_at": session.updated_at.isoformat(),
"metadata": session.metadata,
"messages": [msg.to_dict() for msg in session.messages],
}
token = self._fernet.encrypt(json.dumps(payload).encode())
path.write_bytes(token)

def get_or_create(
self,
session_id: Optional[str] = None,
ttl: float = DEFAULT_SESSION_TTL,
) -> Session:
"""Get a session by id, or create and store a new one.

Loads from JSONL if exists. If the loaded session is expired (based on
real idle time derived from *updated_at*), it is deleted and a fresh
session is returned instead.
"""
id = session_id or str(uuid.uuid4())
if session := self._sessions.get(id):
return session
if session := self.load(id):
self._sessions[id] = session
return session
session = Session(id=id)

if cached := self._sessions.get(id):
if not cached.is_expired():
return cached
# In-memory session has expired — evict and fall through to create
del self._sessions[id]

if session := self.load(id, ttl=ttl):
if session.is_expired():
self.delete(id)
else:
self._sessions[id] = session
return session

session = Session(id=id, ttl=ttl)
self._sessions[id] = session
return session

Expand Down Expand Up @@ -108,6 +195,38 @@ def archive(self, session_id: str) -> bool:
return True
return False

def cleanup(self, ttl: float = DEFAULT_SESSION_TTL) -> list[str]:
"""Delete all sessions whose idle time (since *updated_at*) exceeds *ttl*.

Returns the list of session IDs that were removed.
Archived session files are skipped.
"""
# Build a reverse map: filesystem stem -> original session_id (in-memory key).
# Sessions with `:` in their IDs are stored under the original ID in
# self._sessions but their filename stem uses `_` as a replacement.
stem_to_original: dict[str, str] = {
self._session_id_to_filename(sid): sid for sid in self._sessions
}

removed: list[str] = []
for path in self.sessions_dir.glob("*.jsonl"):
# Skip archived sessions
if "_archived_" in path.stem:
continue
session_id_fs = path.stem
session = self.load(session_id_fs, ttl=ttl)
if session is None:
continue
if session.is_expired():
path.unlink()
# Evict from in-memory cache using the original session ID if known,
# otherwise fall back to the filesystem-safe stem.
original_id = stem_to_original.get(session_id_fs, session_id_fs)
if original_id in self._sessions:
del self._sessions[original_id]
removed.append(original_id)
return removed

def list_sessions(self) -> list[dict[str, Any]]:
"""Load sessions from the sessions directory. Returns list of dicts with id, created_at, updated_at, path."""
result: list[dict[str, Any]] = []
Expand All @@ -132,4 +251,3 @@ def list_sessions(self) -> list[dict[str, Any]]:
"path": str(path),
})
return result

Loading
Loading