From 75e411477b7a3ac52bdf60a9018b8b813af9fa71 Mon Sep 17 00:00:00 2001 From: Trinity Agent Date: Wed, 11 Mar 2026 16:17:00 +0000 Subject: [PATCH] feat(cloud): Stream all container logs to Telegram in realtime (#131) - Add Telegram streaming functions with batch buffering (5s interval) - Replace Claude Code launch with streaming pipe for all output - Stream self-review steps (format check, generated files, diff size) - Stream push and PR creation steps - Add TELEGRAM_STREAM=true env var to cloud_orchestrator Co-Authored-By: Claude Sonnet 4.6 --- deploy/agent-entrypoint.sh | 90 +++++++++++++++++++++++++++++++++- src/tri/cloud_orchestrator.zig | 3 ++ 2 files changed, 92 insertions(+), 1 deletion(-) diff --git a/deploy/agent-entrypoint.sh b/deploy/agent-entrypoint.sh index 225d9b96e5..9431ed8f64 100644 --- a/deploy/agent-entrypoint.sh +++ b/deploy/agent-entrypoint.sh @@ -166,6 +166,52 @@ update_telegram_dashboard() { fi } +# ═══════════════════════════════════════════════════════════════════════════════ +# TELEGRAM LOG STREAMING — Batch streaming every 5 seconds to avoid rate limits +# ═══════════════════════════════════════════════════════════════════════════════ + +TELEGRAM_BUFFER="" +TELEGRAM_LAST_SEND=0 +TELEGRAM_STREAM="${TELEGRAM_STREAM:-true}" +TELEGRAM_BATCH_INTERVAL="${TELEGRAM_BATCH_INTERVAL:-5}" + +stream_to_telegram() { + [ "$TELEGRAM_STREAM" != "true" ] && return + local line="$1" + TELEGRAM_BUFFER="${TELEGRAM_BUFFER}${line} +" + + local now=$(date +%s) + local diff=$((now - TELEGRAM_LAST_SEND)) + + if [ $diff -ge $TELEGRAM_BATCH_INTERVAL ] || [ ${#TELEGRAM_BUFFER} -gt 3000 ]; then + if [ -n "$TELEGRAM_BUFFER" ] && [ -n "$TELEGRAM_BOT_TOKEN" ]; then + local msg=$(echo -e "$TELEGRAM_BUFFER" | head -c 3900) + curl -s -X POST "https://api.telegram.org/bot${TELEGRAM_BOT_TOKEN}/sendMessage" \ + -d "chat_id=${TELEGRAM_CHAT_ID}" \ + -d "parse_mode=HTML" \ + -d "text=
🤖 #${ISSUE} LOG
+${msg}
" \ + --max-time 5 || true + TELEGRAM_BUFFER="" + TELEGRAM_LAST_SEND=$now + fi + fi +} + +flush_telegram() { + if [ -n "$TELEGRAM_BUFFER" ] && [ -n "$TELEGRAM_BOT_TOKEN" ]; then + local msg=$(echo -e "$TELEGRAM_BUFFER" | head -c 3900) + curl -s -X POST "https://api.telegram.org/bot${TELEGRAM_BOT_TOKEN}/sendMessage" \ + -d "chat_id=${TELEGRAM_CHAT_ID}" \ + -d "parse_mode=HTML" \ + -d "text=
🤖 #${ISSUE} LOG
+${msg}
" \ + --max-time 5 || true + TELEGRAM_BUFFER="" + fi +} + # ═══════════════════════════════════════════════════════════════════════════════ # EVENT STREAM (OpenHands-style structured events) # ═══════════════════════════════════════════════════════════════════════════════ @@ -405,7 +451,19 @@ Comment on the issue at each major step." emit_event "status" '{"status":"CODING","detail":"Claude Code starting"}' CLAUDE_EXIT=0 -timeout "${AGENT_TIMEOUT}" claude -p "${PROMPT}" --allowedTools "Bash,Read,Write,Edit,Glob,Grep" 2>&1 || CLAUDE_EXIT=$? +timeout "${AGENT_TIMEOUT}" claude -p "${PROMPT}" --allowedTools "Bash,Read,Write,Edit,Glob,Grep" 2>&1 | \ + while IFS= read -r line; do + echo "$line" + stream_to_telegram "$line" + echo "{\"type\":\"log\",\"issue\":${ISSUE},\"line\":\"$(echo "$line" | sed 's/"/\\"/g' | head -c 500)\",\"ts\":\"$(date -u +%Y-%m-%dT%H:%M:%SZ)\"}" >> /tmp/agent_events.jsonl + case "$line" in + *"Read("*|*"cat "*) report_status "READING" "$line" ;; + *"Write("*|*"Edit("*) report_status "CODING" "$(echo $line | head -c 100)" ;; + *"Bash("*|*"zig build"*) report_status "TESTING" "$(echo $line | head -c 100)" ;; + *"error"*|*"Error"*) report_status "ERROR" "$(echo $line | head -c 200)" ;; + esac + done || CLAUDE_EXIT=$? +flush_telegram emit_event "command" "{\"cmd\":\"claude\",\"exit_code\":${CLAUDE_EXIT},\"timeout\":${AGENT_TIMEOUT}}" if [ "${CLAUDE_EXIT}" -eq 124 ]; then @@ -417,46 +475,66 @@ fi # === 6b. Self-review (advisory only — never blocks push) === report_status "REVIEWING" "Self-review (advisory)" +stream_to_telegram "Running self-review..." REVIEW_WARNINGS=0 # 7a. Format check — auto-fix silently +stream_to_telegram "Checking zig fmt format..." if ! zig fmt --check src/ 2>/dev/null; then + stream_to_telegram "Running zig fmt to fix formatting..." zig fmt src/ 2>/dev/null || true git add -A git commit -m "style: zig fmt (#${ISSUE})" 2>/dev/null || true + stream_to_telegram "Formatting fixed and committed." +else + stream_to_telegram "Format check passed." fi # 7b. Generated files check (only real blocker) +stream_to_telegram "Checking for generated files..." if git diff --name-only main..HEAD 2>/dev/null | grep -qE 'trinity/output/|generated/'; then emit_event "error" '{"msg":"Modified generated files"}' REVIEW_WARNINGS=$((REVIEW_WARNINGS + 1)) + stream_to_telegram "Warning: Generated files modified." fi # 7c. Diff size warning (advisory) +stream_to_telegram "Checking diff size..." DIFF_LINES=$(git diff --stat main..HEAD 2>/dev/null | tail -1 | grep -oE '[0-9]+ insertion' | grep -oE '[0-9]+' || echo "0") if [ "${DIFF_LINES:-0}" -gt 500 ]; then emit_event "error" "{\"msg\":\"Diff large: ${DIFF_LINES} lines\"}" REVIEW_WARNINGS=$((REVIEW_WARNINGS + 1)) + stream_to_telegram "Warning: Large diff (${DIFF_LINES} lines)." +else + stream_to_telegram "Diff size OK: ${DIFF_LINES} lines." fi # NOTE: zig build skipped — too heavy for Railway containers, always fails # Tests run by CI after PR is created if [ $REVIEW_WARNINGS -gt 0 ]; then + stream_to_telegram "Self-review: ${REVIEW_WARNINGS} warning(s) (advisory, not blocking)." log "Self-review: ${REVIEW_WARNINGS} warning(s) (advisory, not blocking)" +else + stream_to_telegram "Self-review: All checks passed." fi # === 8. Push and create PR if not already done === report_status "TESTING" "Checking/creating PR" +stream_to_telegram "Checking for existing PR..." EXISTING_PR=$(gh pr list --head "feat/issue-${ISSUE}" --json number --jq '.[0].number' 2>/dev/null || echo "") if [ -z "${EXISTING_PR}" ]; then # Check if there are actually commits to push COMMIT_COUNT=$(git log --oneline main..HEAD 2>/dev/null | wc -l | tr -d ' ') + stream_to_telegram "Commit count: ${COMMIT_COUNT}" if [ "${COMMIT_COUNT}" -gt 0 ]; then log "Pushing ${COMMIT_COUNT} commit(s)..." + stream_to_telegram "Pushing ${COMMIT_COUNT} commit(s) to origin..." retry "git push -u origin 'feat/issue-${ISSUE}' 2>/dev/null" || true + stream_to_telegram "Push completed." log "Creating PR..." + stream_to_telegram "Creating pull request..." PR_URL=$(gh pr create \ --title "feat: solve issue #${ISSUE}" \ --body "Closes #${ISSUE} @@ -466,6 +544,7 @@ Commits: ${COMMIT_COUNT}" \ --head "feat/issue-${ISSUE}" 2>/dev/null || true) if [ -n "${PR_URL}" ]; then + stream_to_telegram "PR created: ${PR_URL}" emit_event "pr" "{\"url\":\"${PR_URL}\",\"commits\":${COMMIT_COUNT}}" report_status "PR_CREATED" "PR: ${PR_URL}" # Send metrics to monitor @@ -473,6 +552,7 @@ Commits: ${COMMIT_COUNT}" \ # Post final summary comment DIFF_STAT=$(git diff --stat main..HEAD 2>/dev/null || echo "N/A") FINAL_ELAPSED=$(( $(date +%s) - START_TIME )) + stream_to_telegram "Posting final summary..." gh issue comment "${ISSUE}" --body "🚀 **Trinity Agent — Summary** | Field | Value | @@ -485,17 +565,25 @@ Commits: ${COMMIT_COUNT}" \ \`\`\` ${DIFF_STAT} \`\`\`" 2>/dev/null || true + stream_to_telegram "Summary posted." # Cleanup worktree after PR creation (keeps shared bare repo intact) log "Cleaning up worktree..." + stream_to_telegram "Cleaning up worktree..." cd /bare-repo.git git worktree remove "${WORKTREE_PATH}" --force 2>/dev/null || true log "Worktree removed: ${WORKTREE_PATH}" + stream_to_telegram "Worktree removed." + else + stream_to_telegram "Failed to create PR." fi else + stream_to_telegram "No commits produced — agent could not solve issue." report_status "FAILED" "No commits produced — agent could not solve issue" gh issue comment "${ISSUE}" --body "❌ **Trinity Agent**: No solution produced. Issue may need manual attention." 2>/dev/null || true fi +else + stream_to_telegram "PR already exists: #${EXISTING_PR}" fi # === 8. Report final status === diff --git a/src/tri/cloud_orchestrator.zig b/src/tri/cloud_orchestrator.zig index a327635bcd..60f4ae55b0 100644 --- a/src/tri/cloud_orchestrator.zig +++ b/src/tri/cloud_orchestrator.zig @@ -132,6 +132,9 @@ pub fn spawnAgent(allocator: Allocator, issue_number: u32) !SpawnResult { allocator.free(tg_chat); } + // Enable Telegram log streaming by default + _ = api.upsertVariable(service_id, env_id, "TELEGRAM_STREAM", "true") catch {}; + const mon_token = std.process.getEnvVarOwned(allocator, "MONITOR_TOKEN") catch ""; if (mon_token.len > 0) { _ = api.upsertVariable(service_id, env_id, "MONITOR_TOKEN", mon_token) catch {};