feat: port job orchestration to graphile#1059
Conversation
WalkthroughThis PR migrates the job queue system from pg-boss to Graphile Worker, enabling asynchronous task execution for run processing, scheduling, and browser management. The refactor updates all queueing calls in routes, implements a new task-runner with Graphile integration, replaces the schedule-worker with a DB-backed advisory-lock polled scheduler, and updates server lifecycle management. ChangesJob Orchestration Migration: pg-boss → Graphile Worker
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes This is a substantial migration affecting job orchestration across multiple subsystems (scheduling, recording routes, run execution, storage queueing, and server lifecycle). The changes introduce new Graphile Worker infrastructure with connection pooling and lifecycle management, rewrite the schedule worker from PgBoss registry-based to DB-backed advisory-lock polling, implement a large task-runner with multi-branch execution logic and comprehensive error handling, and refactor queueing calls across three route/storage files. While the patterns are consistent (addJob replacement, direct calls in record routes, DB-backed scheduling), the variety of execution branches, error paths, and integration points (webhooks, sockets, binary uploads, integration updates) requires careful review of each layer's interaction with the broader system. Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
server/src/routes/storage.ts (1)
1125-1136:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winCritical: success path never responds, and a queue failure leaves the run silently stuck.
In this handler:
- On the happy path, the inner
trycompletes and execution falls out of the outertrywith nores.send(...). The HTTP request will hang until the client/proxy timeout fires — there is no success response anywhere in the route.- In the inner
catch, the error is logged without the message (${queueError.message}missing), theRunrow is not updated tofailed, the orphaned browser is not destroyed, and no error response is sent. Compare this to the analogous block inPUT /runs/:id(L1021–1037) which updates the run to failed, callsdestroyRemoteBrowser, and returns 503.The outer
catch(L1137) only fires when something before/around the innertrythrows, so neither the success nor the queue-error path can ever respond to the client today.🛠️ Proposed fix
try { const jobId = await addJob(QUEUE_NAMES.EXECUTE_RUN, { userId: req.user.id, runId: req.params.id, browserId: plainRun.browserId, }, { maxAttempts: 1 }); logger.log('info', `Queued run execution job with ID: ${jobId} for run: ${req.params.id}`); + return res.send(true); } catch (queueError: any) { - logger.log('error', `Failed to queue run execution`); - + logger.log('error', `Failed to queue run execution: ${queueError.message}`); + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: 'Failed to queue execution job', + }); + return res.status(503).send({ error: 'Unable to queue run, please try again later' }); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@server/src/routes/storage.ts` around lines 1125 - 1136, The handler currently never responds on success and mishandles queue errors; after a successful addJob(QUEUE_NAMES.EXECUTE_RUN, ...) call return an HTTP success (e.g., 202/200) to the client (res.status(...).json(...)) so the request doesn't hang; in the inner catch log the actual error (include queueError.message), mark the Run as failed (update the Run row/status for req.params.id and set finishedAt), call destroyRemoteBrowser(plainRun.browserId) to clean up the orphaned browser, and then send an error response (e.g., res.status(503).json(...)); mirror the behavior used in the PUT /runs/:id handler (the update-to-failed + destroyRemoteBrowser + 503 response) and keep addJob, QUEUE_NAMES.EXECUTE_RUN, destroyRemoteBrowser and req.params.id as the referenced symbols.
🧹 Nitpick comments (4)
server/src/storage/schedule.ts (1)
12-20: 💤 Low value
as anycast bypasses schedule type safety.The repeated
as anyon the schedule object loses the type contract defined on theRobot.schedulefield, including the newly addedschedulerClaimedAt. Consider tightening the schedule type so this cast can be removed; otherwise future field renames will silently break this code.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@server/src/storage/schedule.ts` around lines 12 - 20, The code is using an unsafe "as any" cast when calling robot.update with a mutated schedule, which bypasses the Robot.schedule type (and hides renames like schedulerClaimedAt); fix by constructing a properly typed schedule object instead of casting: import or reference the Robot schedule type (e.g., Schedule or Robot['schedule']), build a Partial<Schedule> or a newSchedule object containing cronExpression, timezone, nextRunAt and explicitly typed schedulerClaimedAt (or omit it if optional), then pass that typed object to robot.update (update the robot.update call/site to accept the typed shape) so the compiler enforces the real schedule contract and the cast can be removed.server/src/schedule-worker.ts (3)
14-21: 💤 Low valueConsider externalizing scheduler tunables.
DB_SCHEDULER_BATCH_SIZE,DB_SCHEDULER_POLL_MS,DB_SCHEDULER_CLAIM_TIMEOUT_MS, andDB_SCHEDULER_ADVISORY_LOCK_KEYare hardcoded. With a 30s poll and batch of 10, the system can dispatch at most ~20 robots/minute, which may surprise operators running many schedules. Also the advisory lock key is a magic constant — colliding with another module's advisory lock would silently break the scheduler.Consider loading these from env (with current values as defaults) and documenting the lock key allocation.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@server/src/schedule-worker.ts` around lines 14 - 21, The hardcoded scheduler tunables (DB_SCHEDULER_BATCH_SIZE, DB_SCHEDULER_POLL_MS, DB_SCHEDULER_CLAIM_TIMEOUT_MS, DB_SCHEDULER_ADVISORY_LOCK_KEY) should be loaded from environment variables with the current constants as defaults; update the module to read process.env.DB_SCHEDULER_BATCH_SIZE, DB_SCHEDULER_POLL_MS, DB_SCHEDULER_CLAIM_TIMEOUT_MS and DB_SCHEDULER_ADVISORY_LOCK_KEY (using parseInt for numeric values and safe fallback to the existing constants) instead of literal constants, keep the existing constant names as fallback defaults, and add a short comment/docstring near DB_SCHEDULER_ADVISORY_LOCK_KEY advising teams to allocate unique advisory lock keys to avoid collisions.
29-85: 💤 Low valueRedundant locking strategy — confirm it's intentional.
You acquire
pg_try_advisory_xact_lock(which serializes scheduler instances cluster-wide) and also issueRobot.findAll({ lock: LOCK.UPDATE, skipLocked: true })inside the same transaction. Once the advisory lock is held, no other scheduler can runclaimDueDbSchedules, so the row-level lock is only protecting against concurrent updates from non-scheduler paths (e.g.,scheduleWorkflow/cancelScheduledWorkflowwriters).That's defensible as defense-in-depth, but worth a code comment so future maintainers don't try to "simplify" by removing one.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@server/src/schedule-worker.ts` around lines 29 - 85, The dual-locking in claimDueDbSchedules is intentional: keep the pg_try_advisory_xact_lock call (DB_SCHEDULER_ADVISORY_LOCK_KEY) for cluster-wide scheduler serialization and retain the Robot.findAll row-level lock + skipLocked (lock: transaction.LOCK.UPDATE, skipLocked: true) to protect against concurrent writers from other code paths like scheduleWorkflow/cancelScheduledWorkflow; add a concise inline comment above the advisory lock (or immediately before the Robot.findAll) explaining this defense-in-depth and warning maintainers not to remove either lock without considering both cluster-wide and row-level concurrency.
41-68: ⚡ Quick winPrefer parameterized Date objects with
Sequelize.where()and::timestamptzcasts for literal predicates.Lines 48 and 52 interpolate
now.toISOString()andclaimExpiry.toISOString()directly intoSequelize.literaltemplate strings, creating a SQL injection risk and relying on lexicographic comparison oftextvalues. Postgres performs strictly-typedtimestamptzcomparisons if you cast the extracted JSON values and useSequelize.where()with Date objects, which Sequelize automatically parameterizes securely.Use
Sequelize.where()with(schedule->>'nextRunAt')::timestamptzcasts and pass Date objects directly toOp.lteandOp.lt:♻️ Suggested refactor
- [Op.and]: [ - Sequelize.literal(`schedule->>'cronExpression' IS NOT NULL`), - Sequelize.literal(`schedule->>'timezone' IS NOT NULL`), - Sequelize.literal(`schedule->>'nextRunAt' IS NOT NULL`), - Sequelize.literal(`schedule->>'nextRunAt' <= '${now.toISOString()}'`), - { - [Op.or]: [ - Sequelize.literal(`schedule->>'schedulerClaimedAt' IS NULL`), - Sequelize.literal(`schedule->>'schedulerClaimedAt' < '${claimExpiry.toISOString()}'`), - ], - }, - ], + [Op.and]: [ + Sequelize.literal(`schedule->>'cronExpression' IS NOT NULL`), + Sequelize.literal(`schedule->>'timezone' IS NOT NULL`), + Sequelize.literal(`schedule->>'nextRunAt' IS NOT NULL`), + Sequelize.where( + Sequelize.literal(`(schedule->>'nextRunAt')::timestamptz`), + { [Op.lte]: now } + ), + { + [Op.or]: [ + Sequelize.literal(`schedule->>'schedulerClaimedAt' IS NULL`), + Sequelize.where( + Sequelize.literal(`(schedule->>'schedulerClaimedAt')::timestamptz`), + { [Op.lt]: claimExpiry } + ), + ], + }, + ],🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@server/src/schedule-worker.ts` around lines 41 - 68, The Sequelize.literal clauses in the Robot.findAll query interpolate now.toISOString() and claimExpiry.toISOString() directly, creating SQL injection risk and relying on text comparison; update the where predicates to use Sequelize.where with explicit casts like "(schedule->>'nextRunAt')::timestamptz" and "(schedule->>'schedulerClaimedAt')::timestamptz" and pass Date objects (now and claimExpiry) to Sequelize/Op operators (e.g., Op.lte for nextRunAt and Op.lt for schedulerClaimedAt) so Sequelize parameterizes the values securely; locate the Robot.findAll call and replace the Sequelize.literal comparisons that reference schedule->>'nextRunAt' and schedule->>'schedulerClaimedAt' with Sequelize.where(...) expressions using Date objects and the appropriate Op operators.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@server/src/schedule-worker.ts`:
- Around line 122-154: processDueSchedules currently sets dispatched=true after
addJob but relies on finalizeSchedule to clear schedulerClaimedAt/advance
nextRunAt; since finalizeSchedule can swallow errors this yields at-least-once
dispatches after DB_SCHEDULER_CLAIM_TIMEOUT_MS. Fix by ensuring finalizeSchedule
succeeds (or you document the at-least-once contract): wrap the call to
finalizeSchedule(robot.robotMetaId, executedAt) in a retry-with-backoff loop (or
retry a few times with delays) and only call releaseScheduleClaim if retries
ultimately fail, referencing processDueSchedules, addJob, finalizeSchedule,
releaseScheduleClaim and DB_SCHEDULER_CLAIM_TIMEOUT_MS so the claim is reliably
cleared/advanced or the at-least-once behavior is explicitly documented.
- Around line 179-189: stopScheduleWorker currently only clears workerIntervals
and flips dbScheduleTickRunning, but it does not wait for an in-flight
processDueSchedules tick which can race with shutdown (causing
addJob/finalizeSchedule/releaseScheduleClaim to run after workers are stopped);
fix by tracking the current in-flight Promise (e.g., store the Promise returned
by processDueSchedules in a module-scoped variable like currentScheduleTick) and
in stopScheduleWorker after clearing intervals await that Promise (or set a
stopping flag checked by processDueSchedules at key points to exit early) so
that any running tick completes or aborts before stopScheduleWorker returns.
In `@server/src/server.ts`:
- Around line 295-301: The current shutdown sequence short-circuits if one call
rejects; change the block so each of stopScheduleWorker, stopWorkers, and
stopGraphileWorkerUtils is awaited inside its own try/catch so failures in one
do not skip the others; for each catch, log a clear message including the full
error (not just error.message) and continue to the next shutdown call to ensure
all resources (Graphile pool/task runners) are always attempted to be stopped.
In `@server/src/storage/graphileWorker.ts`:
- Around line 25-39: When starting Graphile Worker utils, ensure the postgres
pool created as utilsPool is cleaned up on partial failures: if
makeWorkerUtils(...) or workerUtils.migrate() throws after utilsPool is
instantiated, call utilsPool.end() (and null/undefined out utilsPool) in the
catch before rethrowing so orphaned connections are closed; apply the same
cleanup pattern where utilsPool is created (also the similar block around
workerUtils usage at the later occurrence) and reference utilsPool,
makeWorkerUtils, workerUtils, isStarted and stopGraphileWorkerUtils to locate
the spots to add the pool shutdown.
- Around line 73-79: Remove the inline process.on('SIGTERM'/'SIGINT') listeners
from graphileWorker.ts so shutdown isn’t duplicated; instead export or keep
stopGraphileWorkerUtils() available and wire a single shutdown path from
server.ts that calls stopGraphileWorkerUtils() as part of the same ordered
shutdown sequence used for schedulers/migration draining (so scheduler and task
workers finish draining before releasing Graphile utils and causing late
addJob() failures).
In `@server/src/storage/schedule.ts`:
- Around line 10-22: computeNextRun returning null results in nextRunAt being
persisted as undefined so the DB worker (which filters on schedule->>'nextRunAt'
IS NOT NULL) will never pick up the robot; change the update flow in the block
that calls computeNextRun/crons (the variables nextRunAt, cronExpression,
timezone and the robot.update call) to validate that computeNextRun(...)
returned a non-null value and fail fast (throw or return an explicit error) if
it did not, do not call robot.update or persist the cron when nextRunAt is null,
and include the cronExpression/timezone in the error message so callers can
surface the problem.
- Around line 5-20: The scheduleWorkflow function currently ignores the userId
parameter creating an auth gap; update Robot.findOne in scheduleWorkflow to
include userId in the where clause and after fetch explicitly verify ownership
(e.g., assert robot.userId === userId and throw an authorization error if not).
Also add the same defensive ownership check in the two callers (the route
handlers in storage.ts and sdk.ts) before calling scheduleWorkflow so both the
DB lookup and the endpoints verify robot.userId matches the requester’s id.
In `@server/src/task-runner.ts`:
- Around line 624-648: The runnerPool created by new Pool(...) can be left open
if run(...) throws before workersStarted flips; modify the startup try/catch so
that when run(...) or any subsequent setup throws and runnerPool is defined, you
call runnerPool.end()/close/terminate (whichever the Pool instance API uses)
before rethrowing the error; ensure this cleanup occurs even when workersStarted
remains false so stopWorkers() doesn't need to rely on that flag to close the
pool. Target symbols: runnerPool, run(...), workersStarted, stopWorkers().
- Around line 453-458: The hasData check in task-runner.ts uses .length on
run.serializableOutput.scrapeSchema and scrapeList (within the hasData
assignment) which are keyed objects elsewhere, so change those checks to
Object.keys(run.serializableOutput.scrapeSchema).length > 0 and
Object.keys(run.serializableOutput.scrapeList).length > 0; preserve the existing
checks for crawl/search and binaryOutput. Make the identical fix in the
corresponding failure/abort partial-output paths where the same .length checks
are used (the other occurrence around the failure/abort handling) so partial
scrape outputs are detected correctly.
- Around line 307-317: This catch block currently performs run.update, emits
'run-completed' via serverIo.of(...).emit, calls
capture('maxun-oss-run-created', ...), and destroyRemoteBrowser(...) then
rethrows, causing duplicate teardown in the outer handler; remove the duplicated
cleanup/notification lines (the run.update call, the serverIo emits, the capture
call and destroyRemoteBrowser invocation) from this inner catch and leave only
"throw error" so the outer catch exclusively handles marking the run failed,
emitting socket/webhook events, and destroying the browser (refer to run.update,
serverIo.of(...).emit, capture('maxun-oss-run-created', ...),
destroyRemoteBrowser, and the throw error to locate the code).
---
Outside diff comments:
In `@server/src/routes/storage.ts`:
- Around line 1125-1136: The handler currently never responds on success and
mishandles queue errors; after a successful addJob(QUEUE_NAMES.EXECUTE_RUN, ...)
call return an HTTP success (e.g., 202/200) to the client
(res.status(...).json(...)) so the request doesn't hang; in the inner catch log
the actual error (include queueError.message), mark the Run as failed (update
the Run row/status for req.params.id and set finishedAt), call
destroyRemoteBrowser(plainRun.browserId) to clean up the orphaned browser, and
then send an error response (e.g., res.status(503).json(...)); mirror the
behavior used in the PUT /runs/:id handler (the update-to-failed +
destroyRemoteBrowser + 503 response) and keep addJob, QUEUE_NAMES.EXECUTE_RUN,
destroyRemoteBrowser and req.params.id as the referenced symbols.
---
Nitpick comments:
In `@server/src/schedule-worker.ts`:
- Around line 14-21: The hardcoded scheduler tunables (DB_SCHEDULER_BATCH_SIZE,
DB_SCHEDULER_POLL_MS, DB_SCHEDULER_CLAIM_TIMEOUT_MS,
DB_SCHEDULER_ADVISORY_LOCK_KEY) should be loaded from environment variables with
the current constants as defaults; update the module to read
process.env.DB_SCHEDULER_BATCH_SIZE, DB_SCHEDULER_POLL_MS,
DB_SCHEDULER_CLAIM_TIMEOUT_MS and DB_SCHEDULER_ADVISORY_LOCK_KEY (using parseInt
for numeric values and safe fallback to the existing constants) instead of
literal constants, keep the existing constant names as fallback defaults, and
add a short comment/docstring near DB_SCHEDULER_ADVISORY_LOCK_KEY advising teams
to allocate unique advisory lock keys to avoid collisions.
- Around line 29-85: The dual-locking in claimDueDbSchedules is intentional:
keep the pg_try_advisory_xact_lock call (DB_SCHEDULER_ADVISORY_LOCK_KEY) for
cluster-wide scheduler serialization and retain the Robot.findAll row-level lock
+ skipLocked (lock: transaction.LOCK.UPDATE, skipLocked: true) to protect
against concurrent writers from other code paths like
scheduleWorkflow/cancelScheduledWorkflow; add a concise inline comment above the
advisory lock (or immediately before the Robot.findAll) explaining this
defense-in-depth and warning maintainers not to remove either lock without
considering both cluster-wide and row-level concurrency.
- Around line 41-68: The Sequelize.literal clauses in the Robot.findAll query
interpolate now.toISOString() and claimExpiry.toISOString() directly, creating
SQL injection risk and relying on text comparison; update the where predicates
to use Sequelize.where with explicit casts like
"(schedule->>'nextRunAt')::timestamptz" and
"(schedule->>'schedulerClaimedAt')::timestamptz" and pass Date objects (now and
claimExpiry) to Sequelize/Op operators (e.g., Op.lte for nextRunAt and Op.lt for
schedulerClaimedAt) so Sequelize parameterizes the values securely; locate the
Robot.findAll call and replace the Sequelize.literal comparisons that reference
schedule->>'nextRunAt' and schedule->>'schedulerClaimedAt' with
Sequelize.where(...) expressions using Date objects and the appropriate Op
operators.
In `@server/src/storage/schedule.ts`:
- Around line 12-20: The code is using an unsafe "as any" cast when calling
robot.update with a mutated schedule, which bypasses the Robot.schedule type
(and hides renames like schedulerClaimedAt); fix by constructing a properly
typed schedule object instead of casting: import or reference the Robot schedule
type (e.g., Schedule or Robot['schedule']), build a Partial<Schedule> or a
newSchedule object containing cronExpression, timezone, nextRunAt and explicitly
typed schedulerClaimedAt (or omit it if optional), then pass that typed object
to robot.update (update the robot.update call/site to accept the typed shape) so
the compiler enforces the real schedule contract and the cast can be removed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 534c6d99-8bbb-4a5a-a0f8-fe36a0b3e644
📒 Files selected for processing (11)
legacy/server/pgboss-worker.tslegacy/server/pgboss.tspackage.jsonserver/src/models/Robot.tsserver/src/routes/record.tsserver/src/routes/storage.tsserver/src/schedule-worker.tsserver/src/server.tsserver/src/storage/graphileWorker.tsserver/src/storage/schedule.tsserver/src/task-runner.ts
| async function processDueSchedules(): Promise<void> { | ||
| if (dbScheduleTickRunning) return; | ||
| dbScheduleTickRunning = true; | ||
|
|
||
| try { | ||
| if (registeredQueues.has(queueName)) { | ||
| return; | ||
| } | ||
|
|
||
| await pgBoss.work(queueName, async (job: Job<ScheduledWorkflowData> | Job<ScheduledWorkflowData>[]) => { | ||
| const claimedRobots = await claimDueDbSchedules(); | ||
|
|
||
| for (const robot of claimedRobots) { | ||
| const executedAt = new Date(); | ||
| let dispatched = false; | ||
|
|
||
| try { | ||
| const singleJob = Array.isArray(job) ? job[0] : job; | ||
| return await processScheduledWorkflow(singleJob); | ||
| logger.log('info', `Dispatching scheduled workflow for robot ${robot.robotMetaId}`); | ||
| await addJob(QUEUE_NAMES.SCHEDULED_WORKFLOW, { robotMetaId: robot.robotMetaId, userId: robot.userId }, { maxAttempts: 6 }); | ||
| dispatched = true; | ||
| } catch (error: unknown) { | ||
| const errorMessage = error instanceof Error ? error.message : String(error); | ||
| logger.log('error', `Scheduled workflow job failed in queue ${queueName}: ${errorMessage}`); | ||
| throw error; | ||
| logger.log('error', `Scheduled workflow dispatch failed for robot ${robot.robotMetaId}: ${errorMessage}`); | ||
| } finally { | ||
| if (dispatched) { | ||
| await finalizeSchedule(robot.robotMetaId, executedAt); | ||
| } else { | ||
| await releaseScheduleClaim(robot.robotMetaId); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| registeredQueues.add(queueName); | ||
| logger.log('info', `Registered worker for queue: ${queueName}`); | ||
| } | ||
| } catch (error: unknown) { | ||
| const errorMessage = error instanceof Error ? error.message : String(error); | ||
| logger.log('error', `Failed to register worker for queue ${queueName}: ${errorMessage}`); | ||
| logger.log('error', `Failed to process DB-backed schedules: ${errorMessage}`); | ||
| } finally { | ||
| dbScheduleTickRunning = false; | ||
| } | ||
| } |
There was a problem hiding this comment.
At-least-once dispatch semantics if finalizeSchedule fails.
In processDueSchedules, when addJob succeeds, dispatched=true is set and finalizeSchedule is invoked. However, finalizeSchedule swallows all errors internally. If it fails (DB blip, transient connection issue), schedulerClaimedAt is never cleared and nextRunAt is never advanced — after DB_SCHEDULER_CLAIM_TIMEOUT_MS (10 minutes) the same row will be claimed again and addJob will be called a second time for the same executedAt window.
This is acceptable if downstream SCHEDULED_WORKFLOW handlers are idempotent, but worth confirming explicitly. At minimum, document the at-least-once contract, or have finalizeSchedule retry with backoff before giving up.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@server/src/schedule-worker.ts` around lines 122 - 154, processDueSchedules
currently sets dispatched=true after addJob but relies on finalizeSchedule to
clear schedulerClaimedAt/advance nextRunAt; since finalizeSchedule can swallow
errors this yields at-least-once dispatches after DB_SCHEDULER_CLAIM_TIMEOUT_MS.
Fix by ensuring finalizeSchedule succeeds (or you document the at-least-once
contract): wrap the call to finalizeSchedule(robot.robotMetaId, executedAt) in a
retry-with-backoff loop (or retry a few times with delays) and only call
releaseScheduleClaim if retries ultimately fail, referencing
processDueSchedules, addJob, finalizeSchedule, releaseScheduleClaim and
DB_SCHEDULER_CLAIM_TIMEOUT_MS so the claim is reliably cleared/advanced or the
at-least-once behavior is explicitly documented.
| export async function stopScheduleWorker(): Promise<void> { | ||
| if (!scheduleWorkerStarted) return; | ||
|
|
||
| process.on('SIGTERM', async () => { | ||
| logger.log('info', 'SIGTERM received, shutting down PgBoss scheduler...'); | ||
| await pgBoss.stop(); | ||
| logger.log('info', 'PgBoss scheduler stopped, ready for termination'); | ||
| }); | ||
| for (const interval of workerIntervals.splice(0)) { | ||
| clearInterval(interval); | ||
| } | ||
|
|
||
| process.on('SIGINT', async () => { | ||
| logger.log('info', 'SIGINT received, shutting down PgBoss scheduler...'); | ||
| await pgBoss.stop(); | ||
| logger.log('info', 'PgBoss scheduler stopped, waiting for main process cleanup...'); | ||
| }); | ||
| scheduleWorkerStarted = false; | ||
| dbScheduleTickRunning = false; | ||
| logger.log('info', 'Schedule worker stopped'); | ||
| } |
There was a problem hiding this comment.
stopScheduleWorker does not await an in-flight tick.
stopScheduleWorker only clears the interval and resets dbScheduleTickRunning, but if a tick is currently mid-execution it will continue: it may call addJob and finalizeSchedule/releaseScheduleClaim after stopWorkers/stopGraphileWorkerUtils have already started. In server.ts the shutdown sequence is stopScheduleWorker → stopWorkers → stopGraphileWorkerUtils, so an in-flight tick can race with the Graphile pool shutdown and lose dispatches or generate noisy errors.
Consider awaiting the in-flight tick (e.g., track the current processDueSchedules promise and await it after clearing the interval), or set a "stopping" flag that processDueSchedules checks before continuing the per-robot loop.
♻️ Suggested approach
let scheduleWorkerStarted = false;
let dbScheduleTickRunning = false;
+let inflightTick: Promise<void> | null = null;
const workerIntervals: NodeJS.Timeout[] = [];
@@
- setImmediate(() => {
- processDueSchedules().catch((error) => {
- logger.log('error', `Initial DB schedule poll failed: ${error instanceof Error ? error.message : String(error)}`);
- });
- });
+ setImmediate(() => {
+ inflightTick = processDueSchedules()
+ .catch((error) => logger.log('error', `Initial DB schedule poll failed: ${error instanceof Error ? error.message : String(error)}`))
+ .finally(() => { inflightTick = null; });
+ });
- const interval = setInterval(() => {
- processDueSchedules().catch((error) => {
- logger.log('error', `DB schedule poll failed: ${error instanceof Error ? error.message : String(error)}`);
- });
- }, DB_SCHEDULER_POLL_MS);
+ const interval = setInterval(() => {
+ inflightTick = processDueSchedules()
+ .catch((error) => logger.log('error', `DB schedule poll failed: ${error instanceof Error ? error.message : String(error)}`))
+ .finally(() => { inflightTick = null; });
+ }, DB_SCHEDULER_POLL_MS);
@@
export async function stopScheduleWorker(): Promise<void> {
if (!scheduleWorkerStarted) return;
for (const interval of workerIntervals.splice(0)) {
clearInterval(interval);
}
+ if (inflightTick) {
+ try { await inflightTick; } catch { /* already logged */ }
+ }
+
scheduleWorkerStarted = false;
dbScheduleTickRunning = false;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@server/src/schedule-worker.ts` around lines 179 - 189, stopScheduleWorker
currently only clears workerIntervals and flips dbScheduleTickRunning, but it
does not wait for an in-flight processDueSchedules tick which can race with
shutdown (causing addJob/finalizeSchedule/releaseScheduleClaim to run after
workers are stopped); fix by tracking the current in-flight Promise (e.g., store
the Promise returned by processDueSchedules in a module-scoped variable like
currentScheduleTick) and in stopScheduleWorker after clearing intervals await
that Promise (or set a stopping flag checked by processDueSchedules at key
points to exit early) so that any running tick completes or aborts before
stopScheduleWorker returns.
| try { | ||
| await stopScheduleWorker(); | ||
| await stopWorkers(); | ||
| await stopGraphileWorkerUtils(); | ||
| } catch (workerError: any) { | ||
| console.error('Error stopping workers:', workerError.message); | ||
| } |
There was a problem hiding this comment.
One stop failure short-circuits the rest of the worker shutdown.
If stopScheduleWorker() rejects, stopWorkers() and stopGraphileWorkerUtils() are skipped, leaving the Graphile pool and task runners alive — the process will likely hang on shutdown (or process.exit will run with sockets still open). Wrap each call in its own try/catch so a failure in one stage does not skip the others.
🛡️ Proposed fix
- try {
- await stopScheduleWorker();
- await stopWorkers();
- await stopGraphileWorkerUtils();
- } catch (workerError: any) {
- console.error('Error stopping workers:', workerError.message);
- }
+ for (const [name, stopFn] of [
+ ['schedule worker', stopScheduleWorker],
+ ['task workers', stopWorkers],
+ ['graphile worker utils', stopGraphileWorkerUtils],
+ ] as const) {
+ try {
+ await stopFn();
+ } catch (workerError: any) {
+ console.error(`Error stopping ${name}:`, workerError?.message ?? workerError);
+ shutdownSuccessful = false;
+ }
+ }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@server/src/server.ts` around lines 295 - 301, The current shutdown sequence
short-circuits if one call rejects; change the block so each of
stopScheduleWorker, stopWorkers, and stopGraphileWorkerUtils is awaited inside
its own try/catch so failures in one do not skip the others; for each catch, log
a clear message including the full error (not just error.message) and continue
to the next shutdown call to ensure all resources (Graphile pool/task runners)
are always attempted to be stopped.
| try { | ||
| utilsPool = new Pool({ | ||
| connectionString, | ||
| max: 3, | ||
| ssl: useSSL ? true : undefined, | ||
| }); | ||
|
|
||
| workerUtils = await makeWorkerUtils({ pgPool: utilsPool }); | ||
| await workerUtils.migrate(); | ||
| isStarted = true; | ||
| logger.log('info', 'Graphile Worker utils started'); | ||
| } catch (error: any) { | ||
| logger.log('error', `Failed to start Graphile Worker utils: ${error.message}`); | ||
| throw error; | ||
| } |
There was a problem hiding this comment.
Clean up the utils pool on partial startup failures.
If makeWorkerUtils() or migrate() throws after utilsPool is created, the pool stays open and stopGraphileWorkerUtils() will never touch it because isStarted is still false. That leaves orphaned DB connections after a failed boot.
Suggested fix
export async function startGraphileWorkerUtils(): Promise<void> {
@@
} catch (error: any) {
+ if (workerUtils) {
+ try {
+ await workerUtils.release();
+ } catch (_) {
+ // ignore secondary cleanup failure
+ } finally {
+ workerUtils = null;
+ }
+ }
+ if (utilsPool) {
+ try {
+ await utilsPool.end();
+ } catch (_) {
+ // ignore secondary cleanup failure
+ } finally {
+ utilsPool = null;
+ }
+ }
logger.log('error', `Failed to start Graphile Worker utils: ${error.message}`);
throw error;
}
}
export async function stopGraphileWorkerUtils(): Promise<void> {
- if (!isStarted) return;
+ if (!isStarted && !workerUtils && !utilsPool) return;Also applies to: 42-43
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@server/src/storage/graphileWorker.ts` around lines 25 - 39, When starting
Graphile Worker utils, ensure the postgres pool created as utilsPool is cleaned
up on partial failures: if makeWorkerUtils(...) or workerUtils.migrate() throws
after utilsPool is instantiated, call utilsPool.end() (and null/undefined out
utilsPool) in the catch before rethrowing so orphaned connections are closed;
apply the same cleanup pattern where utilsPool is created (also the similar
block around workerUtils usage at the later occurrence) and reference utilsPool,
makeWorkerUtils, workerUtils, isStarted and stopGraphileWorkerUtils to locate
the spots to add the pool shutdown.
| process.on('SIGTERM', async () => { | ||
| await stopGraphileWorkerUtils(); | ||
| }); | ||
|
|
||
| process.on('SIGINT', async () => { | ||
| await stopGraphileWorkerUtils(); | ||
| }); |
There was a problem hiding this comment.
Let server.ts own process signal handling.
These listeners create a second shutdown path that can release Graphile utils before the scheduler and task workers finish draining. During a SIGTERM/SIGINT, that makes late addJob() calls fail out of order.
Suggested fix
-process.on('SIGTERM', async () => {
- await stopGraphileWorkerUtils();
-});
-
-process.on('SIGINT', async () => {
- await stopGraphileWorkerUtils();
-});Wire shutdown once, from server.ts, in the same ordered sequence used for the rest of the migration.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@server/src/storage/graphileWorker.ts` around lines 73 - 79, Remove the inline
process.on('SIGTERM'/'SIGINT') listeners from graphileWorker.ts so shutdown
isn’t duplicated; instead export or keep stopGraphileWorkerUtils() available and
wire a single shutdown path from server.ts that calls stopGraphileWorkerUtils()
as part of the same ordered shutdown sequence used for schedulers/migration
draining (so scheduler and task workers finish draining before releasing
Graphile utils and causing late addJob() failures).
| export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise<void> { | ||
| try { | ||
| const runId = uuid(); | ||
|
|
||
| const queueName = `scheduled-workflow-${id}`; | ||
|
|
||
| logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`); | ||
|
|
||
| await pgBossClient.createQueue(queueName); | ||
|
|
||
| await pgBossClient.schedule(queueName, cronExpression, | ||
| { id, runId, userId }, | ||
| { tz: timezone } | ||
| ); | ||
|
|
||
| await registerWorkerForQueue(queueName); | ||
| const robot = await Robot.findOne({ where: { 'recording_meta.id': id } }); | ||
| if (!robot) throw new Error(`Robot ${id} not found`); | ||
|
|
||
| const nextRunAt = computeNextRun(cronExpression, timezone) || undefined; | ||
|
|
||
| await robot.update({ | ||
| schedule: { | ||
| ...robot.schedule, | ||
| cronExpression, | ||
| timezone, | ||
| nextRunAt, | ||
| schedulerClaimedAt: undefined, | ||
| } as any, | ||
| }); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -nP --type=ts -C5 '\bscheduleWorkflow\s*\('Repository: getmaxun/maxun
Length of output: 2497
🏁 Script executed:
fd -t f 'Robot\.(ts|js)$' | head -5Repository: getmaxun/maxun
Length of output: 85
🏁 Script executed:
cat -n server/src/models/Robot.ts | head -100Repository: getmaxun/maxun
Length of output: 3645
🏁 Script executed:
sed -n '1230,1250p' server/src/routes/storage.tsRepository: getmaxun/maxun
Length of output: 590
🏁 Script executed:
sed -n '545,560p' server/src/api/sdk.tsRepository: getmaxun/maxun
Length of output: 802
🏁 Script executed:
sed -n '1210,1242p' server/src/routes/storage.tsRepository: getmaxun/maxun
Length of output: 1129
🏁 Script executed:
sed -n '520,552p' server/src/api/sdk.tsRepository: getmaxun/maxun
Length of output: 1609
🏁 Script executed:
sed -n '1170,1242p' server/src/routes/storage.tsRepository: getmaxun/maxun
Length of output: 2652
🏁 Script executed:
sed -n '450,552p' server/src/api/sdk.tsRepository: getmaxun/maxun
Length of output: 4635
🏁 Script executed:
sed -n '1140,1175p' server/src/routes/storage.tsRepository: getmaxun/maxun
Length of output: 1260
🏁 Script executed:
sed -n '400,450p' server/src/api/sdk.tsRepository: getmaxun/maxun
Length of output: 2090
🏁 Script executed:
sed -n '360,415p' server/src/api/sdk.tsRepository: getmaxun/maxun
Length of output: 1472
🏁 Script executed:
sed -n '1155,1172p' server/src/routes/storage.tsRepository: getmaxun/maxun
Length of output: 570
🏁 Script executed:
sed -n '380,400p' server/src/api/sdk.tsRepository: getmaxun/maxun
Length of output: 563
Critical authorization gap: userId parameter ignored, allowing any authenticated user to schedule another user's robot.
scheduleWorkflow accepts userId but never uses it to verify ownership. Both calling routes (storage.ts and sdk.ts) fetch the robot without checking robot.userId === req.user.id. This allows an authenticated attacker to schedule any robot by ID, regardless of ownership.
The Robot model confirms robots are user-owned entities. Add userId to both the Robot.findOne where clause in scheduleWorkflow AND verify ownership in the calling routes as defense-in-depth.
🛡️ Proposed fix
export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise<void> {
try {
- const robot = await Robot.findOne({ where: { 'recording_meta.id': id } });
- if (!robot) throw new Error(`Robot ${id} not found`);
+ const robot = await Robot.findOne({ where: { 'recording_meta.id': id, userId } });
+ if (!robot) throw new Error(`Robot ${id} not found for user ${userId}`);Additionally, verify ownership in calling routes:
const robot = await Robot.findOne({ where: { 'recording_meta.id': id } });
if (!robot) {
return res.status(404).json({ error: 'Robot not found' });
}
+ if (robot.userId !== req.user.id) {
+ return res.status(403).json({ error: 'Forbidden' });
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise<void> { | |
| try { | |
| const runId = uuid(); | |
| const queueName = `scheduled-workflow-${id}`; | |
| logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`); | |
| await pgBossClient.createQueue(queueName); | |
| await pgBossClient.schedule(queueName, cronExpression, | |
| { id, runId, userId }, | |
| { tz: timezone } | |
| ); | |
| await registerWorkerForQueue(queueName); | |
| const robot = await Robot.findOne({ where: { 'recording_meta.id': id } }); | |
| if (!robot) throw new Error(`Robot ${id} not found`); | |
| const nextRunAt = computeNextRun(cronExpression, timezone) || undefined; | |
| await robot.update({ | |
| schedule: { | |
| ...robot.schedule, | |
| cronExpression, | |
| timezone, | |
| nextRunAt, | |
| schedulerClaimedAt: undefined, | |
| } as any, | |
| }); | |
| export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise<void> { | |
| try { | |
| const robot = await Robot.findOne({ where: { 'recording_meta.id': id, userId } }); | |
| if (!robot) throw new Error(`Robot ${id} not found for user ${userId}`); | |
| const nextRunAt = computeNextRun(cronExpression, timezone) || undefined; | |
| await robot.update({ | |
| schedule: { | |
| ...robot.schedule, | |
| cronExpression, | |
| timezone, | |
| nextRunAt, | |
| schedulerClaimedAt: undefined, | |
| } as any, | |
| }); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@server/src/storage/schedule.ts` around lines 5 - 20, The scheduleWorkflow
function currently ignores the userId parameter creating an auth gap; update
Robot.findOne in scheduleWorkflow to include userId in the where clause and
after fetch explicitly verify ownership (e.g., assert robot.userId === userId
and throw an authorization error if not). Also add the same defensive ownership
check in the two callers (the route handlers in storage.ts and sdk.ts) before
calling scheduleWorkflow so both the DB lookup and the endpoints verify
robot.userId matches the requester’s id.
| const nextRunAt = computeNextRun(cronExpression, timezone) || undefined; | ||
|
|
||
| await robot.update({ | ||
| schedule: { | ||
| ...robot.schedule, | ||
| cronExpression, | ||
| timezone, | ||
| nextRunAt, | ||
| schedulerClaimedAt: undefined, | ||
| } as any, | ||
| }); | ||
|
|
||
| logger.log('info', `Scheduled workflow job for robot ${id}`); | ||
| logger.log('info', `Scheduled robot ${id} (cron: ${cronExpression}, tz: ${timezone}, next: ${nextRunAt})`); |
There was a problem hiding this comment.
Silent no-op if computeNextRun returns null.
If cronExpression is invalid (or computeNextRun returns null for any reason), nextRunAt becomes undefined and gets persisted. The DB-backed schedule worker filters on schedule->>'nextRunAt' IS NOT NULL, so the robot is silently never picked up while the user still sees the cron persisted on the robot. Consider failing fast with a clear error so the caller can surface it.
🛡️ Suggested guard
const nextRunAt = computeNextRun(cronExpression, timezone) || undefined;
+ if (!nextRunAt) {
+ throw new Error(`Invalid cron expression or timezone for robot ${id}: cron="${cronExpression}", tz="${timezone}"`);
+ }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@server/src/storage/schedule.ts` around lines 10 - 22, computeNextRun
returning null results in nextRunAt being persisted as undefined so the DB
worker (which filters on schedule->>'nextRunAt' IS NOT NULL) will never pick up
the robot; change the update flow in the block that calls computeNextRun/crons
(the variables nextRunAt, cronExpression, timezone and the robot.update call) to
validate that computeNextRun(...) returned a non-null value and fail fast (throw
or return an explicit error) if it did not, do not call robot.update or persist
the cron when nextRunAt is null, and include the cronExpression/timezone in the
error message so callers can surface the problem.
| } catch (error: any) { | ||
| logger.log('error', `Scrape conversion failed for run ${data.runId}: ${error.message}`); | ||
| await run.update({ status: 'failed', finishedAt: new Date().toLocaleString(), log: `Conversion failed: ${error.message}` }); | ||
| try { | ||
| const failureData = { runId: data.runId, robotMetaId: plainRun.robotMetaId, robotName: recording.recording_meta.name, status: 'failed', finishedAt: new Date().toLocaleString() }; | ||
| serverIo.of(browserId).emit('run-completed', failureData); | ||
| serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureData); | ||
| } catch (_) {} | ||
| capture('maxun-oss-run-created', { runId: data.runId, user_id: data.userId, status: 'failed', robot_type: 'scrape', source: 'manual' }); | ||
| await destroyRemoteBrowser(browserId, data.userId); | ||
| throw error; |
There was a problem hiding this comment.
Avoid tearing the scrape run down twice.
This catch already marks the run failed, emits failure events, and destroys the browser, then rethrows into the outer failure handler which does the same work again. That can produce duplicate notifications and makes destroyRemoteBrowser() idempotence a release dependency.
Suggested direction
} catch (error: any) {
logger.log('error', `Scrape conversion failed for run ${data.runId}: ${error.message}`);
- await run.update({ status: 'failed', finishedAt: new Date().toLocaleString(), log: `Conversion failed: ${error.message}` });
- try {
- const failureData = { runId: data.runId, robotMetaId: plainRun.robotMetaId, robotName: recording.recording_meta.name, status: 'failed', finishedAt: new Date().toLocaleString() };
- serverIo.of(browserId).emit('run-completed', failureData);
- serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureData);
- } catch (_) {}
- capture('maxun-oss-run-created', { runId: data.runId, user_id: data.userId, status: 'failed', robot_type: 'scrape', source: 'manual' });
- await destroyRemoteBrowser(browserId, data.userId);
throw error;
}Let the outer catch own the failed update / webhook / socket / browser cleanup path, or return here without rethrowing.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@server/src/task-runner.ts` around lines 307 - 317, This catch block currently
performs run.update, emits 'run-completed' via serverIo.of(...).emit, calls
capture('maxun-oss-run-created', ...), and destroyRemoteBrowser(...) then
rethrows, causing duplicate teardown in the outer handler; remove the duplicated
cleanup/notification lines (the run.update call, the serverIo emits, the capture
call and destroyRemoteBrowser invocation) from this inner catch and leave only
"throw error" so the outer catch exclusively handles marking the run failed,
emitting socket/webhook events, and destroying the browser (refer to run.update,
serverIo.of(...).emit, capture('maxun-oss-run-created', ...),
destroyRemoteBrowser, and the throw error to locate the code).
| const hasData = (run.serializableOutput && ( | ||
| (run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) || | ||
| (run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0) || | ||
| (run.serializableOutput.crawl && Object.keys(run.serializableOutput.crawl).length > 0) || | ||
| (run.serializableOutput.search && Object.keys(run.serializableOutput.search).length > 0))) || | ||
| (run.binaryOutput && Object.keys(run.binaryOutput).length > 0); |
There was a problem hiding this comment.
Use object emptiness checks for partial scrape output.
serializableOutput.scrapeSchema and scrapeList are handled as keyed objects elsewhere in this file, so .length is always falsey here. Failed or aborted runs with extracted data will skip integration updates and report no partial data.
Suggested fix
- (run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) ||
- (run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0) ||
+ (run.serializableOutput.scrapeSchema && Object.keys(run.serializableOutput.scrapeSchema).length > 0) ||
+ (run.serializableOutput.scrapeList && Object.keys(run.serializableOutput.scrapeList).length > 0) ||Apply the same correction in both failure/abort paths.
Also applies to: 543-546
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@server/src/task-runner.ts` around lines 453 - 458, The hasData check in
task-runner.ts uses .length on run.serializableOutput.scrapeSchema and
scrapeList (within the hasData assignment) which are keyed objects elsewhere, so
change those checks to Object.keys(run.serializableOutput.scrapeSchema).length >
0 and Object.keys(run.serializableOutput.scrapeList).length > 0; preserve the
existing checks for crawl/search and binaryOutput. Make the identical fix in the
corresponding failure/abort partial-output paths where the same .length checks
are used (the other occurrence around the failure/abort handling) so partial
scrape outputs are detected correctly.
| runnerPool = new Pool({ | ||
| connectionString, | ||
| max: TOTAL_CONCURRENCY + 2, | ||
| ssl: useSSL ? true : undefined, | ||
| }); | ||
|
|
||
| runner = await run({ | ||
| pgPool: runnerPool, | ||
| concurrency: TOTAL_CONCURRENCY, | ||
| noHandleSignals: true, | ||
| taskList, | ||
| }); | ||
|
|
||
| runner.events.on('job:error', ({ job, error }: { job: any; error: unknown }) => { | ||
| const msg = error instanceof Error ? error.message : String(error); | ||
| logger.log('error', `Job error for job ${job?.id} (${job?.task_identifier}): ${msg}`); | ||
| }); | ||
|
|
||
| workersStarted = true; | ||
| logger.log('info', `Graphile Worker runner started (concurrency: ${TOTAL_CONCURRENCY})`); | ||
| } catch (error: unknown) { | ||
| const errorMessage = error instanceof Error ? error.message : String(error); | ||
| logger.log('error', `Failed to start Graphile Worker runner: ${errorMessage}`); | ||
| throw error; | ||
| } |
There was a problem hiding this comment.
Close runnerPool when worker startup fails.
If run(...) throws after new Pool(...), this path rethrows with an open pool. stopWorkers() then skips cleanup because workersStarted never flipped, so a failed boot can leak PostgreSQL connections.
Suggested fix
export async function startWorkers(): Promise<void> {
@@
} catch (error: unknown) {
+ if (runnerPool) {
+ try {
+ await runnerPool.end();
+ } catch (poolError: any) {
+ logger.log('warn', `Error closing runner pool after failed startup: ${poolError.message}`);
+ } finally {
+ runnerPool = null;
+ }
+ }
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Failed to start Graphile Worker runner: ${errorMessage}`);
throw error;
}
}
export async function stopWorkers(): Promise<void> {
- if (!workersStarted || !runner) return;
+ if (!workersStarted && !runner && !runnerPool) return;Also applies to: 651-652
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@server/src/task-runner.ts` around lines 624 - 648, The runnerPool created by
new Pool(...) can be left open if run(...) throws before workersStarted flips;
modify the startup try/catch so that when run(...) or any subsequent setup
throws and runnerPool is defined, you call runnerPool.end()/close/terminate
(whichever the Pool instance API uses) before rethrowing the error; ensure this
cleanup occurs even when workersStarted remains false so stopWorkers() doesn't
need to rely on that flag to close the pool. Target symbols: runnerPool,
run(...), workersStarted, stopWorkers().
What this PR does?
Ports the job orchestration functionality from pgboss to graphile worker.
Resolves #1058
Summary by CodeRabbit
Refactor