diff --git a/src/tri/tri_cloud.zig b/src/tri/tri_cloud.zig index 1a268ad060..b23f336131 100644 --- a/src/tri/tri_cloud.zig +++ b/src/tri/tri_cloud.zig @@ -564,7 +564,7 @@ fn cloudCleanup(allocator: Allocator) !void { print("{s}✓ Cleaned {d} inactive agent(s){s}\n", .{ GREEN, cleaned, RESET }); } -/// tri cloud history [issue] — Show event history from JSONL +/// tri cloud history [issue] [--format=json] — Show event history from JSONL fn cloudHistory(_: Allocator, args: []const []const u8) !void { const events_path = ".trinity/cloud_events.jsonl"; const file = std.fs.cwd().openFile(events_path, .{}) catch { @@ -573,22 +573,70 @@ fn cloudHistory(_: Allocator, args: []const []const u8) !void { }; defer file.close(); - // Optional issue filter + // Parse args: [issue] [--format=json] var filter_issue: ?u32 = null; - if (args.len >= 1) { - filter_issue = std.fmt.parseInt(u32, args[0], 10) catch null; + var json_output: bool = false; + for (args) |arg| { + if (eql(u8, arg, "--format=json")) { + json_output = true; + } else if (eql(u8, arg, "--format=human")) { + json_output = false; + } else if (filter_issue == null) { + // Try to parse as issue number + filter_issue = std.fmt.parseInt(u32, arg, 10) catch null; + } } - print("\n{s}{s}", .{ GOLDEN, BOLD }); - print("═══════════════════════════════════════════════════\n", .{}); - print(" CLOUD EVENTS — History\n", .{}); - print("═══════════════════════════════════════════════════{s}\n", .{RESET}); - // Read entire file (cloud events are small) var buf: [32768]u8 = undefined; const len = file.readAll(&buf) catch 0; const content = buf[0..len]; + if (json_output) { + // JSON output for machine consumption + var json_buf: [65536]u8 = undefined; + var fbs = std.io.fixedBufferStream(&json_buf); + const w = fbs.writer(); + + w.writeAll("{\"events\":[") catch return; + + var first = true; + var count: u32 = 0; + var offset: usize = 0; + + while (offset < content.len) { + const line_end = std.mem.indexOfPos(u8, content, offset, "\n") orelse content.len; + const line = content[offset..line_end]; + offset = line_end + 1; + + if (line.len == 0) continue; + + // Filter by issue if specified + if (filter_issue) |fi| { + var needle_buf: [32]u8 = undefined; + const needle = std.fmt.bufPrint(&needle_buf, "\"issue\":{d}", .{fi}) catch continue; + if (std.mem.indexOf(u8, line, needle) == null) continue; + } + + if (!first) w.writeAll(",") catch {}; + first = false; + w.writeAll(line) catch break; + count += 1; + } + + w.writeAll("],\"count\":") catch {}; + std.fmt.format(w, "{d}}}", .{count}) catch {}; + + print("{s}\n", .{fbs.getWritten()}); + return; + } + + // Human-readable output + print("\n{s}{s}", .{ GOLDEN, BOLD }); + print("═══════════════════════════════════════════════════\n", .{}); + print(" CLOUD EVENTS — History\n", .{}); + print("═══════════════════════════════════════════════════{s}\n", .{RESET}); + var count: u32 = 0; var offset: usize = 0; diff --git a/tools/mcp/trinity_mcp/cloud_monitor.zig b/tools/mcp/trinity_mcp/cloud_monitor.zig index 7188ea60de..30e50062bc 100644 --- a/tools/mcp/trinity_mcp/cloud_monitor.zig +++ b/tools/mcp/trinity_mcp/cloud_monitor.zig @@ -58,6 +58,10 @@ pub const AgentStatus = struct { var agent_statuses: [MAX_AGENTS]AgentStatus = undefined; var status_count: usize = 0; +// Track last event time per issue for deduplication (5s window) +var last_event_times: [MAX_AGENTS][2]u32 = [_][2]u32{.{ 0, 0 }} ** MAX_AGENTS; // [issue_index][0]=issue, [1]=timestamp +var last_event_count: usize = 0; + // ═══════════════════════════════════════════════════════════════════════════════ // PUBLIC API // ═══════════════════════════════════════════════════════════════════════════════ @@ -69,6 +73,11 @@ pub fn runMonitor(port: u16) !void { std.log.info("Cloud Monitor starting on port {d}", .{actual_port}); + // Restore state from JSONL on startup + restoreStateFromEvents() catch |err| { + std.log.warn("Failed to restore state from events: {}", .{err}); + }; + const address = net.Address.parseIp4("0.0.0.0", actual_port) catch return; var server = address.listen(.{ .reuse_address = true }) catch |err| { std.log.err("Cloud Monitor failed to bind: {}", .{err}); @@ -90,7 +99,13 @@ pub fn runMonitor(port: u16) !void { /// Update agent status (called from HTTP POST handler). /// Persists to JSONL, alerts on error states via tri notify. +/// Skips duplicate events within 5s window. pub fn updateStatus(issue: u32, status_str: []const u8, detail: []const u8) void { + // Deduplication: skip if same status within 5s + if (shouldSkipEvent(issue, status_str)) { + return; + } + // 1. Append to JSONL event log appendEvent(issue, status_str, detail); @@ -197,6 +212,130 @@ pub fn getStatusJson(buf: []u8) []const u8 { return fbs.getWritten(); } +/// Restore agent states from JSONL event log on startup. +/// Reads the last event for each issue and populates agent_statuses. +fn restoreStateFromEvents() !void { + const file = std.fs.cwd().openFile(EVENTS_FILE, .{}) catch return; + defer file.close(); + + // Read entire file + var file_buf: [65536]u8 = undefined; + const file_len = file.readAll(&file_buf) catch 0; + if (file_len == 0) return; + const content = file_buf[0..file_len]; + + // Track latest status per issue + var latest_issue: [MAX_AGENTS]u32 = undefined; + var latest_ts: [MAX_AGENTS]i64 = undefined; + var latest_status: [MAX_AGENTS][32]u8 = undefined; + var latest_status_len: [MAX_AGENTS]usize = undefined; + var latest_detail: [MAX_AGENTS][256]u8 = undefined; + var latest_detail_len: [MAX_AGENTS]usize = undefined; + var latest_count: usize = 0; + + var offset: usize = 0; + while (offset < content.len) { + const line_end = std.mem.indexOfPos(u8, content, offset, "\n") orelse content.len; + const line = content[offset..line_end]; + offset = line_end + 1; + if (line.len == 0) continue; + + // Parse issue + const issue_idx = std.mem.indexOf(u8, line, "\"issue\":") orelse continue; + const istart = issue_idx + 8; + var iend = istart; + while (iend < line.len and line[iend] >= '0' and line[iend] <= '9') : (iend += 1) {} + const issue = std.fmt.parseInt(u32, line[istart..iend], 10) catch continue; + + // Parse timestamp + const ts_idx = std.mem.indexOf(u8, line, "\"ts\":") orelse continue; + const tstart = ts_idx + 5; + var tend = tstart; + while (tend < line.len and line[tend] >= '0' and line[tend] <= '9') : (tend += 1) {} + const ts = std.fmt.parseInt(i64, line[tstart..tend], 10) catch continue; + + // Parse status and detail + const status_str = extractJsonString(line, "status") orelse continue; + const detail_str = extractJsonString(line, "detail") orelse ""; + + // Find or create entry for this issue (keep latest timestamp) + var entry_idx: ?usize = null; + for (0..latest_count) |i| { + if (latest_issue[i] == issue) { + if (ts > latest_ts[i]) { + entry_idx = i; + } + break; + } + } + + if (entry_idx == null and latest_count < MAX_AGENTS) { + entry_idx = latest_count; + latest_issue[latest_count] = issue; + latest_count += 1; + } + + if (entry_idx) |idx| { + latest_ts[idx] = ts; + latest_status_len[idx] = @min(status_str.len, 32); + @memcpy(latest_status[idx][0..latest_status_len[idx]], status_str[0..latest_status_len[idx]]); + latest_detail_len[idx] = @min(detail_str.len, 256); + @memcpy(latest_detail[idx][0..latest_detail_len[idx]], detail_str[0..latest_detail_len[idx]]); + } + } + + // Populate agent_statuses from latest events + status_count = 0; + for (0..latest_count) |i| { + if (status_count >= MAX_AGENTS) break; + const entry = &agent_statuses[status_count]; + entry.issue = latest_issue[i]; + entry.status_len = latest_status_len[i]; + @memcpy(entry.status[0..entry.status_len], latest_status[i][0..entry.status_len]); + entry.detail_len = latest_detail_len[i]; + @memcpy(entry.detail[0..entry.detail_len], latest_detail[i][0..entry.detail_len]); + entry.last_heartbeat = latest_ts[i]; + status_count += 1; + } + + std.log.info("Restored {d} agent states from events log", .{status_count}); +} + +/// Check if we should skip this event (deduplication: same status within 5 seconds) +fn shouldSkipEvent(issue: u32, status_str: []const u8) bool { + const now = std.time.timestamp(); + const dedup_window: i64 = 5; // 5 seconds + + for (0..last_event_count) |i| { + if (last_event_times[i][0] == issue) { + const last_ts = @as(i64, @intCast(last_event_times[i][1])); + if (now - last_ts < dedup_window) { + // Check if status is the same + for (0..status_count) |j| { + if (agent_statuses[j].issue == issue) { + const last_status = agent_statuses[j].getStatus(); + if (std.mem.eql(u8, last_status, status_str)) { + return true; // Skip: same status within 5s window + } + break; + } + } + } + // Update timestamp + last_event_times[i][1] = @as(u32, @intCast(now)); + return false; + } + } + + // New issue, add to tracking + if (last_event_count < MAX_AGENTS) { + last_event_times[last_event_count][0] = issue; + last_event_times[last_event_count][1] = @as(u32, @intCast(now)); + last_event_count += 1; + } + return false; +} + // ═══════════════════════════════════════════════════════════════════════════════ // INTERNAL // ═══════════════════════════════════════════════════════════════════════════════ @@ -209,15 +348,17 @@ fn appendEvent(issue: u32, status_str: []const u8, detail: []const u8) void { // Seek to end for append file.seekFromEnd(0) catch return; - const w = file.writer(); + // Format JSON line to buffer, then write + var buf: [512]u8 = undefined; const ts = std.time.timestamp(); - std.fmt.format(w, "{{\"ts\":{d},\"issue\":{d},\"status\":\"{s}\",\"detail\":\"{s}\"}}\n", .{ + const line = std.fmt.bufPrint(&buf, "{{\"ts\":{d},\"issue\":{d},\"status\":\"{s}\",\"detail\":\"{s}\"}}\n", .{ ts, issue, status_str, detail, }) catch return; + _ = file.writeAll(line) catch return; } fn sendTriNotify(issue: u32, status_str: []const u8, detail: []const u8) void {