From f024c77c630153f005ef16842ff84028d884b1f5 Mon Sep 17 00:00:00 2001 From: LydiaCai1203 <1125862926@qq.com> Date: Thu, 9 Apr 2026 18:19:50 +0800 Subject: [PATCH] feat: long task summary --- backend/biz/task/service/tasksummary.go | 138 ++++++++++++++++-------- backend/biz/task/service/types.go | 1 + backend/config/config.go | 1 + 3 files changed, 96 insertions(+), 44 deletions(-) diff --git a/backend/biz/task/service/tasksummary.go b/backend/biz/task/service/tasksummary.go index 0d744d62..2f3eacbd 100644 --- a/backend/biz/task/service/tasksummary.go +++ b/backend/biz/task/service/tasksummary.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "log/slog" + "slices" "strings" "sync" "time" @@ -261,9 +262,14 @@ func (s *TaskSummaryService) handleJob(ctx context.Context, job *delayqueue.Job[ return nil } -// fetchConversation 从 Loki 获取历史对话,返回消息数组 +// fetchConversation 从 Loki 获取历史对话,只保留最近 N 轮对话(user-input / reply-question 及其对应的 agent 回复)。 +// 使用倒序查询,从最新的日志往前查,收集到足够轮用户消息后立即停止,避免遍历全部历史。 func (s *TaskSummaryService) fetchConversation(ctx context.Context, taskID string, createdAt time.Time) ([]llm.Message, error) { - var messages []llm.Message + maxRounds := s.cfg.TaskSummary.MaxRounds + if maxRounds <= 0 { + maxRounds = 3 + } + const pageSize = 200 taskUUID, err := uuid.Parse(taskID) if err != nil { @@ -274,64 +280,107 @@ func (s *TaskSummaryService) fetchConversation(ctx context.Context, taskID strin if err != nil { return nil, fmt.Errorf("failed to get task: %w", err) } - messages = append(messages, llm.Message{Role: "user", Content: t.Content}) - agentMsg := []string{} - _, err = s.loki.History(ctx, taskID, createdAt, func(entries []loki.LogEntry) { + // 从后往前分页查 Loki,收集到足够的用户轮次后停止 + start := createdAt + end := time.Now() + var tailEntries []loki.LogEntry + userRoundCount := 0 + + for { + entries, err := s.loki.QueryByTaskID(ctx, taskID, start, end, pageSize, "backward") + if err != nil { + return nil, fmt.Errorf("failed to fetch loki history: %w", err) + } + + done := false for _, entry := range entries { + tailEntries = append(tailEntries, entry) + if entry.Line == "" { continue } - - s.logger.DebugContext(ctx, "loki entry", "entry", entry.Line) - var lokiEnt lokiEntry if err := json.Unmarshal([]byte(entry.Line), &lokiEnt); err != nil { - s.logger.ErrorContext(ctx, "failed to unmarshal loki entry", "task_id", taskID, "error", err) continue } - - if lokiEnt.Data == "" { - continue + if lokiEnt.Event == "user-input" || lokiEnt.Event == "reply-question" { + userRoundCount++ + if userRoundCount >= maxRounds { + done = true + break + } } + } - decoded, err := base64.StdEncoding.DecodeString(lokiEnt.Data) - if err != nil { - s.logger.ErrorContext(ctx, "failed to decode base64 data", "task_id", taskID, "error", err) - continue - } + if done || len(entries) < pageSize { + break + } + // 向更早的时间翻页 + end = entries[len(entries)-1].Timestamp.Add(-time.Nanosecond) + } - switch lokiEnt.Event { - case "user-input", "reply-question": - var userInputText string - var ur userReply - if err := json.Unmarshal(decoded, &ur); err != nil { - userInputText = string(decoded) - } else { - userInputText = ur.AnswersJSON - } + // 反转为时间正序 + slices.Reverse(tailEntries) - if len(agentMsg) > 0 { - agentContent := strings.Join(agentMsg, "") - messages = append(messages, llm.Message{Role: "assistant", Content: agentContent}) - agentMsg = []string{} - } + // 按正序解析为 messages + var messages []llm.Message + // 如果 Loki 中用户轮次不足 3 轮,补上初始任务内容 + if userRoundCount < maxRounds { + messages = append(messages, llm.Message{Role: "user", Content: t.Content}) + } + + agentMsg := []string{} + for _, entry := range tailEntries { + if entry.Line == "" { + continue + } - messages = append(messages, llm.Message{Role: "user", Content: userInputText}) + s.logger.DebugContext(ctx, "loki entry", "entry", entry.Line) - case "task-running": - var taskMsg wsData - if err := json.Unmarshal(decoded, &taskMsg); err != nil { - continue - } - if taskMsg.Update.SessionUpdate == "agent_message_chunk" { - agentMsg = append(agentMsg, taskMsg.Update.Content.Text) - } + var lokiEnt lokiEntry + if err := json.Unmarshal([]byte(entry.Line), &lokiEnt); err != nil { + s.logger.ErrorContext(ctx, "failed to unmarshal loki entry", "task_id", taskID, "error", err) + continue + } + + if lokiEnt.Data == "" { + continue + } + + decoded, err := base64.StdEncoding.DecodeString(lokiEnt.Data) + if err != nil { + s.logger.ErrorContext(ctx, "failed to decode base64 data", "task_id", taskID, "error", err) + continue + } + + switch lokiEnt.Event { + case "user-input", "reply-question": + var userInputText string + var ur userReply + if err := json.Unmarshal(decoded, &ur); err != nil { + userInputText = string(decoded) + } else { + userInputText = ur.AnswersJSON + } + + if len(agentMsg) > 0 { + agentContent := strings.Join(agentMsg, "") + messages = append(messages, llm.Message{Role: "assistant", Content: agentContent}) + agentMsg = []string{} + } + + messages = append(messages, llm.Message{Role: "user", Content: userInputText}) + + case "task-running": + var taskMsg wsData + if err := json.Unmarshal(decoded, &taskMsg); err != nil { + continue + } + if taskMsg.Update.SessionUpdate == "agent_message_chunk" { + agentMsg = append(agentMsg, taskMsg.Update.Content.Text) } } - }) - if err != nil { - return nil, fmt.Errorf("failed to fetch loki history: %w", err) } if len(messages) == 0 { @@ -343,7 +392,7 @@ func (s *TaskSummaryService) fetchConversation(ctx context.Context, taskID strin messages = append(messages, llm.Message{Role: "assistant", Content: agentContent}) } - s.logger.DebugContext(ctx, "conversation", "messages_count", messages) + s.logger.DebugContext(ctx, "conversation", "messages_count", len(messages), "messages", messages) return messages, nil } @@ -367,6 +416,7 @@ func (s *TaskSummaryService) generateSummary(ctx context.Context, conversation [ - 如果是开发任务:说明做的是什么应用/功能(如"开发五子棋游戏") - 如果是问问题:说明问的是什么问题(如"React Hooks 如何管理状态") - 如果是修 bug:说明修的是什么问题(如"修复用户登录失败问题") +- 中英文之间要加空格(如"修复 React 组件的 bug"而不是"修复React组件的bug") - 如果对话无实质内容,就用最近一条用户输入作为标题`, maxChars) messages := []llm.Message{ diff --git a/backend/biz/task/service/types.go b/backend/biz/task/service/types.go index 7b2de005..9118d267 100644 --- a/backend/biz/task/service/types.go +++ b/backend/biz/task/service/types.go @@ -35,3 +35,4 @@ type wsContent struct { Content string `json:"content"` Message string `json:"message"` } + diff --git a/backend/config/config.go b/backend/config/config.go index 2cee6ef5..4c6f1fee 100644 --- a/backend/config/config.go +++ b/backend/config/config.go @@ -110,6 +110,7 @@ type TaskSummary struct { InterfaceType string `mapstructure:"interface_type"` // API 接口类型(openai_chat/openai_responses/anthropic) Delay int `mapstructure:"delay"` // 延迟时间(秒),默认 3600 MaxChars int `mapstructure:"max_chars"` // 摘要最大字符数,默认 300 + MaxRounds int `mapstructure:"max_rounds"` // 最近对话轮数,默认 3 MaxWorkers int `mapstructure:"max_workers"` // 最大消费者数量,默认 5 }