From 39ba8ddfa21bde2c7a8fec2ff1470c9571b80236 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 13 Nov 2025 20:23:14 +0000 Subject: [PATCH 01/10] Stream LLM completion logs via WebSocket events for RemoteConversation Implements Solution 1 from issue #1158 to make LLM completion logs accessible when using RemoteConversation with DockerWorkspace. Changes: - Add LLMCompletionLogEvent to stream log data from server to client - Add log_callback mechanism to Telemetry class - Configure EventService to emit LLMCompletionLogEvent when logging enabled - Handle LLMCompletionLogEvent in RemoteConversation to write logs client-side - Add tests for LLMCompletionLogEvent serialization When log_completions=True in remote execution context, logs are now sent as events through the WebSocket connection and written to the client filesystem instead of being trapped in the Docker container. Co-authored-by: openhands --- .../openhands/agent_server/event_service.py | 29 ++++++++ .../conversation/impl/remote_conversation.py | 56 ++++++++++++++ openhands-sdk/openhands/sdk/event/__init__.py | 2 + .../openhands/sdk/event/llm_completion_log.py | 32 ++++++++ .../openhands/sdk/llm/utils/telemetry.py | 46 ++++++++---- .../event/test_llm_completion_log_event.py | 74 +++++++++++++++++++ 6 files changed, 225 insertions(+), 14 deletions(-) create mode 100644 openhands-sdk/openhands/sdk/event/llm_completion_log.py create mode 100644 tests/sdk/event/test_llm_completion_log_event.py diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 054ae18ef8..f5674adf9a 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -20,6 +20,7 @@ ConversationState, ) from openhands.sdk.event.conversation_state import ConversationStateUpdateEvent +from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent from openhands.sdk.security.analyzer import SecurityAnalyzerBase from openhands.sdk.security.confirmation_policy import ConfirmationPolicyBase from openhands.sdk.utils.async_utils import AsyncCallbackWrapper @@ -231,6 +232,30 @@ async def subscribe_to_events(self, subscriber: Subscriber[Event]) -> UUID: async def unsubscribe_from_events(self, subscriber_id: UUID) -> bool: return self._pub_sub.unsubscribe(subscriber_id) + def _setup_llm_log_streaming(self, agent: Agent) -> None: + """Configure LLM log callbacks to stream logs via events.""" + + def log_callback(filename: str, log_data: str) -> None: + """Callback to emit LLM completion logs as events.""" + # Extract model name from filename (format: model__timestamp_uuid.json) + model_name = filename.split("-")[0].replace("__", "/") + event = LLMCompletionLogEvent( + filename=filename, + log_data=log_data, + model_name=model_name, + ) + # Publish to all subscribers - schedule in the main event loop + if self._main_loop and self._main_loop.is_running(): + asyncio.run_coroutine_threadsafe(self._pub_sub(event), self._main_loop) + + # Set callback for all LLMs in the agent that have logging enabled + for llm in agent.get_all_llms(): + if llm.log_completions: + # Access telemetry safely + telemetry = getattr(llm, "telemetry", None) + if telemetry is not None: + telemetry.set_log_callback(log_callback) + async def start(self): # Store the main event loop for cross-thread communication self._main_loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() @@ -241,6 +266,10 @@ async def start(self): assert isinstance(workspace, LocalWorkspace) Path(workspace.working_dir).mkdir(parents=True, exist_ok=True) agent = Agent.model_validate(self.stored.agent.model_dump()) + + # Setup LLM log streaming for remote execution + self._setup_llm_log_streaming(agent) + conversation = LocalConversation( agent=agent, workspace=workspace, diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 3b0cb2d113..4ed9d3978b 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -26,6 +26,7 @@ FULL_STATE_KEY, ConversationStateUpdateEvent, ) +from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent from openhands.sdk.llm import LLM, Message, TextContent from openhands.sdk.logger import get_logger from openhands.sdk.observability.laminar import observe @@ -423,6 +424,7 @@ class RemoteConversation(BaseConversation): max_iteration_per_run: int workspace: RemoteWorkspace _client: httpx.Client + _log_completion_folders: dict[str, str] def __init__( self, @@ -461,6 +463,13 @@ def __init__( self.workspace = workspace self._client = workspace.client + # Build map of log directories for all LLMs in the agent + self._log_completion_folders = {} + for llm in agent.get_all_llms(): + if llm.log_completions: + # Map usage_id to log folder + self._log_completion_folders[llm.usage_id] = llm.log_completions_folder + if conversation_id is None: payload = { "agent": agent.model_dump( @@ -502,6 +511,11 @@ def __init__( state_update_callback = self._state.create_state_update_callback() self._callbacks.append(state_update_callback) + # Add callback to handle LLM completion logs + if self._log_completion_folders: + llm_log_callback = self._create_llm_completion_log_callback() + self._callbacks.append(llm_log_callback) + # Handle visualization configuration if isinstance(visualizer, ConversationVisualizerBase): # Use custom visualizer instance @@ -541,6 +555,48 @@ def __init__( self._start_observability_span(str(self._id)) + def _create_llm_completion_log_callback(self) -> ConversationCallbackType: + """Create a callback that writes LLM completion logs to client filesystem.""" + import os + + def callback(event: Event) -> None: + if not isinstance(event, LLMCompletionLogEvent): + return + + # Try to find the appropriate log directory based on model name + # The model_name might have been extracted from filename, so we need + # to match it against configured LLMs + log_dir = None + + # First, try to match by usage_id if we can determine it + # Since we don't have usage_id in the event, we'll use the first + # matching log folder, or fall back to a default + if self._log_completion_folders: + # Use the first configured log folder (typically there's only one) + log_dir = next(iter(self._log_completion_folders.values())) + + if not log_dir: + # No LLMs with logging enabled, skip + logger.debug( + "Received LLMCompletionLogEvent but no log directory configured" + ) + return + + try: + # Create log directory if it doesn't exist + os.makedirs(log_dir, exist_ok=True) + + # Write the log file + log_path = os.path.join(log_dir, event.filename) + with open(log_path, "w") as f: + f.write(event.log_data) + + logger.debug(f"Wrote LLM completion log to {log_path}") + except Exception as e: + logger.warning(f"Failed to write LLM completion log: {e}") + + return callback + @property def id(self) -> ConversationID: return self._id diff --git a/openhands-sdk/openhands/sdk/event/__init__.py b/openhands-sdk/openhands/sdk/event/__init__.py index 9e4346e1dc..f2ae2ea5e3 100644 --- a/openhands-sdk/openhands/sdk/event/__init__.py +++ b/openhands-sdk/openhands/sdk/event/__init__.py @@ -5,6 +5,7 @@ CondensationSummaryEvent, ) from openhands.sdk.event.conversation_state import ConversationStateUpdateEvent +from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent from openhands.sdk.event.llm_convertible import ( ActionEvent, AgentErrorEvent, @@ -35,6 +36,7 @@ "CondensationRequest", "CondensationSummaryEvent", "ConversationStateUpdateEvent", + "LLMCompletionLogEvent", "EventID", "ToolCallID", ] diff --git a/openhands-sdk/openhands/sdk/event/llm_completion_log.py b/openhands-sdk/openhands/sdk/event/llm_completion_log.py new file mode 100644 index 0000000000..95323ccdce --- /dev/null +++ b/openhands-sdk/openhands/sdk/event/llm_completion_log.py @@ -0,0 +1,32 @@ +"""Event for streaming LLM completion logs from remote agents to clients.""" + +from pydantic import Field + +from openhands.sdk.event.base import Event +from openhands.sdk.event.types import SourceType + + +class LLMCompletionLogEvent(Event): + """Event containing LLM completion log data. + + When an LLM is configured with log_completions=True in a remote conversation, + this event streams the completion log data back to the client through WebSocket + instead of writing it to a file inside the Docker container. + """ + + source: SourceType = "environment" + filename: str = Field( + ..., + description="The intended filename for this log (relative to log directory)", + ) + log_data: str = Field( + ..., + description="The JSON-encoded log data to be written to the file", + ) + model_name: str = Field( + default="unknown", + description="The model name for context", + ) + + def __str__(self) -> str: + return f"LLMCompletionLog(model={self.model_name}, file={self.filename})" diff --git a/openhands-sdk/openhands/sdk/llm/utils/telemetry.py b/openhands-sdk/openhands/sdk/llm/utils/telemetry.py index 2e6b1ac785..3734ca6ebe 100644 --- a/openhands-sdk/openhands/sdk/llm/utils/telemetry.py +++ b/openhands-sdk/openhands/sdk/llm/utils/telemetry.py @@ -3,6 +3,7 @@ import time import uuid import warnings +from collections.abc import Callable from typing import Any, ClassVar from litellm.cost_calculator import completion_cost as litellm_completion_cost @@ -42,12 +43,22 @@ class Telemetry(BaseModel): _req_start: float = PrivateAttr(default=0.0) _req_ctx: dict[str, Any] = PrivateAttr(default_factory=dict) _last_latency: float = PrivateAttr(default=0.0) + _log_callback: Callable[[str, str], None] | None = PrivateAttr(default=None) model_config: ClassVar[ConfigDict] = ConfigDict( extra="forbid", arbitrary_types_allowed=True ) # ---------- Lifecycle ---------- + def set_log_callback(self, callback: Callable[[str, str], None] | None) -> None: + """Set a callback function for logging instead of writing to file. + + Args: + callback: A function that takes (filename, log_data) and handles the log. + Used for streaming logs in remote execution contexts. + """ + self._log_callback = callback + def on_request(self, log_ctx: dict | None) -> None: self._req_start = time.time() self._req_ctx = log_ctx or {} @@ -221,19 +232,13 @@ def log_llm_call( if not self.log_dir: return try: - # Create log directory if it doesn't exist - os.makedirs(self.log_dir, exist_ok=True) - if not os.access(self.log_dir, os.W_OK): - raise PermissionError(f"log_dir is not writable: {self.log_dir}") - - fname = os.path.join( - self.log_dir, - ( - f"{self.model_name.replace('/', '__')}-" - f"{time.time():.3f}-" - f"{uuid.uuid4().hex[:4]}.json" - ), + # Prepare filename and log data + filename = ( + f"{self.model_name.replace('/', '__')}-" + f"{time.time():.3f}-" + f"{uuid.uuid4().hex[:4]}.json" ) + data = self._req_ctx.copy() data["response"] = ( resp # ModelResponse | ResponsesAPIResponse; @@ -297,8 +302,21 @@ def log_llm_call( and "tools" in data["kwargs"] ): data["kwargs"].pop("tools") - with open(fname, "w") as f: - f.write(json.dumps(data, default=_safe_json)) + + log_data = json.dumps(data, default=_safe_json) + + # Use callback if set (for remote execution), otherwise write to file + if self._log_callback: + self._log_callback(filename, log_data) + else: + # Create log directory if it doesn't exist + os.makedirs(self.log_dir, exist_ok=True) + if not os.access(self.log_dir, os.W_OK): + raise PermissionError(f"log_dir is not writable: {self.log_dir}") + + fname = os.path.join(self.log_dir, filename) + with open(fname, "w") as f: + f.write(log_data) except Exception as e: warnings.warn(f"Telemetry logging failed: {e}") diff --git a/tests/sdk/event/test_llm_completion_log_event.py b/tests/sdk/event/test_llm_completion_log_event.py new file mode 100644 index 0000000000..35bc8cb161 --- /dev/null +++ b/tests/sdk/event/test_llm_completion_log_event.py @@ -0,0 +1,74 @@ +"""Tests for LLMCompletionLogEvent serialization and functionality.""" + +import json + +from openhands.sdk.event import Event, LLMCompletionLogEvent + + +def test_llm_completion_log_event_creation() -> None: + """Test creating an LLMCompletionLogEvent.""" + event = LLMCompletionLogEvent( + filename="test_model__1234567890.123-abcd.json", + log_data='{"test": "data"}', + model_name="test_model", + ) + + assert event.filename == "test_model__1234567890.123-abcd.json" + assert event.log_data == '{"test": "data"}' + assert event.model_name == "test_model" + assert event.source == "environment" + + +def test_llm_completion_log_event_serialization() -> None: + """Test LLMCompletionLogEvent serialization/deserialization.""" + log_data = json.dumps( + { + "response": {"id": "response_123", "model": "test_model"}, + "cost": 0.0001, + "timestamp": 1234567890.123, + } + ) + + event = LLMCompletionLogEvent( + filename="anthropic__claude-sonnet__1234567890.123-abcd.json", + log_data=log_data, + model_name="anthropic/claude-sonnet", + ) + + # Serialize + json_str = event.model_dump_json() + deserialized = LLMCompletionLogEvent.model_validate_json(json_str) + + assert deserialized == event + assert deserialized.filename == event.filename + assert deserialized.log_data == event.log_data + assert deserialized.model_name == event.model_name + + +def test_llm_completion_log_event_as_base_event() -> None: + """Test that LLMCompletionLogEvent can be deserialized as base Event.""" + event = LLMCompletionLogEvent( + filename="test_model__1234567890.123-abcd.json", + log_data='{"test": "data"}', + model_name="test_model", + ) + + # Serialize and deserialize as base Event + json_str = event.model_dump_json() + deserialized = Event.model_validate_json(json_str) + + assert isinstance(deserialized, LLMCompletionLogEvent) + assert deserialized == event + + +def test_llm_completion_log_event_str() -> None: + """Test string representation of LLMCompletionLogEvent.""" + event = LLMCompletionLogEvent( + filename="test_model__1234567890.123-abcd.json", + log_data='{"test": "data"}', + model_name="test_model", + ) + + str_repr = str(event) + assert "test_model" in str_repr + assert "test_model__1234567890.123-abcd.json" in str_repr From 5d2d65d7f824e39548fc2bb4e3637a1583a7f204 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 13 Nov 2025 21:12:35 +0000 Subject: [PATCH 02/10] Add stats streaming to RemoteConversation via callback mechanism This commit extends the streaming solution to fix issue #1087 where stats updates were not visible during RemoteConversation execution. Changes: - Added stats_update_callback mechanism to Telemetry class - Telemetry now triggers callback after metrics updates in on_response() - EventService sets up callback to emit ConversationStateUpdateEvent - Updated _publish_state_update to support selective field updates - Added comprehensive tests for callback functionality This follows the same pattern as LLM log streaming (issue #1158), providing a unified solution for streaming server-side data to RemoteConversation clients. Co-authored-by: openhands --- .../openhands/agent_server/event_service.py | 48 ++++++++++++-- .../openhands/sdk/llm/utils/telemetry.py | 17 +++++ tests/sdk/llm/test_llm_telemetry.py | 63 +++++++++++++++++++ 3 files changed, 122 insertions(+), 6 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index f5674adf9a..b40f77ffd6 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -256,6 +256,23 @@ def log_callback(filename: str, log_data: str) -> None: if telemetry is not None: telemetry.set_log_callback(log_callback) + def _setup_stats_streaming(self, agent: Agent) -> None: + """Configure stats update callbacks to stream stats changes via events.""" + + def stats_update_callback() -> None: + """Callback to emit stats updates as ConversationStateUpdateEvent.""" + # Schedule state update in the main event loop + if self._main_loop and self._main_loop.is_running(): + asyncio.run_coroutine_threadsafe( + self._publish_state_update(keys=["stats"]), self._main_loop + ) + + # Set callback for all LLMs in the agent + for llm in agent.get_all_llms(): + telemetry = getattr(llm, "telemetry", None) + if telemetry is not None: + telemetry.set_stats_update_callback(stats_update_callback) + async def start(self): # Store the main event loop for cross-thread communication self._main_loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() @@ -270,6 +287,9 @@ async def start(self): # Setup LLM log streaming for remote execution self._setup_llm_log_streaming(agent) + # Setup stats streaming for remote execution + self._setup_stats_streaming(agent) + conversation = LocalConversation( agent=agent, workspace=workspace, @@ -380,17 +400,33 @@ async def get_state(self) -> ConversationState: raise ValueError("inactive_service") return self._conversation._state - async def _publish_state_update(self): - """Publish a ConversationStateUpdateEvent with the current state.""" + async def _publish_state_update(self, keys: list[str] | None = None): + """Publish a ConversationStateUpdateEvent with the current state. + + Args: + keys: Optional list of field keys to include in the update. + If None, publishes the full state. If provided, only publishes + the specified fields. + """ if not self._conversation: return state = self._conversation._state with state: - # Create state update event with current state information - state_update_event = ConversationStateUpdateEvent.from_conversation_state( - state - ) + if keys is None: + # Full state update + state_update_event = ( + ConversationStateUpdateEvent.from_conversation_state(state) + ) + else: + # Selective field update - create events for each key + for key in keys: + value = getattr(state, key, None) + state_update_event = ConversationStateUpdateEvent( + key=key, value=value + ) + await self._pub_sub(state_update_event) + return # Publish the state update event await self._pub_sub(state_update_event) diff --git a/openhands-sdk/openhands/sdk/llm/utils/telemetry.py b/openhands-sdk/openhands/sdk/llm/utils/telemetry.py index 3734ca6ebe..78ba233c93 100644 --- a/openhands-sdk/openhands/sdk/llm/utils/telemetry.py +++ b/openhands-sdk/openhands/sdk/llm/utils/telemetry.py @@ -44,6 +44,7 @@ class Telemetry(BaseModel): _req_ctx: dict[str, Any] = PrivateAttr(default_factory=dict) _last_latency: float = PrivateAttr(default=0.0) _log_callback: Callable[[str, str], None] | None = PrivateAttr(default=None) + _stats_update_callback: Callable[[], None] | None = PrivateAttr(default=None) model_config: ClassVar[ConfigDict] = ConfigDict( extra="forbid", arbitrary_types_allowed=True @@ -59,6 +60,15 @@ def set_log_callback(self, callback: Callable[[str, str], None] | None) -> None: """ self._log_callback = callback + def set_stats_update_callback(self, callback: Callable[[], None] | None) -> None: + """Set a callback function to be notified when stats are updated. + + Args: + callback: A function called whenever metrics are updated. + Used for streaming stats updates in remote execution contexts. + """ + self._stats_update_callback = callback + def on_request(self, log_ctx: dict | None) -> None: self._req_start = time.time() self._req_ctx = log_ctx or {} @@ -97,6 +107,13 @@ def on_response( if self.log_enabled: self.log_llm_call(resp, cost, raw_resp=raw_resp) + # 5) notify about stats update + if self._stats_update_callback is not None: + try: + self._stats_update_callback() + except Exception: + logger.exception("Stats update callback failed", exc_info=True) + return self.metrics.deep_copy() def on_error(self, _err: Exception) -> None: diff --git a/tests/sdk/llm/test_llm_telemetry.py b/tests/sdk/llm/test_llm_telemetry.py index b69eda805a..c0a70ddc70 100644 --- a/tests/sdk/llm/test_llm_telemetry.py +++ b/tests/sdk/llm/test_llm_telemetry.py @@ -779,3 +779,66 @@ def test_cost_calculation_with_zero_cost(self, basic_telemetry, mock_response): assert metrics.accumulated_cost == 0.0 # Should NOT add zero cost to costs list (0.0 is falsy) assert len(basic_telemetry.metrics.costs) == 0 + + +class TestTelemetryCallbacks: + """Test callback functionality for log streaming and stats updates.""" + + def test_set_log_callback(self, basic_telemetry): + """Test setting log callback.""" + callback_called = [] + + def log_callback(filename: str, log_data: str): + callback_called.append((filename, log_data)) + + basic_telemetry.set_log_callback(log_callback) + assert basic_telemetry._log_callback == log_callback + + # Clear callback + basic_telemetry.set_log_callback(None) + assert basic_telemetry._log_callback is None + + def test_set_stats_update_callback(self, basic_telemetry): + """Test setting stats update callback.""" + callback_called = [] + + def stats_callback(): + callback_called.append(True) + + basic_telemetry.set_stats_update_callback(stats_callback) + assert basic_telemetry._stats_update_callback == stats_callback + + # Clear callback + basic_telemetry.set_stats_update_callback(None) + assert basic_telemetry._stats_update_callback is None + + def test_stats_update_callback_triggered_on_response( + self, basic_telemetry, mock_response + ): + """Test that stats update callback is triggered on response.""" + callback_called = [] + + def stats_callback(): + callback_called.append(True) + + basic_telemetry.set_stats_update_callback(stats_callback) + basic_telemetry.on_request(None) + basic_telemetry.on_response(mock_response) + + # Callback should be triggered once after response + assert len(callback_called) == 1 + + def test_stats_update_callback_exception_handling( + self, basic_telemetry, mock_response + ): + """Test that exceptions in stats callback don't break on_response.""" + + def failing_callback(): + raise Exception("Callback failed") + + basic_telemetry.set_stats_update_callback(failing_callback) + basic_telemetry.on_request(None) + + # Should not raise exception even if callback fails + metrics = basic_telemetry.on_response(mock_response) + assert isinstance(metrics, Metrics) From b3576a57df02ea7aa0f9ea453e0a9be8642cc538 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Fri, 14 Nov 2025 17:54:40 +0100 Subject: [PATCH 03/10] Include usage_id in LLM log events to preserve per-usage_id folders (#1161) Co-authored-by: openhands --- .../openhands/agent_server/event_service.py | 32 +++++++++++-------- .../conversation/impl/remote_conversation.py | 11 ++++--- .../openhands/sdk/event/llm_completion_log.py | 9 +++++- 3 files changed, 33 insertions(+), 19 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index b40f77ffd6..57c1e7eb9a 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -235,18 +235,24 @@ async def unsubscribe_from_events(self, subscriber_id: UUID) -> bool: def _setup_llm_log_streaming(self, agent: Agent) -> None: """Configure LLM log callbacks to stream logs via events.""" - def log_callback(filename: str, log_data: str) -> None: - """Callback to emit LLM completion logs as events.""" - # Extract model name from filename (format: model__timestamp_uuid.json) - model_name = filename.split("-")[0].replace("__", "/") - event = LLMCompletionLogEvent( - filename=filename, - log_data=log_data, - model_name=model_name, - ) - # Publish to all subscribers - schedule in the main event loop - if self._main_loop and self._main_loop.is_running(): - asyncio.run_coroutine_threadsafe(self._pub_sub(event), self._main_loop) + def make_log_callback(usage_id: str): + def log_callback(filename: str, log_data: str) -> None: + """Callback to emit LLM completion logs as events.""" + # Extract model name from filename (format: model__timestamp_uuid.json) + model_name = filename.split("-")[0].replace("__", "/") + event = LLMCompletionLogEvent( + filename=filename, + log_data=log_data, + model_name=model_name, + usage_id=usage_id, + ) + # Publish to all subscribers - schedule in the main event loop + if self._main_loop and self._main_loop.is_running(): + asyncio.run_coroutine_threadsafe( + self._pub_sub(event), self._main_loop + ) + + return log_callback # Set callback for all LLMs in the agent that have logging enabled for llm in agent.get_all_llms(): @@ -254,7 +260,7 @@ def log_callback(filename: str, log_data: str) -> None: # Access telemetry safely telemetry = getattr(llm, "telemetry", None) if telemetry is not None: - telemetry.set_log_callback(log_callback) + telemetry.set_log_callback(make_log_callback(llm.usage_id)) def _setup_stats_streaming(self, agent: Agent) -> None: """Configure stats update callbacks to stream stats changes via events.""" diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 4ed9d3978b..90a60b537b 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -568,11 +568,12 @@ def callback(event: Event) -> None: # to match it against configured LLMs log_dir = None - # First, try to match by usage_id if we can determine it - # Since we don't have usage_id in the event, we'll use the first - # matching log folder, or fall back to a default - if self._log_completion_folders: - # Use the first configured log folder (typically there's only one) + # Prefer usage_id provided by the event to select the correct folder + if isinstance(event, LLMCompletionLogEvent) and event.usage_id: + log_dir = self._log_completion_folders.get(event.usage_id) + + # Fallback to the first configured log folder if usage_id unavailable + if not log_dir and self._log_completion_folders: log_dir = next(iter(self._log_completion_folders.values())) if not log_dir: diff --git a/openhands-sdk/openhands/sdk/event/llm_completion_log.py b/openhands-sdk/openhands/sdk/event/llm_completion_log.py index 95323ccdce..47207bcf0f 100644 --- a/openhands-sdk/openhands/sdk/event/llm_completion_log.py +++ b/openhands-sdk/openhands/sdk/event/llm_completion_log.py @@ -27,6 +27,13 @@ class LLMCompletionLogEvent(Event): default="unknown", description="The model name for context", ) + usage_id: str = Field( + default="default", + description="The LLM usage_id that produced this log", + ) def __str__(self) -> str: - return f"LLMCompletionLog(model={self.model_name}, file={self.filename})" + return ( + f"LLMCompletionLog(usage_id={self.usage_id}, model={self.model_name}, " + f"file={self.filename})" + ) From 78565bc2eb40c7ee2aa29dfec2541688574d1ef8 Mon Sep 17 00:00:00 2001 From: openhands Date: Sat, 15 Nov 2025 03:43:29 +0000 Subject: [PATCH 04/10] Fix critical issues in LLM telemetry streaming Address code review feedback on PR #1159: 1. Expose public LLM.telemetry property - Add telemetry property to LLM class for public access - Remove getattr() calls in EventService, use llm.telemetry directly - This fixes the issue where callbacks were never registered 2. Fix model name parsing - Pass model_name directly from LLM to event callback - Removes brittle filename parsing that breaks hyphenated model names - Model names like 'claude-3-5-sonnet-latest' now work correctly 3. Fix logging short-circuit when log_dir is None - Change guard in log_llm_call to check log_dir OR _log_callback - Allows callback-only streaming without file logging - Essential for remote execution scenarios 4. Move inline import to top-level - Move 'import os' from callback to module top-level imports - Follows code style guidelines All pre-commit checks and tests pass. Co-authored-by: openhands --- .../openhands/agent_server/event_service.py | 15 +++++---------- .../sdk/conversation/impl/remote_conversation.py | 2 +- openhands-sdk/openhands/sdk/llm/llm.py | 15 +++++++++++++++ .../openhands/sdk/llm/utils/telemetry.py | 5 +++-- 4 files changed, 24 insertions(+), 13 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 57c1e7eb9a..cf5c56c478 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -235,11 +235,9 @@ async def unsubscribe_from_events(self, subscriber_id: UUID) -> bool: def _setup_llm_log_streaming(self, agent: Agent) -> None: """Configure LLM log callbacks to stream logs via events.""" - def make_log_callback(usage_id: str): + def make_log_callback(usage_id: str, model_name: str): def log_callback(filename: str, log_data: str) -> None: """Callback to emit LLM completion logs as events.""" - # Extract model name from filename (format: model__timestamp_uuid.json) - model_name = filename.split("-")[0].replace("__", "/") event = LLMCompletionLogEvent( filename=filename, log_data=log_data, @@ -257,10 +255,9 @@ def log_callback(filename: str, log_data: str) -> None: # Set callback for all LLMs in the agent that have logging enabled for llm in agent.get_all_llms(): if llm.log_completions: - # Access telemetry safely - telemetry = getattr(llm, "telemetry", None) - if telemetry is not None: - telemetry.set_log_callback(make_log_callback(llm.usage_id)) + llm.telemetry.set_log_callback( + make_log_callback(llm.usage_id, llm.model) + ) def _setup_stats_streaming(self, agent: Agent) -> None: """Configure stats update callbacks to stream stats changes via events.""" @@ -275,9 +272,7 @@ def stats_update_callback() -> None: # Set callback for all LLMs in the agent for llm in agent.get_all_llms(): - telemetry = getattr(llm, "telemetry", None) - if telemetry is not None: - telemetry.set_stats_update_callback(stats_update_callback) + llm.telemetry.set_stats_update_callback(stats_update_callback) async def start(self): # Store the main event loop for cross-thread communication diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 90a60b537b..e90db29252 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -1,5 +1,6 @@ import asyncio import json +import os import threading import uuid from collections.abc import Mapping @@ -557,7 +558,6 @@ def __init__( def _create_llm_completion_log_callback(self) -> ConversationCallbackType: """Create a callback that writes LLM completion logs to client filesystem.""" - import os def callback(event: Event) -> None: if not isinstance(event, LLMCompletionLogEvent): diff --git a/openhands-sdk/openhands/sdk/llm/llm.py b/openhands-sdk/openhands/sdk/llm/llm.py index c30e7a54e4..265121e5ee 100644 --- a/openhands-sdk/openhands/sdk/llm/llm.py +++ b/openhands-sdk/openhands/sdk/llm/llm.py @@ -438,6 +438,21 @@ def metrics(self) -> Metrics: ) return self._metrics + @property + def telemetry(self) -> Telemetry: + """Get telemetry handler for this LLM instance. + + Returns: + Telemetry object for managing logging and metrics callbacks. + + Example: + >>> llm.telemetry.set_log_callback(my_callback) + """ + assert self._telemetry is not None, ( + "Telemetry should be initialized after model validation" + ) + return self._telemetry + def restore_metrics(self, metrics: Metrics) -> None: # Only used by ConversationStats to seed metrics self._metrics = metrics diff --git a/openhands-sdk/openhands/sdk/llm/utils/telemetry.py b/openhands-sdk/openhands/sdk/llm/utils/telemetry.py index 78ba233c93..06d4eb6098 100644 --- a/openhands-sdk/openhands/sdk/llm/utils/telemetry.py +++ b/openhands-sdk/openhands/sdk/llm/utils/telemetry.py @@ -246,7 +246,8 @@ def log_llm_call( cost: float | None, raw_resp: ModelResponse | ResponsesAPIResponse | None = None, ) -> None: - if not self.log_dir: + # Skip if neither file logging nor callback is configured + if not self.log_dir and not self._log_callback: return try: # Prepare filename and log data @@ -325,7 +326,7 @@ def log_llm_call( # Use callback if set (for remote execution), otherwise write to file if self._log_callback: self._log_callback(filename, log_data) - else: + elif self.log_dir: # Create log directory if it doesn't exist os.makedirs(self.log_dir, exist_ok=True) if not os.access(self.log_dir, os.W_OK): From d6c53d3a539a6dec259114b3010dd2aa166f1efc Mon Sep 17 00:00:00 2001 From: openhands Date: Wed, 19 Nov 2025 23:24:41 +0000 Subject: [PATCH 05/10] Refactor: Simplify event streaming implementation This refactoring improves code clarity while maintaining the same functionality: 1. EventService refactoring: - Extract _emit_event_from_thread() helper to eliminate duplication - Simplify _setup_llm_log_streaming() by flattening nested closures - Inline stats event creation in _setup_stats_streaming() - Restore _publish_state_update() to its original simple form 2. RemoteConversation simplification: - Simplify _create_llm_completion_log_callback() by removing unnecessary fallback logic and redundant type checks - Remove verbose comments, keep code self-documenting Key improvements: - Reduced nesting: Eliminated nested closure factory pattern - Better separation: Stats updates handled directly in callback, not via dual-mode _publish_state_update() method - Less code: Removed ~30 lines while preserving functionality - Better readability: Clearer flow with helper method All tests pass with no functional changes. Co-authored-by: openhands --- .../openhands/agent_server/event_service.py | 88 ++++++++----------- .../conversation/impl/remote_conversation.py | 22 +---- 2 files changed, 41 insertions(+), 69 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index cf5c56c478..2d51a2a88e 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -232,47 +232,54 @@ async def subscribe_to_events(self, subscriber: Subscriber[Event]) -> UUID: async def unsubscribe_from_events(self, subscriber_id: UUID) -> bool: return self._pub_sub.unsubscribe(subscriber_id) + def _emit_event_from_thread(self, event: Event) -> None: + """Helper to safely emit events from non-async contexts (e.g., callbacks). + + This schedules event emission in the main event loop, making it safe to call + from callbacks that may run in different threads. + """ + if self._main_loop and self._main_loop.is_running(): + asyncio.run_coroutine_threadsafe(self._pub_sub(event), self._main_loop) + def _setup_llm_log_streaming(self, agent: Agent) -> None: """Configure LLM log callbacks to stream logs via events.""" + for llm in agent.get_all_llms(): + if not llm.log_completions: + continue + + # Capture variables for closure + usage_id = llm.usage_id + model_name = llm.model - def make_log_callback(usage_id: str, model_name: str): - def log_callback(filename: str, log_data: str) -> None: + def log_callback( + filename: str, log_data: str, uid=usage_id, model=model_name + ) -> None: """Callback to emit LLM completion logs as events.""" event = LLMCompletionLogEvent( filename=filename, log_data=log_data, - model_name=model_name, - usage_id=usage_id, + model_name=model, + usage_id=uid, ) - # Publish to all subscribers - schedule in the main event loop - if self._main_loop and self._main_loop.is_running(): - asyncio.run_coroutine_threadsafe( - self._pub_sub(event), self._main_loop - ) - - return log_callback + self._emit_event_from_thread(event) - # Set callback for all LLMs in the agent that have logging enabled - for llm in agent.get_all_llms(): - if llm.log_completions: - llm.telemetry.set_log_callback( - make_log_callback(llm.usage_id, llm.model) - ) + llm.telemetry.set_log_callback(log_callback) def _setup_stats_streaming(self, agent: Agent) -> None: """Configure stats update callbacks to stream stats changes via events.""" - def stats_update_callback() -> None: - """Callback to emit stats updates as ConversationStateUpdateEvent.""" - # Schedule state update in the main event loop - if self._main_loop and self._main_loop.is_running(): - asyncio.run_coroutine_threadsafe( - self._publish_state_update(keys=["stats"]), self._main_loop - ) + def stats_callback() -> None: + """Callback to emit stats updates.""" + # Publish only the stats field to avoid sending entire state + if not self._conversation: + return + state = self._conversation._state + with state: + event = ConversationStateUpdateEvent(key="stats", value=state.stats) + self._emit_event_from_thread(event) - # Set callback for all LLMs in the agent for llm in agent.get_all_llms(): - llm.telemetry.set_stats_update_callback(stats_update_callback) + llm.telemetry.set_stats_update_callback(stats_callback) async def start(self): # Store the main event loop for cross-thread communication @@ -401,35 +408,16 @@ async def get_state(self) -> ConversationState: raise ValueError("inactive_service") return self._conversation._state - async def _publish_state_update(self, keys: list[str] | None = None): - """Publish a ConversationStateUpdateEvent with the current state. - - Args: - keys: Optional list of field keys to include in the update. - If None, publishes the full state. If provided, only publishes - the specified fields. - """ + async def _publish_state_update(self): + """Publish a ConversationStateUpdateEvent with the current state.""" if not self._conversation: return state = self._conversation._state with state: - if keys is None: - # Full state update - state_update_event = ( - ConversationStateUpdateEvent.from_conversation_state(state) - ) - else: - # Selective field update - create events for each key - for key in keys: - value = getattr(state, key, None) - state_update_event = ConversationStateUpdateEvent( - key=key, value=value - ) - await self._pub_sub(state_update_event) - return - - # Publish the state update event + state_update_event = ConversationStateUpdateEvent.from_conversation_state( + state + ) await self._pub_sub(state_update_event) async def __aenter__(self): diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 8881725e8e..01d048892e 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -563,35 +563,19 @@ def callback(event: Event) -> None: if not isinstance(event, LLMCompletionLogEvent): return - # Try to find the appropriate log directory based on model name - # The model_name might have been extracted from filename, so we need - # to match it against configured LLMs - log_dir = None - - # Prefer usage_id provided by the event to select the correct folder - if isinstance(event, LLMCompletionLogEvent) and event.usage_id: - log_dir = self._log_completion_folders.get(event.usage_id) - - # Fallback to the first configured log folder if usage_id unavailable - if not log_dir and self._log_completion_folders: - log_dir = next(iter(self._log_completion_folders.values())) - + # Get the log directory for this LLM's usage_id + log_dir = self._log_completion_folders.get(event.usage_id) if not log_dir: - # No LLMs with logging enabled, skip logger.debug( - "Received LLMCompletionLogEvent but no log directory configured" + f"No log directory configured for usage_id={event.usage_id}" ) return try: - # Create log directory if it doesn't exist os.makedirs(log_dir, exist_ok=True) - - # Write the log file log_path = os.path.join(log_dir, event.filename) with open(log_path, "w") as f: f.write(event.log_data) - logger.debug(f"Wrote LLM completion log to {log_path}") except Exception as e: logger.warning(f"Failed to write LLM completion log: {e}") From ba12154ee7bdd17815c3170798c11fc607b8c086 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 21 Nov 2025 19:47:58 +0000 Subject: [PATCH 06/10] Fix event persistence for LLM completion logs and stats updates The LLMCompletionLogEvent and stats update events were being dispatched via self._pub_sub directly, which sent them to WebSocket subscribers but did NOT persist them to the event log. This meant they would not be available when the UI loads events through WebSocket. In LocalConversation, there's a default callback that persists all events by appending them to self._state.events. By emitting events through self._conversation._on_event instead of self._pub_sub, we ensure that: 1. Events go through all callbacks including the default persistence callback 2. Events are sent to WebSocket subscribers via the AsyncCallbackWrapper 3. Events are persisted to self._state.events for later retrieval Co-authored-by: openhands --- .../openhands/agent_server/event_service.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 2d51a2a88e..b3ed712ede 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -236,10 +236,13 @@ def _emit_event_from_thread(self, event: Event) -> None: """Helper to safely emit events from non-async contexts (e.g., callbacks). This schedules event emission in the main event loop, making it safe to call - from callbacks that may run in different threads. + from callbacks that may run in different threads. Events are emitted through + the conversation's normal event flow to ensure they are persisted. """ - if self._main_loop and self._main_loop.is_running(): - asyncio.run_coroutine_threadsafe(self._pub_sub(event), self._main_loop) + if self._main_loop and self._main_loop.is_running() and self._conversation: + # Run the synchronous _on_event callback in an executor to ensure + # the event is both persisted and sent to WebSocket subscribers + self._main_loop.run_in_executor(None, self._conversation._on_event, event) def _setup_llm_log_streaming(self, agent: Agent) -> None: """Configure LLM log callbacks to stream logs via events.""" From ef703b1b438a7497d5478107f7d7c2ffa79d0986 Mon Sep 17 00:00:00 2001 From: openhands Date: Mon, 24 Nov 2025 14:56:51 +0000 Subject: [PATCH 07/10] Remove _log_completion_folders from RemoteConversation The server should not be aware of log_completions_folder - it only needs to stream LLM completion log events back to the client. The client can then decide where to write these logs based on its own LLM configuration. This change: - Removes the _log_completion_folders dict that pre-cached folder paths - Updates the callback to look up the LLM directly from agent config - Accesses log_completions_folder from the LLM at write time - Makes the log folder path purely a client-side concern Co-authored-by: openhands --- .../conversation/impl/remote_conversation.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 01d048892e..4e3e50416b 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -425,7 +425,6 @@ class RemoteConversation(BaseConversation): max_iteration_per_run: int workspace: RemoteWorkspace _client: httpx.Client - _log_completion_folders: dict[str, str] def __init__( self, @@ -464,13 +463,6 @@ def __init__( self.workspace = workspace self._client = workspace.client - # Build map of log directories for all LLMs in the agent - self._log_completion_folders = {} - for llm in agent.get_all_llms(): - if llm.log_completions: - # Map usage_id to log folder - self._log_completion_folders[llm.usage_id] = llm.log_completions_folder - if conversation_id is None: payload = { "agent": agent.model_dump( @@ -513,7 +505,8 @@ def __init__( self._callbacks.append(state_update_callback) # Add callback to handle LLM completion logs - if self._log_completion_folders: + # Register callback if any LLM has log_completions enabled + if any(llm.log_completions for llm in agent.get_all_llms()): llm_log_callback = self._create_llm_completion_log_callback() self._callbacks.append(llm_log_callback) @@ -563,15 +556,22 @@ def callback(event: Event) -> None: if not isinstance(event, LLMCompletionLogEvent): return - # Get the log directory for this LLM's usage_id - log_dir = self._log_completion_folders.get(event.usage_id) - if not log_dir: + # Find the LLM with matching usage_id + target_llm = None + for llm in self.agent.get_all_llms(): + if llm.usage_id == event.usage_id: + target_llm = llm + break + + if not target_llm or not target_llm.log_completions: logger.debug( - f"No log directory configured for usage_id={event.usage_id}" + f"No LLM with log_completions enabled found " + f"for usage_id={event.usage_id}" ) return try: + log_dir = target_llm.log_completions_folder os.makedirs(log_dir, exist_ok=True) log_path = os.path.join(log_dir, event.filename) with open(log_path, "w") as f: From 823aea522e44e71e831006e46592d8e1f3c25889 Mon Sep 17 00:00:00 2001 From: hieptl Date: Tue, 25 Nov 2025 20:18:13 +0700 Subject: [PATCH 08/10] fix: set up callbacks --- .../openhands/agent_server/event_service.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 6fe3aea316..c3844f704f 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -12,7 +12,7 @@ ) from openhands.agent_server.pub_sub import PubSub, Subscriber from openhands.agent_server.utils import utc_now -from openhands.sdk import LLM, Agent, Event, Message, get_logger +from openhands.sdk import LLM, Agent, AgentBase, Event, Message, get_logger from openhands.sdk.conversation.impl.local_conversation import LocalConversation from openhands.sdk.conversation.secret_registry import SecretValue from openhands.sdk.conversation.state import ( @@ -292,7 +292,7 @@ def _emit_event_from_thread(self, event: Event) -> None: # the event is both persisted and sent to WebSocket subscribers self._main_loop.run_in_executor(None, self._conversation._on_event, event) - def _setup_llm_log_streaming(self, agent: Agent) -> None: + def _setup_llm_log_streaming(self, agent: AgentBase) -> None: """Configure LLM log callbacks to stream logs via events.""" for llm in agent.get_all_llms(): if not llm.log_completions: @@ -316,7 +316,7 @@ def log_callback( llm.telemetry.set_log_callback(log_callback) - def _setup_stats_streaming(self, agent: Agent) -> None: + def _setup_stats_streaming(self, agent: AgentBase) -> None: """Configure stats update callbacks to stream stats changes via events.""" def stats_callback() -> None: @@ -343,12 +343,6 @@ async def start(self): Path(workspace.working_dir).mkdir(parents=True, exist_ok=True) agent = Agent.model_validate(self.stored.agent.model_dump()) - # Setup LLM log streaming for remote execution - self._setup_llm_log_streaming(agent) - - # Setup stats streaming for remote execution - self._setup_stats_streaming(agent) - conversation = LocalConversation( agent=agent, workspace=workspace, @@ -370,6 +364,12 @@ async def start(self): # Register state change callback to automatically publish updates self._conversation._state.set_on_state_change(self._conversation._on_event) + # Setup LLM log streaming for remote execution + self._setup_llm_log_streaming(self._conversation.agent) + + # Setup stats streaming for remote execution + self._setup_stats_streaming(self._conversation.agent) + # Publish initial state update await self._publish_state_update() From 298982cacfac86654b817f288b89cd7e2b95de04 Mon Sep 17 00:00:00 2001 From: hieptl Date: Tue, 25 Nov 2025 21:20:22 +0700 Subject: [PATCH 09/10] refactor: _emit_event_from_thread --- .../openhands/agent_server/event_service.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index c3844f704f..3e5671be40 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -288,9 +288,18 @@ def _emit_event_from_thread(self, event: Event) -> None: the conversation's normal event flow to ensure they are persisted. """ if self._main_loop and self._main_loop.is_running() and self._conversation: - # Run the synchronous _on_event callback in an executor to ensure - # the event is both persisted and sent to WebSocket subscribers - self._main_loop.run_in_executor(None, self._conversation._on_event, event) + # Capture conversation reference for closure + conversation = self._conversation + + # Wrap _on_event with lock acquisition to ensure thread-safe access + # to conversation state and event log during concurrent operations + def locked_on_event(): + with conversation._state: + conversation._on_event(event) + + # Run the locked callback in an executor to ensure the event is + # both persisted and sent to WebSocket subscribers + self._main_loop.run_in_executor(None, locked_on_event) def _setup_llm_log_streaming(self, agent: AgentBase) -> None: """Configure LLM log callbacks to stream logs via events.""" From decfd82c13aa2fca8bdf3e00e68f2a7fbc925b2e Mon Sep 17 00:00:00 2001 From: openhands Date: Tue, 25 Nov 2025 15:48:54 +0000 Subject: [PATCH 10/10] Rename set_log_callback to set_log_completions_callback This change makes the naming more explicit and descriptive: - Renamed _log_callback to _log_completions_callback - Renamed set_log_callback() to set_log_completions_callback() - Updated all references across the codebase - Updated tests to use the new naming Co-authored-by: openhands --- .../openhands/agent_server/event_service.py | 2 +- openhands-sdk/openhands/sdk/llm/llm.py | 2 +- .../openhands/sdk/llm/utils/telemetry.py | 16 ++++++++++------ tests/sdk/llm/test_llm_telemetry.py | 8 ++++---- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 3e5671be40..1fbbb5d301 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -323,7 +323,7 @@ def log_callback( ) self._emit_event_from_thread(event) - llm.telemetry.set_log_callback(log_callback) + llm.telemetry.set_log_completions_callback(log_callback) def _setup_stats_streaming(self, agent: AgentBase) -> None: """Configure stats update callbacks to stream stats changes via events.""" diff --git a/openhands-sdk/openhands/sdk/llm/llm.py b/openhands-sdk/openhands/sdk/llm/llm.py index a36ab38851..7127cb841c 100644 --- a/openhands-sdk/openhands/sdk/llm/llm.py +++ b/openhands-sdk/openhands/sdk/llm/llm.py @@ -464,7 +464,7 @@ def telemetry(self) -> Telemetry: Telemetry object for managing logging and metrics callbacks. Example: - >>> llm.telemetry.set_log_callback(my_callback) + >>> llm.telemetry.set_log_completions_callback(my_callback) """ assert self._telemetry is not None, ( "Telemetry should be initialized after model validation" diff --git a/openhands-sdk/openhands/sdk/llm/utils/telemetry.py b/openhands-sdk/openhands/sdk/llm/utils/telemetry.py index f3e7b7e2fb..a931fbcbfb 100644 --- a/openhands-sdk/openhands/sdk/llm/utils/telemetry.py +++ b/openhands-sdk/openhands/sdk/llm/utils/telemetry.py @@ -43,7 +43,9 @@ class Telemetry(BaseModel): _req_start: float = PrivateAttr(default=0.0) _req_ctx: dict[str, Any] = PrivateAttr(default_factory=dict) _last_latency: float = PrivateAttr(default=0.0) - _log_callback: Callable[[str, str], None] | None = PrivateAttr(default=None) + _log_completions_callback: Callable[[str, str], None] | None = PrivateAttr( + default=None + ) _stats_update_callback: Callable[[], None] | None = PrivateAttr(default=None) model_config: ClassVar[ConfigDict] = ConfigDict( @@ -51,14 +53,16 @@ class Telemetry(BaseModel): ) # ---------- Lifecycle ---------- - def set_log_callback(self, callback: Callable[[str, str], None] | None) -> None: + def set_log_completions_callback( + self, callback: Callable[[str, str], None] | None + ) -> None: """Set a callback function for logging instead of writing to file. Args: callback: A function that takes (filename, log_data) and handles the log. Used for streaming logs in remote execution contexts. """ - self._log_callback = callback + self._log_completions_callback = callback def set_stats_update_callback(self, callback: Callable[[], None] | None) -> None: """Set a callback function to be notified when stats are updated. @@ -247,7 +251,7 @@ def log_llm_call( raw_resp: ModelResponse | ResponsesAPIResponse | None = None, ) -> None: # Skip if neither file logging nor callback is configured - if not self.log_dir and not self._log_callback: + if not self.log_dir and not self._log_completions_callback: return try: # Prepare filename and log data @@ -324,8 +328,8 @@ def log_llm_call( log_data = json.dumps(data, default=_safe_json, ensure_ascii=False) # Use callback if set (for remote execution), otherwise write to file - if self._log_callback: - self._log_callback(filename, log_data) + if self._log_completions_callback: + self._log_completions_callback(filename, log_data) elif self.log_dir: # Create log directory if it doesn't exist os.makedirs(self.log_dir, exist_ok=True) diff --git a/tests/sdk/llm/test_llm_telemetry.py b/tests/sdk/llm/test_llm_telemetry.py index c0a70ddc70..dde6ec4904 100644 --- a/tests/sdk/llm/test_llm_telemetry.py +++ b/tests/sdk/llm/test_llm_telemetry.py @@ -791,12 +791,12 @@ def test_set_log_callback(self, basic_telemetry): def log_callback(filename: str, log_data: str): callback_called.append((filename, log_data)) - basic_telemetry.set_log_callback(log_callback) - assert basic_telemetry._log_callback == log_callback + basic_telemetry.set_log_completions_callback(log_callback) + assert basic_telemetry._log_completions_callback == log_callback # Clear callback - basic_telemetry.set_log_callback(None) - assert basic_telemetry._log_callback is None + basic_telemetry.set_log_completions_callback(None) + assert basic_telemetry._log_completions_callback is None def test_set_stats_update_callback(self, basic_telemetry): """Test setting stats update callback."""