Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 182 additions & 12 deletions app/agent_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1268,18 +1268,17 @@ async def _check_agent_limits(self) -> bool:

# Check action limits
if (action_count / max_actions) >= 1.0:
# Log warning BEFORE cancelling task (stream is removed during cancel)
if self.event_stream_manager:
self.event_stream_manager.log(
"warning",
f"Action limit reached: 100% of the maximum actions ({max_actions} actions) has been used. Aborting task.",
display_message=f"Action limit reached: 100% of the maximum ({max_actions} actions) has been used. Aborting task.",
f"Action limit reached: 100% of the maximum actions ({max_actions} actions) has been used. Waiting for user decision.",
display_message=None,
task_id=current_task_id,
)
self.state_manager.bump_event_stream()
response = await self.task_manager.mark_task_cancel(reason=f"Task reached the maximum actions allowed limit: {max_actions}")
task_cancelled: bool = response
return not task_cancelled
await self._send_limit_choice_message("action", current_task_id)
await self._pause_task_for_limit_choice(current_task_id)
return False
elif (action_count / max_actions) >= 0.8:
if self.event_stream_manager:
self.event_stream_manager.log(
Expand All @@ -1295,18 +1294,17 @@ async def _check_agent_limits(self) -> bool:

# Check token limits
if (token_count / max_tokens) >= 1.0:
# Log warning BEFORE cancelling task (stream is removed during cancel)
if self.event_stream_manager:
self.event_stream_manager.log(
"warning",
f"Token limit reached: 100% of the maximum tokens ({max_tokens} tokens) has been used. Aborting task.",
display_message=f"Token limit reached: 100% of the maximum ({max_tokens} tokens) has been used. Aborting task.",
f"Token limit reached: 100% of the maximum tokens ({max_tokens} tokens) has been used. Waiting for user decision.",
display_message=None,
task_id=current_task_id,
)
self.state_manager.bump_event_stream()
response = await self.task_manager.mark_task_cancel(reason=f"Task reached the maximum tokens allowed limit: {max_tokens}")
task_cancelled: bool = response
return not task_cancelled
await self._send_limit_choice_message("token", current_task_id)
await self._pause_task_for_limit_choice(current_task_id)
return False
elif (token_count / max_tokens) >= 0.8:
if self.event_stream_manager:
self.event_stream_manager.log(
Expand All @@ -1323,6 +1321,178 @@ async def _check_agent_limits(self) -> bool:
# No limits close or reached
return True

async def _send_limit_choice_message(
self, limit_type: str, session_id: str
) -> None:
"""Send a chat message with Continue/Abort options when a limit is reached."""
label = "Action" if limit_type == "action" else "Token"

# Include task name so user knows which task hit the limit
task_name_suffix = ""
if self.task_manager:
task = self.task_manager.tasks.get(session_id)
if task and task.name:
task_name_suffix = f' for task "{task.name}"'

message = (
f"{label} limit reached{task_name_suffix}. "
f"Would you like to continue (reset limits) or abort the task?"
)
logger.info(f"[LIMIT] Sending limit choice message for session {session_id}: {message}")

# Log to event stream for task context persistence only (display_message=None
# to avoid a duplicate chat message from the event watcher).
if self.event_stream_manager:
try:
self.event_stream_manager.log(
"internal",
message,
display_message=None,
task_id=session_id,
)
except Exception as e:
logger.error(f"[LIMIT] Failed to log to event stream: {e}", exc_info=True)

# Display message with options directly in the chat UI (awaited).
# We bypass the event bus (which uses fire-and-forget create_task)
# to ensure the message is broadcast before the method returns.
if self.ui_controller and self.ui_controller.active_adapter:
try:
from app.ui_layer.components.types import ChatMessage, ChatMessageOption
from app.onboarding import onboarding_manager
import time as _time
agent_name = onboarding_manager.state.agent_name or "Agent"
options = [
ChatMessageOption(label="Continue", value="continue_limit", style="primary"),
ChatMessageOption(label="Abort", value="abort_limit", style="danger"),
]
await self.ui_controller.active_adapter.chat_component.append_message(
ChatMessage(
sender=agent_name,
content=message,
style="agent",
timestamp=_time.time(),
task_session_id=session_id,
options=options,
)
)
logger.info(f"[LIMIT] Options message displayed in chat for session {session_id}")
except Exception as e:
logger.error(f"[LIMIT] Failed to display options in chat: {e}", exc_info=True)
else:
logger.warning(f"[LIMIT] No active UI adapter - options message not displayed")

async def _pause_task_for_limit_choice(self, session_id: str) -> None:
"""Pause the task and create a long-delay trigger to keep it alive."""
logger.info(f"[LIMIT] Pausing task {session_id} for limit choice")
task = self.task_manager.tasks.get(session_id) if self.task_manager else None
if task:
task.waiting_for_user_reply = True

# Update UI task status to "paused" - directly await to ensure
# the WebSocket broadcast completes before the react loop cleans up.
if self.ui_controller and self.ui_controller.active_adapter:
try:
action_panel = self.ui_controller.active_adapter.action_panel
if action_panel:
await action_panel.update_item(session_id, "paused")
except Exception as e:
logger.error(f"[LIMIT] Failed to update task status to paused: {e}", exc_info=True)

from app.ui_layer.events import UIEvent, UIEventType
self.ui_controller.event_bus.emit(
UIEvent(
type=UIEventType.AGENT_STATE_CHANGED,
data={"state": "waiting", "status_message": "Paused - waiting for user decision..."},
)
)

# Create a long-delay trigger so the task stays alive
try:
await self.triggers.put(
Trigger(
fire_at=time.time() + 10800,
priority=5,
next_action_description="Waiting for user decision on limit reached",
session_id=session_id,
payload={"gui_mode": STATE.gui_mode},
waiting_for_reply=True,
),
skip_merge=True,
)
except Exception as e:
logger.error(f"[LIMIT] Failed to create pause trigger for {session_id}: {e}", exc_info=True)

async def handle_limit_continue(self, session_id: str) -> None:
"""User chose to continue past the limit. Reset counters and resume."""
task = self.task_manager.tasks.get(session_id) if self.task_manager else None
if not task:
logger.warning(f"[LIMIT] Task {session_id} not found for limit continue")
return

# Reset counters
STATE.set_agent_property("action_count", 0)
STATE.set_agent_property("token_count", 0)

# Also reset on the StateSession for this session
from agent_core.core.state.session import StateSession
session = StateSession.get(session_id)
if session:
session.agent_properties.set_property("action_count", 0)
session.agent_properties.set_property("token_count", 0)

# Clear waiting flag
task.waiting_for_user_reply = False

# Log to event stream as system message
task_label = f' for task "{task.name}"' if task.name else ""
if self.event_stream_manager:
msg = f"User chose to continue{task_label}. Action and token counters have been reset."
self.event_stream_manager.log(
"system", msg, display_message=msg, task_id=session_id,
)
self.state_manager.bump_event_stream()

# Update UI state back to working
if self.ui_controller:
from app.ui_layer.events import UIEvent, UIEventType
self.ui_controller.event_bus.emit(
UIEvent(
type=UIEventType.TASK_UPDATE,
data={"task_id": session_id, "status": "running"},
)
)
self.ui_controller.event_bus.emit(
UIEvent(
type=UIEventType.AGENT_STATE_CHANGED,
data={"state": "working", "status_message": "Agent is working..."},
)
)

# Fire the trigger to resume execution
await self.triggers.fire(session_id)

async def handle_limit_abort(self, session_id: str) -> None:
"""User chose to abort after reaching limit."""
task = self.task_manager.tasks.get(session_id) if self.task_manager else None
task_label = f' for task "{task.name}"' if task and task.name else ""
if task:
task.waiting_for_user_reply = False

# Log system message before cancelling (stream is removed during cancel)
if self.event_stream_manager:
msg = f"User chose to abort{task_label}. Task has been cancelled."
self.event_stream_manager.log(
"system", msg, display_message=msg, task_id=session_id,
)
self.state_manager.bump_event_stream()

if self.task_manager:
await self.task_manager.mark_task_cancel(
reason="User chose to abort after reaching limit.",
task_id=session_id,
)

# ----- Trigger Management -----

async def _cleanup_session_triggers(self, session_id: str) -> None:
Expand Down
6 changes: 4 additions & 2 deletions app/gui/gui_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,13 +786,15 @@ async def _check_agent_limits(self) -> bool:
token_count: int = agent_properties.get("token_count", 0)
max_tokens: int = agent_properties.get("max_tokens_per_task", 0)

# Check action limits
# Check action limits - returns False to switch to CLI mode,
# where the agent_base's _check_agent_limits will handle the
# pause-and-ask flow with user options.
if (action_count / max_actions) >= 1.0:
return False

# Check token limits
if (token_count / max_tokens) >= 1.0:
return False

# No limits close or reached
return True
18 changes: 17 additions & 1 deletion app/ui_layer/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
InputComponentProtocol,
FootageComponentProtocol,
)
from app.ui_layer.components.types import ChatMessage, ActionItem
from app.ui_layer.components.types import ChatMessage, ChatMessageOption, ActionItem

if TYPE_CHECKING:
from app.ui_layer.controller.ui_controller import UIController
Expand Down Expand Up @@ -271,12 +271,25 @@ def _handle_agent_message(self, event: UIEvent) -> None:
from app.onboarding import onboarding_manager

agent_name = onboarding_manager.state.agent_name or "Agent"
# Extract options from event data if present
raw_options = event.data.get("options")
options = None
if raw_options and isinstance(raw_options, list):
options = [
ChatMessageOption(
label=o.get("label", ""),
value=o.get("value", ""),
style=o.get("style", "default"),
)
for o in raw_options
]
asyncio.create_task(
self._display_chat_message(
agent_name,
event.data.get("message", ""),
"agent",
task_session_id=event.task_id,
options=options,
)
)

Expand Down Expand Up @@ -442,6 +455,7 @@ async def _display_chat_message(
message: str,
style: str,
task_session_id: Optional[str] = None,
options: Optional[List[ChatMessageOption]] = None,
) -> None:
"""
Display a chat message.
Expand All @@ -451,6 +465,7 @@ async def _display_chat_message(
message: Message content
style: Style identifier
task_session_id: Optional task session ID for reply feature
options: Optional list of interactive options/buttons
"""
import time

Expand All @@ -461,6 +476,7 @@ async def _display_chat_message(
style=style,
timestamp=time.time(),
task_session_id=task_session_id,
options=options,
)
)

Expand Down
Loading