Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions db/migrations/002_resumable_queue_jobs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS check_run_completed_at TIMESTAMPTZ;
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS lease_owner TEXT;
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS lease_expires_at TIMESTAMPTZ;
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS heartbeat_at TIMESTAMPTZ;
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS recovery_count INTEGER NOT NULL DEFAULT 0;
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS last_queue_message_at TIMESTAMPTZ;
ALTER TABLE file_reviews ADD COLUMN IF NOT EXISTS transient_error_count INTEGER NOT NULL DEFAULT 0;

CREATE INDEX IF NOT EXISTS jobs_lease_expiry_idx
ON jobs (lease_expires_at)
WHERE status = 'running' AND lease_expires_at IS NOT NULL;

CREATE INDEX IF NOT EXISTS jobs_terminal_check_idx
ON jobs (status, check_run_completed_at)
WHERE check_run_id IS NOT NULL AND check_run_completed_at IS NULL;

CREATE INDEX IF NOT EXISTS jobs_unleased_running_idx
ON jobs (last_queue_message_at, heartbeat_at)
WHERE status = 'running' AND lease_expires_at IS NULL;

DELETE FROM file_reviews fr
USING (
SELECT id, ROW_NUMBER() OVER (PARTITION BY job_id, file_path ORDER BY created_at ASC, id ASC) AS row_number
FROM file_reviews
) ranked
WHERE fr.id = ranked.id
AND ranked.row_number > 1;

CREATE UNIQUE INDEX IF NOT EXISTS file_reviews_job_file_path_key
ON file_reviews (job_id, file_path);
2 changes: 1 addition & 1 deletion scripts/test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ if (!usableEnvValue(process.env.TEST_DATABASE_URL)) {
process.exit(1);
}

process.env.DATABASE_URL = usableEnvValue(process.env.DATABASE_URL) ?? process.env.TEST_DATABASE_URL;
process.env.DATABASE_URL = process.env.TEST_DATABASE_URL;

run(process.execPath, ['scripts/migrate.mjs']);
run(process.execPath, ['node_modules/vitest/vitest.mjs', 'run']);
6 changes: 3 additions & 3 deletions src/client/pages/settings.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ const DEFAULT_GLOBAL_CONFIG: ModelRouteConfig = {
],
};

function normalizeGlobalConfig(config: any): ModelRouteConfig {
export function normalizeGlobalConfig(config: any): ModelRouteConfig {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Use of any type for config parameter

The function 'normalizeGlobalConfig' uses 'any' for the 'config' parameter. This bypasses TypeScript's type checking and can lead to runtime errors if the input structure changes. It is better to use 'Partial' or 'unknown' with a type guard to ensure type safety.

Suggested change
export function normalizeGlobalConfig(config: any): ModelRouteConfig {
export function normalizeGlobalConfig(config: Partial<ModelRouteConfig>): ModelRouteConfig {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Use of any type for config parameter

The function 'normalizeGlobalConfig' uses 'any' for the 'config' parameter. This bypasses TypeScript's type checking and can lead to runtime errors if the input structure changes. It is better to use 'Partial' or 'unknown' with a type guard to ensure type safety.

Suggested change
export function normalizeGlobalConfig(config: any): ModelRouteConfig {
export function normalizeGlobalConfig(config: Partial<ModelRouteConfig>): ModelRouteConfig {

if (!config || !config.main) return DEFAULT_GLOBAL_CONFIG;
return {
main: config.main,
fallbacks: config.fallbacks?.length ? config.fallbacks : DEFAULT_GLOBAL_CONFIG.fallbacks,
size_overrides: config.size_overrides ?? DEFAULT_GLOBAL_CONFIG.size_overrides,
fallbacks: Array.isArray(config.fallbacks) ? config.fallbacks : DEFAULT_GLOBAL_CONFIG.fallbacks,
size_overrides: Array.isArray(config.size_overrides) ? config.size_overrides : DEFAULT_GLOBAL_CONFIG.size_overrides,
};
}

Expand Down
53 changes: 53 additions & 0 deletions src/server/core/job-recovery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import type { AppBindings } from '@server/env';
import { getTerminalJobsNeedingCheckRunCompletion, markJobCheckRunCompleted, recoverExpiredJobLeases } from '@server/db/jobs';
import { logger } from '@server/core/logger';
import { GitHubService } from '@server/services/github';

const MAX_RECOVERY_COUNT = 3;

export async function recoverJobs(env: AppBindings) {
try {
const recovered = await recoverExpiredJobLeases(env, MAX_RECOVERY_COUNT);
for (const jobId of recovered.requeuedJobIds) {
await env.REVIEW_QUEUE.send({
jobId,
deliveryId: crypto.randomUUID(),
phase: 'review',
});
}

if (recovered.requeuedJobIds.length > 0 || recovered.failedJobs.length > 0) {
logger.warn('Expired job leases recovered', {
requeued: recovered.requeuedJobIds.length,
failed: recovered.failedJobs.length,
});
}
} catch (err) {
logger.error('Failed to recover expired job leases', err instanceof Error ? err : new Error(String(err)));
}
}

export async function completeTerminalCheckRuns(env: AppBindings) {
const jobs = await getTerminalJobsNeedingCheckRunCompletion(env);
for (const job of jobs) {
if (!job.check_run_id) continue;

try {
const github = new GitHubService(env, job.installation_id);
await github.updateCheckRun(job.owner, job.repo, job.check_run_id, {
status: 'completed',
conclusion: job.status === 'superseded' ? 'neutral' : 'failure',
title: job.status === 'superseded' ? 'Review superseded' : 'Review failed',
summary: job.error_msg ?? (job.status === 'superseded' ? 'Superseded by a newer commit or job.' : 'Review failed.'),
});
await markJobCheckRunCompleted(env, job.id);
} catch (error) {
logger.error(`Failed to complete terminal check run for job ${job.id}`, error instanceof Error ? error : new Error(String(error)));
}
}
}

export async function runOpportunisticJobMaintenance(env: AppBindings) {
await recoverJobs(env);
await completeTerminalCheckRuns(env);
}
13 changes: 10 additions & 3 deletions src/server/core/model-output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import { findClosestValidLine, findPositionForLine, getValidNewLines, getValidPo
import type { FileDiff } from './diff';
import { jsonrepair } from 'jsonrepair';

const MAX_LOGGED_MODEL_OUTPUT_CHARS = 2_000;

function truncateForLog(value: string) {
if (value.length <= MAX_LOGGED_MODEL_OUTPUT_CHARS) return value;
return `${value.slice(0, MAX_LOGGED_MODEL_OUTPUT_CHARS)}... [truncated ${value.length - MAX_LOGGED_MODEL_OUTPUT_CHARS} chars]`;
}

function hasReviewKeys(input: string) {
return /"(findings|overall_explanation|overall_correctness|overall_confidence_score|summary)"\s*:/.test(input);
}
Expand Down Expand Up @@ -253,7 +260,7 @@ export function parseFileReviewResponse(raw: string, file: FileDiff): {
throw new Error('Model response did not contain review JSON keys.');
}
} catch (e) {
logger.error('Failed to extract JSON from model response', { raw, error: e });
logger.error('Failed to extract JSON from model response', { raw: truncateForLog(raw), error: e });
throw new Error('Could not find JSON root in model response.');
}

Expand All @@ -269,14 +276,14 @@ export function parseFileReviewResponse(raw: string, file: FileDiff): {
try {
repaired = jsonrepair(preprocessed);
} catch (e) {
logger.warn('jsonrepair failed to fix model output, using preprocessed text', { preprocessed, error: e });
logger.warn('jsonrepair failed to fix model output, using preprocessed text', { preprocessed: truncateForLog(preprocessed), error: e });
}

let parsedJson: any;
try {
parsedJson = JSON.parse(repaired);
} catch (e) {
logger.error('Critical JSON parse error after extraction and repair', { repaired, error: e });
logger.error('Critical JSON parse error after extraction and repair', { repaired: truncateForLog(repaired), error: e });
throw new Error(`Invalid JSON format: ${e instanceof Error ? e.message : 'Unknown error'}`);
}

Expand Down
Loading