diff --git a/apps/web/src/lib/code-reviews/db/code-reviews.ts b/apps/web/src/lib/code-reviews/db/code-reviews.ts index 2ea46f5cd..2d98acbf8 100644 --- a/apps/web/src/lib/code-reviews/db/code-reviews.ts +++ b/apps/web/src/lib/code-reviews/db/code-reviews.ts @@ -27,6 +27,7 @@ import { MAX_CONCURRENT_CODE_REVIEWS_PER_ORG, staleQueuedCodeReviewCutoffSql, staleRunningCodeReviewCutoffSql, + type PendingCodeReviewCreatedAtWindow, } from '../dispatch/dispatch-constants'; type CodeReviewAttemptStatus = CodeReviewStatus; @@ -184,11 +185,13 @@ export async function getCodeReviewById(reviewId: string): Promise { const limit = Math.max(1, Math.min(params.limit ?? 100, 1_000)); const staleQueuedCutoff = staleQueuedCodeReviewCutoffSql(); const staleRunningCutoff = staleRunningCodeReviewCutoffSql(); + const { pendingCreatedAtWindow } = params; try { const result = await db.execute<{ owner_type: 'user' | 'org'; owner_id: string }>(sql` @@ -204,7 +207,7 @@ export async function listDispatchableCodeReviewOwnerCandidates( ) AS owner_id, MIN(${cloud_agent_code_reviews.created_at}) AS oldest_reconsiderable_at FROM ${cloud_agent_code_reviews} - WHERE ${reconsiderableCodeReviewWorkCondition(staleQueuedCutoff)} + WHERE ${reconsiderableCodeReviewWorkCondition(staleQueuedCutoff, pendingCreatedAtWindow)} GROUP BY owner_type, owner_id ), active_work AS ( SELECT diff --git a/apps/web/src/lib/code-reviews/dispatch/dispatch-constants.ts b/apps/web/src/lib/code-reviews/dispatch/dispatch-constants.ts index d4e1c409b..8fead1baf 100644 --- a/apps/web/src/lib/code-reviews/dispatch/dispatch-constants.ts +++ b/apps/web/src/lib/code-reviews/dispatch/dispatch-constants.ts @@ -1,5 +1,5 @@ import { cloud_agent_code_reviews } from '@kilocode/db/schema'; -import { sql } from 'drizzle-orm'; +import { sql, type SQL } from 'drizzle-orm'; export const MAX_CONCURRENT_CODE_REVIEWS_PER_ORG = 20; export const MAX_CONCURRENT_CODE_REVIEWS_PER_FUNDED_USER = 3; @@ -8,6 +8,13 @@ export const FUNDED_CODE_REVIEW_BALANCE_THRESHOLD_MICRODOLLARS = 5_000_000; export const STALE_QUEUED_CODE_REVIEW_MINUTES = 5; export const STALE_RUNNING_CODE_REVIEW_MINUTES = 90; +export const CRON_PENDING_CODE_REVIEW_MIN_AGE_MINUTES = 60; +export const CRON_PENDING_CODE_REVIEW_MAX_AGE_MINUTES = 75; + +export type PendingCodeReviewCreatedAtWindow = { + createdAtAfter: SQL; + createdAtBefore: SQL; +}; export function staleQueuedCodeReviewCutoffSql() { return sql`now() - interval '${sql.raw(String(STALE_QUEUED_CODE_REVIEW_MINUTES))} minutes'`; @@ -17,11 +24,27 @@ export function staleRunningCodeReviewCutoffSql() { return sql`now() - interval '${sql.raw(String(STALE_RUNNING_CODE_REVIEW_MINUTES))} minutes'`; } +export function cronPendingCodeReviewCreatedAtWindowSql(): PendingCodeReviewCreatedAtWindow { + return { + createdAtAfter: sql`now() - interval '${sql.raw(String(CRON_PENDING_CODE_REVIEW_MAX_AGE_MINUTES))} minutes'`, + createdAtBefore: sql`now() - interval '${sql.raw(String(CRON_PENDING_CODE_REVIEW_MIN_AGE_MINUTES))} minutes'`, + }; +} + export function reconsiderableCodeReviewWorkCondition( - staleQueuedCutoff = staleQueuedCodeReviewCutoffSql() + staleQueuedCutoff = staleQueuedCodeReviewCutoffSql(), + pendingCreatedAtWindow?: PendingCodeReviewCreatedAtWindow ) { + const pendingClause = pendingCreatedAtWindow + ? sql`( + ${cloud_agent_code_reviews.status} = 'pending' + AND ${cloud_agent_code_reviews.created_at} >= ${pendingCreatedAtWindow.createdAtAfter} + AND ${cloud_agent_code_reviews.created_at} <= ${pendingCreatedAtWindow.createdAtBefore} + )` + : sql`${cloud_agent_code_reviews.status} = 'pending'`; + return sql`( - ${cloud_agent_code_reviews.status} = 'pending' + ${pendingClause} OR ( ${cloud_agent_code_reviews.status} = 'queued' AND ${cloud_agent_code_reviews.updated_at} < ${staleQueuedCutoff} diff --git a/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-code-review-owners.test.ts b/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-code-review-owners.test.ts index a9d294e42..f71e6d18c 100644 --- a/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-code-review-owners.test.ts +++ b/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-code-review-owners.test.ts @@ -24,6 +24,7 @@ import { } from '@kilocode/db/schema'; import { eq } from 'drizzle-orm'; import { listDispatchableCodeReviewOwnerCandidates } from '../db/code-reviews'; +import { cronPendingCodeReviewCreatedAtWindowSql } from './dispatch-constants'; import { dispatchPendingCodeReviewOwners } from './dispatch-pending-code-review-owners'; const REPO = `test-org/dispatch-owner-drain-${Date.now()}`; @@ -169,8 +170,101 @@ describe('dispatch pending code review owners', () => { }); }); + it('bounds cron pending discovery by created_at while still recovering stale queued work', async () => { + const tooRecentPendingTimestamp = minutesAgo(30); + const eligiblePendingTimestamp = minutesAgo(65); + const tooOldPendingTimestamp = minutesAgo(90); + const recentlyUpdatedAt = minutesAgo(5); + const oldQueuedCreatedAt = minutesAgo(360); + const staleQueuedUpdatedAt = minutesAgo(10); + + await db.insert(cloud_agent_code_reviews).values([ + reviewValues({ + owner: { type: 'user', id: firstUser.id }, + status: 'pending', + createdAt: tooRecentPendingTimestamp, + updatedAt: tooRecentPendingTimestamp, + }), + reviewValues({ + owner: { type: 'user', id: secondUser.id }, + status: 'pending', + createdAt: eligiblePendingTimestamp, + updatedAt: recentlyUpdatedAt, + }), + reviewValues({ + owner: { type: 'org', id: firstOrganizationId }, + status: 'pending', + createdAt: tooOldPendingTimestamp, + updatedAt: recentlyUpdatedAt, + }), + reviewValues({ + owner: { type: 'org', id: secondOrganizationId }, + status: 'queued', + createdAt: oldQueuedCreatedAt, + updatedAt: staleQueuedUpdatedAt, + }), + ]); + + const result = await listDispatchableCodeReviewOwnerCandidates({ + limit: 10, + pendingCreatedAtWindow: cronPendingCodeReviewCreatedAtWindowSql(), + }); + + expect(result).toEqual({ + owners: [ + { type: 'org', id: secondOrganizationId }, + { type: 'user', id: secondUser.id }, + ], + hasMore: false, + }); + }); + + it('drains owners with pending work inside the cron window and skips outside-window pending owners', async () => { + await db.insert(cloud_agent_code_reviews).values([ + reviewValues({ + owner: { type: 'user', id: firstUser.id }, + status: 'pending', + createdAt: minutesAgo(90), + updatedAt: minutesAgo(5), + }), + reviewValues({ + owner: { type: 'user', id: secondUser.id }, + status: 'pending', + createdAt: minutesAgo(65), + updatedAt: minutesAgo(5), + }), + ]); + + mockTryDispatchPendingReviews.mockResolvedValue({ + dispatched: 1, + notDispatched: 0, + activeCount: 1, + }); + + const summary = await dispatchPendingCodeReviewOwners(); + + expect(summary).toEqual({ + ownersConsidered: 1, + ownersProcessed: 1, + ownersWithNoNewDispatch: 0, + ownersSkippedMissingBotUsers: 0, + coordinatorFailures: 0, + reviewsDispatched: 1, + hasMoreCandidateOwners: false, + }); + expect(mockTryDispatchPendingReviews).toHaveBeenCalledTimes(1); + expect(mockTryDispatchPendingReviews).toHaveBeenCalledWith( + { + type: 'user', + id: secondUser.id, + userId: secondUser.id, + }, + expect.objectContaining({ pendingCreatedAtWindow: expect.anything() }) + ); + }); + it('summarizes dispatch, recovered bot owners, no-op owners, and isolated owner failures', async () => { - const waitingTimestamp = minutesAgo(10); + const waitingTimestamp = minutesAgo(65); await db.insert(cloud_agent_code_reviews).values([ reviewValues({ owner: { type: 'user', id: firstUser.id }, @@ -180,17 +274,17 @@ describe('dispatch pending code review owners', () => { reviewValues({ owner: { type: 'user', id: secondUser.id }, status: 'pending', - createdAt: minutesAgo(9), + createdAt: minutesAgo(66), }), reviewValues({ owner: { type: 'org', id: firstOrganizationId }, status: 'pending', - createdAt: minutesAgo(8), + createdAt: minutesAgo(67), }), reviewValues({ owner: { type: 'org', id: secondOrganizationId }, status: 'pending', - createdAt: minutesAgo(7), + createdAt: minutesAgo(68), }), ]); diff --git a/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-code-review-owners.ts b/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-code-review-owners.ts index 3fbe91b44..033cc104e 100644 --- a/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-code-review-owners.ts +++ b/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-code-review-owners.ts @@ -7,6 +7,10 @@ import { } from '../db/code-reviews'; import { errorExceptInTest, logExceptInTest } from '@/lib/utils.server'; import { tryDispatchPendingReviews } from './dispatch-pending-reviews'; +import { + cronPendingCodeReviewCreatedAtWindowSql, + type PendingCodeReviewCreatedAtWindow, +} from './dispatch-constants'; import type { Owner } from '../core'; const OWNER_SCAN_LIMIT = 100; @@ -39,7 +43,8 @@ async function resolveDispatchOwner( } async function drainOwner( - candidate: DispatchableCodeReviewOwnerCandidate + candidate: DispatchableCodeReviewOwnerCandidate, + pendingCreatedAtWindow: PendingCodeReviewCreatedAtWindow ): Promise { try { const owner = await resolveDispatchOwner(candidate); @@ -47,7 +52,7 @@ async function drainOwner( return { status: 'skipped-missing-bot' }; } - const result = await tryDispatchPendingReviews(owner); + const result = await tryDispatchPendingReviews(owner, { pendingCreatedAtWindow }); return { status: 'processed', dispatched: result.dispatched }; } catch (error) { errorExceptInTest('[dispatchPendingCodeReviewOwners] Owner drain failed', { @@ -63,10 +68,14 @@ async function drainOwner( } export async function dispatchPendingCodeReviewOwners(): Promise { - const candidates = await listDispatchableCodeReviewOwnerCandidates({ limit: OWNER_SCAN_LIMIT }); + const pendingCreatedAtWindow = cronPendingCodeReviewCreatedAtWindowSql(); + const candidates = await listDispatchableCodeReviewOwnerCandidates({ + limit: OWNER_SCAN_LIMIT, + pendingCreatedAtWindow, + }); const limit = pLimit(OWNER_DISPATCH_CONCURRENCY); const outcomes = await Promise.all( - candidates.owners.map(candidate => limit(() => drainOwner(candidate))) + candidates.owners.map(candidate => limit(() => drainOwner(candidate, pendingCreatedAtWindow))) ); const summary: DispatchPendingCodeReviewOwnersSummary = { diff --git a/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.test.ts b/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.test.ts index 0d5aa62c3..2537e8ebb 100644 --- a/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.test.ts +++ b/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.test.ts @@ -33,6 +33,7 @@ import { } from '@kilocode/db/schema'; import { eq } from 'drizzle-orm'; import { tryDispatchPendingReviews } from './dispatch-pending-reviews'; +import { cronPendingCodeReviewCreatedAtWindowSql } from './dispatch-constants'; import { cancelSupersededReviewsForPR, updateRepositoryReviewInstructionsMetadata, @@ -448,6 +449,179 @@ describe('tryDispatchPendingReviews', () => { expect(storedReview?.updated_at).not.toBe(staleQueuedTimestamp); }); + it('claims the oldest pending review regardless of age', async () => { + const oldPendingTimestamp = minutesAgo(150); + const recentPendingTimestamp = minutesAgo(30); + const owner = { type: 'user', id: testUser.id } satisfies ReviewOwner; + await setTestUserBalance(DEFAULT_TIER_BALANCE_MICRODOLLARS); + + const [oldPendingReview, recentPendingReview] = await db + .insert(cloud_agent_code_reviews) + .values([ + reviewValues({ + owner, + status: 'pending', + createdAt: oldPendingTimestamp, + updatedAt: oldPendingTimestamp, + }), + reviewValues({ + owner, + status: 'pending', + createdAt: recentPendingTimestamp, + updatedAt: recentPendingTimestamp, + }), + ]) + .returning({ id: cloud_agent_code_reviews.id }); + + if (!oldPendingReview || !recentPendingReview) { + throw new Error('Expected old and recent pending reviews to be inserted'); + } + + const result = await tryDispatchPendingReviews({ + type: 'user', + id: testUser.id, + userId: testUser.id, + }); + + const storedOldPendingReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, oldPendingReview.id), + }); + const storedRecentPendingReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, recentPendingReview.id), + }); + + expect(result).toEqual({ dispatched: 1, notDispatched: 0, activeCount: 1 }); + expect(storedOldPendingReview?.status).toBe('queued'); + expect(storedRecentPendingReview?.status).toBe('pending'); + expect(mockPrepareReviewPayload).toHaveBeenCalledWith({ + reviewId: oldPendingReview.id, + owner: { type: 'user', id: testUser.id, userId: testUser.id }, + agentConfig: { id: 'test-agent-config', config: {} }, + platform: 'github', + }); + expect(mockPrepareReviewPayload).not.toHaveBeenCalledWith( + expect.objectContaining({ reviewId: recentPendingReview.id }) + ); + }); + + it('claims only pending rows created inside the cron window', async () => { + const tooRecentTimestamp = minutesAgo(30); + const eligibleTimestamp = minutesAgo(65); + const tooOldTimestamp = minutesAgo(90); + const recentlyUpdatedAt = minutesAgo(5); + const owner = { type: 'user', id: testUser.id } satisfies ReviewOwner; + await setTestUserBalance(DEFAULT_TIER_BALANCE_MICRODOLLARS); + + const [tooRecentReview, eligibleReview, tooOldReview] = await db + .insert(cloud_agent_code_reviews) + .values([ + reviewValues({ + owner, + status: 'pending', + createdAt: tooRecentTimestamp, + updatedAt: tooRecentTimestamp, + }), + reviewValues({ + owner, + status: 'pending', + createdAt: eligibleTimestamp, + updatedAt: recentlyUpdatedAt, + }), + reviewValues({ + owner, + status: 'pending', + createdAt: tooOldTimestamp, + updatedAt: recentlyUpdatedAt, + }), + ]) + .returning({ id: cloud_agent_code_reviews.id }); + + if (!tooRecentReview || !eligibleReview || !tooOldReview) { + throw new Error('Expected pending reviews to be inserted'); + } + + const result = await tryDispatchPendingReviews( + { + type: 'user', + id: testUser.id, + userId: testUser.id, + }, + { pendingCreatedAtWindow: cronPendingCodeReviewCreatedAtWindowSql() } + ); + + const storedTooRecentReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, tooRecentReview.id), + }); + const storedEligibleReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, eligibleReview.id), + }); + const storedTooOldReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, tooOldReview.id), + }); + + expect(result).toEqual({ dispatched: 1, notDispatched: 0, activeCount: 1 }); + expect(storedTooRecentReview?.status).toBe('pending'); + expect(storedEligibleReview?.status).toBe('queued'); + expect(storedTooOldReview?.status).toBe('pending'); + expect(mockPrepareReviewPayload).toHaveBeenCalledWith({ + reviewId: eligibleReview.id, + owner: { type: 'user', id: testUser.id, userId: testUser.id }, + agentConfig: { id: 'test-agent-config', config: {} }, + platform: 'github', + }); + expect(mockPrepareReviewPayload).not.toHaveBeenCalledWith( + expect.objectContaining({ reviewId: tooRecentReview.id }) + ); + expect(mockPrepareReviewPayload).not.toHaveBeenCalledWith( + expect.objectContaining({ reviewId: tooOldReview.id }) + ); + }); + + it('recovers stale queued reviews regardless of age under the cron window', async () => { + const oldQueuedCreatedAt = minutesAgo(180); + const staleQueuedUpdatedAt = minutesAgo(10); + const owner = { type: 'user', id: testUser.id } satisfies ReviewOwner; + await setTestUserBalance(DEFAULT_TIER_BALANCE_MICRODOLLARS); + + const [review] = await db + .insert(cloud_agent_code_reviews) + .values( + reviewValues({ + owner, + status: 'queued', + createdAt: oldQueuedCreatedAt, + updatedAt: staleQueuedUpdatedAt, + }) + ) + .returning({ id: cloud_agent_code_reviews.id }); + + if (!review) { + throw new Error('Expected stale queued review to be inserted'); + } + + const result = await tryDispatchPendingReviews( + { + type: 'user', + id: testUser.id, + userId: testUser.id, + }, + { pendingCreatedAtWindow: cronPendingCodeReviewCreatedAtWindowSql() } + ); + + const storedReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, review.id), + }); + + expect(result).toEqual({ dispatched: 1, notDispatched: 0, activeCount: 1 }); + expect(storedReview?.status).toBe('queued'); + expect(mockPrepareReviewPayload).toHaveBeenCalledWith({ + reviewId: review.id, + owner: { type: 'user', id: testUser.id, userId: testUser.id }, + agentConfig: { id: 'test-agent-config', config: {} }, + platform: 'github', + }); + }); + it('does not overwrite a review that becomes terminal after reservation', async () => { const recentTimestamp = minutesAgo(1); const owner = { type: 'user', id: testUser.id } satisfies ReviewOwner; diff --git a/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.ts b/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.ts index 6f8305642..ec9838624 100644 --- a/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.ts +++ b/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.ts @@ -43,6 +43,7 @@ import { MAX_CONCURRENT_CODE_REVIEWS_PER_ORG, staleQueuedCodeReviewCutoffSql, staleRunningCodeReviewCutoffSql, + type PendingCodeReviewCreatedAtWindow, } from './dispatch-constants'; export type DispatchResult = { @@ -51,6 +52,15 @@ export type DispatchResult = { activeCount: number; }; +export type TryDispatchPendingReviewsOptions = { + /** + * When provided, restricts pending work selection to reviews whose + * `created_at` is inside the cron recovery window. Direct dispatch paths + * leave this unset, and stale queued recovery remains unaffected. + */ + pendingCreatedAtWindow?: PendingCodeReviewCreatedAtWindow; +}; + type ReservedReview = { review: CloudAgentCodeReview; dispatchReservationId: string; @@ -93,7 +103,10 @@ function ownerReviewCondition(owner: Owner) { : eq(cloud_agent_code_reviews.owned_by_user_id, owner.id); } -async function reservePendingReviewsForDispatch(owner: Owner): Promise { +async function reservePendingReviewsForDispatch( + owner: Owner, + options: TryDispatchPendingReviewsOptions = {} +): Promise { return await db.transaction(async tx => { await tx.execute( sql`SELECT pg_advisory_xact_lock(hashtext(${`code-review-dispatch:${owner.type}:${owner.id}`}))` @@ -101,6 +114,7 @@ async function reservePendingReviewsForDispatch(owner: Owner): Promise candidate.id) ), - reconsiderableCodeReviewWorkCondition(staleQueuedCutoff) + reconsiderableCodeReviewWorkCondition(staleQueuedCutoff, pendingCreatedAtWindow) ) ) .returning(); @@ -177,12 +196,20 @@ async function reservePendingReviewsForDispatch(owner: Owner): Promise { +export async function tryDispatchPendingReviews( + owner: Owner, + options: TryDispatchPendingReviewsOptions = {} +): Promise { try { logExceptInTest('[tryDispatchPendingReviews] Starting dispatch check', { owner }); - const { activeCount, reservations } = await reservePendingReviewsForDispatch(owner); + const { activeCount, reservations } = await reservePendingReviewsForDispatch(owner, options); if (reservations.length === 0) { logExceptInTest('[tryDispatchPendingReviews] No reviews reserved', { owner, activeCount });