Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 57 additions & 9 deletions src/tri/tri_cloud.zig
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ fn cloudCleanup(allocator: Allocator) !void {
print("{s}✓ Cleaned {d} inactive agent(s){s}\n", .{ GREEN, cleaned, RESET });
}

/// tri cloud history [issue] — Show event history from JSONL
/// tri cloud history [issue] [--format=json] — Show event history from JSONL
fn cloudHistory(_: Allocator, args: []const []const u8) !void {
const events_path = ".trinity/cloud_events.jsonl";
const file = std.fs.cwd().openFile(events_path, .{}) catch {
Expand All @@ -573,22 +573,70 @@ fn cloudHistory(_: Allocator, args: []const []const u8) !void {
};
defer file.close();

// Optional issue filter
// Parse args: [issue] [--format=json]
var filter_issue: ?u32 = null;
if (args.len >= 1) {
filter_issue = std.fmt.parseInt(u32, args[0], 10) catch null;
var json_output: bool = false;
for (args) |arg| {
if (eql(u8, arg, "--format=json")) {
json_output = true;
} else if (eql(u8, arg, "--format=human")) {
json_output = false;
} else if (filter_issue == null) {
// Try to parse as issue number
filter_issue = std.fmt.parseInt(u32, arg, 10) catch null;
}
}

print("\n{s}{s}", .{ GOLDEN, BOLD });
print("═══════════════════════════════════════════════════\n", .{});
print(" CLOUD EVENTS — History\n", .{});
print("═══════════════════════════════════════════════════{s}\n", .{RESET});

// Read entire file (cloud events are small)
var buf: [32768]u8 = undefined;
const len = file.readAll(&buf) catch 0;
const content = buf[0..len];

if (json_output) {
// JSON output for machine consumption
var json_buf: [65536]u8 = undefined;
var fbs = std.io.fixedBufferStream(&json_buf);
const w = fbs.writer();

w.writeAll("{\"events\":[") catch return;

var first = true;
var count: u32 = 0;
var offset: usize = 0;

while (offset < content.len) {
const line_end = std.mem.indexOfPos(u8, content, offset, "\n") orelse content.len;
const line = content[offset..line_end];
offset = line_end + 1;

if (line.len == 0) continue;

// Filter by issue if specified
if (filter_issue) |fi| {
var needle_buf: [32]u8 = undefined;
const needle = std.fmt.bufPrint(&needle_buf, "\"issue\":{d}", .{fi}) catch continue;
if (std.mem.indexOf(u8, line, needle) == null) continue;
}

if (!first) w.writeAll(",") catch {};
first = false;
w.writeAll(line) catch break;
count += 1;
}

w.writeAll("],\"count\":") catch {};
std.fmt.format(w, "{d}}}", .{count}) catch {};

print("{s}\n", .{fbs.getWritten()});
return;
}

// Human-readable output
print("\n{s}{s}", .{ GOLDEN, BOLD });
print("═══════════════════════════════════════════════════\n", .{});
print(" CLOUD EVENTS — History\n", .{});
print("═══════════════════════════════════════════════════{s}\n", .{RESET});

var count: u32 = 0;
var offset: usize = 0;

Expand Down
145 changes: 143 additions & 2 deletions tools/mcp/trinity_mcp/cloud_monitor.zig
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ pub const AgentStatus = struct {
var agent_statuses: [MAX_AGENTS]AgentStatus = undefined;
var status_count: usize = 0;

// Track last event time per issue for deduplication (5s window)
var last_event_times: [MAX_AGENTS][2]u32 = [_][2]u32{.{ 0, 0 }} ** MAX_AGENTS; // [issue_index][0]=issue, [1]=timestamp
var last_event_count: usize = 0;

// ═══════════════════════════════════════════════════════════════════════════════
// PUBLIC API
// ═══════════════════════════════════════════════════════════════════════════════
Expand All @@ -69,6 +73,11 @@ pub fn runMonitor(port: u16) !void {

std.log.info("Cloud Monitor starting on port {d}", .{actual_port});

// Restore state from JSONL on startup
restoreStateFromEvents() catch |err| {
std.log.warn("Failed to restore state from events: {}", .{err});
};

const address = net.Address.parseIp4("0.0.0.0", actual_port) catch return;
var server = address.listen(.{ .reuse_address = true }) catch |err| {
std.log.err("Cloud Monitor failed to bind: {}", .{err});
Expand All @@ -90,7 +99,13 @@ pub fn runMonitor(port: u16) !void {

/// Update agent status (called from HTTP POST handler).
/// Persists to JSONL, alerts on error states via tri notify.
/// Skips duplicate events within 5s window.
pub fn updateStatus(issue: u32, status_str: []const u8, detail: []const u8) void {
// Deduplication: skip if same status within 5s
if (shouldSkipEvent(issue, status_str)) {
return;
}

// 1. Append to JSONL event log
appendEvent(issue, status_str, detail);

Expand Down Expand Up @@ -197,6 +212,130 @@ pub fn getStatusJson(buf: []u8) []const u8 {
return fbs.getWritten();
}

/// Restore agent states from JSONL event log on startup.
/// Reads the last event for each issue and populates agent_statuses.
fn restoreStateFromEvents() !void {
const file = std.fs.cwd().openFile(EVENTS_FILE, .{}) catch return;
defer file.close();

// Read entire file
var file_buf: [65536]u8 = undefined;
const file_len = file.readAll(&file_buf) catch 0;
if (file_len == 0) return;
const content = file_buf[0..file_len];

// Track latest status per issue
var latest_issue: [MAX_AGENTS]u32 = undefined;
var latest_ts: [MAX_AGENTS]i64 = undefined;
var latest_status: [MAX_AGENTS][32]u8 = undefined;
var latest_status_len: [MAX_AGENTS]usize = undefined;
var latest_detail: [MAX_AGENTS][256]u8 = undefined;
var latest_detail_len: [MAX_AGENTS]usize = undefined;
var latest_count: usize = 0;

var offset: usize = 0;
while (offset < content.len) {
const line_end = std.mem.indexOfPos(u8, content, offset, "\n") orelse content.len;
const line = content[offset..line_end];
offset = line_end + 1;
if (line.len == 0) continue;

// Parse issue
const issue_idx = std.mem.indexOf(u8, line, "\"issue\":") orelse continue;
const istart = issue_idx + 8;
var iend = istart;
while (iend < line.len and line[iend] >= '0' and line[iend] <= '9') : (iend += 1) {}
const issue = std.fmt.parseInt(u32, line[istart..iend], 10) catch continue;

// Parse timestamp
const ts_idx = std.mem.indexOf(u8, line, "\"ts\":") orelse continue;
const tstart = ts_idx + 5;
var tend = tstart;
while (tend < line.len and line[tend] >= '0' and line[tend] <= '9') : (tend += 1) {}
const ts = std.fmt.parseInt(i64, line[tstart..tend], 10) catch continue;

// Parse status and detail
const status_str = extractJsonString(line, "status") orelse continue;
const detail_str = extractJsonString(line, "detail") orelse "";

// Find or create entry for this issue (keep latest timestamp)
var entry_idx: ?usize = null;
for (0..latest_count) |i| {
if (latest_issue[i] == issue) {
if (ts > latest_ts[i]) {
entry_idx = i;
}
break;
}
}

if (entry_idx == null and latest_count < MAX_AGENTS) {
entry_idx = latest_count;
latest_issue[latest_count] = issue;
latest_count += 1;
}

if (entry_idx) |idx| {
latest_ts[idx] = ts;
latest_status_len[idx] = @min(status_str.len, 32);
@memcpy(latest_status[idx][0..latest_status_len[idx]], status_str[0..latest_status_len[idx]]);
latest_detail_len[idx] = @min(detail_str.len, 256);
@memcpy(latest_detail[idx][0..latest_detail_len[idx]], detail_str[0..latest_detail_len[idx]]);
}
}

// Populate agent_statuses from latest events
status_count = 0;
for (0..latest_count) |i| {
if (status_count >= MAX_AGENTS) break;
const entry = &agent_statuses[status_count];
entry.issue = latest_issue[i];
entry.status_len = latest_status_len[i];
@memcpy(entry.status[0..entry.status_len], latest_status[i][0..entry.status_len]);
entry.detail_len = latest_detail_len[i];
@memcpy(entry.detail[0..entry.detail_len], latest_detail[i][0..entry.detail_len]);
entry.last_heartbeat = latest_ts[i];
status_count += 1;
}

std.log.info("Restored {d} agent states from events log", .{status_count});
}

/// Check if we should skip this event (deduplication: same status within 5 seconds)
fn shouldSkipEvent(issue: u32, status_str: []const u8) bool {
const now = std.time.timestamp();
const dedup_window: i64 = 5; // 5 seconds

for (0..last_event_count) |i| {
if (last_event_times[i][0] == issue) {
const last_ts = @as(i64, @intCast(last_event_times[i][1]));
if (now - last_ts < dedup_window) {
// Check if status is the same
for (0..status_count) |j| {
if (agent_statuses[j].issue == issue) {
const last_status = agent_statuses[j].getStatus();
if (std.mem.eql(u8, last_status, status_str)) {
return true; // Skip: same status within 5s window
}
break;
}
}
}
// Update timestamp
last_event_times[i][1] = @as(u32, @intCast(now));
return false;
}
}

// New issue, add to tracking
if (last_event_count < MAX_AGENTS) {
last_event_times[last_event_count][0] = issue;
last_event_times[last_event_count][1] = @as(u32, @intCast(now));
last_event_count += 1;
}
return false;
}

// ═══════════════════════════════════════════════════════════════════════════════
// INTERNAL
// ═══════════════════════════════════════════════════════════════════════════════
Expand All @@ -209,15 +348,17 @@ fn appendEvent(issue: u32, status_str: []const u8, detail: []const u8) void {

// Seek to end for append
file.seekFromEnd(0) catch return;
const w = file.writer();

// Format JSON line to buffer, then write
var buf: [512]u8 = undefined;
const ts = std.time.timestamp();
std.fmt.format(w, "{{\"ts\":{d},\"issue\":{d},\"status\":\"{s}\",\"detail\":\"{s}\"}}\n", .{
const line = std.fmt.bufPrint(&buf, "{{\"ts\":{d},\"issue\":{d},\"status\":\"{s}\",\"detail\":\"{s}\"}}\n", .{
ts,
issue,
status_str,
detail,
}) catch return;
_ = file.writeAll(line) catch return;
}

fn sendTriNotify(issue: u32, status_str: []const u8, detail: []const u8) void {
Expand Down
Loading