Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ async def start_conversation(
raise ValueError("inactive_service")
conversation_id = request.conversation_id or uuid4()

if conversation_id in self._event_services:
existing_event_service = self._event_services[conversation_id]
existing_event_service = self._event_services.get(conversation_id)
if existing_event_service and existing_event_service.is_open():
state = await existing_event_service.get_state()
conversation_info = _compose_conversation_info(
existing_event_service.stored, state
Expand Down Expand Up @@ -402,8 +402,14 @@ async def _start_event_service(self, stored: StoredConversation) -> EventService
]
)

try:
await event_service.start()
except Exception:
# Clean up the event service if startup fails
await event_service.close()
raise

event_services[stored.id] = event_service
await event_service.start()
return event_service


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,6 @@ async def __aenter__(self):
async def __aexit__(self, exc_type, exc_value, traceback):
await self.save_meta()
await self.close()

def is_open(self) -> bool:
return bool(self._conversation)
170 changes: 170 additions & 0 deletions tests/agent_server/test_conversation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,176 @@ async def test_start_conversation_with_duplicate_id(self, conversation_service):
assert result.id == custom_id
assert not is_new

@pytest.mark.asyncio
async def test_start_conversation_reuse_checks_is_open(self, conversation_service):
"""Test that conversation reuse checks if event service is open."""
custom_id = uuid4()

# Create a mock event service that exists but is not open
mock_event_service = AsyncMock(spec=EventService)
mock_event_service.is_open.return_value = False
conversation_service._event_services[custom_id] = mock_event_service

with tempfile.TemporaryDirectory() as temp_dir:
request = StartConversationRequest(
agent=Agent(llm=LLM(model="gpt-4", usage_id="test-llm"), tools=[]),
workspace=LocalWorkspace(working_dir=temp_dir),
confirmation_policy=NeverConfirm(),
conversation_id=custom_id,
)

# Mock the _start_event_service method to avoid actual startup
with patch.object(
conversation_service, "_start_event_service"
) as mock_start:
mock_new_service = AsyncMock(spec=EventService)
mock_new_service.stored = StoredConversation(
id=custom_id,
agent=request.agent,
workspace=request.workspace,
confirmation_policy=request.confirmation_policy,
initial_message=request.initial_message,
metrics=None,
created_at=datetime(2025, 1, 1, 12, 0, 0, tzinfo=UTC),
updated_at=datetime(2025, 1, 1, 12, 30, 0, tzinfo=UTC),
)
mock_state = ConversationState(
id=custom_id,
agent=request.agent,
workspace=request.workspace,
execution_status=ConversationExecutionStatus.IDLE,
confirmation_policy=request.confirmation_policy,
)
mock_new_service.get_state.return_value = mock_state
mock_start.return_value = mock_new_service

result, is_new = await conversation_service.start_conversation(request)

# Should create a new conversation since existing one is not open
assert result.id == custom_id
assert is_new
mock_start.assert_called_once()

@pytest.mark.asyncio
async def test_start_conversation_reuse_when_open(self, conversation_service):
"""Test that conversation is reused when event service is open."""
custom_id = uuid4()

# Create a mock event service that exists and is open
mock_event_service = AsyncMock(spec=EventService)
mock_event_service.is_open.return_value = True
mock_event_service.stored = StoredConversation(
id=custom_id,
agent=Agent(llm=LLM(model="gpt-4", usage_id="test-llm"), tools=[]),
workspace=LocalWorkspace(working_dir="workspace/project"),
confirmation_policy=NeverConfirm(),
initial_message=None,
metrics=None,
created_at=datetime(2025, 1, 1, 12, 0, 0, tzinfo=UTC),
updated_at=datetime(2025, 1, 1, 12, 30, 0, tzinfo=UTC),
)
mock_state = ConversationState(
id=custom_id,
agent=mock_event_service.stored.agent,
workspace=mock_event_service.stored.workspace,
execution_status=ConversationExecutionStatus.IDLE,
confirmation_policy=mock_event_service.stored.confirmation_policy,
)
mock_event_service.get_state.return_value = mock_state
conversation_service._event_services[custom_id] = mock_event_service

with tempfile.TemporaryDirectory() as temp_dir:
request = StartConversationRequest(
agent=Agent(llm=LLM(model="gpt-4", usage_id="test-llm"), tools=[]),
workspace=LocalWorkspace(working_dir=temp_dir),
confirmation_policy=NeverConfirm(),
conversation_id=custom_id,
)

# Mock the _start_event_service method to ensure it's not called
with patch.object(
conversation_service, "_start_event_service"
) as mock_start:
result, is_new = await conversation_service.start_conversation(request)

# Should reuse existing conversation since it's open
assert result.id == custom_id
assert not is_new
mock_start.assert_not_called()

@pytest.mark.asyncio
async def test_start_event_service_failure_cleanup(self, conversation_service):
"""Test that event service is cleaned up when startup fails."""
with tempfile.TemporaryDirectory() as temp_dir:
stored = StoredConversation(
id=uuid4(),
agent=Agent(llm=LLM(model="gpt-4", usage_id="test-llm"), tools=[]),
workspace=LocalWorkspace(working_dir=temp_dir),
confirmation_policy=NeverConfirm(),
initial_message=None,
metrics=None,
created_at=datetime(2025, 1, 1, 12, 0, 0, tzinfo=UTC),
updated_at=datetime(2025, 1, 1, 12, 30, 0, tzinfo=UTC),
)

# Mock EventService to simulate startup failure
with patch(
"openhands.agent_server.conversation_service.EventService"
) as mock_event_service_class:
mock_event_service = AsyncMock()
mock_event_service.start.side_effect = Exception("Startup failed")
mock_event_service.close = AsyncMock()
mock_event_service_class.return_value = mock_event_service

# Attempt to start event service should fail and clean up
with pytest.raises(Exception, match="Startup failed"):
await conversation_service._start_event_service(stored)

# Verify cleanup was called
mock_event_service.close.assert_called_once()

# Verify event service was not stored
assert stored.id not in conversation_service._event_services

@pytest.mark.asyncio
async def test_start_event_service_success_stores_service(
self, conversation_service
):
"""Test that event service is stored only after successful startup."""
with tempfile.TemporaryDirectory() as temp_dir:
stored = StoredConversation(
id=uuid4(),
agent=Agent(llm=LLM(model="gpt-4", usage_id="test-llm"), tools=[]),
workspace=LocalWorkspace(working_dir=temp_dir),
confirmation_policy=NeverConfirm(),
initial_message=None,
metrics=None,
created_at=datetime(2025, 1, 1, 12, 0, 0, tzinfo=UTC),
updated_at=datetime(2025, 1, 1, 12, 30, 0, tzinfo=UTC),
)

# Mock EventService to simulate successful startup
with patch(
"openhands.agent_server.conversation_service.EventService"
) as mock_event_service_class:
mock_event_service = AsyncMock()
mock_event_service.start = AsyncMock() # Successful startup
mock_event_service_class.return_value = mock_event_service

# Start event service should succeed
result = await conversation_service._start_event_service(stored)

# Verify startup was called
mock_event_service.start.assert_called_once()

# Verify event service was stored after successful startup
assert stored.id in conversation_service._event_services
assert (
conversation_service._event_services[stored.id]
== mock_event_service
)
assert result == mock_event_service


class TestConversationServiceUpdateConversation:
"""Test cases for ConversationService.update_conversation method."""
Expand Down
40 changes: 40 additions & 0 deletions tests/agent_server/test_event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,3 +747,43 @@ async def test_send_message_with_different_message_types(self, event_service):
mock_loop.run_in_executor.assert_any_call(
None, conversation.send_message, system_message
)


class TestEventServiceIsOpen:
"""Test cases for EventService.is_open method."""

def test_is_open_when_conversation_is_none(self, event_service):
"""Test is_open returns False when _conversation is None."""
event_service._conversation = None
assert not event_service.is_open()

def test_is_open_when_conversation_exists(self, event_service):
"""Test is_open returns True when _conversation exists."""
conversation = MagicMock(spec=Conversation)
event_service._conversation = conversation
assert event_service.is_open()

def test_is_open_when_conversation_is_falsy(self, event_service):
"""Test is_open returns False when _conversation is falsy."""
# Test with various falsy values
falsy_values = [None, False, 0, "", [], {}]

for falsy_value in falsy_values:
event_service._conversation = falsy_value
assert not event_service.is_open(), f"Expected False for {falsy_value}"

def test_is_open_when_conversation_is_truthy(self, event_service):
"""Test is_open returns True when _conversation is truthy."""
# Test with various truthy values
truthy_values = [
MagicMock(spec=Conversation),
"some_string",
1,
[1, 2, 3],
{"key": "value"},
True,
]

for truthy_value in truthy_values:
event_service._conversation = truthy_value
assert event_service.is_open(), f"Expected True for {truthy_value}"
Loading