-
-
Notifications
You must be signed in to change notification settings - Fork 671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
re2: New release concurrency system #1804
Conversation
|
Caution Review failedThe pull request is closed. WalkthroughThe changes span numerous areas of the project. Debug and configuration files now use more specific commands and improved logging. API schemas and database migrations have been updated to include new status values and additional fields (such as project, organization, and concurrency-related columns). Refactoring in the RunEngine routes replaces deprecated modules and simplifies parameter passing (e.g., removing environment IDs from wait functions and making release concurrency configurable). In addition, several new systems and modules have been added—including token bucket–based concurrency control, batch, checkpoint, delayed run, waitpoint, and TTL systems—to manage task execution, while a comprehensive update of tests further validates system reliability. Changes
Sequence Diagram(s)sequenceDiagram
participant RE as RunEngine
participant RS as ReleaseConcurrencySystem
participant TBQ as TokenBucketQueue
participant Redis as Redis DB
RE->>RS: Request to release concurrency (with snapshot details)
RS->>TBQ: Attempt token release via token bucket algorithm
TBQ->>Redis: Check token availability and consume token
Redis-->>TBQ: Token status returned
TBQ-->>RS: Confirm token released or schedule retry
RS-->>RE: Concurrency release status acknowledgment
sequenceDiagram
participant RE as RunEngine
participant WS as WaitpointSystem
participant Worker as Worker
RE->>WS: Block run with waitpoint (releaseConcurrency flag)
WS->>Worker: Schedule waitpoint monitoring/completion
Worker-->>WS: Notify waitpoint cleared
WS-->>RE: Signal run unblocked and ready to continue
Suggested reviewers
Tip ⚡🧪 Multi-step agentic review comment chat (experimental)
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🔭 Outside diff range comments (1)
internal-packages/run-queue/src/run-queue/tests/nack.test.ts (1)
11-12
: 💡 Verification agent🧩 Analysis chain
Missing test implementation
The file has all the necessary imports but appears to be missing the actual test implementation. The comment on line 11 indicates "existing code" but this is marked as a new file. Please add the test implementations that verify the
nackMessage
functionality, particularly tests for:
- Verifying concurrency is cleared when a message is nacked
- Confirming messages move to the dead letter queue after maximum attempts
According to the PR summary, this should test the new release concurrency system.
Let's verify if the file actually contains any test implementation for
nackMessage
.Once the output is reviewed and it confirms that only imports (and placeholder code like "// ... existing code ...") are present without actual tests, this confirms the missing implementation as noted in the review comment.
Assuming the file output indicates the absence of test cases, here’s the rewritten review comment:
Critical: Missing Test Implementation in nack.test.ts
- The file
internal-packages/run-queue/src/run-queue/tests/nack.test.ts
currently includes the necessary imports but does not implement any tests.- Please add test cases for the
nackMessage
functionality, specifically:
- Verifying that concurrency is cleared when a message is nacked.
- Confirming that messages are moved to the dead letter queue after reaching the maximum retry attempts.
- This test is essential for validating the new release concurrency system as described in the PR summary.
🏁 Script executed:
#!/bin/bash # Display the contents of the nack.test.ts file to inspect its implementation cat internal-packages/run-queue/src/run-queue/tests/nack.test.tsLength of output: 555
Critical: Missing Test Implementation in nack.test.ts
- The file
internal-packages/run-queue/src/run-queue/tests/nack.test.ts
currently only contains the necessary imports and a placeholder comment ("... existing code ...") without any actual test cases.- Please implement tests for the
nackMessage
functionality, specifically:
- Verifying that concurrency is cleared when a message is nacked.
- Confirming that messages are moved to the dead letter queue after exceeding the maximum retry attempts.
- This is essential to validate the new release concurrency system as outlined in the PR summary.
🧹 Nitpick comments (55)
internal-packages/database/prisma/migrations/20250319131807_add_locked_queue_id_to_task_run/migration.sql (1)
1-5
: Consider adding a non-null constraint or foreign key referenceThe migration adds a
lockedQueueId
column which appears to be related to queue locking in the new concurrency system. Consider whether this column should have constraints like NOT NULL or a foreign key reference to ensure data integrity.internal-packages/database/prisma/migrations/20250319114436_add_metadata_to_task_run_execution_snapshots/migration.sql (1)
1-5
: Consider adding a default empty object for JSONB columnThe migration adds a metadata column of type JSONB. Consider adding a default empty object value (
DEFAULT '{}'::jsonb
) to ensure consistent behavior when querying records that don't have metadata set.internal-packages/run-engine/src/run-queue/tests/releaseConcurrency.test.ts (1)
46-95
: Good test case for releasing concurrency.This test correctly verifies that after dequeueing a message and releasing concurrency, both the queue and environment concurrency counts are properly updated to zero.
One thing to note - line 84 calls
releaseAllConcurrency
but the test description refers toreleaseConcurrency
. Consider aligning the method name in the test description with the actual method being called for better clarity.internal-packages/database/prisma/migrations/20250318163201_add_previous_snapshot_id_to_task_run_execution_snapshot/migration.sql (1)
1-8
: Investigate index recreation patternThe migration drops and recreates the
SecretStore_key_idx
index withtext_pattern_ops
. This pattern suggests an optimization for PostgreSQL's LIKE queries using the text_pattern_ops operator class.While the migration will work, there are a few considerations:
- This creates a brief window where the index is unavailable, which could impact performance in production
- No explanation is provided for why the index is being modified
Consider adding a comment explaining why the index is being changed to use text_pattern_ops, and consider using a more explicit migration naming if this is the primary purpose of the change.
-- DropIndex DROP INDEX "SecretStore_key_idx"; + -- Recreating index with text_pattern_ops for better performance with LIKE queries -- CreateIndex CREATE INDEX "SecretStore_key_idx" ON "SecretStore"("key" text_pattern_ops);packages/core/src/v3/schemas/api.ts (1)
960-962
: Consider adding JSDoc for the releaseConcurrency parameterWhile the parameter has been added correctly, there's no associated documentation to explain its purpose and behavior in this context.
Add JSDoc documentation similar to what exists in the
wait.ts
file to ensure consistent documentation across the codebase:idempotencyKeyTTL: z.string().optional(), + /** + * If set to true, this will cause the waitpoint to release the current run from the queue's concurrency, + * allowing other runs to execute while this waitpoint is pending. + * + * @default false + */ releaseConcurrency: z.boolean().optional(), date: z.coerce.date(),internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts (1)
169-226
: Duplicate test caseThis test appears to be a duplicate of the previous test (lines 110-167) that checks if the run is already being counted as concurrency. The test description, setup, and assertions are identical to the previous test.
You should remove this duplicate test or modify it to test a different scenario to avoid redundancy and improve test suite maintenance.
internal-packages/run-engine/src/engine/errors.ts (1)
70-75
: NotImplementedError logs but consider adding proper nameThe
NotImplementedError
class correctly logs an error message to the console, but unlike other error classes, it doesn't set a custom name property. For consistency, consider addingthis.name = "NotImplementedError"
.export class NotImplementedError extends Error { constructor(message: string) { console.error("This isn't implemented", { message }); super(message); + this.name = "NotImplementedError"; } }
internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts (2)
48-133
: Comprehensive test for basic dequeue functionalityThis test case thoroughly validates the dequeue functionality by checking queue state before and after operations, properly verifying message properties and concurrency metrics.
However, consider extracting magic numbers like
10
in the dequeue call to named constants for better readability.+const DEFAULT_BATCH_SIZE = 10; // Later in the code: -const dequeued = await queue.dequeueMessageFromMasterQueue("test_12345", envMasterQueue, 10); +const dequeued = await queue.dequeueMessageFromMasterQueue("test_12345", envMasterQueue, DEFAULT_BATCH_SIZE);
1-266
: Consider adding tests for different concurrency key scenariosThe current tests validate concurrency limits at the environment and queue level, but don't test behavior with different concurrency keys within the same queue.
Consider adding a test case that verifies multiple runs with different concurrency keys can be processed simultaneously, while runs with the same concurrency key are serialized.
internal-packages/run-engine/src/engine/tests/delays.test.ts (2)
11-11
: Unused import - TaskRunErrorCodesThe
TaskRunErrorCodes
import is not used in this file.-import { TaskRunErrorCodes } from "@trigger.dev/core/v3";
195-297
: Comprehensive test for TTL functionalityThe new test case thoroughly validates the time-to-live functionality for delayed runs, ensuring that runs expire correctly after their TTL period.
However, the timing in this test is quite tight and could lead to flaky tests in CI environments. Consider making the assertions more resilient:
-//wait for 1 seconds -await setTimeout(2_500); +//wait for delay to elapse (with buffer) +await setTimeout(2_500); -//wait for 3 seconds -await setTimeout(3_000); +//wait for TTL to expire (with buffer) +await setTimeout(3_000); -//should now be expired -const executionData3 = await engine.getRunExecutionData({ runId: run.id }); -assertNonNullable(executionData3); -expect(executionData3.snapshot.executionStatus).toBe("FINISHED"); +//should now be expired (retry a few times to avoid timing issues) +let executionData3; +let attempts = 0; +const maxAttempts = 5; +while (attempts < maxAttempts) { + executionData3 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData3); + if (executionData3.snapshot.executionStatus === "FINISHED") { + break; + } + await setTimeout(500); + attempts++; +} +expect(executionData3.snapshot.executionStatus).toBe("FINISHED");internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (1)
117-120
: Address the TODO regarding invalid dequeue state.A comment indicates uncertainty about run re-queuing. You may need to update the run’s status or adjust concurrency if you want to retry.
I can help draft a fallback flow to mark the run as dequeuable again. Let me know if you'd like me to open an issue or propose a patch.
internal-packages/run-engine/src/engine/types.ts (1)
108-109
: Add tests forreleaseConcurrency
property
This new boolean flag can significantly impact triggers. Consider adding coverage in integration tests to confirm it's respected during runs.internal-packages/run-engine/src/run-queue/index.test.ts (2)
651-739
: Test coverage for concurrency release is comprehensive
The new "Releasing concurrency" test thoroughly validates concurrency states (before and after release). Consider additional scenarios with multiple messages or concurrency keys to further ensure robust coverage.
799-823
: Extended Dead Letter Queue testing
These lines enhance verification of items in the DLQ and their subsequent re-drive. Looks good overall. For completeness, consider testing concurrency states after redriving from the DLQ as well.internal-packages/run-engine/README.md (2)
102-102
: Refine phrasing with a comma after “By default”
A minor style improvement could make it read more naturally.-By default it would wait again. +By default, it would wait again.🧰 Tools
🪛 LanguageTool
[uncategorized] ~102-~102: Did you mean: “By default,”?
Context: ...so a second attempt doesn't wait again. By default it would wait again. ```ts //Note if t...(BY_DEFAULT_COMMA)
113-113
: Consider adding a comma before “so”
This sentence can be more readable with a comma inserted.-the run will still continue but with the errors so the developer can decide what to do +the run will still continue but with the errors, so the developer can decide what to do🧰 Tools
🪛 LanguageTool
[uncategorized] ~113-~113: Use a comma before ‘so’ if it connects two independent clauses (unless they are closely connected and short).
Context: ... will still continue but with the errors so the developer can decide what to do. #...(COMMA_COMPOUND_SENTENCE_2)
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (1)
51-161
: Comprehensive concurrency release logic
Good approach: verifying valid snapshots, locking the run, and releasing concurrency scope as needed. Consider additional logs if partial Redis configs cause unexpected release issues.internal-packages/run-engine/src/engine/systems/checkpointSystem.ts (4)
35-46
: Clarify the checkpoint creation process in the JSDoc.The docstring is brief, but extra details about how the checkpoint snapshot is validated and persisted could help future maintainers. Consider elaborating on any assumptions or side effects (e.g., concurrency lock, wait states, event emissions).
94-118
: Add test coverage for non-checkpointable states.When
snapshot.executionStatus
isn’t checkpointable, you discard the checkpoint. Consider adding or expanding tests to ensure coverage of each ineligible status, verifying correct event emissions and error handling.
219-246
: Improve error clarity for invalid snapshot resumes.When
continueRunExecution
is called and the snapshot ID or status is invalid, you raise a genericServiceValidationError
. Including more context (e.g., which status was expected) could ease debugging.
271-289
: Evaluate concurrency system error handling.After creating the new snapshot, we notify the worker. If the concurrency refill or queue notification fails, do we have fallback or retry policies? Consider centralizing concurrency and notification error handling for better reliability.
internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (4)
9-36
: Document the in-test queue-creation method.The
createReleaseConcurrencyQueue
function is a key utility for test setup. Briefly document how it’s used, including the role ofexecutedRuns
. This clarifies usage patterns for new contributors.
56-61
: Consider event-based or promise-based synchronization.Using
await setTimeout(1000)
can create flaky tests if system load causes delays. Event-based signaling or a loop that checks the queue’s emptiness could yield more deterministic tests.
272-279
: Enhance test coverage for negative or zero refills.The test throws an error for negative tokens and does nothing for zero tokens. Consider explicitly confirming no partial state is left behind in Redis, ensuring data consistency even for these invalid calls.
523-581
: Consider using a mocking library instead of console.log.Exponential backoff intervals are logged with
console.log
. Switching to a mocking or spy library (e.g., jest.spyOn) can gather timing metrics in a structured manner, improving test clarity without console noise.internal-packages/run-engine/src/engine/systems/batchSystem.ts (3)
16-25
: Add a debounce or concurrency check for scheduling.
scheduleCompleteBatch
schedules a job 2 seconds in the future, but multiple calls for the same batch within that window might enqueue duplicates. Consider deduplicating or merging jobs to reduce overhead and potential race conditions.
27-29
: Validate usage of public method for single operation.
performCompleteBatch
merely calls#tryCompleteBatch
. If there's no alternative logic, consider merging these steps or making it a single method, unless the separation is for a future extension.
35-82
: Guard against concurrent#tryCompleteBatch
calls.If the worker or code triggers
performCompleteBatch
in parallel or repeatedly, the concurrency logic might lead to repeated finalization attempts. You could use a short-lived lock around the batch or an idempotent approach that re-checks the completion state once more.internal-packages/run-engine/src/engine/systems/waitingForWorkerSystem.ts (2)
39-44
: Consider returning a typed exception or structured response for missing background worker.
The current approach logs an error and returns; it might help upstream consumers (or logs) to have a more explicit failure signal (e.g. custom error or structured response) when the background worker doesn't exist.
64-83
: Possible performance improvement for batch processing runs.
This loop updates and enqueues each run in its own transaction. If you expect high volume, consider grouping updates into a single transaction or optimizing the number of transactions to reduce overhead.internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts (2)
104-116
: Validate negative or erroneous paths for waitpoint creation.
While the tests check normal success paths, consider adding negative tests (e.g., invalid environment references), ensuring robust coverage for error handling in waitpoint creation or concurrency release.
578-619
: Token consumption and return logic.
These tests confirm concurrency tokens can be manually consumed and later returned to restore concurrency. To further strengthen reliability, consider partial concurrency fragments (e.g., multiple tokens) and testing that partial token returns yield the correct concurrency updates.internal-packages/run-engine/src/engine/systems/ttlSystem.ts (2)
108-114
: Clarify expected behavior when no waitpoint is found.
ThrowingServiceValidationError
is valid, but consider explaining in logs or error details why the run can't be expired without a waitpoint. This clarifies the cause for system maintainers.
120-131
: Validate TTL edge cases.
parseNaturalLanguageDuration(ttl)
may yield invalid or zero durations if the user passes unexpected input. It might be beneficial to handle or log these edge cases explicitly so that failures are more evident.internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts (2)
13-164
: Consider reducing test complexity or splitting into smaller tests.
This test covers multiple states (initial run, failing due to user error, retrying, then succeeding). While the logic is sound, the large block can become unwieldy to maintain. Consider splitting different scenarios into separate tests or helper functions for improved clarity and maintainability.
386-492
: Correct the misleading comment describing OOM retry.
The code comments imply that the run should be retried on a larger machine, but the test logic ultimately expects "CRASHED" status. This discrepancy can confuse future readers.Suggested fix:
- // The run should be retried with a larger machine + // The run fails immediately due to OOM and is not retriedinternal-packages/run-engine/src/engine/tests/checkpoints.test.ts (1)
17-189
: Possible timing flakiness with fixed delays.
Calls likesetTimeout(500)
might introduce nondeterministic behavior under heavy load or slow environments. Consider using logic that checks the run state in a loop or waiting for a specific signal instead of fixed sleeps.internal-packages/database/prisma/schema.prisma (1)
2031-2031
: New enum value QUEUED_EXECUTING is a good addition for concurrency tracking.Make sure that all relevant code paths (including transitions from other statuses) are thoroughly tested, as newly introduced states can trigger unexpected transitions.
Consider adding descriptive JSDoc or comments around this enum to document when and how this status is set.
internal-packages/run-engine/src/run-queue/keyProducer.ts (1)
177-195
: New method deadLetterQueueKey() neatly encapsulates the dead-letter queue segment.
- The method uses consistent org→project→env ordering, which simplifies scanning keys for debugging.
- Consider verifying that the environment actually supports or needs this dead-letter queue approach to avoid overhead in ephemeral or dev environments.
You might embed a short comment describing how you distinguish between the normal queue and dead-letter queue to help maintainers understand the bigger picture.
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (2)
39-54
: clearBlockingWaitpoints method is straightforward.
- It deletes all waitpoints associated with the run, effectively clearing the blocking state.
- Consider whether partial or selective clearing might ever be needed, or if a total clear is always safe/relevant.
Document the use cases and side-effects of a full clear in the method docstring.
325-453
: blockRunWithWaitpoint method's concurrency release approach is comprehensive.
- The logic to handle “EXECUTING_WITH_WAITPOINTS” vs. “SUSPENDED” states is concise; well done.
- Consider the edge cases if multiple waitpoints are added after a run is already suspended; your
debounce
approach with near-future scheduling handles this but it’s worth a stress test.Use a typed result instead of raw query for
pending_count: BigInt
to avoid potential type issues. Also align with the static analysis hint by switchingBigInt
→bigint
.- const insert = await prisma.$queryRaw<{ pending_count: BigInt }[]>` + const insert = await prisma.$queryRaw<{ pending_count: bigint }[]>`🧰 Tools
🪛 Biome (1.9.4)
[error] 359-359: Don't use 'BigInt' as a type.
Use lowercase primitives for consistency.
Safe fix: Use 'bigint' instead(lint/complexity/noBannedTypes)
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (3)
58-345
: Check lock timeout and large method length instartRunAttempt
- Lock Duration:
this.$.runLock.lock([runId], 5000, ...)
uses a 5000ms lock. Consider whether this timeout could be too small under heavier load, causing frequent lock collisions.- Method Length:
startRunAttempt
is fairly large. Consider splitting out logic (e.g., environment retrieval, queue validation, snapshot checking) into smaller private helper methods to improve readability and maintainability.🧰 Tools
🪛 Biome (1.9.4)
[error] 332-332: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
394-524
:attemptSucceeded
: verify that concurrency release is handled upstreamThe successful path updates the task run status, completes waitpoints, and emits events. This is great, but consider verifying whether concurrency (locks, queue concurrency) is fully released in the calling method or in a finalizing step to avoid concurrency leaks if success occurs before completion.
826-900
:tryNackAndRequeue
: duplication of logic?This method is helpful for requeueing runs with a fresh snapshot. However, some logic (like acking and concurrency re-application) might be partially duplicated in other methods. Consider extracting shared concurrency or requeue logic into a private helper to streamline future maintenance.
internal-packages/run-engine/src/run-queue/types.ts (1)
79-81
: OverloadeddeadLetterQueueKey
signaturesDeclaring separate overloads for
deadLetterQueueKey(env: MinimalAuthenticatedEnvironment)
anddeadLetterQueueKey(env: EnvDescriptor)
is valid but can lead to duplication. Consider narrowing to a single function that infers the correct shape or uses a union type for clarity.internal-packages/run-engine/src/run-queue/index.ts (6)
289-289
: Minor note:return await
usageUsing
return await this.#callEnqueueMessage(...)
is not typically harmful but could be simplified toreturn this.#callEnqueueMessage(...)
if no additional try/catch is used. This is just a style preference.
494-533
:releaseAllConcurrency
: essential final step for concurrency cleanupReleasing both queue and environment concurrency in one call is helpful. Just verify that a run can’t remain stuck in concurrency sets if
releaseAllConcurrency
is somehow skipped or fails. Possibly add logs on success/failure.
622-684
:handleRedriveMessage
: concurrency after re-enqueueAfter redriving, the code re-enqueues the message with
enqueueMessage()
, but concurrency keys are re-initialized. Confirm that any concurrency from the original attempt is cleared to avoid double counting. Also, logging around success/failure is very helpful.
773-850
:#callDequeueMessage
: check concurrency gatingThe concurrency gating on queue and environment concurrency is well-structured. Just ensure your environment concurrency default of
1000000
or fallback logic is consistent with your intended default concurrency across the system.
851-882
:#callAcknowledgeMessage
refactor clarityThis method is well documented. If possible, unify concurrency removal with the logic in
releaseAllConcurrency
to reduce duplication. Currently, the concurrency sets are updated in multiple places.
962-1291
: Custom Redis commands: beneficial for advanced queue logicAll custom Lua commands appear well-labeled, and your concurrency checks are comprehensive. Consider adding test coverage for edge cases (e.g., concurrency limit equals zero, concurrent calls for the same message, etc.) to validate the correctness of these commands.
internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (1)
217-218
: Simplify boolean logic.
You can use logical negation rather than a ternary to setisValid
based on whethererror
is present.Here's a potential refactor:
- isValid: error ? false : true, + isValid: !error,🧰 Tools
🪛 Biome (1.9.4)
[error] 217-217: Unnecessary use of boolean literals in conditional expression.
Simplify your code by directly assigning the result without using a ternary operator.
If your goal is negation, you may use the logical NOT (!) or double NOT (!!) operator for clearer and concise code.
Check for more details about NOT operator.
Unsafe fix: Remove the conditional expression with(lint/complexity/noUselessTernary)
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (2)
112-112
: Remove redundant double negation.
!!result
can be replaced with a direct boolean evaluation to simplify readability.Consider:
- if (!!result) { + if (result) {🧰 Tools
🪛 Biome (1.9.4)
[error] 112-112: Avoid redundant double-negation.
It is not necessary to use double-negation when a value will already be coerced to a boolean.
Unsafe fix: Remove redundant double-negation(lint/complexity/noExtraBooleanCast)
621-621
: Eliminate unnecessary continue statement.
There's no further logic after thisif
block, making thecontinue
redundant.- if (!processed) { - continue; - }🧰 Tools
🪛 Biome (1.9.4)
[error] 621-621: Unnecessary continue statement
Unsafe fix: Delete the unnecessary continue statement
(lint/correctness/noUnnecessaryContinue)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
internal-packages/run-engine/execution-states.png
is excluded by!**/*.png
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (77)
.cursor/mcp.json
(1 hunks).vscode/launch.json
(1 hunks)apps/webapp/app/routes/admin.api.v1.environments.$environmentId.ts
(3 hunks)apps/webapp/app/routes/engine.v1.runs.$runFriendlyId.wait.duration.ts
(1 hunks)apps/webapp/app/routes/engine.v1.runs.$runFriendlyId.waitpoints.tokens.$waitpointFriendlyId.wait.ts
(1 hunks)apps/webapp/app/v3/services/triggerTaskV2.server.ts
(5 hunks)internal-packages/database/prisma/migrations/20250314133612_add_queued_executing_status_to_snapshots/migration.sql
(1 hunks)internal-packages/database/prisma/migrations/20250318163201_add_previous_snapshot_id_to_task_run_execution_snapshot/migration.sql
(1 hunks)internal-packages/database/prisma/migrations/20250319103257_add_release_concurrency_on_waitpoint_to_task_queue/migration.sql
(1 hunks)internal-packages/database/prisma/migrations/20250319110754_add_org_and_project_to_execution_snapshots/migration.sql
(1 hunks)internal-packages/database/prisma/migrations/20250319114436_add_metadata_to_task_run_execution_snapshots/migration.sql
(1 hunks)internal-packages/database/prisma/migrations/20250319131807_add_locked_queue_id_to_task_run/migration.sql
(1 hunks)internal-packages/database/prisma/schema.prisma
(8 hunks)internal-packages/redis-worker/src/worker.ts
(1 hunks)internal-packages/redis/src/index.ts
(1 hunks)internal-packages/run-engine/README.md
(8 hunks)internal-packages/run-engine/package.json
(1 hunks)internal-packages/run-engine/src/engine/errors.ts
(1 hunks)internal-packages/run-engine/src/engine/eventBus.ts
(2 hunks)internal-packages/run-engine/src/engine/executionSnapshots.ts
(0 hunks)internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts
(1 hunks)internal-packages/run-engine/src/engine/retrying.ts
(1 hunks)internal-packages/run-engine/src/engine/statuses.ts
(3 hunks)internal-packages/run-engine/src/engine/systems/batchSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/checkpointSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/systems.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/ttlSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/waitingForWorkerSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts
(3 hunks)internal-packages/run-engine/src/engine/tests/checkpoints.test.ts
(1 hunks)internal-packages/run-engine/src/engine/tests/delays.test.ts
(4 hunks)internal-packages/run-engine/src/engine/tests/heartbeats.test.ts
(1 hunks)internal-packages/run-engine/src/engine/tests/locking.test.ts
(1 hunks)internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts
(1 hunks)internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts
(1 hunks)internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts
(0 hunks)internal-packages/run-engine/src/engine/tests/waitpoints.test.ts
(3 hunks)internal-packages/run-engine/src/engine/types.ts
(3 hunks)internal-packages/run-engine/src/engine/workerCatalog.ts
(1 hunks)internal-packages/run-engine/src/index.ts
(1 hunks)internal-packages/run-engine/src/run-queue/errors.ts
(1 hunks)internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts
(2 hunks)internal-packages/run-engine/src/run-queue/index.test.ts
(2 hunks)internal-packages/run-engine/src/run-queue/index.ts
(31 hunks)internal-packages/run-engine/src/run-queue/keyProducer.ts
(4 hunks)internal-packages/run-engine/src/run-queue/tests/ack.test.ts
(1 hunks)internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts
(1 hunks)internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts
(1 hunks)internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts
(3 hunks)internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts
(2 hunks)internal-packages/run-engine/src/run-queue/tests/nack.test.ts
(1 hunks)internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts
(1 hunks)internal-packages/run-engine/src/run-queue/tests/releaseConcurrency.test.ts
(1 hunks)internal-packages/run-engine/src/run-queue/types.ts
(1 hunks)internal-packages/run-engine/tsconfig.test.json
(1 hunks)internal-packages/run-engine/vitest.config.ts
(1 hunks)internal-packages/run-queue/src/run-queue/tests/nack.test.ts
(1 hunks)internal-packages/testcontainers/src/setup.ts
(2 hunks)packages/cli-v3/src/entryPoints/dev-run-controller.ts
(1 hunks)packages/cli-v3/src/entryPoints/managed-run-controller.ts
(1 hunks)packages/core/src/v3/schemas/api.ts
(2 hunks)packages/core/src/v3/schemas/runEngine.ts
(1 hunks)packages/core/src/v3/types/tasks.ts
(1 hunks)packages/trigger-sdk/src/v3/shared.ts
(1 hunks)packages/trigger-sdk/src/v3/wait.ts
(3 hunks)references/hello-world/package.json
(1 hunks)references/hello-world/src/trigger/example.ts
(1 hunks)references/hello-world/src/trigger/waits.ts
(1 hunks)references/test-tasks/src/trigger/test-reserve-concurrency-system.ts
(3 hunks)references/test-tasks/src/utils.ts
(0 hunks)
💤 Files with no reviewable changes (3)
- references/test-tasks/src/utils.ts
- internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts
- internal-packages/run-engine/src/engine/executionSnapshots.ts
🧰 Additional context used
🧬 Code Definitions (17)
references/hello-world/src/trigger/waits.ts (1)
packages/trigger-sdk/src/v3/wait.ts (1) (1)
wait
(138-293)
internal-packages/run-engine/src/run-queue/tests/ack.test.ts (3)
internal-packages/run-engine/src/run-queue/keyProducer.ts (1) (1)
RunQueueFullKeyProducer
(18-236)internal-packages/run-engine/src/run-queue/types.ts (2) (2)
InputPayload
(5-16)InputPayload
(17-17)internal-packages/run-engine/src/run-queue/index.ts (1) (1)
RunQueue
(73-1292)
internal-packages/run-engine/src/engine/statuses.ts (1)
packages/core/src/v3/schemas/runEngine.ts (2) (2)
TaskRunExecutionStatus
(6-16)TaskRunExecutionStatus
(18-19)
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (2)
apps/webapp/app/v3/services/triggerTaskV2.server.ts (1) (1)
environment
(451-470)internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (1) (1)
getLatestExecutionSnapshot
(28-100)
internal-packages/run-engine/src/run-queue/index.test.ts (1)
internal-packages/run-engine/src/run-queue/index.ts (6) (6)
RunQueue
(73-1292)redis
(961-1291)message
(730-771)message
(851-882)message
(884-923)message
(925-946)
internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (2)
internal-packages/run-engine/src/engine/systems/systems.ts (1) (1)
SystemResources
(10-23)internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (1) (1)
ExecutionSnapshotSystem
(144-335)
internal-packages/redis-worker/src/worker.ts (1)
internal-packages/run-engine/src/run-queue/index.ts (1) (1)
channel
(717-728)
internal-packages/run-engine/src/engine/systems/waitingForWorkerSystem.ts (1)
internal-packages/run-engine/src/engine/index.ts (1) (1)
backgroundWorkerId
(1379-1381)
internal-packages/run-engine/src/engine/systems/batchSystem.ts (1)
internal-packages/run-engine/src/engine/systems/systems.ts (1) (1)
SystemResources
(10-23)
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (6)
internal-packages/run-engine/src/engine/systems/systems.ts (1) (1)
SystemResources
(10-23)internal-packages/run-engine/src/engine/types.ts (1) (1)
RunEngineOptions
(10-52)internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (1) (1)
getLatestExecutionSnapshot
(28-100)internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (2) (2)
runId
(1074-1183)runId
(1197-1218)internal-packages/run-engine/src/engine/statuses.ts (1) (1)
isDequeueableExecutionStatus
(3-6)internal-packages/run-engine/src/engine/eventBus.ts (1) (1)
sendNotificationToWorker
(189-211)
internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (1)
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (4) (4)
ReleaseConcurrencyTokenBucketQueue
(40-545)releaseQueue
(287-289)releaseQueue
(291-293)releaseQueue
(295-297)
internal-packages/run-engine/src/engine/tests/checkpoints.test.ts (1)
internal-packages/run-engine/src/engine/index.ts (1) (1)
RunEngine
(57-1382)
internal-packages/run-engine/src/run-queue/types.ts (1)
internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts (4) (4)
env
(496-512)env
(551-569)env
(571-585)queue
(587-593)
internal-packages/run-engine/src/engine/types.ts (1)
internal-packages/run-engine/src/engine/workerCatalog.ts (1) (1)
workerCatalog
(3-56)
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (6)
internal-packages/run-engine/src/engine/systems/systems.ts (1) (1)
SystemResources
(10-23)internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (2) (2)
ExecutionSnapshotSystem
(144-335)getLatestExecutionSnapshot
(28-100)internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (1) (1)
EnqueueSystem
(16-104)internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (1) (1)
ReleaseConcurrencySystem
(17-161)packages/core/src/v3/schemas/api.ts (1) (1)
timeoutError
(990-995)internal-packages/run-engine/src/engine/eventBus.ts (1) (1)
sendNotificationToWorker
(189-211)
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (6)
internal-packages/run-engine/src/engine/systems/systems.ts (1) (1)
SystemResources
(10-23)internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (2) (2)
getLatestExecutionSnapshot
(28-100)status
(316-334)internal-packages/run-engine/src/engine/types.ts (1) (1)
RunEngineOptions
(10-52)internal-packages/run-engine/src/engine/errors.ts (1) (1)
ServiceValidationError
(60-68)internal-packages/run-engine/src/engine/retrying.ts (1) (1)
retryOutcomeFromCompletion
(41-120)internal-packages/run-engine/src/engine/eventBus.ts (1) (1)
sendNotificationToWorker
(189-211)
internal-packages/run-engine/src/run-queue/index.ts (3)
internal-packages/run-engine/src/run-queue/types.ts (2) (2)
OutputPayload
(19-22)OutputPayload
(23-23)internal-packages/run-engine/src/run-queue/errors.ts (1) (1)
MessageNotFoundError
(1-5)internal-packages/run-engine/src/run-queue/keyProducer.ts (1) (1)
deadLetterQueueKey
(179-195)
🪛 Biome (1.9.4)
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
[error] 538-539: Unnecessary continue statement
Unsafe fix: Delete the unnecessary continue statement
(lint/correctness/noUnnecessaryContinue)
internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts
[error] 217-217: Unnecessary use of boolean literals in conditional expression.
Simplify your code by directly assigning the result without using a ternary operator.
If your goal is negation, you may use the logical NOT (!) or double NOT (!!) operator for clearer and concise code.
Check for more details about NOT operator.
Unsafe fix: Remove the conditional expression with
(lint/complexity/noUselessTernary)
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
[error] 359-359: Don't use 'BigInt' as a type.
Use lowercase primitives for consistency.
Safe fix: Use 'bigint' instead
(lint/complexity/noBannedTypes)
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
[error] 332-332: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts
[error] 112-112: Avoid redundant double-negation.
It is not necessary to use double-negation when a value will already be coerced to a boolean.
Unsafe fix: Remove redundant double-negation
(lint/complexity/noExtraBooleanCast)
[error] 621-621: Unnecessary continue statement
Unsafe fix: Delete the unnecessary continue statement
(lint/correctness/noUnnecessaryContinue)
🪛 LanguageTool
internal-packages/run-engine/README.md
[typographical] ~94-~94: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...pleted when the associated run completes. Every run has an associatedWaitpoint
...
(WHAT_NOUNPHRASE_QUESTION_MARK)
[typographical] ~95-~95: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...s completed when the datetime is reached. - MANUAL
which gets completed when th...
(WHAT_NOUNPHRASE_QUESTION_MARK)
[typographical] ~96-~96: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...ch gets completed when that event occurs. Waitpoints can have an idempotencyKey ...
(WHAT_NOUNPHRASE_QUESTION_MARK)
[uncategorized] ~102-~102: Did you mean: “By default,”?
Context: ...so a second attempt doesn't wait again. By default it would wait again. ```ts //Note if t...
(BY_DEFAULT_COMMA)
[uncategorized] ~113-~113: Use a comma before ‘so’ if it connects two independent clauses (unless they are closely connected and short).
Context: ... will still continue but with the errors so the developer can decide what to do. #...
(COMMA_COMPOUND_SENTENCE_2)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: typecheck / typecheck
🔇 Additional comments (164)
internal-packages/run-engine/tsconfig.test.json (1)
1-3
: Expanded Test File Inclusion in TSConfigThe updated
"include"
array now explicitly adds thesrc/run-queue/tests/dequeueMessageFromMasterQueue.ts
file. This is a necessary change to ensure that the new test, related to the release concurrency system enhancements, is compiled and executed as part of the test suite. The change is correctly integrated with the rest of the configuration.internal-packages/run-queue/src/run-queue/tests/nack.test.ts (1)
1-9
: Imports look appropriate for Redis-based queue testingThe imports cover essential dependencies for testing the nack functionality with Redis, including test containers, tracing, logging, and the core RunQueue components. This setup appears well-structured for testing concurrency release during message negative acknowledgment.
.cursor/mcp.json (1)
7-7
: Good addition of the EOF newline character.Adding a newline at the end of the file follows best practices for text files, ensuring compatibility with tools like git and various text editors.
internal-packages/redis/src/index.ts (1)
11-11
: Good optimization for test environments.Setting
maxRetriesPerRequest
to 1 in test environments is a sensible approach as it makes tests faster and more deterministic by reducing the amount of Redis retries during test runs..vscode/launch.json (2)
141-142
: More specific test debug configuration.The debug configuration now targets a specific test case related to the new concurrency system, which aligns with the PR's focus on the release concurrency queue. This makes it easier to debug the specific functionality being added.
149-149
: Updated debug target for RunQueue tests.The configuration now points to the main RunQueue test file instead of the waitpoints-specific test, providing a more comprehensive debugging setup for the run queue system.
internal-packages/run-engine/package.json (3)
22-22
: Dependencies reordering and addition.The dependencies have been reordered, with
@unkey/cache
andzod
maintained, andseedrandom
added (likely needed for the new concurrency system for random number generation).Also applies to: 26-27
32-34
: Updated testing dependencies.Upgrading Vitest to v3.0.8 and adding the V8 coverage plugin will provide improved testing capabilities and better test coverage reporting.
40-40
: Added test coverage script.The new script makes it easier to run tests with coverage reporting enabled, which is a good practice for monitoring code quality and identifying untested areas of the codebase.
internal-packages/testcontainers/src/setup.ts (3)
72-76
: Well-designed optional parameter structure for queue configurationThe addition of the
queueOptions
parameter with optional properties forreleaseConcurrencyOnWaitpoint
andconcurrencyLimit
provides a clean way to configure concurrency behavior without breaking existing code.
122-125
: Good default handling for concurrency limitThe conditional logic correctly handles undefined values by defaulting to 10 when no specific concurrency limit is provided, while still allowing explicit null values to be passed through.
129-132
: Proper type checking for boolean propertyThe code correctly checks if the
releaseConcurrencyOnWaitpoint
is specifically a boolean before applying it, preventing potential type errors from undefined or non-boolean values.references/hello-world/package.json (1)
11-12
: Good addition of development scriptAdding the
dev
script simplifies the developer experience by providing a standardized way to start the development server with the trigger dev command.internal-packages/database/prisma/migrations/20250314133612_add_queued_executing_status_to_snapshots/migration.sql (1)
1-4
: LGTM: New status value supports improved concurrency handlingThe addition of the 'QUEUED_EXECUTING' status to the TaskRunExecutionStatus enum will enable more granular tracking of task execution states, which is essential for the new concurrency system being implemented in this PR.
internal-packages/run-engine/src/run-queue/errors.ts (1)
1-5
: LGTM: Clean error class implementationThe MessageNotFoundError class follows best practices for custom error types in TypeScript. The descriptive error message that includes the messageId will be helpful for debugging issues in the new concurrency system.
internal-packages/run-engine/src/engine/retrying.ts (1)
13-13
:❓ Verification inconclusive
Import path update looks good
Updating the import path for ServiceValidationError is good refactoring toward more modular code organization. This helps separate concerns by moving error definitions to a dedicated file.
🏁 Script executed:
#!/bin/bash # Verify that ServiceValidationError is properly exported from errors.js rg -l "export .*ServiceValidationError" internal-packages/run-engine/src/engine/errors.jsLength of output: 181
Verify that ServiceValidationError is Exported from a Valid Errors File
The updated import statement in
internal-packages/run-engine/src/engine/retrying.ts
appears to be a good refactoring step toward improved modularity. However, our initial check didn’t locate anerrors.js
file at the expected path. This raises one of the following possibilities:
- The error definitions might now reside in a file with a different extension (e.g.,
errors.ts
), or- The file might have been moved to a different directory, or its export syntax may differ.
Before approving the change, please verify that the file containing the
ServiceValidationError
export exists in theinternal-packages/run-engine/src/engine
directory and that it properly exports the class. For example, you can run the following command to list any files starting with “errors” in that directory:#!/bin/bash find internal-packages/run-engine/src/engine -maxdepth 1 -iname "errors.*"Once you’ve confirmed the correct file is in place and that it exports
ServiceValidationError
appropriately, the import update can be considered safe.internal-packages/database/prisma/migrations/20250319103257_add_release_concurrency_on_waitpoint_to_task_queue/migration.sql (1)
1-5
: LGTM: Well-designed addition for concurrency controlThe addition of the "releaseConcurrencyOnWaitpoint" column to the TaskQueue table provides the necessary database support for the new concurrency release system. The boolean type with NOT NULL constraint and DEFAULT false value is appropriate, maintaining backward compatibility with existing records.
references/hello-world/src/trigger/waits.ts (1)
76-81
: Good addition of the releaseConcurrency parameterAdding the
releaseConcurrency: true
parameter towait.for()
allows the system to release concurrency slots while waiting, enabling other tasks to execute during this wait period. This is an essential enhancement for the new release concurrency system.With this feature, wait operations can now explicitly signal that they don't need to hold the concurrency token, improving resource utilization in the system.
internal-packages/run-engine/src/engine/tests/locking.test.ts (1)
4-4
: Import path correctionThe import path has been updated from a relative path pointing to the current directory (
./locking.js
) to a relative path pointing to the parent directory (../locking.js
). This suggests the file has been moved or reorganized as part of the concurrency system changes.packages/trigger-sdk/src/v3/shared.ts (1)
1348-1348
: Good implementation of releaseConcurrency forwardingThe
releaseConcurrency
option is now being properly forwarded to the task execution options. This ensures that when a parent task triggers a child task withtriggerAndWait()
, the concurrency release behavior can be controlled by the caller.This change integrates well with the overall concurrency control system and allows for more flexible concurrency management during task execution chains.
packages/cli-v3/src/entryPoints/dev-run-controller.ts (1)
452-453
: Support for new QUEUED_EXECUTING statusAdding the
QUEUED_EXECUTING
case to the switch statement properly handles the new execution status that's part of the concurrency system. This status likely represents a state where a run is queued but is beginning execution while potentially waiting for concurrency tokens.The implementation correctly follows the same pattern as the existing
RUN_CREATED
andQUEUED
statuses, ensuring consistent behavior in the controller.internal-packages/run-engine/src/engine/systems/systems.ts (1)
1-23
: New system resources type looks well-structuredThe new
SystemResources
type provides a clear structure for dependency injection across the system components, with specific support for the new release concurrency feature. The type signature properly includes all necessary dependencies, including Prisma, worker, event bus, logging, tracing, locks, queues, and the new concurrency token bucket.Including the generic type parameter for
ReleaseConcurrencyTokenBucketQueue
with organization, project, and environment IDs ensures strong typing for the token bucket implementation.references/hello-world/src/trigger/example.ts (1)
25-30
:❓ Verification inconclusive
Added support for concurrency control
The additional options parameter with
releaseConcurrency: true
enables the new release concurrency system when triggering child tasks. This will help optimize resource utilization by releasing concurrency tokens while waiting for the child task to complete.
🏁 Script executed:
#!/bin/bash # Check for other places in the codebase that might need similar updates rg -A 2 -B 2 "triggerAndWait\(" --type ts | grep -v "releaseConcurrency"Length of output: 37069
Concurrency Control Update in Example File Verified
In
references/hello-world/src/trigger/example.ts
the changes properly add the new{ releaseConcurrency: true }
option tochildTask.triggerAndWait
, enabling the release concurrency system as intended to optimize resource utilization.A couple of points to note:
- Other call sites invoking
triggerAndWait
across the codebase do not include thereleaseConcurrency
flag. Please verify that these omissions are intentional (e.g., for tests or different use cases) or if similar updates should be applied for consistency.- Ensure that your concurrency control strategy is uniformly implemented where appropriate.
packages/cli-v3/src/entryPoints/managed-run-controller.ts (1)
630-630
: Added support for new execution statusThe addition of
QUEUED_EXECUTING
to the case statement ensures the controller correctly handles this new intermediary state between queueing and execution. This aligns with the status enum addition in the schemas file.packages/core/src/v3/schemas/runEngine.ts (1)
9-9
:❓ Verification inconclusive
New execution status added to the enumeration
The addition of
QUEUED_EXECUTING
toTaskRunExecutionStatus
introduces a new state that represents when a task has been dequeued but execution hasn't fully started. This provides more granular tracking of the execution lifecycle and supports the new release concurrency system.Make sure to update your documentation to describe this new status and when it occurs in the lifecycle.
🏁 Script executed:
#!/bin/bash # Verify that this status is properly handled across the codebase rg "TaskRunExecutionStatus" -A 20 | grep -A 5 -B 5 "case.*QUEUED_EXECUTING"Length of output: 75
Important: Update Documentation & Verify Handling for New Execution Status
- In
packages/core/src/v3/schemas/runEngine.ts
(line 9), the new statusQUEUED_EXECUTING
has been added toTaskRunExecutionStatus
to indicate that a task has been dequeued but not yet fully executing.- A preliminary search for switch-case statements or usage handling of
QUEUED_EXECUTING
returned no matches. Please manually verify that all parts of the codebase (e.g., switch blocks or conditionals that evaluate execution statuses) correctly handle this new state.- Additionally, update the documentation to clearly describe this status and specify the circumstances under which it occurs in the task lifecycle.
internal-packages/redis-worker/src/worker.ts (1)
368-368
: Changed log level from 'log' to 'debug' for subscription message.This change reduces log verbosity by moving the subscription confirmation message to debug level. This aligns with best practices for operational logging, where routine confirmation messages should only appear in debug logs.
internal-packages/run-engine/src/engine/tests/heartbeats.test.ts (1)
98-98
:❓ Verification inconclusive
Increased timeout duration in test case.
The wait time was doubled from
pendingExecutingTimeout * 2
topendingExecutingTimeout * 4
. This change suggests the new concurrency system might require more time to process state transitions. This adjustment helps prevent flaky tests.
🏁 Script executed:
#!/bin/bash # Check if there are any other timeout multipliers that might need adjustment rg "pendingExecutingTimeout \* \d" --type tsLength of output: 282
Action Required: Verify Consistency of Timeout Multipliers in Heartbeats Test
The increase from
pendingExecutingTimeout * 2
topendingExecutingTimeout * 4
on line 98 is clearly intended to accommodate the new concurrency dynamics by reducing test flakiness. However, a grep search on the repository shows another instance in the same file usingpendingExecutingTimeout * 3
. Please confirm whether the differing multipliers (*4
vs.*3
) in the heartbeat tests are intentional. If both tests are expected to have the increased timeout, consider aligning them for consistency.
- File:
internal-packages/run-engine/src/engine/tests/heartbeats.test.ts
- Line with increased timeout:
await setTimeout(pendingExecutingTimeout * 4);
- Parallel instance found:
await setTimeout(pendingExecutingTimeout * 3);
apps/webapp/app/routes/engine.v1.runs.$runFriendlyId.wait.duration.ts (1)
53-53
:✅ Verification successful
Modified concurrency release behavior to use request body parameter.
This change makes the concurrency behavior configurable by propagating
body.releaseConcurrency
to the engine instead of using a hardcoded value. TheenvironmentId
parameter was also removed from theblockRunWithWaitpoint
call.
🏁 Script executed:
#!/bin/bash # Verify that blockRunWithWaitpoint no longer requires environmentId rg -A 10 "blockRunWithWaitpoint" --type ts "internal-packages/run-engine/src/"Length of output: 34192
Concurrency release update verified and approved
The modifications correctly extract the concurrency release configuration from the request body (
body.releaseConcurrency
), and all instances of theblockRunWithWaitpoint
call have been updated to remove the hardcodedenvironmentId
parameter, as confirmed by the grep output.internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts (1)
1-114
: LGTM: Comprehensive test for queue message handling.This new test file thoroughly verifies the behavior of the
RunQueue.enqueueMessage
method, checking queue lengths, oldest message timestamps, and concurrency levels. The test is well-structured with clear setup, action, and assertions.internal-packages/run-engine/src/index.ts (1)
1-2
: Good refactoring to separate error class into its own file.The code now exports
RunEngine
andRunDuplicateIdempotencyKeyError
from separate files, which improves modularity and organization.internal-packages/run-engine/src/run-queue/tests/releaseConcurrency.test.ts (3)
1-8
: Good test setup using Redis test containers.The imports and test setup look appropriate for testing the concurrency feature with Redis.
9-43
: LGTM: Well-structured test configuration.The test options and test data setup look comprehensive, with appropriate values for concurrency testing.
97-151
: Good negative test case for concurrency management.This test properly verifies that releasing concurrency has no effect when a run hasn't been dequeued, which is an important edge case to cover.
packages/core/src/v3/types/tasks.ts (1)
827-836
:✅ Verification successful
Good addition of releaseConcurrency option with clear documentation.
The
releaseConcurrency
option is well-documented with a clear explanation of its purpose: allowing other runs to execute while a child task is executing. The default value offalse
is explicitly specified which is good practice.This change fits well with the new release concurrency system described in the PR objectives.
🏁 Script executed:
#!/bin/bash # Check where this new option is used in the codebase rg "releaseConcurrency\?: boolean" --type=tsLength of output: 447
Verified Implementation: Release Concurrency Option Confirmed
- The
releaseConcurrency
option is consistently defined and used across several files, including:
packages/core/src/v3/types/tasks.ts
internal-packages/run-engine/src/engine/index.ts
internal-packages/run-engine/src/engine/types.ts
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
packages/trigger-sdk/src/v3/wait.ts
- The documentation in
TriggerAndWaitOptions
clearly explains that settingreleaseConcurrency
to true allows other runs to execute while a child task is executing, with the default explicitly set tofalse
.This change is well implemented and matches the PR objectives for the new release concurrency system.
internal-packages/database/prisma/migrations/20250318163201_add_previous_snapshot_id_to_task_run_execution_snapshot/migration.sql (1)
4-5
: Verify nullability of previousSnapshotIdThe migration adds
previousSnapshotId
as TEXT without specifying NOT NULL, meaning it allows NULL values by default.Confirm that this is the intended behavior for the new column. If NULL values should be valid (which seems logical for the first snapshot in a chain), this is correct. If not, the column should have a NOT NULL constraint.
packages/core/src/v3/schemas/api.ts (1)
122-122
: Good addition of releaseConcurrency featureThe addition of the
releaseConcurrency
option to theTriggerTaskRequestBody
schema is well-structured and maintains backward compatibility by making it optional.This enhancement will provide better control over concurrency management during task execution.
internal-packages/run-engine/src/run-queue/tests/nack.test.ts (3)
1-48
: Well-structured test setup and configurationThe test setup includes properly configured Redis containers, appropriate timeout settings, and clear test data fixtures. This provides a solid foundation for the test cases.
The configuration and imports are well-organized, making the tests easy to understand and maintain.
49-127
: Comprehensive test for concurrency clearing on nackThis test case thoroughly verifies that nacking a message properly clears concurrency resources. It checks both the queue and environment concurrency before and after the nack operation.
The test follows a clear pattern of:
- Setting up the test environment
- Enqueueing and dequeueing a message
- Verifying initial concurrency state
- Performing the nack operation
- Verifying concurrency has been released
- Checking that the message is properly requeued
This covers the main functionality of the nack operation effectively.
129-216
: Thorough test for dead letter queue functionalityThis test verifies the critical path for handling message retries and dead letter queue management, ensuring that messages that exceed max attempts are properly moved to the dead letter queue.
The test effectively:
- Configures a low max attempt threshold (2) for easier testing
- Verifies the message is in the normal queue initially
- Nacks the message to increment the attempt counter
- Verifies it's still in the main queue after first nack
- Dequeues and nacks again to trigger the max attempts logic
- Verifies the message moves to the dead letter queue
This provides good confidence in the retry and dead letter queue functionality.
packages/trigger-sdk/src/v3/wait.ts (3)
92-103
: Well-documented releaseConcurrency parameterThe added
releaseConcurrency
parameter includes comprehensive documentation that clearly explains:
- What the parameter does (releases run from queue's concurrency)
- When it's useful (allowing other runs to execute during a waitpoint)
- The potential consequences (run may not resume immediately)
- The default value (false)
This level of documentation is excellent as it helps developers understand both the functionality and the implications of using this feature.
154-154
: Consistent integration of releaseConcurrency in wait.for methodThe
releaseConcurrency
option is properly passed through from the options to the API call.The implementation maintains consistency with how other options are handled.
192-192
: Consistent integration of releaseConcurrency in wait.until methodThe
releaseConcurrency
option is properly passed through from the options to the API call.The implementation maintains consistency with how other options are handled.
internal-packages/run-engine/src/engine/eventBus.ts (3)
4-4
: Added import for EventEmitterThe imported EventEmitter will be used to define the EventBus type, improving the overall typing of the event system.
183-183
: Well-defined EventBus typeGood addition of a type alias that combines EventEmitter with your custom EventBusEvents, creating a strongly-typed event bus. This ensures that all emitted events conform to your predefined event signatures.
185-211
: New worker notification function with clear documentationThe
sendNotificationToWorker
function is well-documented with a clear purpose: to notify workers about run state changes. The implementation correctly emits the "workerNotification" event with the proper structure as defined in the EventBusEvents type.This function will be useful for the new concurrency release system, allowing workers to react to changes in run state.
references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (4)
1-1
: Added batch import for enhanced testing capabilityThe addition of the
batch
import enables the retrieval of batch state at runtime, which is essential for these concurrency tests.
296-297
: Improved test reliability with batch retrievalGood improvement to retrieve the latest batch state with
batch.retrieve()
instead of relying on the initial state. This ensures the test is working with the current state of the batch tasks.
299-299
: Updated to use retrieved batch for waitForRunStatusUsing
retrievedHoldBatch.runs
ensures that the test is waiting on the most up-to-date run objects with current status information.
346-346
: Consistent use of retrieved batch runsThe updated code consistently uses the retrieved batch runs for waiting on completions, maintaining a reliable test pattern throughout the file.
internal-packages/run-engine/vitest.config.ts (1)
14-17
: Enhanced test configuration for concurrency testsThe increased test timeout (60 seconds) is appropriate for concurrency-related tests that involve asynchronous operations and potential waiting periods. The V8 coverage provider is also a good choice for TypeScript projects.
These changes will support the new concurrency-related tests being added to the codebase.
internal-packages/run-engine/src/run-queue/tests/ack.test.ts (4)
1-9
: Well-structured test importsGood selection of imports for testing the RunQueue's acknowledgment functionality, especially using redisTest to work with a real Redis container.
10-46
: Comprehensive test setup with realistic dataThe test options and mock data are well-defined, providing a realistic environment for testing the acknowledgment functionality. The test timeout configuration aligns with the updated vitest.config.ts timeout.
48-105
: Thorough test for concurrency clearingThis test effectively verifies that acknowledging a message properly clears concurrency counts from both the queue and environment levels. The test follows good practices:
- Setting up the environment with realistic data
- Verifying concurrency is correctly set after dequeuing
- Calling acknowledgeMessage
- Verifying concurrency counts are reset to zero
- Properly cleaning up resources with queue.quit()
This ensures the concurrency management aspect of message acknowledgment works correctly.
107-169
: Complete coverage of message removal functionalityThis test thoroughly verifies that acknowledging a message correctly removes it from the queue by:
- Checking queue lengths before and after dequeuing
- Verifying queue remains empty after acknowledgment
- Testing both task-specific queue and environment queue
The test is well-structured with proper setup, verification, and cleanup phases.
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.ts (4)
5-5
: Module replacement: marqs → engineThe import statement has been updated to use the
engine
from~/v3/runEngine.server
instead of the previousmarqs
module. This aligns with the new release concurrency system mentioned in the PR objectives.
116-117
: Updated method references to use engine.runQueueThe calls to get environment concurrency limit and current concurrency have been updated to use the engine.runQueue methods instead of direct marqs calls. This is consistent with the import change and maintains the same functionality while consolidating concurrency management under the engine module.
120-127
: Updated queue concurrency methods to use engine.runQueueQueue-specific concurrency methods have been migrated to use the engine.runQueue methods, ensuring consistency with the environment concurrency methods. This completes the transition from marqs to engine.runQueue for concurrency management.
138-138
: Simplified JSON response structureThe return statement has been simplified to exclude the reserveConcurrency and queueReserveConcurrency values that were previously included. This streamlines the API response and avoids exposing unnecessary implementation details.
internal-packages/run-engine/src/engine/statuses.ts (3)
4-4
: Added QUEUED_EXECUTING to dequeueable statusesThe QUEUED_EXECUTING status has been added to the list of dequeueable execution statuses, which extends the criteria for what statuses can be dequeued. This supports the new concurrency system by introducing a transitional state for tasks that are queued but in the process of execution.
29-29
: Added QUEUED_EXECUTING to checkpointable statusesThe QUEUED_EXECUTING status has been added to the list of checkpointable statuses, ensuring that tasks in this state can be appropriately checkpointed. This maintains consistency with the changes to dequeueable statuses.
49-52
: Added new canReleaseConcurrency functionA new function has been implemented to determine if concurrency can be released based on the task status. This is a critical component of the new release concurrency system, allowing tasks in SUSPENDED or EXECUTING_WITH_WAITPOINTS states to release their concurrency, potentially improving resource utilization.
internal-packages/run-engine/src/engine/workerCatalog.ts (1)
1-56
: New workerCatalog with well-defined operationsThis new file defines a catalog of worker operations with clear schemas and timeout values. Each operation (finishWaitpoint, heartbeatSnapshot, expireRun, etc.) has a well-structured schema validation using Zod and appropriate visibility timeouts.
The catalog provides a centralized definition of operations that workers can perform, which will help maintain consistency and type safety throughout the system.
The operations defined cover important aspects of the system including waitpoints, snapshots, run lifecycle management, batch processing, and delayed runs - all critical components for a robust concurrency management system.
internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts (4)
47-108
: Comprehensive test for reacquiring released concurrencyThis test case thoroughly validates that concurrency can be reacquired after being explicitly released. The test properly sets up the environment, releases concurrency, attempts to reacquire it, and verifies the expected state changes.
110-167
: Test for already counted concurrencyThis test confirms that the reacquireConcurrency method returns true when a run is already counted in the concurrency system, preventing duplicate concurrency allocations. This is important for maintaining accurate concurrency counts.
228-300
: Test for handling capacity limitsThis test verifies that reacquireConcurrency returns false when trying to reacquire concurrency for a run that's not already counted and there's no remaining capacity in the environment. This is important for preventing concurrency limits from being exceeded.
302-327
: Error handling test for non-existent runsThis test ensures that attempting to reacquire concurrency for a non-existent run throws a MessageNotFoundError, which is appropriate error handling behavior. The test properly uses Jest's expect().rejects.toThrow() pattern for testing rejected promises.
internal-packages/database/prisma/migrations/20250319110754_add_org_and_project_to_execution_snapshots/migration.sql (2)
8-14
: Appropriate column additionsAdding organization and project IDs to execution snapshots is a good enhancement that provides better context for task runs. This aligns well with the PR objectives around the new release concurrency system.
16-26
: Foreign key constraints set up correctlyThe foreign key constraints are properly established with appropriate ON DELETE RESTRICT and ON UPDATE CASCADE policies, ensuring referential integrity while preventing accidental deletion of referenced organizations or projects.
internal-packages/run-engine/src/engine/errors.ts (2)
60-68
: Well-structured validation error classThe
ServiceValidationError
class provides a clean way to handle validation errors with optional status codes, which is useful for HTTP responses. This implementation follows best practices for extending the Error class.
77-82
: Properly implemented idempotency key errorThe
RunDuplicateIdempotencyKeyError
class is well-implemented with a custom name property, making it easy to identify in error handling scenarios.internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts (1)
195-195
: Updated key format to include project informationThe expected key format now includes project ID information (
proj:p1234
), which aligns with the database migration that adds project IDs to execution snapshots. This change ensures proper namespacing of concurrency keys.internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts (3)
52-55
: Simplified FairQueueConcurrency typeThe
FairQueueConcurrency
type has been simplified by removing thereserve
property. This aligns with the PR's objective to streamline the concurrency management system.
502-510
: Updated getEnvConcurrency method to remove reserve retrievalThe
#getEnvConcurrency
method has been updated to only retrieve and return current and limit values, omitting the reserve value retrieval. This is consistent with the removal of the reserve property from theFairQueueConcurrency
type.
405-406
:✅ Verification successful
Simplified concurrency check logic
The condition to check if an environment is at full concurrency has been simplified to only check if current >= limit, removing the reserve buffer. This simplification makes the code more straightforward but could potentially reduce flexibility in managing concurrency.
Verify that removing the reserve buffer was an intentional design decision:
🏁 Script executed:
#!/bin/bash # Check for any references to reserve concurrency still remaining in the codebase echo "Checking for remaining references to reserve concurrency..." rg -A 2 -B 2 "reserve.*concurrency" --type ts echo "------" echo "This helps verify if the removal of reserve concurrency was comprehensive"Length of output: 18519
Concurrency Check Simplification Verified
The removal of the reserve buffer in the concurrency check within
internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts
appears to be an intentional design decision. The grep results show that reserve concurrency logic is still present and actively tested in other areas of the codebase (e.g., in various test files and parts of the queuing system). This indicates that while the selection strategy was simplified to only checkenv.concurrency.current >= env.concurrency.limit
, the reserve concurrency mechanism continues to be managed elsewhere as needed.
- The simplified check in
fairQueueSelectionStrategy.ts
focuses solely on whether the current concurrency reaches its limit.- Reserve concurrency logic remains in the system (as seen in test files and the queue processing logic), suggesting that its management has been decoupled from this particular selection strategy.
No further changes are required regarding this design decision.
internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (7)
1-14
: Well-structured imports with clear dependenciesThe imports are organized logically, with proper use of named imports and appropriate relative paths, showing good module structure.
11-14
: Clean type definition with clear requirementsThe
EnqueueSystemOptions
type is well-defined with its dependencies clearly specified, making the component's requirements explicit.
16-23
: Well-structured class initializationThe
EnqueueSystem
class follows a clean dependency injection pattern with appropriate private readonly properties, making the code more maintainable and testable.
25-56
: Comprehensive parameter interface with appropriate optionalsThe method signature for
enqueueRun
is well-designed with a clear parameter interface. It properly handles optional parameters and uses TypeScript's extract utility type for constraining the snapshot status values.
59-78
: Effective use of locking mechanism and execution snapshot creationThe code correctly uses the run lock to prevent race conditions, with a reasonable timeout of 5000ms. The execution snapshot creation is properly configured with all necessary parameters.
79-82
: Robust handling of master queuesThe code properly handles both primary and secondary master queues, improving the reliability of the queue management system.
84-99
: Well-structured message enqueuing with comprehensive metadataThe message enqueuing logic properly includes all necessary context data (environment, project, organization IDs) and run-specific information. The concurrencyKey handling correctly accommodates nullable values.
internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts (3)
1-47
: Good test setup with appropriate configurationThe setup code is well-structured with comprehensive test options and environment configuration. Using real Redis instances through testcontainers provides more realistic testing conditions.
135-200
: Effective concurrency limit testingThis test case correctly validates that the queue respects environment concurrency limits, which is crucial for the concurrency management system being implemented.
202-266
: Queue-specific concurrency limits are properly testedThe test validates that queue-specific concurrency limits are respected, which is an important aspect of the overall concurrency management system.
apps/webapp/app/v3/services/triggerTaskV2.server.ts (5)
1-10
: Enhanced error handling with structured error typesThe new imports for
TaskRunError
,taskRunErrorEnhancer
, andtaskRunErrorToString
enable more robust and consistent error handling throughout the service.
173-174
: Added releaseConcurrency parameter to blockRunWithWaitpointThe
releaseConcurrency
parameter is properly propagated from the request body to the engine call, aligning with the PR's focus on improving concurrency management.
376-377
: Added releaseConcurrency parameter to triggerThe
releaseConcurrency
parameter is correctly passed to the engine's trigger method, maintaining consistency with the concurrency control pattern.
381-388
: Improved error handling with structured error parsingThe task run error is now properly parsed using
TaskRunError.parse
and propagated to the event system, enhancing error reporting and debugging capabilities.
403-410
: Enhanced error validation and reportingThe service now properly checks for errors in the result and throws a formatted
ServiceValidationError
with detailed information, improving error handling for API consumers.internal-packages/run-engine/src/engine/tests/delays.test.ts (2)
158-159
: Increased delay duration for test stabilityIncreasing the delay from 200ms to 400ms likely improves test reliability by allowing more time for operations to complete.
169-172
: Improved method naming for clarityChanging from
rescheduleRun
torescheduleDelayedRun
makes the API more explicit about its purpose, improving code readability and documentation.internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts (3)
3-6
: Imports look good.These updated imports cleanly reference your constants, strategy, and producer. No concerns here.
214-215
: Check potential flakiness in performance test threshold.Increasing the threshold to a 6x difference (from 9x) might reduce test flakiness. Ensure this does not inadvertently pass slower second calls. Overall, your approach seems correct for typical test environments.
Would you like to run these tests under stress conditions or in slower environments to ensure stability?
229-229
: Confirm updated factor for third distribution test.The factor has been changed to 2 (instead of the prior 4). Ensure that the lowered multiple provides the intended performance check while avoiding false positives under varying conditions.
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (1)
1-116
: Overall implementation looks robust.The concurrency checks, snapshot creation, and error handling are clear. Good use of locks and structured error flows. No other pressing issues identified in this new
DequeueSystem
class.Also applies to: 121-537, 540-619
internal-packages/run-engine/src/engine/tests/waitpoints.test.ts (3)
10-10
: Use of Node timers is fine.Importing
setTimeout
from"node:timers/promises"
is recommended for Node 18+ to access the ESM-friendly API.
107-107
: Releasing concurrency while blocking the run.Adding
releaseConcurrency: true
clarifies that concurrency tokens will be freed during the wait, which is consistent with your concurrency design.
1137-1344
: New test for reacquiring concurrency after unblocking is thorough.This test accurately checks the scenario of concurrency-limited queues. It appears solid and aligns with the updated concurrency code flow.
internal-packages/run-engine/src/engine/types.ts (4)
1-2
: Imports from internal redis packages appear well-structured
No specific concerns. This import is consistent with your project's approach to typed Redis functionality.
7-8
: MinimalAuthenticatedEnvironment import is consistent
No issues found with referencingMinimalAuthenticatedEnvironment
orworkerCatalog
.
39-51
: Check partial Redis config fallback
The optionalreleaseConcurrency
property is well-defined, but ensure that any partial Redis config merges gracefully with default Redis settings to avoid misconfiguration.
111-111
: EngineWorker type introduction
DefiningEngineWorker
asWorker<typeof workerCatalog>
effectively couples the worker and catalog. This looks correct and straightforward.internal-packages/run-engine/README.md (3)
27-27
: Expanded explanation of race conditions
The added note around checkpoint concurrency clarifies potential conflicts. No further issues noted.
94-96
: Grammar check for ‘which’ usage
The static analysis suggestion to add question marks seems like a false positive here, as these lines are statements, not questions.🧰 Tools
🪛 LanguageTool
[typographical] ~94-~94: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...pleted when the associated run completes. Every run has anassociatedWaitpoint
...(WHAT_NOUNPHRASE_QUESTION_MARK)
[typographical] ~95-~95: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...s completed when the datetime is reached. -MANUAL
which gets completed when th...(WHAT_NOUNPHRASE_QUESTION_MARK)
[typographical] ~96-~96: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...ch gets completed when that event occurs. Waitpoints can have an idempotencyKey ...(WHAT_NOUNPHRASE_QUESTION_MARK)
202-331
: System architecture documentation
This new section and its diagram provide clarity on subsystem responsibilities. No changes needed.internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (7)
1-2
: Imports for concurrency system
These imports align with the new concurrency logic. No immediate issues found.
7-9
: Zod schema forreleaseConcurrency
Defining a Zod schema for metadata parsing ensures a safer and more robust approach to typed concurrency data.
11-12
: Leverage typed metadata model
TheReleaseConcurrencyMetadata
type keeps concurrency-related fields consistent. Implementation looks good.
13-15
:ReleaseConcurrencySystemOptions
design
BundlingSystemResources
this way promotes modular architecture and straightforward testing.
17-22
: Constructor injects system resources
This allows for easier testability and clear separation of concerns across different concurrency operations.
24-33
: Checkpoint environment concurrency tokens
ThecheckpointCreatedOnEnvironment()
method refilling tokens is logical. Verify the incremental token count to avoid possible oversubscription.
35-49
: Immediate concurrency release for development
Allowing dev environments to bypass the queued release is convenient for local testing. Confirm queue logic remains consistent with other environment types.internal-packages/run-engine/src/engine/systems/checkpointSystem.ts (1)
56-56
: Validate lock acquisition timeouts and retry logic.You lock the run with a 5,000ms timeout. If it times out, it might leave the checkpoint process partially completed. Confirm whether higher-level error handling is needed or if it’s acceptable to fail the lock attempt without further retries.
internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (1)
206-246
: Verify correct token return when executor fails.The test ensures tokens are returned upon failure, allowing subsequent runs to execute. Confirm that no race conditions occur if multiple failures happen quickly. Edge cases might require specialized tests or defensive logic in the executor.
internal-packages/run-engine/src/engine/systems/waitingForWorkerSystem.ts (2)
78-79
: Confirm timestamp logic.
SubtractingupdatedRun.priorityMs
from the run’s creation timestamp is correct if you want to preserve priority-based ordering. However, ensure that negative or unexpected priority values won't invert the enqueue order.
85-88
: Ensure repeated scheduling won't trigger too many concurrent jobs.
WhenrunsWaitingForDeploy.length > maxCount
, the method reschedules itself. Confirm that repeated calls toscheduleEnqueueRunsWaitingForWorker
won't cause excessive recursion or queue buildup for large volumes.internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts (1)
14-73
: Thorough concurrency release testing.
The first test scenario comprehensively verifies default concurrency release behaviors and waitpoint blocking. This is a good baseline showcasing environment concurrency and queue concurrency updates.internal-packages/run-engine/src/engine/systems/ttlSystem.ts (1)
25-34
: Check concurrency with actively executing runs.
expireRun
skips expiration for runs whose snapshot shows "executing." Confirm that logic aligns with your desired behavior for WAITING, SUSPENDED, or other transitional states that may also need to be guarded.internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts (3)
277-384
: Logic for handling non-retriable errors looks correct.
The test accurately verifies that internal, non-retriable errors lead to a final "CRASHED" status.
493-648
: Proper verification of OOM retry logic.
This test appropriately retries the run on a larger machine following an OOM error and validates the final success upon reattempt.
650-812
: Correct handling of final OOM failure after retry.
The test properly confirms that the second OOM attempt leads to a "CRASHED" final status.internal-packages/run-engine/src/engine/tests/checkpoints.test.ts (5)
191-277
: Validation of invalid snapshot ID scenario looks good.
The test properly rejects checkpoint creation when referencing an outdated or invalid snapshot.
279-474
: Multiple checkpoints test covers essential checkpoint transitions.
The test ensures that multiple checkpoints can be chained correctly without interfering with each other.
476-619
: Checkpoint attempt after concurrency release is properly blocked.
The test demonstrates correct handling when the snapshot has progressed beyond the point of checkpoint creation, returning "Not the latest snapshot."
622-761
: Prevention of checkpoint creation in a non-checkpointable state is properly verified.
The test ensures that once suspended, further checkpoint attempts are disallowed and fired as "discarded."
764-983
: Accurate state transition for checkpoint creation in QUEUED_EXECUTING.
This test confirms that the run moves to "QUEUED" after forced concurrency release, and that a checkpoint can be created in that state without issues.internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts (4)
14-22
: Initialization logic is straightforward.
The constructor stores references to the provided resources cleanly, preparing the system for use.
26-78
: Reschedule logic appears robust.
Using a locked block onrunId
prevents concurrency issues when rescheduling a delayed run. The additional snapshot creation ensures traceability of the reschedule event.
80-125
: Enqueue logic cleanly updates run status and scheduling.
The function updates run status to "PENDING" and sets thequeuedAt
timestamp, followed by scheduling optional TTL expirations. This approach maintains consistent lifecycle management.
127-135
: Deferred scheduling of delayed runs is well-designed.
Scheduling enqueuing jobs viaworker.enqueue
keeps the code modular and records events for future tracing.internal-packages/database/prisma/schema.prisma (3)
167-167
: Nice addition of executionSnapshots to the Organization model.Having a direct relationship to TaskRunExecutionSnapshot can be very useful for traceability. Ensure that any required cascading deletes or updates are configured to avoid dangling references.
507-507
: Great addition of executionSnapshots to the Project model.This mirrors the relationship in Organization, providing consistent ways to track snapshots at a project level. Ensure that you have the appropriate test coverage for retrieving snapshots through the Project relationship.
2545-2545
:✅ Verification successful
releaseConcurrencyOnWaitpoint field adds flexibility for concurrency release.
Be mindful of the potential for concurrency tokens to be released prematurely if many waiting tasks exist; double-check the logic in your RunQueue or concurrency system to ensure stable queue processing.
Would you like to verify all usage sites of this field for proper handling? If so, run this script to search for references:
🏁 Script executed:
#!/bin/bash rg --context 5 'releaseConcurrencyOnWaitpoint'Length of output: 6790
Verification Passed –
releaseConcurrencyOnWaitpoint
Usage ConfirmedAfter reviewing the references to
releaseConcurrencyOnWaitpoint
across the codebase, we can confirm that:
- The field is properly defined in the Prisma schema (
internal-packages/database/prisma/schema.prisma
) with a default value of false.- It is correctly referenced and conditionally passed in setup logic (e.g., in
internal-packages/testcontainers/src/setup.ts
).- Its usage in the concurrency system (specifically within
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts
) and relevant tests (ininternal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts
) confirms that the concurrency release logic behaves as intended, including scenarios where the flag is set.That said, while the code correctly applies this flag, please continue to monitor the behavior under load to ensure that concurrency tokens are not released prematurely when many tasks wait simultaneously.
internal-packages/run-engine/src/run-queue/keyProducer.ts (3)
2-2
: Good addition of EnvDescriptor, QueueDescriptor, and RunQueueKeyProducer imports.This clarifies the type boundaries and ensures the code is more self-documenting.
15-15
: Introduction of DEAD_LETTER_QUEUE_PART constant for queue keys.This helps keep key segments descriptive.
196-200
:✅ Verification successful
deadLetterQueueKeyFromQueue() builds on top of descriptorFromQueue to produce robust queue definitions.
This consistent reusability is excellent. Consider logging or handling potential mismatches in queue naming for debugging in large distributed systems.
If you want to confirm correct usage, you can search for all calls to deadLetterQueueKeyFromQueue and confirm the post-conditions:
🏁 Script executed:
#!/bin/bash rg -A 3 'deadLetterQueueKeyFromQueue'Length of output: 1082
Verified Usage of
deadLetterQueueKeyFromQueue
FunctionThe verification confirms that the function is being consistently used across the codebase (notably in
internal-packages/run-engine/src/run-queue/types.ts
andinternal-packages/run-engine/src/run-queue/index.ts
). The approach of building the key on top ofdescriptorFromQueue
is solid. As an enhancement, consider adding logging or explicit error handling to capture any potential mismatches in queue naming for easier debugging in distributed environments.internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (7)
1-25
: Imports and WaitpointSystemOptions definition look good.You've pulled in everything needed for concurrency, event emission, and database access. Nicely structured.
26-37
: Constructor properly injects required subsystem references.No issues identified here. Keep the constructor minimal and rely on specialized methods inside the class to handle logic.
56-116
: completeWaitpoint method handles concurrency gracefully.
- The transaction approach with
$transaction
is good—ensures atomic waitpoint completion.- Well-handled fallback for P2025 error prevents the system from crashing when the waitpoint has already been updated.
118-152
: Event-based scheduling of run continuation is well done.Using
worker.enqueue
with minimal delays ensures that the code doesn't block normal flow. Also emittingcachedRunCompleted
is a nice approach for hooking into specialized logic.
154-237
: createDateTimeWaitpoint method is well organized.
- Good use of
upsert
for idempotency, though you also handle stale keys by rotating them into inactive.- Scheduling a job with
finishWaitpoint
extends the system elegantly.
239-323
: createManualWaitpoint method parallels createDateTimeWaitpoint effectively.
- The code for rotating expired idempotency keys is nearly identical—good consistency.
- The scheduled finalization with a possible error payload is a neat approach.
589-609
: createRunAssociatedWaitpoint provides a direct way to link runs to new waitpoints of type RUN.The approach is consistent with the other creation methods. The usage of generated ID plus a random idempotencyKey is well-handled.
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (7)
1-56
: Congratulate on a well-defined system entry pointThis file introduces a robust structure for the
RunAttemptSystem
class. The imports and class setup appear consistent, and the typed resources (SystemResources
) integrate nicely. Logging, tracing, and typed constructor arguments stand out as well-organized. No immediate concerns here.
347-392
:completeRunAttempt
error handling confirmation
completeRunAttempt
correctly delegates to eitherattemptSucceeded
orattemptFailed
. The switch-case forcompletion.ok
is clear. Just confirm that any external calls (like event bus emits) have appropriate try/catch or fallback to avoid swallowing critical errors if the bus is unavailable.
526-766
:attemptFailed
: robust error handling; consider double-checking concurrency cleanupThe method covers multiple fail paths with thorough logic (cancellation, permanent fail, retry, etc.). Since multiple concurrency paths may be left open upon failure, ensure that every path eventually calls your concurrency release methods (particularly for partial or catastrophic failures).
768-824
:systemFailure
gracefully handles leftover errorsThis functionality is a safety net for unhandled system-level issues. The approach of reusing
attemptFailed
is consistent. No major concerns, though adding specific logs or metrics for repeated system failures can be helpful in diagnosing repeated issues at scale.
901-1072
:cancelRun
: ensure partial cancellation scenarios are testedThe cancellation logic is robust, with multiple exit points depending on the run status. Ensure that each partial or edge scenario (e.g. run is already finished,
RUN_PENDING_CANCEL
states) is covered by tests or QA scenarios to avoid orphaned runs.
1074-1183
:#permanentlyFailRun
: thorough final error handlingGracefully sets final status, removes concurrency, completes waitpoints, and emits events. This strongly finalizes a run after all attempts are exhausted. Looks correct; no additional concerns.
1185-1220
:#finalizeRun
and#getAuthenticatedEnvironmentFromRun
: concise finishersThe final run cleanup and environment retrieval appear both concise and well-typed, particularly for the environment lookups. No issues spotted.
internal-packages/run-engine/src/run-queue/index.ts (12)
32-32
: Use of customMessageNotFoundError
Importing a dedicated error class for missing messages is a good practice for clearer debugging and domain-specific error handling. This is a strong addition that keeps error management explicit.
160-167
:lengthOfDeadLetterQueue
&messageInDeadLetterQueue
: extends queue diagnosticsThese new methods improve observability into the dead letter queue. Ensure that usage of
messageInDeadLetterQueue
includes subsequent handling if a message is found stuck in the DLQ.
169-180
:redriveMessage
logic adherencePublishing to
"rq:redrive"
is clear, but ensure that the subscribing side properly handles any malformed or outdated messages. Also confirm that the redrive fully handles concurrency re-limits if needed.
214-215
:messageExists
vs. old concurrency methodsReplacing concurrency-based checks with a simpler existence check is a good simplification. Verify that older concurrency checks removed in other files are no longer needed.
218-241
:readMessage
: good parse & error loggingThe schema-based parsing with
OutputPayload.safeParse(...)
is robust. Logging parse errors is essential. Confirm that subsequent usage of partial or corrupted data does not proceed inadvertently.
403-420
:acknowledgeMessage
: concurrency release checkAcknowledging a message calls
#callAcknowledgeMessage
, which includes concurrency updates. Confirm that final concurrency keys are always cleared, especially if acknowledgment is called after a partial run.
447-476
:nackMessage
: robust fallback to DLQSolid approach: increment attempts, compare with max, then transfer to the DLQ if the limit is exceeded. Ensure tests simulate multiple consecutive nacks to confirm correct behavior with repeated attempt increments.
535-570
:releaseEnvConcurrency
: partial concurrency release usageReleasing only the environment concurrency is useful for certain partial release scenarios. Confirm that partial queue concurrency might remain associated if the queue concurrency isn't released at the same time. This can lead to confusion if used incorrectly.
572-614
:reacquireConcurrency
: re-check concurrency limit correctnessReacquiring concurrency for a message is carefully guarded with limit checks. Good approach. For reliability, confirm that if either queue or environment concurrency has changed in the interim, reacquiring doesn’t cause miscounts.
730-771
:#callEnqueueMessage
: thorough parent queue rebalancingEnqueue logic extends concurrency checks and rebalances parent queues. The debug logs are thorough and helpful. No immediate issues.
884-923
:#callNackMessage
: exponential backoff logic incalculateNextRetryDelay
Reinserting the message into the queue with updated score is straightforward. The logs are well-detailed. Ensure that extremely high attempt counts and large time values are tested so that the system won’t overflow or exceed Redis data type limits.
925-946
:#callMoveToDeadLetterQueue
: final concurrency cleanupThis is a critical function for permanent message removal from active sets. The concurrency keys are removed, which is essential. The approach looks correct.
internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (2)
27-43
: Consider verifying upstream error handling when a snapshot isn't found.
The function throws an error if no valid snapshot is found. Ensure that the calling code is prepared to handle this exception to prevent unhandled rejections.
271-284
: Heartbeat approach for non-latest snapshot.
If the snapshot isn't the latest by the time the heartbeat runs, your approach to acknowledge and return is sensible. Ensure the calling code gracefully handles unexpected states when dealing with older snapshots.
...bapp/app/routes/engine.v1.runs.$runFriendlyId.waitpoints.tokens.$waitpointFriendlyId.wait.ts
Show resolved
Hide resolved
...se/prisma/migrations/20250319110754_add_org_and_project_to_execution_snapshots/migration.sql
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
internal-packages/run-engine/README.md (4)
27-29
: Race Condition Explanation Enhancements
The updated description clearly highlights the potential race condition during checkpointing. This effectively communicates the need to lock the run to prevent outdated checkpoints. Consider optionally providing additional context or linking to further documentation if more details are available.🧰 Tools
🪛 LanguageTool
[uncategorized] ~28-~28: Possible missing comma found.
Context: ...onditions we need to deal with: - When checkpointing the run continues to execute until the ...(AI_HYDRA_LEO_MISSING_COMMA)
[uncategorized] ~28-~28: Use a comma before “and” if it connects two independent clauses (unless they are closely connected and short).
Context: ...ored. At the same time the run continues and the checkpoint can become irrelevant if...(COMMA_COMPOUND_SENTENCE_2)
93-97
: Waitpoint Type Descriptions
The bullet points for the waitpoint types are informative. However, consider rephrasing the relative clauses for improved clarity and consistency. For example, you might change:
- "
RUN
which gets completed when the associated run completes. Every run has anassociatedWaitpoint
that matches the lifetime of the run."
to- "
RUN
: Completed when the associated run completes. Every run has anassociatedWaitpoint
that matches the lifetime of the run."Also, review the punctuation as noted by static analysis; while the context isn’t a question, ensuring consistent grammatical styling helps maintain clarity.
🧰 Tools
🪛 LanguageTool
[typographical] ~94-~94: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...pleted when the associated run completes. Every run has anassociatedWaitpoint
...(WHAT_NOUNPHRASE_QUESTION_MARK)
[typographical] ~95-~95: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...s completed when the datetime is reached. -MANUAL
which gets completed when th...(WHAT_NOUNPHRASE_QUESTION_MARK)
[typographical] ~96-~96: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...ch gets completed when that event occurs. Waitpoints can have an idempotencyKey ...(WHAT_NOUNPHRASE_QUESTION_MARK)
101-103
: Enhance thewait.for()
/wait.until()
Description
The explanation is largely clear; however, refining the phrasing can improve readability. For instance, consider:-Wait for a future time, then continue. We should add the option to pass an `idempotencyKey` so a second attempt doesn't wait again. By default it would wait again. +Wait for a future time, then continue. We should add an option to pass an `idempotencyKey` so that a second attempt does not wait again. By default, it will wait again.This change adds the needed comma (as suggested in the static analysis) and clarifies the intent.
🧰 Tools
🪛 LanguageTool
[uncategorized] ~102-~102: Did you mean: “By default,”?
Context: ...so a second attempt doesn't wait again. By default it would wait again. ```ts //Note if t...(BY_DEFAULT_COMMA)
112-113
: Comma for Clarity intriggerAndWait()
Description
To improve sentence clarity, consider inserting commas for a natural pause:-Trigger and then wait for run(s) to finish. If the run fails it will still continue but with the errors so the developer can decide what to do. +Trigger and then wait for run(s) to finish. If the run fails, it will still continue but with the errors, so the developer can decide what to do.This minor change enhances readability by clearly separating the clauses.
🧰 Tools
🪛 LanguageTool
[uncategorized] ~113-~113: Use a comma before ‘so’ if it connects two independent clauses (unless they are closely connected and short).
Context: ... will still continue but with the errors so the developer can decide what to do. #...(COMMA_COMPOUND_SENTENCE_2)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal-packages/run-engine/README.md
(8 hunks)
🧰 Additional context used
🪛 LanguageTool
internal-packages/run-engine/README.md
[typographical] ~94-~94: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...pleted when the associated run completes. Every run has an associatedWaitpoint
...
(WHAT_NOUNPHRASE_QUESTION_MARK)
[typographical] ~95-~95: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...s completed when the datetime is reached. - MANUAL
which gets completed when th...
(WHAT_NOUNPHRASE_QUESTION_MARK)
[typographical] ~96-~96: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...ch gets completed when that event occurs. Waitpoints can have an idempotencyKey ...
(WHAT_NOUNPHRASE_QUESTION_MARK)
[uncategorized] ~102-~102: Did you mean: “By default,”?
Context: ...so a second attempt doesn't wait again. By default it would wait again. ```ts //Note if t...
(BY_DEFAULT_COMMA)
[uncategorized] ~113-~113: Use a comma before ‘so’ if it connects two independent clauses (unless they are closely connected and short).
Context: ... will still continue but with the errors so the developer can decide what to do. #...
(COMMA_COMPOUND_SENTENCE_2)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (13)
internal-packages/run-engine/README.md (13)
45-48
: Improved Worker Types Formatting
The section now distinctly lists the two types of workers (hosted and self-hosted) and improves readability. The formatting addition (marked by the "~" at line 45) supports the explanation well.
72-73
: Clarification in Run Queue Reliability Features
The description of built-in reliability features (using nacking to increment the attempt count and moving runs to the DLQ if necessary) is clear and concise. No further changes are needed here.
107-108
: Code Examples for IdempotencyKey Usage
The added TypeScript examples usingwait.until
with anidempotencyKey
are clear and demonstrate the intended usage effectively.
120-121
:wait.forRequest()
Explanation
The updated explanation forwait.forRequest()
is clear and provides a practical example of its use case. The improvements make the intent immediately understandable.🧰 Tools
🪛 LanguageTool
[grammar] ~121-~121: The word “callback” is a noun. The verb is spelled with a space.
Context: ..., Replicate have an API where they will callback when their work is complete. ### `wait...(NOUN_VERB_CONFUSION)
165-168
: Rate Limiting Section Clarity
The additions in the rate limiting section clearly articulate its purposes—preventing abuse and limiting execution frequency using a key. The changes enhance the overall clarity and are well integrated.
174-176
: Debouncing Section Enhancement
The clarifying note about debouncing—preventing too many runs from occurring in a short period—is succinct and effectively communicates its utility. No changes needed.
200-202
: New RunEngine System Architecture Section
The introduction of the "RunEngine System Architecture" header provides a good structural break, marking the start of high-level architectural documentation. This addition enhances the overall documentation clarity.
203-271
: Mermaid Diagram for System Architecture
The new Mermaid diagram offers a comprehensive view of the system’s components and their interactions. This visual representation improves understanding of the system architecture. Ensure that the diagram renders correctly in all documentation viewers.
272-280
: System Responsibilities Documentation
The “System Responsibilities” section is well-organized, clearly outlining the roles of the core systems such as ESS, ES, RAS, and RCS. This structured breakdown makes it easier for developers to grasp system functions.
281-286
: Queue Management Overview
The Queue Management section clearly describes the functions of DS, DRS, and WFS. The concise documentation here matches the overall clarity of the README.
287-293
: State Management Clarity
The State Management section neatly documents the roles of CS, WS, BS, and TS. This clear listing improves the reader's understanding of how state is managed in the system.
295-303
: Shared Resources Section
The updated section on Shared Resources provides thorough context regarding critical components like Prisma, Logger, and RunLocker. This is valuable for maintaining consistency and understanding across the system.
304-327
: Key Interactions Summary
The “Key Interactions” section clearly summarizes core workflows and dependencies among system components. It’s well structured; just ensure that all system names correctly reflect the latest nomenclature and implementations in the codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
internal-packages/redis-worker/src/worker.test.ts (1)
1-253
: 🛠️ Refactor suggestionConsider refactoring tests to use beforeEach/afterEach for consistent cleanup
All tests create a worker instance but none consistently clean it up. This pattern makes the tests more prone to resource leaks and inconsistent behavior.
Consider refactoring the tests to use a more structured setup/teardown approach:
describe("Worker", () => { + let worker; + + afterEach(async () => { + if (worker) { + await worker.stop(); + worker = null; + } + }); redisTest("Process items that don't throw", { timeout: 30_000 }, async ({ redisContainer }) => { const processedItems: number[] = []; - const worker = new Worker({ + worker = new Worker({ // ...configuration... }).start(); // Test code... }); // Apply the same pattern to other testsThis would ensure consistent cleanup regardless of test success or failure.
🧹 Nitpick comments (3)
internal-packages/redis-worker/src/worker.test.ts (3)
96-110
: Missing worker cleanup in error case testSimilar to the previous test, this test for error handling no longer calls
worker.stop()
. This is particularly concerning in error scenarios where proper cleanup becomes even more important.Consider adding cleanup logic, especially since this test involves error handling:
// Wait for items to be processed await new Promise((resolve) => setTimeout(resolve, 500)); expect(processedItems.length).toBe(10); expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely + + // Cleanup + await worker.stop();
151-173
: Missing explicit cleanup for DLQ testThis test creates a worker that processes items and checks Dead Letter Queue functionality, but doesn't explicitly stop the worker afterward. Resources allocated by the worker may remain active after the test.
Consider adding cleanup at the end of the test:
// Check that the failed item is in the DLQ const dlqSize = await worker.queue.sizeOfDeadLetterQueue(); expect(dlqSize).toBe(1); + + // Cleanup + await worker.stop();
216-250
: Redis client is closed but worker isn'tThe test properly closes the Redis client with
await redisClient.quit()
but doesn't close the worker. This inconsistency in resource management might cause subtle issues, especially in extended test runs.Consider adding the worker cleanup after the Redis client is closed:
dlqSize = await worker.queue.sizeOfDeadLetterQueue(); expect(dlqSize).toBe(0); await redisClient.quit(); + await worker.stop();
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal-packages/redis-worker/src/worker.test.ts
(4 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
internal-packages/redis-worker/src/worker.test.ts (1)
internal-packages/redis/src/index.ts (1) (1)
createRedisClient
(16-42)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: typecheck / typecheck
// Enqueue 10 items | ||
for (let i = 0; i < 10; i++) { | ||
await worker.enqueue({ | ||
id: `item-${i}`, | ||
job: "testJob", | ||
payload: { value: i }, | ||
visibilityTimeoutMs: 5000, | ||
}); | ||
} | ||
|
||
// Wait for items to be processed | ||
await new Promise((resolve) => setTimeout(resolve, 2000)); | ||
// Wait for items to be processed | ||
await new Promise((resolve) => setTimeout(resolve, 2000)); | ||
|
||
expect(processedItems.length).toBe(10); | ||
expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely | ||
} finally { | ||
worker.stop(); | ||
} | ||
expect(processedItems.length).toBe(10); | ||
expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Potential resource leak after removal of worker.stop()
The test no longer explicitly stops the worker after completion. This could potentially lead to resource leaks as Redis connections might remain open, particularly if these tests run frequently or in large numbers.
🏁 Script executed:
#!/bin/bash
# Check if the Worker class has a cleanup mechanism or if the test environment handles cleanup
# Look for any automatic cleanup in the Worker implementation
echo "Checking Worker implementation for automatic cleanup..."
cat internal-packages/redis-worker/src/worker.ts | grep -A 20 "start()" | grep -B 20 "stop()"
# Look for cleanup in the redisTest helper
echo "Checking redisTest helper for cleanup..."
grep -r "redisTest" --include="*.ts" internal-packages/ | head -n 20
Length of output: 3037
Action Required: Address Resource Cleanup in Worker Tests
Our verification indicates that the current implementation of the Worker class does not include an automatic cleanup mechanism, and the redisTest helper does not appear to handle stopping the worker. Therefore, the removal of the explicit worker.stop()
call leaves the connection open, potentially leading to resource leaks when tests are executed frequently.
- File:
internal-packages/redis-worker/src/worker.test.ts
- Concern: The test enqueues and processes items without stopping the worker.
- Suggestion: Reintroduce an explicit
worker.stop()
call after processing is complete, or ensure that an automatic cleanup mechanism is implemented in the Worker class or via the redisTest helper to safely release resources.
…hile the run is in QUEUED_EXECUTING state by saving the EXECUTING_WITH_WAITPOINTS snapshotId as the previousSnapshotId on the QUEUED_EXECUTING snapshot
…m, make it disabled by default, configure the run engine in the webapp with env vars
e1a362e
to
38e1887
Compare
This PR introduces a new release concurrency system into the run engine that unifies how runs blocked by waitpoints optionally release concurrency:
releaseConcurrency: true
option to allow users control over how queue concurrency is released. PassingreleaseConcurrency: true
to a waitpoint for a run on a queue that has no concurrency limit basically has no effect (see above)The release concurrency system implements a "release queue" to prevent unbounded releasings, which can cause an unbounded amount of executions. It does this by implementing an event-based token bucket rate limiter backed by a queue. The token is refilled when checkpoints are created. There are new env vars to control this behavior:
RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED
setting this to 1 enables releasing concurrency. This is 0 by defaultRUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO
the max tokens in a release concurrency queue bucket is determined by a formula:environment.maximumConcurrencyLimit * env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO
.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES
if a releasing fails, this controls how many times it retries with an exponential backoffRUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT
how many consumers to run. They will attempt to dequeue releasings at an interval ofRUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL
in ms. Defaults to 1 and 500ms.RUN_ENGINE_RELEASE_CONCURRENCY_BATCH_SIZE
how many releasings to dequeue at once. Defaults to 10QUEUED_EXECUTING execution status
This PR also introduces a new snapshot execution status
QUEUED_EXECUTING
because it's possible that a run that isEXECUTING_WITH_WAITPOINTS
is unable to reacquire the concurrency needed after it attempts to continue when the blocking waitpoints are complete.The run is enqueued with this new snapshot execution status, and then can be dequeued whilst in this state. If dequeued in this state, instead of returning the run to QueueRaider, we instead notify the worker that the run can be resumed.
Reorganizing of the RunEngine internals
This PR also has created a new organization for the RunEngine internals into separate systems.
More details can be found in the run engine README.md
Work to follow from this
releaseConcurrency
option towait.forToken
Summary by CodeRabbit
New Features
Improvements
Documentation