Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 90 additions & 23 deletions apps/memos-local-plugin/core/pipeline/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ import type {
SkillInvokeCtx,
SubAgentCtx,
} from "../retrieval/types.js";
import type { CoreEvent } from "../../agent-contract/events.js";
import type { CoreEvent, CoreEventType } from "../../agent-contract/events.js";
import type { LogRecord } from "../../agent-contract/log-record.js";
import { memoryBuffer } from "../logger/index.js";
import { onBroadcastLog } from "../logger/transports/sse-broadcast.js";
Expand Down Expand Up @@ -105,9 +105,8 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle {
// Small ring buffer of the most-recent events. Late-connecting SSE
// subscribers (e.g. the viewer's Overview panel opened after an agent
// turn already fired) replay this buffer on connect so the "实时活动"
// card isn't empty by default. 100 rows is plenty — the viewer only
// renders the last dozen.
const RECENT_EVENTS_CAP = 100;
// dashboard isn't empty by default.
const RECENT_EVENTS_CAP = 160;
const recentEvents: CoreEvent[] = [];

const emitCore = (evt: CoreEvent): void => {
Expand Down Expand Up @@ -146,38 +145,106 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle {
// Hydrate the ring buffer with synthetic events derived from the
// most-recent rows on disk. Without this, every plugin restart
// produces an empty "实时活动" panel until the user happens to
// interact with the agent again — misleading, because the DB
// clearly has recent activity. We emit a small set of low-cost
// synthetic `episode.closed` + `trace.created` entries (no bus
// fan-out) just for the buffer, so SSE connects replay them to new
// clients. Seq numbers are monotone from 0 so the frontend's
// `key={evt.seq}` stays unique against live events that come later.
// interact with the agent again — misleading, because the DB clearly
// has recent activity. Include the same categories the overview
// dashboard renders (memory / experience / environment / skill /
// feedback), not just task/session lifecycle events.
try {
const hydrated: CoreEvent[] = [];
const pushSynthetic = (
type: CoreEventType,
ts: number | null | undefined,
correlationId: string | undefined,
payload: unknown,
): void => {
if (!Number.isFinite(ts)) return;
hydrated.push({
type,
ts: ts as number,
seq: 0,
correlationId,
payload,
});
};

const recentEpisodes = deps.repos.episodes.list({ limit: 20 });
let seq = 0;
for (const ep of recentEpisodes.reverse()) {
for (const ep of recentEpisodes) {
const ts = ep.endedAt ?? ep.startedAt;
if (!ts) continue;
const type = ep.status === "closed" ? "episode.closed" : "episode.opened";
pushSynthetic(type, ts, ep.id, {
episodeId: ep.id,
sessionId: ep.sessionId,
status: ep.status,
rTask: ep.rTask ?? null,
});
}

for (const tr of deps.repos.traces.list({ limit: 30 })) {
pushSynthetic("trace.created", tr.ts, tr.id, {
traceId: tr.id,
episodeId: tr.episodeId,
sessionId: tr.sessionId,
});
}

for (const policy of deps.repos.policies.list({ limit: 20 })) {
const ts = policy.updatedAt ?? policy.createdAt;
pushSynthetic("l2.revised", ts, policy.id, {
policyId: policy.id,
status: policy.status,
signature: policy.title,
});
}

for (const world of deps.repos.worldModel.list({ limit: 20 })) {
const ts = world.updatedAt ?? world.createdAt;
pushSynthetic("l3.revised", ts, world.id, {
worldModelId: world.id,
title: world.title,
status: world.status,
});
}

for (const skill of deps.repos.skills.list({ limit: 20 })) {
const ts = skill.updatedAt ?? skill.createdAt;
const type: CoreEventType =
skill.status === "archived" ? "skill.archived" : "skill.crystallized";
pushSynthetic(type, ts, skill.id, {
skillId: skill.id,
name: skill.name,
status: skill.status,
});
}

for (const fb of deps.repos.feedback.list({ limit: 20 })) {
pushSynthetic("feedback.classified", fb.ts, fb.id, {
feedbackId: fb.id,
episodeId: fb.episodeId,
traceId: fb.traceId,
tone: fb.polarity,
channel: fb.channel,
});
}

hydrated.sort((a, b) => a.ts - b.ts);
const keep = hydrated.slice(-RECENT_EVENTS_CAP);
const seqStart = -keep.length;
for (let i = 0; i < keep.length; i++) {
const evt = keep[i]!;
recentEvents.push({
type,
ts,
seq: seq++,
correlationId: ep.id,
payload: {
episodeId: ep.id,
sessionId: ep.sessionId,
status: ep.status,
rTask: ep.rTask ?? null,
},
...evt,
// Negative ids are reserved for replay-only synthetic rows, so
// live bridge events starting at seq=1 never collide in the UI.
seq: seqStart + i,
});
}
if (recentEvents.length > RECENT_EVENTS_CAP) {
recentEvents.splice(0, recentEvents.length - RECENT_EVENTS_CAP);
}
log.debug("events.ring.hydrated", {
count: recentEvents.length,
source: "episodes",
source: "storage",
});
} catch (err) {
log.debug("events.ring.hydrate_failed", {
Expand Down
159 changes: 153 additions & 6 deletions apps/memos-local-plugin/web/src/stores/i18n.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ const en = {
"common.deselect": "Deselect",
"common.bulkDelete": "Delete selected",
"common.bulkDelete.confirm": "Delete {n} selected items? This cannot be undone.",
// Relative-time labels — used by the activity dashboard and any
// future surface that wants "5 s ago" / "3 min ago" formatting.
"common.justNow": "just now",
"common.secondsAgo": "{n} s ago",
"common.minutesAgo": "{n} min ago",
"common.hoursAgo": "{n} h ago",
"common.daysAgo": "{n} d ago",
"pager.page": "Page {n}",
"pager.pageOfAtLeast": "Page {n} / {total}+",
"pager.pageOfTotal": "Page {n} / {total}",
Expand Down Expand Up @@ -216,10 +223,84 @@ const en = {
"Primary provider unavailable, host LLM is handling the call. Original error: {msg}",
"overview.metric.policies.breakdown": "{active} active · {candidate} candidate",
"overview.metric.skills.breakdown": "{active} active · {candidate} candidate",
// Live activity dashboard — the third row of the overview page.
// Shows six per-category tiles (memory / experience / environment
// knowledge / skill / retrieval / feedback) with a five-minute
// sparkline and the most recent event in plain language.
"overview.live.title": "Live activity",
"overview.live.subtitle": "Most recent events emitted by the algorithm core",
"overview.live.empty": "No events yet",
"overview.live.hint": "Events will appear here as sessions advance.",
"overview.live.tile.count": "events in last 5 min",
"overview.live.tile.empty": "No events in last 5 min",

// Tile labels (also used by the per-event "category pill"). Keep
// these in sync with overview.metric.* / nav.* labels — same noun,
// different surface.
"overview.live.cat.session": "Conversation",
"overview.live.cat.task": "Task",
"overview.live.cat.memory": "Memory",
"overview.live.cat.experience": "Experience",
"overview.live.cat.world": "Environment knowledge",
"overview.live.cat.skill": "Skill",
"overview.live.cat.retrieval": "Retrieval",
"overview.live.cat.feedback": "Feedback",
"overview.live.cat.system": "System",
"overview.live.cat.hub": "Hub",

// Per-event titles. Telegraphic noun-verb compounds (matching the
// operational-log style the rest of the product uses), one per
// CoreEventType. Detail text — IDs, counts, milliseconds — is
// formatted in TS and concatenated to the title at render time.
"overview.live.event.session.opened": "Session opened",
"overview.live.event.session.closed": "Session ended",
"overview.live.event.episode.opened": "Task started",
"overview.live.event.episode.closed": "Task ended",
"overview.live.event.trace.created": "Memory stored",
"overview.live.event.trace.value_updated": "Memory updated",
"overview.live.event.trace.priority_decayed": "Memory decayed",
"overview.live.event.l2.candidate_added": "Experience candidate",
"overview.live.event.l2.candidate_expired": "Experience candidate expired",
"overview.live.event.l2.induced": "Experience generated",
"overview.live.event.l2.associated": "Experience associated",
"overview.live.event.l2.revised": "Experience revised",
"overview.live.event.l2.boundary_shrunk": "Experience boundary shrunk",
"overview.live.event.l3.abstracted": "Environment knowledge generated",
"overview.live.event.l3.revised": "Environment knowledge updated",
"overview.live.event.skill.crystallized": "Skill crystallised",
"overview.live.event.skill.eta_updated": "Skill ETA updated",
"overview.live.event.skill.boundary_updated": "Skill boundary updated",
"overview.live.event.skill.archived": "Skill archived",
"overview.live.event.skill.repaired": "Skill repaired",
"overview.live.event.retrieval.triggered": "Retrieval triggered",
"overview.live.event.retrieval.tier1.hit": "Tier 1 retrieval hit",
"overview.live.event.retrieval.tier2.hit": "Tier 2 retrieval hit",
"overview.live.event.retrieval.tier3.hit": "Tier 3 retrieval hit",
"overview.live.event.retrieval.empty": "Retrieval empty",
"overview.live.event.feedback.received": "Feedback received",
"overview.live.event.feedback.classified": "Feedback classified",
"overview.live.event.reward.computed": "Reward computed",
"overview.live.event.decision_repair.generated": "Decision repair generated",
"overview.live.event.decision_repair.validated": "Decision repair validated",
"overview.live.event.hub.client_connected": "Hub client connected",
"overview.live.event.hub.client_disconnected": "Hub client disconnected",
"overview.live.event.hub.share_published": "Hub share published",
"overview.live.event.hub.share_received": "Hub share received",
"overview.live.event.system.started": "System started",
"overview.live.event.system.shutdown": "System shutdown",
"overview.live.event.system.error": "System error",
"overview.live.event.system.config_changed": "Config changed",
"overview.live.event.system.update_available": "Update available",

// Detail templates. Many events share patterns (label + id, count +
// latency, …) so the same template is reused across multiple types.
"overview.live.detail.id": "{label} {id}",
"overview.live.detail.idReason": "{label} {id} · {reason}",
"overview.live.detail.candidate": "Candidate {sig}",
"overview.live.detail.induced": "{sig} · from {n} successful tasks",
"overview.live.detail.similarity": "{label} {id} · similarity {pct}%",
"overview.live.detail.retrievalHit": "{count} hits · {ms}ms",
"overview.live.detail.feedbackTone": "Tone: {tone}",
"overview.live.detail.reward": "r = {r} · from {source}",
"overview.live.detail.version": "v{version}",
"overview.live.detail.raw": "{value}",

// Host bridge status.
"bridge.connected": "Memory bridge connected",
Expand Down Expand Up @@ -848,6 +929,11 @@ const zh: Record<TranslationKey, string> = {
"common.deselect": "取消选择",
"common.bulkDelete": "批量删除",
"common.bulkDelete.confirm": "确认删除 {n} 项?此操作不可撤销。",
"common.justNow": "刚刚",
"common.secondsAgo": "{n} 秒前",
"common.minutesAgo": "{n} 分钟前",
"common.hoursAgo": "{n} 小时前",
"common.daysAgo": "{n} 天前",
"pager.page": "第 {n} 页",
"pager.pageOfAtLeast": "第 {n} 页 / 共 {total}+ 页",
"pager.pageOfTotal": "第 {n} 页 / 共 {total} 页",
Expand Down Expand Up @@ -964,9 +1050,70 @@ const zh: Record<TranslationKey, string> = {
"overview.metric.policies.breakdown": "{active} 已启用 · {candidate} 候选",
"overview.metric.skills.breakdown": "{active} 已启用 · {candidate} 候选",
"overview.live.title": "实时活动",
"overview.live.subtitle": "算法核心发出的最近事件",
"overview.live.empty": "暂无事件",
"overview.live.hint": "Agent 交互后事件会显示在这里。",
"overview.live.tile.count": "最近 5 分钟事件",
"overview.live.tile.empty": "最近 5 分钟无事件",

"overview.live.cat.session": "对话",
"overview.live.cat.task": "任务",
"overview.live.cat.memory": "记忆",
"overview.live.cat.experience": "经验",
"overview.live.cat.world": "环境认知",
"overview.live.cat.skill": "技能",
"overview.live.cat.retrieval": "检索",
"overview.live.cat.feedback": "反馈",
"overview.live.cat.system": "系统",
"overview.live.cat.hub": "Hub",

"overview.live.event.session.opened": "对话开启",
"overview.live.event.session.closed": "对话结束",
"overview.live.event.episode.opened": "任务开始",
"overview.live.event.episode.closed": "任务结束",
"overview.live.event.trace.created": "记忆存储",
"overview.live.event.trace.value_updated": "记忆更新",
"overview.live.event.trace.priority_decayed": "记忆衰减",
"overview.live.event.l2.candidate_added": "候选经验新增",
"overview.live.event.l2.candidate_expired": "候选经验过期",
"overview.live.event.l2.induced": "经验生成",
"overview.live.event.l2.associated": "经验关联",
"overview.live.event.l2.revised": "经验修订",
"overview.live.event.l2.boundary_shrunk": "经验边界收紧",
"overview.live.event.l3.abstracted": "环境认知生成",
"overview.live.event.l3.revised": "环境认知更新",
"overview.live.event.skill.crystallized": "技能晶化",
"overview.live.event.skill.eta_updated": "技能预期更新",
"overview.live.event.skill.boundary_updated": "技能边界更新",
"overview.live.event.skill.archived": "技能归档",
"overview.live.event.skill.repaired": "技能修复",
"overview.live.event.retrieval.triggered": "检索触发",
"overview.live.event.retrieval.tier1.hit": "第一层检索命中",
"overview.live.event.retrieval.tier2.hit": "第二层检索命中",
"overview.live.event.retrieval.tier3.hit": "第三层检索命中",
"overview.live.event.retrieval.empty": "检索无结果",
"overview.live.event.feedback.received": "收到反馈",
"overview.live.event.feedback.classified": "反馈分类",
"overview.live.event.reward.computed": "奖励计算",
"overview.live.event.decision_repair.generated": "决策修补",
"overview.live.event.decision_repair.validated": "决策修补已校验",
"overview.live.event.hub.client_connected": "Hub 客户端连接",
"overview.live.event.hub.client_disconnected": "Hub 客户端断开",
"overview.live.event.hub.share_published": "Hub 分享发布",
"overview.live.event.hub.share_received": "Hub 收到分享",
"overview.live.event.system.started": "系统启动",
"overview.live.event.system.shutdown": "系统关闭",
"overview.live.event.system.error": "系统异常",
"overview.live.event.system.config_changed": "配置变更",
"overview.live.event.system.update_available": "可用更新",

"overview.live.detail.id": "{label} {id}",
"overview.live.detail.idReason": "{label} {id} · {reason}",
"overview.live.detail.candidate": "候选 {sig}",
"overview.live.detail.induced": "{sig} · 来自 {n} 次成功任务",
"overview.live.detail.similarity": "{label} {id} · 相似度 {pct}%",
"overview.live.detail.retrievalHit": "命中 {count} 条 · {ms}ms",
"overview.live.detail.feedbackTone": "情绪 {tone}",
"overview.live.detail.reward": "r = {r} · 来自 {source}",
"overview.live.detail.version": "v{version}",
"overview.live.detail.raw": "{value}",

"bridge.connected": "记忆通道已开启",
"bridge.reconnecting": "记忆通道已开启",
Expand Down
Loading