feat: squads obs sync — JSONL to Postgres backfill (#626)#634
feat: squads obs sync — JSONL to Postgres backfill (#626)#634kokevidaurre merged 1 commit intodevelopfrom
Conversation
Reads all JSONL execution records and POSTs to API when Tier 2 is active. Dedup by execution_id (409 = already exists, skipped). Supports --dry-run to preview before sending. Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request introduces the obs sync command to backfill JSONL execution data to a Postgres database. The implementation includes a dry-run mode and basic error handling for API requests. Feedback suggests improving performance by processing records in parallel chunks rather than sequentially, enhancing observability by logging specific error details instead of swallowing them, and clarifying the output message when running in dry-run mode.
| for (const record of records) { | ||
| if (opts.dryRun) { | ||
| writeLine(` ${colors.dim}[dry-run] ${record.ts} ${record.squad}/${record.agent} $${record.cost_usd.toFixed(3)}${RESET}`); | ||
| synced++; | ||
| continue; | ||
| } | ||
|
|
||
| try { | ||
| const res = await fetch(`${info.urls.api}/agent-executions`, { | ||
| method: 'POST', | ||
| headers: { 'Content-Type': 'application/json' }, | ||
| body: JSON.stringify({ | ||
| execution_id: record.id, | ||
| squad: record.squad, | ||
| agent: record.agent, | ||
| model: record.model, | ||
| status: record.status, | ||
| input_tokens: record.input_tokens, | ||
| output_tokens: record.output_tokens, | ||
| cache_read_tokens: record.cache_read_tokens, | ||
| cache_write_tokens: record.cache_write_tokens, | ||
| cost_usd: record.cost_usd, | ||
| duration_seconds: Math.round(record.duration_ms / 1000), | ||
| error_message: record.error || null, | ||
| metadata: { trigger: record.trigger, provider: record.provider }, | ||
| }), | ||
| signal: AbortSignal.timeout(5000), | ||
| }); | ||
|
|
||
| if (res.ok) { | ||
| synced++; | ||
| } else if (res.status === 409) { | ||
| skipped++; // Already exists (dedup) | ||
| } else { | ||
| errors++; | ||
| } | ||
| } catch { | ||
| errors++; | ||
| } | ||
| } |
There was a problem hiding this comment.
The current implementation processes records sequentially, which will be very slow for a large number of records. Sending all requests at once with Promise.all could also overwhelm the server or client network resources.
A more robust approach is to process records in parallel but in smaller chunks. This balances performance and resource usage.
Additionally, the catch block currently swallows errors, making it hard to debug. It's better to log the error details. You can adapt the console.error in the suggestion to your project's logging standards if needed.
Here's a suggested refactoring that processes records in parallel chunks and includes error logging.
const CHUNK_SIZE = 50; // This can be tuned as needed
for (let i = 0; i < records.length; i += CHUNK_SIZE) {
const chunk = records.slice(i, i + CHUNK_SIZE);
const promises = chunk.map(async (record) => {
if (opts.dryRun) {
writeLine(` ${colors.dim}[dry-run] ${record.ts} ${record.squad}/${record.agent} $${record.cost_usd.toFixed(3)}${RESET}`);
return 'synced';
}
try {
const res = await fetch(`${info.urls.api}/agent-executions`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
execution_id: record.id,
squad: record.squad,
agent: record.agent,
model: record.model,
status: record.status,
input_tokens: record.input_tokens,
output_tokens: record.output_tokens,
cache_read_tokens: record.cache_read_tokens,
cache_write_tokens: record.cache_write_tokens,
cost_usd: record.cost_usd,
duration_seconds: Math.round(record.duration_ms / 1000),
error_message: record.error || null,
metadata: { trigger: record.trigger, provider: record.provider },
}),
signal: AbortSignal.timeout(5000),
});
if (res.ok) {
return 'synced';
} else if (res.status === 409) {
return 'skipped';
} else {
console.error(`[sync error] status: ${res.status}, record id: ${record.id}, response: ${await res.text()}`);
return 'error';
}
} catch (e) {
console.error(`[sync error] record id: ${record.id}, error:`, e);
return 'error';
}
});
const results = await Promise.all(promises);
synced += results.filter((r) => r === 'synced').length;
skipped += results.filter((r) => r === 'skipped').length;
errors += results.filter((r) => r === 'error').length;
}| } | ||
| } | ||
|
|
||
| writeLine(` ${colors.green}Synced: ${synced}${RESET} ${colors.dim}Skipped: ${skipped} Errors: ${errors}${RESET}\n`); |
There was a problem hiding this comment.
The final output message for a dry run can be misleading. It currently says "Synced: N", which isn't accurate since no data was actually sent. It would be clearer to have a specific message for dry runs to avoid confusion.
if (opts.dryRun) {
writeLine(`\n ${colors.dim}Dry run: Would sync ${synced} records.${RESET}\n`);
} else {
writeLine(` ${colors.green}Synced: ${synced}${RESET} ${colors.dim}Skipped: ${skipped} Errors: ${errors}${RESET}\n`);
}
Closes #626. squads obs sync backfills JSONL records to Postgres. Dedup on execution_id. Supports --dry-run. 13/13 Docker tests pass.