diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index f77e9e07c8..1fbbb5d301 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -12,7 +12,7 @@ ) from openhands.agent_server.pub_sub import PubSub, Subscriber from openhands.agent_server.utils import utc_now -from openhands.sdk import LLM, Agent, Event, Message, get_logger +from openhands.sdk import LLM, Agent, AgentBase, Event, Message, get_logger from openhands.sdk.conversation.impl.local_conversation import LocalConversation from openhands.sdk.conversation.secret_registry import SecretValue from openhands.sdk.conversation.state import ( @@ -20,6 +20,7 @@ ConversationState, ) from openhands.sdk.event.conversation_state import ConversationStateUpdateEvent +from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent from openhands.sdk.security.analyzer import SecurityAnalyzerBase from openhands.sdk.security.confirmation_policy import ConfirmationPolicyBase from openhands.sdk.utils.async_utils import AsyncCallbackWrapper @@ -279,6 +280,67 @@ async def subscribe_to_events(self, subscriber: Subscriber[Event]) -> UUID: async def unsubscribe_from_events(self, subscriber_id: UUID) -> bool: return self._pub_sub.unsubscribe(subscriber_id) + def _emit_event_from_thread(self, event: Event) -> None: + """Helper to safely emit events from non-async contexts (e.g., callbacks). + + This schedules event emission in the main event loop, making it safe to call + from callbacks that may run in different threads. Events are emitted through + the conversation's normal event flow to ensure they are persisted. + """ + if self._main_loop and self._main_loop.is_running() and self._conversation: + # Capture conversation reference for closure + conversation = self._conversation + + # Wrap _on_event with lock acquisition to ensure thread-safe access + # to conversation state and event log during concurrent operations + def locked_on_event(): + with conversation._state: + conversation._on_event(event) + + # Run the locked callback in an executor to ensure the event is + # both persisted and sent to WebSocket subscribers + self._main_loop.run_in_executor(None, locked_on_event) + + def _setup_llm_log_streaming(self, agent: AgentBase) -> None: + """Configure LLM log callbacks to stream logs via events.""" + for llm in agent.get_all_llms(): + if not llm.log_completions: + continue + + # Capture variables for closure + usage_id = llm.usage_id + model_name = llm.model + + def log_callback( + filename: str, log_data: str, uid=usage_id, model=model_name + ) -> None: + """Callback to emit LLM completion logs as events.""" + event = LLMCompletionLogEvent( + filename=filename, + log_data=log_data, + model_name=model, + usage_id=uid, + ) + self._emit_event_from_thread(event) + + llm.telemetry.set_log_completions_callback(log_callback) + + def _setup_stats_streaming(self, agent: AgentBase) -> None: + """Configure stats update callbacks to stream stats changes via events.""" + + def stats_callback() -> None: + """Callback to emit stats updates.""" + # Publish only the stats field to avoid sending entire state + if not self._conversation: + return + state = self._conversation._state + with state: + event = ConversationStateUpdateEvent(key="stats", value=state.stats) + self._emit_event_from_thread(event) + + for llm in agent.get_all_llms(): + llm.telemetry.set_stats_update_callback(stats_callback) + async def start(self): # Store the main event loop for cross-thread communication self._main_loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() @@ -289,6 +351,7 @@ async def start(self): assert isinstance(workspace, LocalWorkspace) Path(workspace.working_dir).mkdir(parents=True, exist_ok=True) agent = Agent.model_validate(self.stored.agent.model_dump()) + conversation = LocalConversation( agent=agent, workspace=workspace, @@ -310,6 +373,12 @@ async def start(self): # Register state change callback to automatically publish updates self._conversation._state.set_on_state_change(self._conversation._on_event) + # Setup LLM log streaming for remote execution + self._setup_llm_log_streaming(self._conversation.agent) + + # Setup stats streaming for remote execution + self._setup_stats_streaming(self._conversation.agent) + # Publish initial state update await self._publish_state_update() @@ -406,12 +475,9 @@ async def _publish_state_update(self): state = self._conversation._state with state: - # Create state update event with current state information state_update_event = ConversationStateUpdateEvent.from_conversation_state( state ) - - # Publish the state update event await self._pub_sub(state_update_event) async def __aenter__(self): diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index e95a9f0a39..4e3e50416b 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -1,5 +1,6 @@ import asyncio import json +import os import threading import uuid from collections.abc import Mapping @@ -26,6 +27,7 @@ FULL_STATE_KEY, ConversationStateUpdateEvent, ) +from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent from openhands.sdk.llm import LLM, Message, TextContent from openhands.sdk.logger import DEBUG, get_logger from openhands.sdk.observability.laminar import observe @@ -502,6 +504,12 @@ def __init__( state_update_callback = self._state.create_state_update_callback() self._callbacks.append(state_update_callback) + # Add callback to handle LLM completion logs + # Register callback if any LLM has log_completions enabled + if any(llm.log_completions for llm in agent.get_all_llms()): + llm_log_callback = self._create_llm_completion_log_callback() + self._callbacks.append(llm_log_callback) + # Handle visualization configuration if isinstance(visualizer, ConversationVisualizerBase): # Use custom visualizer instance @@ -541,6 +549,39 @@ def __init__( self._start_observability_span(str(self._id)) + def _create_llm_completion_log_callback(self) -> ConversationCallbackType: + """Create a callback that writes LLM completion logs to client filesystem.""" + + def callback(event: Event) -> None: + if not isinstance(event, LLMCompletionLogEvent): + return + + # Find the LLM with matching usage_id + target_llm = None + for llm in self.agent.get_all_llms(): + if llm.usage_id == event.usage_id: + target_llm = llm + break + + if not target_llm or not target_llm.log_completions: + logger.debug( + f"No LLM with log_completions enabled found " + f"for usage_id={event.usage_id}" + ) + return + + try: + log_dir = target_llm.log_completions_folder + os.makedirs(log_dir, exist_ok=True) + log_path = os.path.join(log_dir, event.filename) + with open(log_path, "w") as f: + f.write(event.log_data) + logger.debug(f"Wrote LLM completion log to {log_path}") + except Exception as e: + logger.warning(f"Failed to write LLM completion log: {e}") + + return callback + @property def id(self) -> ConversationID: return self._id diff --git a/openhands-sdk/openhands/sdk/event/__init__.py b/openhands-sdk/openhands/sdk/event/__init__.py index 9e4346e1dc..f2ae2ea5e3 100644 --- a/openhands-sdk/openhands/sdk/event/__init__.py +++ b/openhands-sdk/openhands/sdk/event/__init__.py @@ -5,6 +5,7 @@ CondensationSummaryEvent, ) from openhands.sdk.event.conversation_state import ConversationStateUpdateEvent +from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent from openhands.sdk.event.llm_convertible import ( ActionEvent, AgentErrorEvent, @@ -35,6 +36,7 @@ "CondensationRequest", "CondensationSummaryEvent", "ConversationStateUpdateEvent", + "LLMCompletionLogEvent", "EventID", "ToolCallID", ] diff --git a/openhands-sdk/openhands/sdk/event/llm_completion_log.py b/openhands-sdk/openhands/sdk/event/llm_completion_log.py new file mode 100644 index 0000000000..47207bcf0f --- /dev/null +++ b/openhands-sdk/openhands/sdk/event/llm_completion_log.py @@ -0,0 +1,39 @@ +"""Event for streaming LLM completion logs from remote agents to clients.""" + +from pydantic import Field + +from openhands.sdk.event.base import Event +from openhands.sdk.event.types import SourceType + + +class LLMCompletionLogEvent(Event): + """Event containing LLM completion log data. + + When an LLM is configured with log_completions=True in a remote conversation, + this event streams the completion log data back to the client through WebSocket + instead of writing it to a file inside the Docker container. + """ + + source: SourceType = "environment" + filename: str = Field( + ..., + description="The intended filename for this log (relative to log directory)", + ) + log_data: str = Field( + ..., + description="The JSON-encoded log data to be written to the file", + ) + model_name: str = Field( + default="unknown", + description="The model name for context", + ) + usage_id: str = Field( + default="default", + description="The LLM usage_id that produced this log", + ) + + def __str__(self) -> str: + return ( + f"LLMCompletionLog(usage_id={self.usage_id}, model={self.model_name}, " + f"file={self.filename})" + ) diff --git a/openhands-sdk/openhands/sdk/llm/llm.py b/openhands-sdk/openhands/sdk/llm/llm.py index c498821e97..7127cb841c 100644 --- a/openhands-sdk/openhands/sdk/llm/llm.py +++ b/openhands-sdk/openhands/sdk/llm/llm.py @@ -456,6 +456,21 @@ def metrics(self) -> Metrics: ) return self._metrics + @property + def telemetry(self) -> Telemetry: + """Get telemetry handler for this LLM instance. + + Returns: + Telemetry object for managing logging and metrics callbacks. + + Example: + >>> llm.telemetry.set_log_completions_callback(my_callback) + """ + assert self._telemetry is not None, ( + "Telemetry should be initialized after model validation" + ) + return self._telemetry + def restore_metrics(self, metrics: Metrics) -> None: # Only used by ConversationStats to seed metrics self._metrics = metrics diff --git a/openhands-sdk/openhands/sdk/llm/utils/telemetry.py b/openhands-sdk/openhands/sdk/llm/utils/telemetry.py index 6e2b2adabd..a931fbcbfb 100644 --- a/openhands-sdk/openhands/sdk/llm/utils/telemetry.py +++ b/openhands-sdk/openhands/sdk/llm/utils/telemetry.py @@ -3,6 +3,7 @@ import time import uuid import warnings +from collections.abc import Callable from typing import Any, ClassVar from litellm.cost_calculator import completion_cost as litellm_completion_cost @@ -42,12 +43,36 @@ class Telemetry(BaseModel): _req_start: float = PrivateAttr(default=0.0) _req_ctx: dict[str, Any] = PrivateAttr(default_factory=dict) _last_latency: float = PrivateAttr(default=0.0) + _log_completions_callback: Callable[[str, str], None] | None = PrivateAttr( + default=None + ) + _stats_update_callback: Callable[[], None] | None = PrivateAttr(default=None) model_config: ClassVar[ConfigDict] = ConfigDict( extra="forbid", arbitrary_types_allowed=True ) # ---------- Lifecycle ---------- + def set_log_completions_callback( + self, callback: Callable[[str, str], None] | None + ) -> None: + """Set a callback function for logging instead of writing to file. + + Args: + callback: A function that takes (filename, log_data) and handles the log. + Used for streaming logs in remote execution contexts. + """ + self._log_completions_callback = callback + + def set_stats_update_callback(self, callback: Callable[[], None] | None) -> None: + """Set a callback function to be notified when stats are updated. + + Args: + callback: A function called whenever metrics are updated. + Used for streaming stats updates in remote execution contexts. + """ + self._stats_update_callback = callback + def on_request(self, log_ctx: dict | None) -> None: self._req_start = time.time() self._req_ctx = log_ctx or {} @@ -86,6 +111,13 @@ def on_response( if self.log_enabled: self.log_llm_call(resp, cost, raw_resp=raw_resp) + # 5) notify about stats update + if self._stats_update_callback is not None: + try: + self._stats_update_callback() + except Exception: + logger.exception("Stats update callback failed", exc_info=True) + return self.metrics.deep_copy() def on_error(self, _err: Exception) -> None: @@ -218,22 +250,17 @@ def log_llm_call( cost: float | None, raw_resp: ModelResponse | ResponsesAPIResponse | None = None, ) -> None: - if not self.log_dir: + # Skip if neither file logging nor callback is configured + if not self.log_dir and not self._log_completions_callback: return try: - # Create log directory if it doesn't exist - os.makedirs(self.log_dir, exist_ok=True) - if not os.access(self.log_dir, os.W_OK): - raise PermissionError(f"log_dir is not writable: {self.log_dir}") - - fname = os.path.join( - self.log_dir, - ( - f"{self.model_name.replace('/', '__')}-" - f"{time.time():.3f}-" - f"{uuid.uuid4().hex[:4]}.json" - ), + # Prepare filename and log data + filename = ( + f"{self.model_name.replace('/', '__')}-" + f"{time.time():.3f}-" + f"{uuid.uuid4().hex[:4]}.json" ) + data = self._req_ctx.copy() data["response"] = ( resp # ModelResponse | ResponsesAPIResponse; @@ -297,8 +324,21 @@ def log_llm_call( and "tools" in data["kwargs"] ): data["kwargs"].pop("tools") - with open(fname, "w", encoding="utf-8") as f: - f.write(json.dumps(data, default=_safe_json, ensure_ascii=False)) + + log_data = json.dumps(data, default=_safe_json, ensure_ascii=False) + + # Use callback if set (for remote execution), otherwise write to file + if self._log_completions_callback: + self._log_completions_callback(filename, log_data) + elif self.log_dir: + # Create log directory if it doesn't exist + os.makedirs(self.log_dir, exist_ok=True) + if not os.access(self.log_dir, os.W_OK): + raise PermissionError(f"log_dir is not writable: {self.log_dir}") + + fname = os.path.join(self.log_dir, filename) + with open(fname, "w", encoding="utf-8") as f: + f.write(log_data) except Exception as e: warnings.warn(f"Telemetry logging failed: {e}") diff --git a/tests/sdk/event/test_llm_completion_log_event.py b/tests/sdk/event/test_llm_completion_log_event.py new file mode 100644 index 0000000000..35bc8cb161 --- /dev/null +++ b/tests/sdk/event/test_llm_completion_log_event.py @@ -0,0 +1,74 @@ +"""Tests for LLMCompletionLogEvent serialization and functionality.""" + +import json + +from openhands.sdk.event import Event, LLMCompletionLogEvent + + +def test_llm_completion_log_event_creation() -> None: + """Test creating an LLMCompletionLogEvent.""" + event = LLMCompletionLogEvent( + filename="test_model__1234567890.123-abcd.json", + log_data='{"test": "data"}', + model_name="test_model", + ) + + assert event.filename == "test_model__1234567890.123-abcd.json" + assert event.log_data == '{"test": "data"}' + assert event.model_name == "test_model" + assert event.source == "environment" + + +def test_llm_completion_log_event_serialization() -> None: + """Test LLMCompletionLogEvent serialization/deserialization.""" + log_data = json.dumps( + { + "response": {"id": "response_123", "model": "test_model"}, + "cost": 0.0001, + "timestamp": 1234567890.123, + } + ) + + event = LLMCompletionLogEvent( + filename="anthropic__claude-sonnet__1234567890.123-abcd.json", + log_data=log_data, + model_name="anthropic/claude-sonnet", + ) + + # Serialize + json_str = event.model_dump_json() + deserialized = LLMCompletionLogEvent.model_validate_json(json_str) + + assert deserialized == event + assert deserialized.filename == event.filename + assert deserialized.log_data == event.log_data + assert deserialized.model_name == event.model_name + + +def test_llm_completion_log_event_as_base_event() -> None: + """Test that LLMCompletionLogEvent can be deserialized as base Event.""" + event = LLMCompletionLogEvent( + filename="test_model__1234567890.123-abcd.json", + log_data='{"test": "data"}', + model_name="test_model", + ) + + # Serialize and deserialize as base Event + json_str = event.model_dump_json() + deserialized = Event.model_validate_json(json_str) + + assert isinstance(deserialized, LLMCompletionLogEvent) + assert deserialized == event + + +def test_llm_completion_log_event_str() -> None: + """Test string representation of LLMCompletionLogEvent.""" + event = LLMCompletionLogEvent( + filename="test_model__1234567890.123-abcd.json", + log_data='{"test": "data"}', + model_name="test_model", + ) + + str_repr = str(event) + assert "test_model" in str_repr + assert "test_model__1234567890.123-abcd.json" in str_repr diff --git a/tests/sdk/llm/test_llm_telemetry.py b/tests/sdk/llm/test_llm_telemetry.py index b69eda805a..dde6ec4904 100644 --- a/tests/sdk/llm/test_llm_telemetry.py +++ b/tests/sdk/llm/test_llm_telemetry.py @@ -779,3 +779,66 @@ def test_cost_calculation_with_zero_cost(self, basic_telemetry, mock_response): assert metrics.accumulated_cost == 0.0 # Should NOT add zero cost to costs list (0.0 is falsy) assert len(basic_telemetry.metrics.costs) == 0 + + +class TestTelemetryCallbacks: + """Test callback functionality for log streaming and stats updates.""" + + def test_set_log_callback(self, basic_telemetry): + """Test setting log callback.""" + callback_called = [] + + def log_callback(filename: str, log_data: str): + callback_called.append((filename, log_data)) + + basic_telemetry.set_log_completions_callback(log_callback) + assert basic_telemetry._log_completions_callback == log_callback + + # Clear callback + basic_telemetry.set_log_completions_callback(None) + assert basic_telemetry._log_completions_callback is None + + def test_set_stats_update_callback(self, basic_telemetry): + """Test setting stats update callback.""" + callback_called = [] + + def stats_callback(): + callback_called.append(True) + + basic_telemetry.set_stats_update_callback(stats_callback) + assert basic_telemetry._stats_update_callback == stats_callback + + # Clear callback + basic_telemetry.set_stats_update_callback(None) + assert basic_telemetry._stats_update_callback is None + + def test_stats_update_callback_triggered_on_response( + self, basic_telemetry, mock_response + ): + """Test that stats update callback is triggered on response.""" + callback_called = [] + + def stats_callback(): + callback_called.append(True) + + basic_telemetry.set_stats_update_callback(stats_callback) + basic_telemetry.on_request(None) + basic_telemetry.on_response(mock_response) + + # Callback should be triggered once after response + assert len(callback_called) == 1 + + def test_stats_update_callback_exception_handling( + self, basic_telemetry, mock_response + ): + """Test that exceptions in stats callback don't break on_response.""" + + def failing_callback(): + raise Exception("Callback failed") + + basic_telemetry.set_stats_update_callback(failing_callback) + basic_telemetry.on_request(None) + + # Should not raise exception even if callback fails + metrics = basic_telemetry.on_response(mock_response) + assert isinstance(metrics, Metrics)