Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 94 additions & 44 deletions backend/biz/task/service/tasksummary.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"log/slog"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -261,9 +262,14 @@ func (s *TaskSummaryService) handleJob(ctx context.Context, job *delayqueue.Job[
return nil
}

// fetchConversation 从 Loki 获取历史对话,返回消息数组
// fetchConversation 从 Loki 获取历史对话,只保留最近 N 轮对话(user-input / reply-question 及其对应的 agent 回复)。
// 使用倒序查询,从最新的日志往前查,收集到足够轮用户消息后立即停止,避免遍历全部历史。
func (s *TaskSummaryService) fetchConversation(ctx context.Context, taskID string, createdAt time.Time) ([]llm.Message, error) {
var messages []llm.Message
maxRounds := s.cfg.TaskSummary.MaxRounds
if maxRounds <= 0 {
maxRounds = 3
}
const pageSize = 200

taskUUID, err := uuid.Parse(taskID)
if err != nil {
Expand All @@ -274,64 +280,107 @@ func (s *TaskSummaryService) fetchConversation(ctx context.Context, taskID strin
if err != nil {
return nil, fmt.Errorf("failed to get task: %w", err)
}
messages = append(messages, llm.Message{Role: "user", Content: t.Content})

agentMsg := []string{}
_, err = s.loki.History(ctx, taskID, createdAt, func(entries []loki.LogEntry) {
// 从后往前分页查 Loki,收集到足够的用户轮次后停止
start := createdAt
end := time.Now()
var tailEntries []loki.LogEntry
userRoundCount := 0

for {
entries, err := s.loki.QueryByTaskID(ctx, taskID, start, end, pageSize, "backward")
if err != nil {
return nil, fmt.Errorf("failed to fetch loki history: %w", err)
}

done := false
for _, entry := range entries {
tailEntries = append(tailEntries, entry)

if entry.Line == "" {
continue
}

s.logger.DebugContext(ctx, "loki entry", "entry", entry.Line)

var lokiEnt lokiEntry
if err := json.Unmarshal([]byte(entry.Line), &lokiEnt); err != nil {
s.logger.ErrorContext(ctx, "failed to unmarshal loki entry", "task_id", taskID, "error", err)
continue
}

if lokiEnt.Data == "" {
continue
if lokiEnt.Event == "user-input" || lokiEnt.Event == "reply-question" {
userRoundCount++
if userRoundCount >= maxRounds {
done = true
break
}
}
}

decoded, err := base64.StdEncoding.DecodeString(lokiEnt.Data)
if err != nil {
s.logger.ErrorContext(ctx, "failed to decode base64 data", "task_id", taskID, "error", err)
continue
}
if done || len(entries) < pageSize {
break
}
// 向更早的时间翻页
end = entries[len(entries)-1].Timestamp.Add(-time.Nanosecond)
}

switch lokiEnt.Event {
case "user-input", "reply-question":
var userInputText string
var ur userReply
if err := json.Unmarshal(decoded, &ur); err != nil {
userInputText = string(decoded)
} else {
userInputText = ur.AnswersJSON
}
// 反转为时间正序
slices.Reverse(tailEntries)

if len(agentMsg) > 0 {
agentContent := strings.Join(agentMsg, "")
messages = append(messages, llm.Message{Role: "assistant", Content: agentContent})
agentMsg = []string{}
}
// 按正序解析为 messages
var messages []llm.Message
// 如果 Loki 中用户轮次不足 3 轮,补上初始任务内容
if userRoundCount < maxRounds {
messages = append(messages, llm.Message{Role: "user", Content: t.Content})
}

agentMsg := []string{}
for _, entry := range tailEntries {
if entry.Line == "" {
continue
}

messages = append(messages, llm.Message{Role: "user", Content: userInputText})
s.logger.DebugContext(ctx, "loki entry", "entry", entry.Line)

case "task-running":
var taskMsg wsData
if err := json.Unmarshal(decoded, &taskMsg); err != nil {
continue
}
if taskMsg.Update.SessionUpdate == "agent_message_chunk" {
agentMsg = append(agentMsg, taskMsg.Update.Content.Text)
}
var lokiEnt lokiEntry
if err := json.Unmarshal([]byte(entry.Line), &lokiEnt); err != nil {
s.logger.ErrorContext(ctx, "failed to unmarshal loki entry", "task_id", taskID, "error", err)
continue
}

if lokiEnt.Data == "" {
continue
}

decoded, err := base64.StdEncoding.DecodeString(lokiEnt.Data)
if err != nil {
s.logger.ErrorContext(ctx, "failed to decode base64 data", "task_id", taskID, "error", err)
continue
}

switch lokiEnt.Event {
case "user-input", "reply-question":
var userInputText string
var ur userReply
if err := json.Unmarshal(decoded, &ur); err != nil {
userInputText = string(decoded)
} else {
userInputText = ur.AnswersJSON
}

if len(agentMsg) > 0 {
agentContent := strings.Join(agentMsg, "")
messages = append(messages, llm.Message{Role: "assistant", Content: agentContent})
agentMsg = []string{}
}

messages = append(messages, llm.Message{Role: "user", Content: userInputText})

case "task-running":
var taskMsg wsData
if err := json.Unmarshal(decoded, &taskMsg); err != nil {
continue
}
if taskMsg.Update.SessionUpdate == "agent_message_chunk" {
agentMsg = append(agentMsg, taskMsg.Update.Content.Text)
}
}
})
if err != nil {
return nil, fmt.Errorf("failed to fetch loki history: %w", err)
}

if len(messages) == 0 {
Expand All @@ -343,7 +392,7 @@ func (s *TaskSummaryService) fetchConversation(ctx context.Context, taskID strin
messages = append(messages, llm.Message{Role: "assistant", Content: agentContent})
}

s.logger.DebugContext(ctx, "conversation", "messages_count", messages)
s.logger.DebugContext(ctx, "conversation", "messages_count", len(messages), "messages", messages)
return messages, nil
}

Expand All @@ -367,6 +416,7 @@ func (s *TaskSummaryService) generateSummary(ctx context.Context, conversation [
- 如果是开发任务:说明做的是什么应用/功能(如"开发五子棋游戏")
- 如果是问问题:说明问的是什么问题(如"React Hooks 如何管理状态")
- 如果是修 bug:说明修的是什么问题(如"修复用户登录失败问题")
- 中英文之间要加空格(如"修复 React 组件的 bug"而不是"修复React组件的bug")
- 如果对话无实质内容,就用最近一条用户输入作为标题`, maxChars)

messages := []llm.Message{
Expand Down
1 change: 1 addition & 0 deletions backend/biz/task/service/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ type wsContent struct {
Content string `json:"content"`
Message string `json:"message"`
}

1 change: 1 addition & 0 deletions backend/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type TaskSummary struct {
InterfaceType string `mapstructure:"interface_type"` // API 接口类型(openai_chat/openai_responses/anthropic)
Delay int `mapstructure:"delay"` // 延迟时间(秒),默认 3600
MaxChars int `mapstructure:"max_chars"` // 摘要最大字符数,默认 300
MaxRounds int `mapstructure:"max_rounds"` // 最近对话轮数,默认 3
MaxWorkers int `mapstructure:"max_workers"` // 最大消费者数量,默认 5
}

Expand Down