Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions backend/agents/create_agent_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from database.tool_db import search_tools_for_sub_agent
from utils.prompt_template_utils import get_agent_prompt_template
from utils.config_utils import tenant_config_manager, get_model_name_from_config
from utils.auth_utils import get_current_user_id
from consts.const import LOCAL_MCP_SERVER

logger = logging.getLogger("create_agent_info")
Expand Down Expand Up @@ -314,12 +313,11 @@ async def create_agent_run_info(
minio_files,
query,
history,
authorization,
tenant_id: str,
user_id: str,
language: str = "zh",
allow_memory_search: bool = True,
):
user_id, tenant_id = get_current_user_id(authorization)

final_query = await join_minio_file_description_to_query(minio_files=minio_files, query=query)
model_list = await create_model_config_list(tenant_id)
agent_config = await create_agent_config(
Expand Down
173 changes: 117 additions & 56 deletions backend/services/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from fastapi import Header, Request
from fastapi.responses import JSONResponse, StreamingResponse
from nexent.core.agents.run_agent import agent_run
from nexent.memory.memory_service import clear_memory
from nexent.memory.memory_service import clear_memory, add_memory_in_levels

from agents.agent_run_manager import agent_run_manager
from agents.create_agent_info import create_agent_run_info, create_tool_config_list
from agents.preprocess_manager import preprocess_manager
from consts.exceptions import AgentRunException, MemoryPreparationException
from consts.exceptions import MemoryPreparationException
from consts.model import (
AgentInfoRequest,
AgentRequest,
Expand Down Expand Up @@ -79,7 +79,8 @@ def _resolve_user_tenant_language(

async def _stream_agent_chunks(
agent_request: "AgentRequest",
authorization: str,
user_id: str,
tenant_id: str,
agent_run_info,
memory_ctx,
):
Expand All @@ -91,25 +92,87 @@ async def _stream_agent_chunks(
"""

local_messages = []
captured_final_answer = None
try:
async for chunk in agent_run(agent_run_info, memory_ctx):
async for chunk in agent_run(agent_run_info):
local_messages.append(chunk)
# Try to capture the final answer as it streams by in order to start memory addition
try:
data = json.loads(chunk)
if data.get("type") == "final_answer":
captured_final_answer = data.get("content")
except Exception:
pass
yield f"data: {chunk}\n\n"
except Exception as run_exc:
logger.error(f"Agent run error: {str(run_exc)}")
raise AgentRunException(f"Agent run error: {str(run_exc)}")
# Emit an error chunk and terminate the stream immediately
try:
error_payload = json.dumps(
{"type": "error", "content": str(run_exc)}, ensure_ascii=False)
yield f"data: {error_payload}\n\n"
finally:
return
finally:
# Persist assistant messages for non-debug runs
if not agent_request.is_debug:
save_messages(
agent_request,
target="assistant",
messages=local_messages,
authorization=authorization,
tenant_id=tenant_id,
user_id=user_id,
)
# Always unregister the run to release resources
agent_run_manager.unregister_agent_run(agent_request.conversation_id)

# Schedule memory addition in background to avoid blocking SSE termination
async def _add_memory_background():
try:
# Skip if memory recording is disabled
if not getattr(memory_ctx.user_config, "memory_switch", False):
return
# Use the captured final answer during streaming; observer queue was drained
final_answer_local = captured_final_answer
if not final_answer_local:
return

# Determine allowed memory levels
levels_local = {"agent", "user_agent"}
if memory_ctx.user_config.agent_share_option == "never":
levels_local.discard("agent")
if memory_ctx.agent_id in getattr(memory_ctx.user_config, "disable_agent_ids", []):
levels_local.discard("agent")
if memory_ctx.agent_id in getattr(memory_ctx.user_config, "disable_user_agent_ids", []):
levels_local.discard("user_agent")
if not levels_local:
return

mem_messages_local = [
{"role": "user", "content": agent_run_info.query},
{"role": "assistant", "content": final_answer_local},
]

add_result_local = await add_memory_in_levels(
messages=mem_messages_local,
memory_config=memory_ctx.memory_config,
tenant_id=memory_ctx.tenant_id,
user_id=memory_ctx.user_id,
agent_id=memory_ctx.agent_id,
memory_levels=list(levels_local),
)
items_local = add_result_local.get("results", [])
logger.info(f"Memory addition completed: {items_local}")
except Exception as bg_e:
logger.error(
f"Unexpected error during background memory addition: {bg_e}")

try:
asyncio.create_task(_add_memory_background())
except Exception as schedule_err:
logger.error(
f"Failed to schedule background memory addition: {schedule_err}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个方法流程有点长了,建议拆分几个子函数



def get_enable_tool_id_by_agent_id(agent_id: int, tenant_id: str):
all_tool_instance = query_all_enabled_tool_instances(
Expand Down Expand Up @@ -592,21 +655,14 @@ def insert_related_agent_impl(parent_agent_id, child_agent_id, tenant_id):
# Helper function for run_agent_stream, used to prepare context for an agent run
async def prepare_agent_run(
agent_request: AgentRequest,
http_request: Request,
authorization: str,
user_id: str = None,
tenant_id: str = None,
user_id: str,
tenant_id: str,
language: str="zh",
allow_memory_search: bool = True,
):
"""
Prepare for an agent run by creating context and run info, and registering the run.
"""
user_id, tenant_id, language = _resolve_user_tenant_language(
authorization=authorization,
http_request=http_request,
user_id=user_id,
tenant_id=tenant_id,
)

memory_context = build_memory_context(
user_id, tenant_id, agent_request.agent_id)
Expand All @@ -615,7 +671,8 @@ async def prepare_agent_run(
minio_files=agent_request.minio_files,
query=agent_request.query,
history=agent_request.history,
authorization=authorization,
tenant_id=tenant_id,
user_id=user_id,
language=language,
allow_memory_search=allow_memory_search,
)
Expand All @@ -625,26 +682,25 @@ async def prepare_agent_run(


# Helper function for run_agent_stream, used to save messages for either user or assistant
def save_messages(agent_request, target: str, messages=None, authorization=None):
def save_messages(agent_request, target: str, user_id: str, tenant_id: str, messages=None):
if target == "user":
if messages is not None:
raise ValueError("Messages should be None when saving for user.")
submit(save_conversation_user, agent_request, authorization)
submit(save_conversation_user, agent_request, user_id, tenant_id)
elif target == "assistant":
if messages is None:
raise ValueError(
"Messages cannot be None when saving for assistant.")
submit(save_conversation_assistant,
agent_request, messages, authorization)
agent_request, messages, user_id, tenant_id)


# Helper function for run_agent_stream, used to generate stream response with memory preprocess tokens
async def generate_stream_with_memory(
agent_request: AgentRequest,
http_request: Request,
authorization: str,
user_id: str = None,
tenant_id: str = None,
user_id: str,
tenant_id: str,
language: str = "zh",
):
# Prepare preprocess task tracking (simulate preprocess flow)
task_id = str(uuid.uuid4())
Expand Down Expand Up @@ -674,15 +730,8 @@ def _memory_token(message_text: str) -> str:

memory_enabled = False
try:
# Decide whether to emit memory tokens based on user's memory switch
user_id_preview, tenant_id_preview, _ = _resolve_user_tenant_language(
authorization=authorization,
http_request=http_request,
user_id=user_id,
tenant_id=tenant_id,
)
memory_context_preview = build_memory_context(
user_id_preview, tenant_id_preview, agent_request.agent_id
user_id, tenant_id, agent_request.agent_id
)
memory_enabled = bool(memory_context_preview.user_config.memory_switch)

Expand All @@ -694,10 +743,9 @@ def _memory_token(message_text: str) -> str:
try:
agent_run_info, memory_context = await prepare_agent_run(
agent_request=agent_request,
http_request=http_request,
authorization=authorization,
user_id=user_id,
tenant_id=tenant_id,
language=language,
allow_memory_search=True,
)
except Exception as prep_err:
Expand All @@ -709,7 +757,11 @@ def _memory_token(message_text: str) -> str:
yield f"data: {_memory_token(msg_done)}\n\n"

async for data_chunk in _stream_agent_chunks(
agent_request, authorization, agent_run_info, memory_context
agent_request=agent_request,
user_id=user_id,
tenant_id=tenant_id,
agent_run_info=agent_run_info,
memory_ctx=memory_context,
):
yield data_chunk

Expand All @@ -722,18 +774,24 @@ def _memory_token(message_text: str) -> str:
# Fallback to the no-memory streaming path, which internally handles
async for data_chunk in generate_stream_no_memory(
agent_request,
http_request,
authorization,
user_id=user_id,
tenant_id=tenant_id,
):
yield data_chunk
except Exception as run_exc:
logger.error(f"Agent run error after memory failure: {str(run_exc)}")
raise AgentRunException(f"Agent run error: {str(run_exc)}")
# Emit an error chunk and terminate the stream immediately
error_payload = json.dumps(
{"type": "error", "content": str(run_exc)}, ensure_ascii=False)
yield f"data: {error_payload}\n\n"
return
except Exception as e:
logger.error(f"Generate stream with memory error: {str(e)}")
raise AgentRunException(f"Generate stream with memory error: {str(e)}")
# Emit an error chunk and terminate the stream immediately
error_payload = json.dumps(
{"type": "error", "content": str(e)}, ensure_ascii=False)
yield f"data: {error_payload}\n\n"
return
finally:
# Always unregister preprocess task
preprocess_manager.unregister_preprocess_task(task_id)
Expand All @@ -742,25 +800,27 @@ def _memory_token(message_text: str) -> str:
# Helper function for run_agent_stream, used when user memory is disabled (no memory tokens)
async def generate_stream_no_memory(
agent_request: AgentRequest,
http_request: Request,
authorization: str,
user_id: str = None,
tenant_id: str = None,
user_id: str,
tenant_id: str,
language: str="zh",
):
"""Stream agent responses without any memory preprocessing tokens or fallback logic."""

# Prepare run info respecting memory disabled (honor provided user_id/tenant_id)
agent_run_info, memory_context = await prepare_agent_run(
agent_request=agent_request,
http_request=http_request,
authorization=authorization,
user_id=user_id,
tenant_id=tenant_id,
language=language,
allow_memory_search=False,
)

async for data_chunk in _stream_agent_chunks(
agent_request, authorization, agent_run_info, memory_context
agent_request=agent_request,
user_id=user_id,
tenant_id=tenant_id,
agent_run_info=agent_run_info,
memory_ctx=memory_context,
):
yield data_chunk

Expand All @@ -776,37 +836,38 @@ async def run_agent_stream(
Start an agent run and stream responses.
If user_id or tenant_id is provided, authorization will be overridden. (Useful in northbound apis)
"""
# Save user message only if not in debug mode (before streaming starts)
if not agent_request.is_debug:
save_messages(agent_request, target="user",
authorization=authorization)

# Choose streaming strategy based on user's memory switch
resolved_user_id, resolved_tenant_id, _ = _resolve_user_tenant_language(
resolved_user_id, resolved_tenant_id, language = _resolve_user_tenant_language(
authorization=authorization,
http_request=http_request,
user_id=user_id,
tenant_id=tenant_id,
)

# Save user message only if not in debug mode (before streaming starts)
if not agent_request.is_debug:
save_messages(agent_request, target="user",
user_id=resolved_user_id,
tenant_id=resolved_tenant_id)

memory_ctx_preview = build_memory_context(
resolved_user_id, resolved_tenant_id, agent_request.agent_id
)

if memory_ctx_preview.user_config.memory_switch:
if memory_ctx_preview.user_config.memory_switch and not agent_request.is_debug:
stream_gen = generate_stream_with_memory(
agent_request,
http_request,
authorization,
user_id=resolved_user_id,
tenant_id=resolved_tenant_id,
language=language,
)
else:
stream_gen = generate_stream_no_memory(
agent_request,
http_request,
authorization,
user_id=resolved_user_id,
tenant_id=resolved_tenant_id,
language=language,
)

return StreamingResponse(
Expand Down
Loading