Skip to content
5 changes: 4 additions & 1 deletion apps/web/src/lib/code-reviews/db/code-reviews.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
MAX_CONCURRENT_CODE_REVIEWS_PER_ORG,
staleQueuedCodeReviewCutoffSql,
staleRunningCodeReviewCutoffSql,
type PendingCodeReviewCreatedAtWindow,
} from '../dispatch/dispatch-constants';

type CodeReviewAttemptStatus = CodeReviewStatus;
Expand Down Expand Up @@ -184,11 +185,13 @@ export async function getCodeReviewById(reviewId: string): Promise<CloudAgentCod
export async function listDispatchableCodeReviewOwnerCandidates(
params: {
limit?: number;
pendingCreatedAtWindow?: PendingCodeReviewCreatedAtWindow;
} = {}
): Promise<DispatchableCodeReviewOwnerCandidatesResult> {
const limit = Math.max(1, Math.min(params.limit ?? 100, 1_000));
const staleQueuedCutoff = staleQueuedCodeReviewCutoffSql();
const staleRunningCutoff = staleRunningCodeReviewCutoffSql();
const { pendingCreatedAtWindow } = params;

try {
const result = await db.execute<{ owner_type: 'user' | 'org'; owner_id: string }>(sql`
Expand All @@ -204,7 +207,7 @@ export async function listDispatchableCodeReviewOwnerCandidates(
) AS owner_id,
MIN(${cloud_agent_code_reviews.created_at}) AS oldest_reconsiderable_at
FROM ${cloud_agent_code_reviews}
WHERE ${reconsiderableCodeReviewWorkCondition(staleQueuedCutoff)}
WHERE ${reconsiderableCodeReviewWorkCondition(staleQueuedCutoff, pendingCreatedAtWindow)}
GROUP BY owner_type, owner_id
), active_work AS (
SELECT
Expand Down
29 changes: 26 additions & 3 deletions apps/web/src/lib/code-reviews/dispatch/dispatch-constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { cloud_agent_code_reviews } from '@kilocode/db/schema';
import { sql } from 'drizzle-orm';
import { sql, type SQL } from 'drizzle-orm';

export const MAX_CONCURRENT_CODE_REVIEWS_PER_ORG = 20;
export const MAX_CONCURRENT_CODE_REVIEWS_PER_FUNDED_USER = 3;
Expand All @@ -8,6 +8,13 @@ export const FUNDED_CODE_REVIEW_BALANCE_THRESHOLD_MICRODOLLARS = 5_000_000;

export const STALE_QUEUED_CODE_REVIEW_MINUTES = 5;
export const STALE_RUNNING_CODE_REVIEW_MINUTES = 90;
export const CRON_PENDING_CODE_REVIEW_MIN_AGE_MINUTES = 60;
export const CRON_PENDING_CODE_REVIEW_MAX_AGE_MINUTES = 75;

export type PendingCodeReviewCreatedAtWindow = {
createdAtAfter: SQL;
createdAtBefore: SQL;
};

export function staleQueuedCodeReviewCutoffSql() {
return sql`now() - interval '${sql.raw(String(STALE_QUEUED_CODE_REVIEW_MINUTES))} minutes'`;
Expand All @@ -17,11 +24,27 @@ export function staleRunningCodeReviewCutoffSql() {
return sql`now() - interval '${sql.raw(String(STALE_RUNNING_CODE_REVIEW_MINUTES))} minutes'`;
}

export function cronPendingCodeReviewCreatedAtWindowSql(): PendingCodeReviewCreatedAtWindow {
return {
createdAtAfter: sql`now() - interval '${sql.raw(String(CRON_PENDING_CODE_REVIEW_MAX_AGE_MINUTES))} minutes'`,
createdAtBefore: sql`now() - interval '${sql.raw(String(CRON_PENDING_CODE_REVIEW_MIN_AGE_MINUTES))} minutes'`,
};
}

export function reconsiderableCodeReviewWorkCondition(
staleQueuedCutoff = staleQueuedCodeReviewCutoffSql()
staleQueuedCutoff = staleQueuedCodeReviewCutoffSql(),
pendingCreatedAtWindow?: PendingCodeReviewCreatedAtWindow
) {
const pendingClause = pendingCreatedAtWindow
? sql`(
${cloud_agent_code_reviews.status} = 'pending'
AND ${cloud_agent_code_reviews.created_at} >= ${pendingCreatedAtWindow.createdAtAfter}
AND ${cloud_agent_code_reviews.created_at} <= ${pendingCreatedAtWindow.createdAtBefore}
)`
: sql`${cloud_agent_code_reviews.status} = 'pending'`;

return sql`(
${cloud_agent_code_reviews.status} = 'pending'
${pendingClause}
OR (
${cloud_agent_code_reviews.status} = 'queued'
AND ${cloud_agent_code_reviews.updated_at} < ${staleQueuedCutoff}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
} from '@kilocode/db/schema';
import { eq } from 'drizzle-orm';
import { listDispatchableCodeReviewOwnerCandidates } from '../db/code-reviews';
import { cronPendingCodeReviewCreatedAtWindowSql } from './dispatch-constants';
import { dispatchPendingCodeReviewOwners } from './dispatch-pending-code-review-owners';

const REPO = `test-org/dispatch-owner-drain-${Date.now()}`;
Expand Down Expand Up @@ -169,8 +170,101 @@ describe('dispatch pending code review owners', () => {
});
});

it('bounds cron pending discovery by created_at while still recovering stale queued work', async () => {
const tooRecentPendingTimestamp = minutesAgo(30);
const eligiblePendingTimestamp = minutesAgo(65);
const tooOldPendingTimestamp = minutesAgo(90);
const recentlyUpdatedAt = minutesAgo(5);
const oldQueuedCreatedAt = minutesAgo(360);
const staleQueuedUpdatedAt = minutesAgo(10);

await db.insert(cloud_agent_code_reviews).values([
reviewValues({
owner: { type: 'user', id: firstUser.id },
status: 'pending',
createdAt: tooRecentPendingTimestamp,
updatedAt: tooRecentPendingTimestamp,
}),
reviewValues({
owner: { type: 'user', id: secondUser.id },
status: 'pending',
createdAt: eligiblePendingTimestamp,
updatedAt: recentlyUpdatedAt,
}),
reviewValues({
owner: { type: 'org', id: firstOrganizationId },
status: 'pending',
createdAt: tooOldPendingTimestamp,
updatedAt: recentlyUpdatedAt,
}),
reviewValues({
owner: { type: 'org', id: secondOrganizationId },
status: 'queued',
createdAt: oldQueuedCreatedAt,
updatedAt: staleQueuedUpdatedAt,
}),
]);

const result = await listDispatchableCodeReviewOwnerCandidates({
limit: 10,
pendingCreatedAtWindow: cronPendingCodeReviewCreatedAtWindowSql(),
});

expect(result).toEqual({
owners: [
{ type: 'org', id: secondOrganizationId },
{ type: 'user', id: secondUser.id },
],
hasMore: false,
});
});

it('drains owners with pending work inside the cron window and skips outside-window pending owners', async () => {
await db.insert(cloud_agent_code_reviews).values([
reviewValues({
owner: { type: 'user', id: firstUser.id },
status: 'pending',
createdAt: minutesAgo(90),
updatedAt: minutesAgo(5),
}),
reviewValues({
owner: { type: 'user', id: secondUser.id },
status: 'pending',
createdAt: minutesAgo(65),
updatedAt: minutesAgo(5),
}),
]);

mockTryDispatchPendingReviews.mockResolvedValue({
dispatched: 1,
notDispatched: 0,
activeCount: 1,
});

const summary = await dispatchPendingCodeReviewOwners();

expect(summary).toEqual({
ownersConsidered: 1,
ownersProcessed: 1,
ownersWithNoNewDispatch: 0,
ownersSkippedMissingBotUsers: 0,
coordinatorFailures: 0,
reviewsDispatched: 1,
hasMoreCandidateOwners: false,
});
expect(mockTryDispatchPendingReviews).toHaveBeenCalledTimes(1);
expect(mockTryDispatchPendingReviews).toHaveBeenCalledWith(
{
type: 'user',
id: secondUser.id,
userId: secondUser.id,
},
expect.objectContaining({ pendingCreatedAtWindow: expect.anything() })
);
});

it('summarizes dispatch, recovered bot owners, no-op owners, and isolated owner failures', async () => {
const waitingTimestamp = minutesAgo(10);
const waitingTimestamp = minutesAgo(65);
await db.insert(cloud_agent_code_reviews).values([
reviewValues({
owner: { type: 'user', id: firstUser.id },
Expand All @@ -180,17 +274,17 @@ describe('dispatch pending code review owners', () => {
reviewValues({
owner: { type: 'user', id: secondUser.id },
status: 'pending',
createdAt: minutesAgo(9),
createdAt: minutesAgo(66),
}),
reviewValues({
owner: { type: 'org', id: firstOrganizationId },
status: 'pending',
createdAt: minutesAgo(8),
createdAt: minutesAgo(67),
}),
reviewValues({
owner: { type: 'org', id: secondOrganizationId },
status: 'pending',
createdAt: minutesAgo(7),
createdAt: minutesAgo(68),
}),
]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import {
} from '../db/code-reviews';
import { errorExceptInTest, logExceptInTest } from '@/lib/utils.server';
import { tryDispatchPendingReviews } from './dispatch-pending-reviews';
import {
cronPendingCodeReviewCreatedAtWindowSql,
type PendingCodeReviewCreatedAtWindow,
} from './dispatch-constants';
import type { Owner } from '../core';

const OWNER_SCAN_LIMIT = 100;
Expand Down Expand Up @@ -39,15 +43,16 @@ async function resolveDispatchOwner(
}

async function drainOwner(
candidate: DispatchableCodeReviewOwnerCandidate
candidate: DispatchableCodeReviewOwnerCandidate,
pendingCreatedAtWindow: PendingCodeReviewCreatedAtWindow
): Promise<OwnerDrainOutcome> {
try {
const owner = await resolveDispatchOwner(candidate);
if (!owner) {
return { status: 'skipped-missing-bot' };
}

const result = await tryDispatchPendingReviews(owner);
const result = await tryDispatchPendingReviews(owner, { pendingCreatedAtWindow });
return { status: 'processed', dispatched: result.dispatched };
} catch (error) {
errorExceptInTest('[dispatchPendingCodeReviewOwners] Owner drain failed', {
Expand All @@ -63,10 +68,14 @@ async function drainOwner(
}

export async function dispatchPendingCodeReviewOwners(): Promise<DispatchPendingCodeReviewOwnersSummary> {
const candidates = await listDispatchableCodeReviewOwnerCandidates({ limit: OWNER_SCAN_LIMIT });
const pendingCreatedAtWindow = cronPendingCodeReviewCreatedAtWindowSql();
const candidates = await listDispatchableCodeReviewOwnerCandidates({
limit: OWNER_SCAN_LIMIT,
pendingCreatedAtWindow,
});
const limit = pLimit(OWNER_DISPATCH_CONCURRENCY);
const outcomes = await Promise.all(
candidates.owners.map(candidate => limit(() => drainOwner(candidate)))
candidates.owners.map(candidate => limit(() => drainOwner(candidate, pendingCreatedAtWindow)))
);

const summary: DispatchPendingCodeReviewOwnersSummary = {
Expand Down
Loading