Skip to content

Commit a58ef2b

Browse files
fix: address critical bugs in message steering implementation
- Fix INTERRUPT detection in tool_execution.py (was using broken substring check) - Fix rate limiter blocking urgent messages during tool execution - Add INTERRUPT priority bypass in message queue (URGENT.value + 1) - Add proper error handling in execution_mixin.py with graceful degradation - Extend MessageSteeringProtocol with enabled property for contract compliance - Move test file to proper tests/integration/ directory structure - Add mandatory live agentic test with real LLM execution - Improve prompt formatting with clear separators Fixes critical bugs identified by Greptile, CodeRabbit reviewers. All changes maintain backward compatibility. Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
1 parent e37cfd8 commit a58ef2b

5 files changed

Lines changed: 206 additions & 16 deletions

File tree

src/praisonai-agents/praisonaiagents/agent/execution_mixin.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -808,13 +808,18 @@ def run_chat():
808808

809809
# Check for steering messages before starting chat
810810
if hasattr(self, '_check_steering_messages'):
811-
steering_msg = self._check_steering_messages()
812-
if steering_msg:
813-
# Inject steering message into prompt
814-
prompt_with_steering = f"{prompt}{steering_msg}"
815-
current_status[0] = "Processing steering guidance..."
816-
result_holder[0] = self.chat(prompt_with_steering, **kwargs)
817-
else:
811+
try:
812+
steering_msg = self._check_steering_messages()
813+
if steering_msg:
814+
# Inject steering message into prompt with clear separator
815+
prompt_with_steering = f"{prompt}\n\n{steering_msg}"
816+
current_status[0] = "Processing steering guidance..."
817+
result_holder[0] = self.chat(prompt_with_steering, **kwargs)
818+
else:
819+
result_holder[0] = self.chat(prompt, **kwargs)
820+
except Exception as e:
821+
logger.warning(f"Steering check failed, continuing without steering: {e}")
822+
current_status[0] = "Steering failed, proceeding normally..."
818823
result_holder[0] = self.chat(prompt, **kwargs)
819824
else:
820825
result_holder[0] = self.chat(prompt, **kwargs)

src/praisonai-agents/praisonaiagents/agent/message_steering.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,11 @@ def queue_message(self, message: str, priority: int = 5) -> str:
5656
)
5757

5858
# Use existing message queue with priority mapping
59-
queue_priority = min(priority, MessagePriority.URGENT.value)
59+
# Special handling for INTERRUPT - give it maximum priority
60+
if steering_priority == SteeringPriority.INTERRUPT:
61+
queue_priority = MessagePriority.URGENT.value + 1 # Higher than URGENT
62+
else:
63+
queue_priority = min(priority, MessagePriority.URGENT.value)
6064
success = self._message_queue.enqueue(
6165
content=steering_msg,
6266
priority=queue_priority,
@@ -90,7 +94,8 @@ def process_steering(self, context: Optional[Dict[str, Any]] = None) -> bool:
9094
Process pending steering messages.
9195
9296
This is called during agent execution to check for and process
93-
any steering messages. Uses rate limiting to avoid excessive checking.
97+
any steering messages. Uses rate limiting to avoid excessive checking,
98+
but allows INTERRUPT priority messages to bypass rate limiting.
9499
95100
Args:
96101
context: Execution context that can be updated with steering info
@@ -101,9 +106,17 @@ def process_steering(self, context: Optional[Dict[str, Any]] = None) -> bool:
101106
if not self._enabled:
102107
return False
103108

104-
# Rate limiting - only check every 100ms
109+
# Check if we have high priority messages that bypass rate limiting
110+
has_urgent_messages = False
111+
all_messages = self._message_queue.get_all()
112+
for msg in all_messages:
113+
if isinstance(msg, SteeringMessage) and msg.priority.value >= SteeringPriority.HIGH.value:
114+
has_urgent_messages = True
115+
break
116+
117+
# Rate limiting - only check every 100ms unless we have urgent messages
105118
current_time = time.time()
106-
if current_time - self._last_check < self._check_interval:
119+
if not has_urgent_messages and current_time - self._last_check < self._check_interval:
107120
return False
108121

109122
self._last_check = current_time

src/praisonai-agents/praisonaiagents/agent/message_steering_protocols.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,16 @@ def has_pending_messages(self) -> bool:
109109
True if messages are pending, False otherwise
110110
"""
111111
...
112+
113+
@property
114+
def enabled(self) -> bool:
115+
"""
116+
Whether message steering is enabled.
117+
118+
Returns:
119+
True if steering is enabled, False otherwise
120+
"""
121+
...
112122

113123

114124
@runtime_checkable

src/praisonai-agents/praisonaiagents/agent/tool_execution.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,16 @@ def _execute_tool_with_context(self, function_name, arguments, state, tool_call_
197197
try:
198198
# Check for steering messages before tool execution
199199
if hasattr(self, '_check_steering_messages'):
200-
steering_msg = self._check_steering_messages()
201-
if steering_msg and "INTERRUPT" in steering_msg:
202-
# High priority steering - interrupt tool execution
203-
logger.info(f"Tool {function_name} execution interrupted by steering message")
204-
return f"Tool execution interrupted by user guidance: {steering_msg}"
200+
try:
201+
steering_msg = self._check_steering_messages()
202+
if steering_msg:
203+
# Check if steering message indicates interruption priority
204+
# (SteeringMixin formats HIGH/URGENT/INTERRUPT with specific prefixes)
205+
if any(marker in steering_msg for marker in ["[URGENT USER GUIDANCE]", "[INTERRUPT]"]):
206+
logger.info(f"Tool {function_name} execution interrupted by high-priority steering")
207+
return f"Tool execution interrupted by user guidance: {steering_msg}"
208+
except Exception as e:
209+
logger.warning(f"Steering check failed, continuing with tool execution: {e}")
205210

206211
# Trigger BEFORE_TOOL hook
207212
from ..hooks import HookEvent, BeforeToolInput
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Integration tests for message steering capability.
4+
5+
Tests the real-time message steering implementation including
6+
real agentic tests with actual LLM calls.
7+
"""
8+
import time
9+
import threading
10+
import pytest
11+
from praisonaiagents import Agent
12+
13+
14+
def test_message_steering_basic():
15+
"""Test basic message steering functionality."""
16+
# Create agent with message steering enabled
17+
agent = Agent(
18+
name="test_agent",
19+
instructions="You are a helpful assistant. Acknowledge any user guidance.",
20+
message_steering=True,
21+
llm="gpt-4o-mini"
22+
)
23+
24+
# Verify steering is enabled
25+
assert agent.message_steering_enabled, "Message steering should be enabled"
26+
27+
# Test queueing messages
28+
msg_id = agent.steer("Focus on being concise")
29+
assert msg_id, "Should return message ID"
30+
31+
status = agent.get_steering_status()
32+
assert status["enabled"], "Steering should be enabled"
33+
assert status["pending_count"] > 0, "Should have pending messages"
34+
35+
36+
def test_message_steering_disabled():
37+
"""Test that steering is disabled by default."""
38+
agent = Agent(name="test_agent", instructions="You are helpful")
39+
40+
# Verify steering is disabled
41+
assert not agent.message_steering_enabled, "Message steering should be disabled by default"
42+
43+
# Test steering call returns empty ID
44+
msg_id = agent.steer("This should be ignored")
45+
assert msg_id == "", "Should return empty string when disabled"
46+
47+
status = agent.get_steering_status()
48+
assert not status["enabled"], "Steering should be disabled"
49+
assert status["pending_count"] == 0, "Should have no pending messages"
50+
51+
52+
def test_message_steering_integration():
53+
"""Test integration with execution (smoke test only - no actual LLM call)."""
54+
agent = Agent(
55+
name="integration_test",
56+
instructions="You are helpful",
57+
message_steering=True
58+
)
59+
60+
# Add a steering message
61+
msg_id = agent.steer("Please be very brief", priority=10)
62+
assert msg_id, "Should queue message"
63+
64+
# Check that steering check method exists
65+
assert hasattr(agent, '_check_steering_messages'), "Should have steering check method"
66+
67+
# Test the steering check method
68+
steering_msg = agent._check_steering_messages()
69+
assert steering_msg is not None, "Should return steering message"
70+
assert "USER GUIDANCE" in steering_msg, "Should format as guidance"
71+
assert "brief" in steering_msg.lower(), "Should contain original message"
72+
73+
74+
def test_message_steering_live_execution():
75+
"""Test steering injection during live LLM execution (MANDATORY real agentic test)."""
76+
agent = Agent(
77+
name="steering_live_test",
78+
instructions="You are a helpful assistant. Always acknowledge user guidance when provided.",
79+
message_steering=True,
80+
llm="gpt-4o-mini"
81+
)
82+
83+
# Container to capture agent response
84+
result_container = {}
85+
86+
def run_agent():
87+
"""Execute agent in background thread."""
88+
try:
89+
result = agent.start("Explain quantum computing in detail")
90+
result_container["response"] = result
91+
result_container["success"] = True
92+
except Exception as e:
93+
result_container["error"] = str(e)
94+
result_container["success"] = False
95+
96+
# Start agent execution in background
97+
thread = threading.Thread(target=run_agent)
98+
thread.start()
99+
100+
# Allow execution to start
101+
time.sleep(0.5)
102+
103+
# Send steering message while agent is running
104+
msg_id = agent.steer("Keep your explanation under 100 words and focus on practical applications", priority=10)
105+
assert msg_id, "Should queue steering message"
106+
107+
# Wait for execution to complete (with timeout)
108+
thread.join(timeout=30)
109+
110+
# Verify execution completed successfully
111+
assert result_container.get("success", False), f"Agent execution failed: {result_container.get('error', 'Unknown error')}"
112+
113+
response = result_container.get("response", "")
114+
assert response, "Should return non-empty response from LLM"
115+
assert len(response) > 10, "Response should be substantial"
116+
117+
# Print response for manual verification
118+
print(f"\n🤖 Agent Response:\n{response}")
119+
120+
# Verify steering status after execution
121+
final_status = agent.get_steering_status()
122+
# Note: Message may have been processed during execution
123+
print(f"\n📊 Final steering status: {final_status}")
124+
125+
126+
def test_message_steering_priority_handling():
127+
"""Test different priority levels for steering messages."""
128+
agent = Agent(
129+
name="priority_test",
130+
instructions="You are helpful",
131+
message_steering=True
132+
)
133+
134+
# Test different priority levels
135+
low_msg = agent.steer("Low priority guidance", priority=1)
136+
normal_msg = agent.steer("Normal priority guidance", priority=5)
137+
high_msg = agent.steer("High priority guidance", priority=10)
138+
urgent_msg = agent.steer("Urgent guidance", priority=20)
139+
interrupt_msg = agent.steer("Interrupt guidance", priority=30)
140+
141+
# All should return message IDs
142+
assert all([low_msg, normal_msg, high_msg, urgent_msg, interrupt_msg])
143+
144+
# Check pending count
145+
status = agent.get_steering_status()
146+
assert status["pending_count"] == 5
147+
148+
# Process one message
149+
steering_msg = agent._check_steering_messages()
150+
# Should process highest priority first
151+
assert steering_msg is not None
152+
assert "Interrupt" in steering_msg or "Urgent" in steering_msg
153+
154+
155+
if __name__ == "__main__":
156+
# Run tests manually if executed directly
157+
pytest.main([__file__, "-v"])

0 commit comments

Comments
 (0)