Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re2: New release concurrency system #1804

Merged
merged 38 commits into from
Mar 19, 2025
Merged

re2: New release concurrency system #1804

merged 38 commits into from
Mar 19, 2025

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented Mar 19, 2025

This PR introduces a new release concurrency system into the run engine that unifies how runs blocked by waitpoints optionally release concurrency:

  • The new default depends on how the TaskQueue is defined. If there is a concurrency limit on the task queue, the default is to hold the concurrency for the queue and release for the environment. If there is no concurrency limit on the queue, then we release both (which effectively has no effect because the limit on the queue = the limit on the env)
  • There is no way to hold the concurrency for the env, just the queue
  • In places where waitpoints are created and block runs, we allow passing the releaseConcurrency: true option to allow users control over how queue concurrency is released. Passing releaseConcurrency: true to a waitpoint for a run on a queue that has no concurrency limit basically has no effect (see above)

The release concurrency system implements a "release queue" to prevent unbounded releasings, which can cause an unbounded amount of executions. It does this by implementing an event-based token bucket rate limiter backed by a queue. The token is refilled when checkpoints are created. There are new env vars to control this behavior:

  • RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED setting this to 1 enables releasing concurrency. This is 0 by default
  • RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO the max tokens in a release concurrency queue bucket is determined by a formula: environment.maximumConcurrencyLimit * env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO.
  • RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES if a releasing fails, this controls how many times it retries with an exponential backoff
  • RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT how many consumers to run. They will attempt to dequeue releasings at an interval of RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL in ms. Defaults to 1 and 500ms.
  • RUN_ENGINE_RELEASE_CONCURRENCY_BATCH_SIZE how many releasings to dequeue at once. Defaults to 10

QUEUED_EXECUTING execution status

CleanShot 2025-03-14 at 12 39 55

This PR also introduces a new snapshot execution status QUEUED_EXECUTING because it's possible that a run that is EXECUTING_WITH_WAITPOINTS is unable to reacquire the concurrency needed after it attempts to continue when the blocking waitpoints are complete.

The run is enqueued with this new snapshot execution status, and then can be dequeued whilst in this state. If dequeued in this state, instead of returning the run to QueueRaider, we instead notify the worker that the run can be resumed.

Reorganizing of the RunEngine internals

This PR also has created a new organization for the RunEngine internals into separate systems.

Loading
graph TD
    RE[RunEngine]
    DS[DequeueSystem]
    RAS[RunAttemptSystem]
    ESS[ExecutionSnapshotSystem]
    WS[WaitpointSystem]
    BS[BatchSystem]
    ES[EnqueueSystem]
    CS[CheckpointSystem]
    DRS[DelayedRunSystem]
    TS[TtlSystem]
    WFS[WaitingForWorkerSystem]
    RCS[ReleaseConcurrencySystem]

    %% Core Dependencies
    RE --> DS
    RE --> RAS
    RE --> ESS
    RE --> WS
    RE --> BS
    RE --> ES
    RE --> CS
    RE --> DRS
    RE --> TS
    RE --> WFS
    RE --> RCS

    %% System Dependencies
    DS --> ESS
    DS --> RAS

    RAS --> ESS
    RAS --> WS
    RAS --> BS

    WS --> ESS
    WS --> ES
    WS --> RCS

    ES --> ESS

    CS --> ESS
    CS --> ES
    CS --> RCS

    DRS --> ES

    WFS --> ES

    TS --> WS

    %% Shared Resources
    subgraph Resources
        PRI[(Prisma)]
        LOG[Logger]
        TRC[Tracer]
        RQ[RunQueue]
        RL[RunLocker]
        EB[EventBus]
        WRK[Worker]
    end

    %% Resource Dependencies
    RE -.-> Resources
    DS & RAS & ESS & WS & BS & ES & CS & DRS & TS & WFS & RCS -.-> Resources

More details can be found in the run engine README.md

Work to follow from this

  • The managed run controller needs to handle QUEUED_EXECUTING status updates and possibly initiate a checkpoint if the checkpoint hasn't already started
  • add the releaseConcurrency option to wait.forToken
  • Allow configuring queues at index time to specify the release concurrency behavior (db ready for this)
  • Re-implement deadlock detection
  • Implement the priority completion functionality

Summary by CodeRabbit

  • New Features

    • Introduced advanced concurrency management with new options (including a “releaseConcurrency” flag) and a new task status “QUEUED_EXECUTING” for clearer progress tracking.
  • Improvements

    • Optimized task scheduling, batch processing, and error handling for smoother, more reliable run execution.
    • Enhanced performance and scalability through refined delay, TTL management, and dynamic configuration.
  • Documentation

    • Updated system guides and architecture diagrams to better explain rate limiting, debouncing, and worker operations.

Copy link

changeset-bot bot commented Mar 19, 2025

⚠️ No Changeset found

Latest commit: 38e1887

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

💥 An error occurred when fetching the changed packages and changesets in this PR
Some errors occurred when validating the changesets config:
The package or glob expression "proxy" is specified in the `ignore` option but it is not found in the project. You may have misspelled the package name or provided an invalid glob expression. Note that glob expressions must be defined according to https://www.npmjs.com/package/micromatch.

Copy link
Contributor

coderabbitai bot commented Mar 19, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

The changes span numerous areas of the project. Debug and configuration files now use more specific commands and improved logging. API schemas and database migrations have been updated to include new status values and additional fields (such as project, organization, and concurrency-related columns). Refactoring in the RunEngine routes replaces deprecated modules and simplifies parameter passing (e.g., removing environment IDs from wait functions and making release concurrency configurable). In addition, several new systems and modules have been added—including token bucket–based concurrency control, batch, checkpoint, delayed run, waitpoint, and TTL systems—to manage task execution, while a comprehensive update of tests further validates system reliability.

Changes

File(s) / Module Group Change Summary
*.cursor/mcp.json, .vscode/launch.json, internal-packages/redis/src/index.ts, Vitest config Minor configuration updates: newline addition; refined test commands, dynamic retry settings, and updated reporter/testTimeout options.
Prisma migration files, schema.prisma, packages/core schemas (api.ts, runEngine.ts, types) Database schema and API updates: added new enum status (QUEUED_EXECUTING), new columns (previousSnapshotId, releaseConcurrencyOnWaitpoint, lockedQueueId, metadata, organizationId, projectId), and new optional releaseConcurrency fields in request bodies and task options.
apps/webapp routes & services (admin.api.v1.environments., engine.v1.runs., triggerTaskV2.server.ts) Refactoring of task execution: replaced “marqs” with “engine” module, removed extraneous parameters, and enabled dynamic concurrency release.
internal-packages/run-engine/src/engine/* (errors.ts, eventBus.ts, statuses.ts) New error classes and event bus functionality; updates to status-check functions now include the new QUEUED_EXECUTING status and concurrency release logic.
internal-packages/run-engine/src/engine/systems/* New systems added including BatchSystem, CheckpointSystem, DelayedRunSystem, DequeueSystem, EnqueueSystem, ExecutionSnapshotSystem, ReleaseConcurrencySystem, RunAttemptSystem, TtlSystem, WaitingForWorkerSystem, and WaitpointSystem for enhanced task/run management.
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts New token bucket–based concurrency management system with methods for token consumption, release, and queue processing with retry logic.
internal-packages/run-queue/* (index.ts, keyProducer.ts, types.ts, fairQueueSelectionStrategy.ts, errors.ts) RunQueue refactoring: removed reserve concurrency logic; added methods to support dead letter queue handling and updated key-generation logic to include project IDs.
internal-packages/run-engine/src/engine/tests/* and internal-packages/run-queue/tests/* Comprehensive test suite updates: adjustments in timeouts, removal of obsolete parameters, and added tests for new queue and concurrency functionalities.
packages/cli-v3/, packages/trigger-sdk/, references/hello-world/, references/test-tasks/ CLI and SDK enhancements: added support for the new releaseConcurrency option in task triggering and wait functions, along with updated examples and batch retrieval logic.

Sequence Diagram(s)

Loading
sequenceDiagram
  participant RE as RunEngine
  participant RS as ReleaseConcurrencySystem
  participant TBQ as TokenBucketQueue
  participant Redis as Redis DB

  RE->>RS: Request to release concurrency (with snapshot details)
  RS->>TBQ: Attempt token release via token bucket algorithm
  TBQ->>Redis: Check token availability and consume token
  Redis-->>TBQ: Token status returned
  TBQ-->>RS: Confirm token released or schedule retry
  RS-->>RE: Concurrency release status acknowledgment
Loading
sequenceDiagram
  participant RE as RunEngine
  participant WS as WaitpointSystem
  participant Worker as Worker

  RE->>WS: Block run with waitpoint (releaseConcurrency flag)
  WS->>Worker: Schedule waitpoint monitoring/completion
  Worker-->>WS: Notify waitpoint cleared
  WS-->>RE: Signal run unblocked and ready to continue

Suggested reviewers

  • matt-aitken

🐇 In the garden, I hop with delight,
New features and fixes, all shining bright.
With concurrency managed and tests that are grand,
Our project is thriving, just as we planned!
So let’s celebrate with a joyful cheer,
For the changes we've made, let’s give a big cheer! 🎉

Tip

⚡🧪 Multi-step agentic review comment chat (experimental)
  • We're introducing multi-step agentic chat in review comments. This experimental feature enhances review discussions with the CodeRabbit agentic chat by enabling advanced interactions, including the ability to create pull requests directly from comments.
    - To enable this feature, set early_access to true under in the settings.
✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🔭 Outside diff range comments (1)
internal-packages/run-queue/src/run-queue/tests/nack.test.ts (1)

11-12: 💡 Verification agent

🧩 Analysis chain

Missing test implementation

The file has all the necessary imports but appears to be missing the actual test implementation. The comment on line 11 indicates "existing code" but this is marked as a new file. Please add the test implementations that verify the nackMessage functionality, particularly tests for:

  1. Verifying concurrency is cleared when a message is nacked
  2. Confirming messages move to the dead letter queue after maximum attempts

According to the PR summary, this should test the new release concurrency system.


Let's verify if the file actually contains any test implementation for nackMessage.

Once the output is reviewed and it confirms that only imports (and placeholder code like "// ... existing code ...") are present without actual tests, this confirms the missing implementation as noted in the review comment.

Assuming the file output indicates the absence of test cases, here’s the rewritten review comment:


Critical: Missing Test Implementation in nack.test.ts

  • The file internal-packages/run-queue/src/run-queue/tests/nack.test.ts currently includes the necessary imports but does not implement any tests.
  • Please add test cases for the nackMessage functionality, specifically:
    • Verifying that concurrency is cleared when a message is nacked.
    • Confirming that messages are moved to the dead letter queue after reaching the maximum retry attempts.
  • This test is essential for validating the new release concurrency system as described in the PR summary.

🏁 Script executed:

#!/bin/bash
# Display the contents of the nack.test.ts file to inspect its implementation
cat internal-packages/run-queue/src/run-queue/tests/nack.test.ts

Length of output: 555


Critical: Missing Test Implementation in nack.test.ts

  • The file internal-packages/run-queue/src/run-queue/tests/nack.test.ts currently only contains the necessary imports and a placeholder comment ("... existing code ...") without any actual test cases.
  • Please implement tests for the nackMessage functionality, specifically:
    • Verifying that concurrency is cleared when a message is nacked.
    • Confirming that messages are moved to the dead letter queue after exceeding the maximum retry attempts.
  • This is essential to validate the new release concurrency system as outlined in the PR summary.
🧹 Nitpick comments (55)
internal-packages/database/prisma/migrations/20250319131807_add_locked_queue_id_to_task_run/migration.sql (1)

1-5: Consider adding a non-null constraint or foreign key reference

The migration adds a lockedQueueId column which appears to be related to queue locking in the new concurrency system. Consider whether this column should have constraints like NOT NULL or a foreign key reference to ensure data integrity.

internal-packages/database/prisma/migrations/20250319114436_add_metadata_to_task_run_execution_snapshots/migration.sql (1)

1-5: Consider adding a default empty object for JSONB column

The migration adds a metadata column of type JSONB. Consider adding a default empty object value (DEFAULT '{}'::jsonb) to ensure consistent behavior when querying records that don't have metadata set.

internal-packages/run-engine/src/run-queue/tests/releaseConcurrency.test.ts (1)

46-95: Good test case for releasing concurrency.

This test correctly verifies that after dequeueing a message and releasing concurrency, both the queue and environment concurrency counts are properly updated to zero.

One thing to note - line 84 calls releaseAllConcurrency but the test description refers to releaseConcurrency. Consider aligning the method name in the test description with the actual method being called for better clarity.

internal-packages/database/prisma/migrations/20250318163201_add_previous_snapshot_id_to_task_run_execution_snapshot/migration.sql (1)

1-8: Investigate index recreation pattern

The migration drops and recreates the SecretStore_key_idx index with text_pattern_ops. This pattern suggests an optimization for PostgreSQL's LIKE queries using the text_pattern_ops operator class.

While the migration will work, there are a few considerations:

  1. This creates a brief window where the index is unavailable, which could impact performance in production
  2. No explanation is provided for why the index is being modified

Consider adding a comment explaining why the index is being changed to use text_pattern_ops, and consider using a more explicit migration naming if this is the primary purpose of the change.

-- DropIndex
DROP INDEX "SecretStore_key_idx";

+ -- Recreating index with text_pattern_ops for better performance with LIKE queries
-- CreateIndex
CREATE INDEX "SecretStore_key_idx" ON "SecretStore"("key" text_pattern_ops);
packages/core/src/v3/schemas/api.ts (1)

960-962: Consider adding JSDoc for the releaseConcurrency parameter

While the parameter has been added correctly, there's no associated documentation to explain its purpose and behavior in this context.

Add JSDoc documentation similar to what exists in the wait.ts file to ensure consistent documentation across the codebase:

  idempotencyKeyTTL: z.string().optional(),

+  /**
+   * If set to true, this will cause the waitpoint to release the current run from the queue's concurrency,
+   * allowing other runs to execute while this waitpoint is pending.
+   *
+   * @default false
+   */
  releaseConcurrency: z.boolean().optional(),

  date: z.coerce.date(),
internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts (1)

169-226: Duplicate test case

This test appears to be a duplicate of the previous test (lines 110-167) that checks if the run is already being counted as concurrency. The test description, setup, and assertions are identical to the previous test.

You should remove this duplicate test or modify it to test a different scenario to avoid redundancy and improve test suite maintenance.

internal-packages/run-engine/src/engine/errors.ts (1)

70-75: NotImplementedError logs but consider adding proper name

The NotImplementedError class correctly logs an error message to the console, but unlike other error classes, it doesn't set a custom name property. For consistency, consider adding this.name = "NotImplementedError".

export class NotImplementedError extends Error {
  constructor(message: string) {
    console.error("This isn't implemented", { message });
    super(message);
+   this.name = "NotImplementedError";
  }
}
internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts (2)

48-133: Comprehensive test for basic dequeue functionality

This test case thoroughly validates the dequeue functionality by checking queue state before and after operations, properly verifying message properties and concurrency metrics.

However, consider extracting magic numbers like 10 in the dequeue call to named constants for better readability.

+const DEFAULT_BATCH_SIZE = 10;
 // Later in the code:
-const dequeued = await queue.dequeueMessageFromMasterQueue("test_12345", envMasterQueue, 10);
+const dequeued = await queue.dequeueMessageFromMasterQueue("test_12345", envMasterQueue, DEFAULT_BATCH_SIZE);

1-266: Consider adding tests for different concurrency key scenarios

The current tests validate concurrency limits at the environment and queue level, but don't test behavior with different concurrency keys within the same queue.

Consider adding a test case that verifies multiple runs with different concurrency keys can be processed simultaneously, while runs with the same concurrency key are serialized.

internal-packages/run-engine/src/engine/tests/delays.test.ts (2)

11-11: Unused import - TaskRunErrorCodes

The TaskRunErrorCodes import is not used in this file.

-import { TaskRunErrorCodes } from "@trigger.dev/core/v3";

195-297: Comprehensive test for TTL functionality

The new test case thoroughly validates the time-to-live functionality for delayed runs, ensuring that runs expire correctly after their TTL period.

However, the timing in this test is quite tight and could lead to flaky tests in CI environments. Consider making the assertions more resilient:

-//wait for 1 seconds
-await setTimeout(2_500);
+//wait for delay to elapse (with buffer)
+await setTimeout(2_500);

-//wait for 3 seconds
-await setTimeout(3_000);
+//wait for TTL to expire (with buffer)
+await setTimeout(3_000);

-//should now be expired
-const executionData3 = await engine.getRunExecutionData({ runId: run.id });
-assertNonNullable(executionData3);
-expect(executionData3.snapshot.executionStatus).toBe("FINISHED");
+//should now be expired (retry a few times to avoid timing issues)
+let executionData3;
+let attempts = 0;
+const maxAttempts = 5;
+while (attempts < maxAttempts) {
+  executionData3 = await engine.getRunExecutionData({ runId: run.id });
+  assertNonNullable(executionData3);
+  if (executionData3.snapshot.executionStatus === "FINISHED") {
+    break;
+  }
+  await setTimeout(500);
+  attempts++;
+}
+expect(executionData3.snapshot.executionStatus).toBe("FINISHED");
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (1)

117-120: Address the TODO regarding invalid dequeue state.

A comment indicates uncertainty about run re-queuing. You may need to update the run’s status or adjust concurrency if you want to retry.

I can help draft a fallback flow to mark the run as dequeuable again. Let me know if you'd like me to open an issue or propose a patch.

internal-packages/run-engine/src/engine/types.ts (1)

108-109: Add tests for releaseConcurrency property
This new boolean flag can significantly impact triggers. Consider adding coverage in integration tests to confirm it's respected during runs.

internal-packages/run-engine/src/run-queue/index.test.ts (2)

651-739: Test coverage for concurrency release is comprehensive
The new "Releasing concurrency" test thoroughly validates concurrency states (before and after release). Consider additional scenarios with multiple messages or concurrency keys to further ensure robust coverage.


799-823: Extended Dead Letter Queue testing
These lines enhance verification of items in the DLQ and their subsequent re-drive. Looks good overall. For completeness, consider testing concurrency states after redriving from the DLQ as well.

internal-packages/run-engine/README.md (2)

102-102: Refine phrasing with a comma after “By default”
A minor style improvement could make it read more naturally.

-By default it would wait again.
+By default, it would wait again.
🧰 Tools
🪛 LanguageTool

[uncategorized] ~102-~102: Did you mean: “By default,”?
Context: ...so a second attempt doesn't wait again. By default it would wait again. ```ts //Note if t...

(BY_DEFAULT_COMMA)


113-113: Consider adding a comma before “so”
This sentence can be more readable with a comma inserted.

-the run will still continue but with the errors so the developer can decide what to do
+the run will still continue but with the errors, so the developer can decide what to do
🧰 Tools
🪛 LanguageTool

[uncategorized] ~113-~113: Use a comma before ‘so’ if it connects two independent clauses (unless they are closely connected and short).
Context: ... will still continue but with the errors so the developer can decide what to do. #...

(COMMA_COMPOUND_SENTENCE_2)

internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (1)

51-161: Comprehensive concurrency release logic
Good approach: verifying valid snapshots, locking the run, and releasing concurrency scope as needed. Consider additional logs if partial Redis configs cause unexpected release issues.

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts (4)

35-46: Clarify the checkpoint creation process in the JSDoc.

The docstring is brief, but extra details about how the checkpoint snapshot is validated and persisted could help future maintainers. Consider elaborating on any assumptions or side effects (e.g., concurrency lock, wait states, event emissions).


94-118: Add test coverage for non-checkpointable states.

When snapshot.executionStatus isn’t checkpointable, you discard the checkpoint. Consider adding or expanding tests to ensure coverage of each ineligible status, verifying correct event emissions and error handling.


219-246: Improve error clarity for invalid snapshot resumes.

When continueRunExecution is called and the snapshot ID or status is invalid, you raise a generic ServiceValidationError. Including more context (e.g., which status was expected) could ease debugging.


271-289: Evaluate concurrency system error handling.

After creating the new snapshot, we notify the worker. If the concurrency refill or queue notification fails, do we have fallback or retry policies? Consider centralizing concurrency and notification error handling for better reliability.

internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (4)

9-36: Document the in-test queue-creation method.

The createReleaseConcurrencyQueue function is a key utility for test setup. Briefly document how it’s used, including the role of executedRuns. This clarifies usage patterns for new contributors.


56-61: Consider event-based or promise-based synchronization.

Using await setTimeout(1000) can create flaky tests if system load causes delays. Event-based signaling or a loop that checks the queue’s emptiness could yield more deterministic tests.


272-279: Enhance test coverage for negative or zero refills.

The test throws an error for negative tokens and does nothing for zero tokens. Consider explicitly confirming no partial state is left behind in Redis, ensuring data consistency even for these invalid calls.


523-581: Consider using a mocking library instead of console.log.

Exponential backoff intervals are logged with console.log. Switching to a mocking or spy library (e.g., jest.spyOn) can gather timing metrics in a structured manner, improving test clarity without console noise.

internal-packages/run-engine/src/engine/systems/batchSystem.ts (3)

16-25: Add a debounce or concurrency check for scheduling.

scheduleCompleteBatch schedules a job 2 seconds in the future, but multiple calls for the same batch within that window might enqueue duplicates. Consider deduplicating or merging jobs to reduce overhead and potential race conditions.


27-29: Validate usage of public method for single operation.

performCompleteBatch merely calls #tryCompleteBatch. If there's no alternative logic, consider merging these steps or making it a single method, unless the separation is for a future extension.


35-82: Guard against concurrent #tryCompleteBatch calls.

If the worker or code triggers performCompleteBatch in parallel or repeatedly, the concurrency logic might lead to repeated finalization attempts. You could use a short-lived lock around the batch or an idempotent approach that re-checks the completion state once more.

internal-packages/run-engine/src/engine/systems/waitingForWorkerSystem.ts (2)

39-44: Consider returning a typed exception or structured response for missing background worker.
The current approach logs an error and returns; it might help upstream consumers (or logs) to have a more explicit failure signal (e.g. custom error or structured response) when the background worker doesn't exist.


64-83: Possible performance improvement for batch processing runs.
This loop updates and enqueues each run in its own transaction. If you expect high volume, consider grouping updates into a single transaction or optimizing the number of transactions to reduce overhead.

internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts (2)

104-116: Validate negative or erroneous paths for waitpoint creation.
While the tests check normal success paths, consider adding negative tests (e.g., invalid environment references), ensuring robust coverage for error handling in waitpoint creation or concurrency release.


578-619: Token consumption and return logic.
These tests confirm concurrency tokens can be manually consumed and later returned to restore concurrency. To further strengthen reliability, consider partial concurrency fragments (e.g., multiple tokens) and testing that partial token returns yield the correct concurrency updates.

internal-packages/run-engine/src/engine/systems/ttlSystem.ts (2)

108-114: Clarify expected behavior when no waitpoint is found.
Throwing ServiceValidationError is valid, but consider explaining in logs or error details why the run can't be expired without a waitpoint. This clarifies the cause for system maintainers.


120-131: Validate TTL edge cases.
parseNaturalLanguageDuration(ttl) may yield invalid or zero durations if the user passes unexpected input. It might be beneficial to handle or log these edge cases explicitly so that failures are more evident.

internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts (2)

13-164: Consider reducing test complexity or splitting into smaller tests.
This test covers multiple states (initial run, failing due to user error, retrying, then succeeding). While the logic is sound, the large block can become unwieldy to maintain. Consider splitting different scenarios into separate tests or helper functions for improved clarity and maintainability.


386-492: Correct the misleading comment describing OOM retry.
The code comments imply that the run should be retried on a larger machine, but the test logic ultimately expects "CRASHED" status. This discrepancy can confuse future readers.

Suggested fix:

-  // The run should be retried with a larger machine
+  // The run fails immediately due to OOM and is not retried
internal-packages/run-engine/src/engine/tests/checkpoints.test.ts (1)

17-189: Possible timing flakiness with fixed delays.
Calls like setTimeout(500) might introduce nondeterministic behavior under heavy load or slow environments. Consider using logic that checks the run state in a loop or waiting for a specific signal instead of fixed sleeps.

internal-packages/database/prisma/schema.prisma (1)

2031-2031: New enum value QUEUED_EXECUTING is a good addition for concurrency tracking.

Make sure that all relevant code paths (including transitions from other statuses) are thoroughly tested, as newly introduced states can trigger unexpected transitions.

Consider adding descriptive JSDoc or comments around this enum to document when and how this status is set.

internal-packages/run-engine/src/run-queue/keyProducer.ts (1)

177-195: New method deadLetterQueueKey() neatly encapsulates the dead-letter queue segment.

  1. The method uses consistent org→project→env ordering, which simplifies scanning keys for debugging.
  2. Consider verifying that the environment actually supports or needs this dead-letter queue approach to avoid overhead in ephemeral or dev environments.

You might embed a short comment describing how you distinguish between the normal queue and dead-letter queue to help maintainers understand the bigger picture.

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (2)

39-54: clearBlockingWaitpoints method is straightforward.

  1. It deletes all waitpoints associated with the run, effectively clearing the blocking state.
  2. Consider whether partial or selective clearing might ever be needed, or if a total clear is always safe/relevant.

Document the use cases and side-effects of a full clear in the method docstring.


325-453: blockRunWithWaitpoint method's concurrency release approach is comprehensive.

  1. The logic to handle “EXECUTING_WITH_WAITPOINTS” vs. “SUSPENDED” states is concise; well done.
  2. Consider the edge cases if multiple waitpoints are added after a run is already suspended; your debounce approach with near-future scheduling handles this but it’s worth a stress test.

Use a typed result instead of raw query for pending_count: BigInt to avoid potential type issues. Also align with the static analysis hint by switching BigIntbigint.

- const insert = await prisma.$queryRaw<{ pending_count: BigInt }[]>`
+ const insert = await prisma.$queryRaw<{ pending_count: bigint }[]>`
🧰 Tools
🪛 Biome (1.9.4)

[error] 359-359: Don't use 'BigInt' as a type.

Use lowercase primitives for consistency.
Safe fix: Use 'bigint' instead

(lint/complexity/noBannedTypes)

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (3)

58-345: Check lock timeout and large method length in startRunAttempt

  1. Lock Duration: this.$.runLock.lock([runId], 5000, ...) uses a 5000ms lock. Consider whether this timeout could be too small under heavier load, causing frequent lock collisions.
  2. Method Length: startRunAttempt is fairly large. Consider splitting out logic (e.g., environment retrieval, queue validation, snapshot checking) into smaller private helper methods to improve readability and maintainability.
🧰 Tools
🪛 Biome (1.9.4)

[error] 332-332: Change to an optional chain.

Unsafe fix: Change to an optional chain.

(lint/complexity/useOptionalChain)


394-524: attemptSucceeded: verify that concurrency release is handled upstream

The successful path updates the task run status, completes waitpoints, and emits events. This is great, but consider verifying whether concurrency (locks, queue concurrency) is fully released in the calling method or in a finalizing step to avoid concurrency leaks if success occurs before completion.


826-900: tryNackAndRequeue: duplication of logic?

This method is helpful for requeueing runs with a fresh snapshot. However, some logic (like acking and concurrency re-application) might be partially duplicated in other methods. Consider extracting shared concurrency or requeue logic into a private helper to streamline future maintenance.

internal-packages/run-engine/src/run-queue/types.ts (1)

79-81: Overloaded deadLetterQueueKey signatures

Declaring separate overloads for deadLetterQueueKey(env: MinimalAuthenticatedEnvironment) and deadLetterQueueKey(env: EnvDescriptor) is valid but can lead to duplication. Consider narrowing to a single function that infers the correct shape or uses a union type for clarity.

internal-packages/run-engine/src/run-queue/index.ts (6)

289-289: Minor note: return await usage

Using return await this.#callEnqueueMessage(...) is not typically harmful but could be simplified to return this.#callEnqueueMessage(...) if no additional try/catch is used. This is just a style preference.


494-533: releaseAllConcurrency: essential final step for concurrency cleanup

Releasing both queue and environment concurrency in one call is helpful. Just verify that a run can’t remain stuck in concurrency sets if releaseAllConcurrency is somehow skipped or fails. Possibly add logs on success/failure.


622-684: handleRedriveMessage: concurrency after re-enqueue

After redriving, the code re-enqueues the message with enqueueMessage(), but concurrency keys are re-initialized. Confirm that any concurrency from the original attempt is cleared to avoid double counting. Also, logging around success/failure is very helpful.


773-850: #callDequeueMessage: check concurrency gating

The concurrency gating on queue and environment concurrency is well-structured. Just ensure your environment concurrency default of 1000000 or fallback logic is consistent with your intended default concurrency across the system.


851-882: #callAcknowledgeMessage refactor clarity

This method is well documented. If possible, unify concurrency removal with the logic in releaseAllConcurrency to reduce duplication. Currently, the concurrency sets are updated in multiple places.


962-1291: Custom Redis commands: beneficial for advanced queue logic

All custom Lua commands appear well-labeled, and your concurrency checks are comprehensive. Consider adding test coverage for edge cases (e.g., concurrency limit equals zero, concurrent calls for the same message, etc.) to validate the correctness of these commands.

internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (1)

217-218: Simplify boolean logic.
You can use logical negation rather than a ternary to set isValid based on whether error is present.

Here's a potential refactor:

- isValid: error ? false : true,
+ isValid: !error,
🧰 Tools
🪛 Biome (1.9.4)

[error] 217-217: Unnecessary use of boolean literals in conditional expression.

Simplify your code by directly assigning the result without using a ternary operator.
If your goal is negation, you may use the logical NOT (!) or double NOT (!!) operator for clearer and concise code.
Check for more details about NOT operator.
Unsafe fix: Remove the conditional expression with

(lint/complexity/noUselessTernary)

internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (2)

112-112: Remove redundant double negation.
!!result can be replaced with a direct boolean evaluation to simplify readability.

Consider:

- if (!!result) {
+ if (result) {
🧰 Tools
🪛 Biome (1.9.4)

[error] 112-112: Avoid redundant double-negation.

It is not necessary to use double-negation when a value will already be coerced to a boolean.
Unsafe fix: Remove redundant double-negation

(lint/complexity/noExtraBooleanCast)


621-621: Eliminate unnecessary continue statement.
There's no further logic after this if block, making the continue redundant.

- if (!processed) {
-   continue;
- }
🧰 Tools
🪛 Biome (1.9.4)

[error] 621-621: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a6e85f0 and ace9f98.

⛔ Files ignored due to path filters (2)
  • internal-packages/run-engine/execution-states.png is excluded by !**/*.png
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (77)
  • .cursor/mcp.json (1 hunks)
  • .vscode/launch.json (1 hunks)
  • apps/webapp/app/routes/admin.api.v1.environments.$environmentId.ts (3 hunks)
  • apps/webapp/app/routes/engine.v1.runs.$runFriendlyId.wait.duration.ts (1 hunks)
  • apps/webapp/app/routes/engine.v1.runs.$runFriendlyId.waitpoints.tokens.$waitpointFriendlyId.wait.ts (1 hunks)
  • apps/webapp/app/v3/services/triggerTaskV2.server.ts (5 hunks)
  • internal-packages/database/prisma/migrations/20250314133612_add_queued_executing_status_to_snapshots/migration.sql (1 hunks)
  • internal-packages/database/prisma/migrations/20250318163201_add_previous_snapshot_id_to_task_run_execution_snapshot/migration.sql (1 hunks)
  • internal-packages/database/prisma/migrations/20250319103257_add_release_concurrency_on_waitpoint_to_task_queue/migration.sql (1 hunks)
  • internal-packages/database/prisma/migrations/20250319110754_add_org_and_project_to_execution_snapshots/migration.sql (1 hunks)
  • internal-packages/database/prisma/migrations/20250319114436_add_metadata_to_task_run_execution_snapshots/migration.sql (1 hunks)
  • internal-packages/database/prisma/migrations/20250319131807_add_locked_queue_id_to_task_run/migration.sql (1 hunks)
  • internal-packages/database/prisma/schema.prisma (8 hunks)
  • internal-packages/redis-worker/src/worker.ts (1 hunks)
  • internal-packages/redis/src/index.ts (1 hunks)
  • internal-packages/run-engine/README.md (8 hunks)
  • internal-packages/run-engine/package.json (1 hunks)
  • internal-packages/run-engine/src/engine/errors.ts (1 hunks)
  • internal-packages/run-engine/src/engine/eventBus.ts (2 hunks)
  • internal-packages/run-engine/src/engine/executionSnapshots.ts (0 hunks)
  • internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (1 hunks)
  • internal-packages/run-engine/src/engine/retrying.ts (1 hunks)
  • internal-packages/run-engine/src/engine/statuses.ts (3 hunks)
  • internal-packages/run-engine/src/engine/systems/batchSystem.ts (1 hunks)
  • internal-packages/run-engine/src/engine/systems/checkpointSystem.ts (1 hunks)
  • internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts (1 hunks)
  • internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (1 hunks)
  • internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (1 hunks)
  • internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (1 hunks)
  • internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (1 hunks)
  • internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (1 hunks)
  • internal-packages/run-engine/src/engine/systems/systems.ts (1 hunks)
  • internal-packages/run-engine/src/engine/systems/ttlSystem.ts (1 hunks)
  • internal-packages/run-engine/src/engine/systems/waitingForWorkerSystem.ts (1 hunks)
  • internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts (3 hunks)
  • internal-packages/run-engine/src/engine/tests/checkpoints.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/delays.test.ts (4 hunks)
  • internal-packages/run-engine/src/engine/tests/heartbeats.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/locking.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (1 hunks)
  • internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts (0 hunks)
  • internal-packages/run-engine/src/engine/tests/waitpoints.test.ts (3 hunks)
  • internal-packages/run-engine/src/engine/types.ts (3 hunks)
  • internal-packages/run-engine/src/engine/workerCatalog.ts (1 hunks)
  • internal-packages/run-engine/src/index.ts (1 hunks)
  • internal-packages/run-engine/src/run-queue/errors.ts (1 hunks)
  • internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts (2 hunks)
  • internal-packages/run-engine/src/run-queue/index.test.ts (2 hunks)
  • internal-packages/run-engine/src/run-queue/index.ts (31 hunks)
  • internal-packages/run-engine/src/run-queue/keyProducer.ts (4 hunks)
  • internal-packages/run-engine/src/run-queue/tests/ack.test.ts (1 hunks)
  • internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts (1 hunks)
  • internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts (1 hunks)
  • internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts (3 hunks)
  • internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts (2 hunks)
  • internal-packages/run-engine/src/run-queue/tests/nack.test.ts (1 hunks)
  • internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts (1 hunks)
  • internal-packages/run-engine/src/run-queue/tests/releaseConcurrency.test.ts (1 hunks)
  • internal-packages/run-engine/src/run-queue/types.ts (1 hunks)
  • internal-packages/run-engine/tsconfig.test.json (1 hunks)
  • internal-packages/run-engine/vitest.config.ts (1 hunks)
  • internal-packages/run-queue/src/run-queue/tests/nack.test.ts (1 hunks)
  • internal-packages/testcontainers/src/setup.ts (2 hunks)
  • packages/cli-v3/src/entryPoints/dev-run-controller.ts (1 hunks)
  • packages/cli-v3/src/entryPoints/managed-run-controller.ts (1 hunks)
  • packages/core/src/v3/schemas/api.ts (2 hunks)
  • packages/core/src/v3/schemas/runEngine.ts (1 hunks)
  • packages/core/src/v3/types/tasks.ts (1 hunks)
  • packages/trigger-sdk/src/v3/shared.ts (1 hunks)
  • packages/trigger-sdk/src/v3/wait.ts (3 hunks)
  • references/hello-world/package.json (1 hunks)
  • references/hello-world/src/trigger/example.ts (1 hunks)
  • references/hello-world/src/trigger/waits.ts (1 hunks)
  • references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (3 hunks)
  • references/test-tasks/src/utils.ts (0 hunks)
💤 Files with no reviewable changes (3)
  • references/test-tasks/src/utils.ts
  • internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts
  • internal-packages/run-engine/src/engine/executionSnapshots.ts
🧰 Additional context used
🧬 Code Definitions (17)
references/hello-world/src/trigger/waits.ts (1)
packages/trigger-sdk/src/v3/wait.ts (1) (1)
  • wait (138-293)
internal-packages/run-engine/src/run-queue/tests/ack.test.ts (3)
internal-packages/run-engine/src/run-queue/keyProducer.ts (1) (1)
  • RunQueueFullKeyProducer (18-236)
internal-packages/run-engine/src/run-queue/types.ts (2) (2)
  • InputPayload (5-16)
  • InputPayload (17-17)
internal-packages/run-engine/src/run-queue/index.ts (1) (1)
  • RunQueue (73-1292)
internal-packages/run-engine/src/engine/statuses.ts (1)
packages/core/src/v3/schemas/runEngine.ts (2) (2)
  • TaskRunExecutionStatus (6-16)
  • TaskRunExecutionStatus (18-19)
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (2)
apps/webapp/app/v3/services/triggerTaskV2.server.ts (1) (1)
  • environment (451-470)
internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (1) (1)
  • getLatestExecutionSnapshot (28-100)
internal-packages/run-engine/src/run-queue/index.test.ts (1)
internal-packages/run-engine/src/run-queue/index.ts (6) (6)
  • RunQueue (73-1292)
  • redis (961-1291)
  • message (730-771)
  • message (851-882)
  • message (884-923)
  • message (925-946)
internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (2)
internal-packages/run-engine/src/engine/systems/systems.ts (1) (1)
  • SystemResources (10-23)
internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (1) (1)
  • ExecutionSnapshotSystem (144-335)
internal-packages/redis-worker/src/worker.ts (1)
internal-packages/run-engine/src/run-queue/index.ts (1) (1)
  • channel (717-728)
internal-packages/run-engine/src/engine/systems/waitingForWorkerSystem.ts (1)
internal-packages/run-engine/src/engine/index.ts (1) (1)
  • backgroundWorkerId (1379-1381)
internal-packages/run-engine/src/engine/systems/batchSystem.ts (1)
internal-packages/run-engine/src/engine/systems/systems.ts (1) (1)
  • SystemResources (10-23)
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (6)
internal-packages/run-engine/src/engine/systems/systems.ts (1) (1)
  • SystemResources (10-23)
internal-packages/run-engine/src/engine/types.ts (1) (1)
  • RunEngineOptions (10-52)
internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (1) (1)
  • getLatestExecutionSnapshot (28-100)
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (2) (2)
  • runId (1074-1183)
  • runId (1197-1218)
internal-packages/run-engine/src/engine/statuses.ts (1) (1)
  • isDequeueableExecutionStatus (3-6)
internal-packages/run-engine/src/engine/eventBus.ts (1) (1)
  • sendNotificationToWorker (189-211)
internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (1)
internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts (4) (4)
  • ReleaseConcurrencyTokenBucketQueue (40-545)
  • releaseQueue (287-289)
  • releaseQueue (291-293)
  • releaseQueue (295-297)
internal-packages/run-engine/src/engine/tests/checkpoints.test.ts (1)
internal-packages/run-engine/src/engine/index.ts (1) (1)
  • RunEngine (57-1382)
internal-packages/run-engine/src/run-queue/types.ts (1)
internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts (4) (4)
  • env (496-512)
  • env (551-569)
  • env (571-585)
  • queue (587-593)
internal-packages/run-engine/src/engine/types.ts (1)
internal-packages/run-engine/src/engine/workerCatalog.ts (1) (1)
  • workerCatalog (3-56)
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (6)
internal-packages/run-engine/src/engine/systems/systems.ts (1) (1)
  • SystemResources (10-23)
internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (2) (2)
  • ExecutionSnapshotSystem (144-335)
  • getLatestExecutionSnapshot (28-100)
internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (1) (1)
  • EnqueueSystem (16-104)
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (1) (1)
  • ReleaseConcurrencySystem (17-161)
packages/core/src/v3/schemas/api.ts (1) (1)
  • timeoutError (990-995)
internal-packages/run-engine/src/engine/eventBus.ts (1) (1)
  • sendNotificationToWorker (189-211)
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (6)
internal-packages/run-engine/src/engine/systems/systems.ts (1) (1)
  • SystemResources (10-23)
internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (2) (2)
  • getLatestExecutionSnapshot (28-100)
  • status (316-334)
internal-packages/run-engine/src/engine/types.ts (1) (1)
  • RunEngineOptions (10-52)
internal-packages/run-engine/src/engine/errors.ts (1) (1)
  • ServiceValidationError (60-68)
internal-packages/run-engine/src/engine/retrying.ts (1) (1)
  • retryOutcomeFromCompletion (41-120)
internal-packages/run-engine/src/engine/eventBus.ts (1) (1)
  • sendNotificationToWorker (189-211)
internal-packages/run-engine/src/run-queue/index.ts (3)
internal-packages/run-engine/src/run-queue/types.ts (2) (2)
  • OutputPayload (19-22)
  • OutputPayload (23-23)
internal-packages/run-engine/src/run-queue/errors.ts (1) (1)
  • MessageNotFoundError (1-5)
internal-packages/run-engine/src/run-queue/keyProducer.ts (1) (1)
  • deadLetterQueueKey (179-195)
🪛 Biome (1.9.4)
internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

[error] 538-539: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)

internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts

[error] 217-217: Unnecessary use of boolean literals in conditional expression.

Simplify your code by directly assigning the result without using a ternary operator.
If your goal is negation, you may use the logical NOT (!) or double NOT (!!) operator for clearer and concise code.
Check for more details about NOT operator.
Unsafe fix: Remove the conditional expression with

(lint/complexity/noUselessTernary)

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

[error] 359-359: Don't use 'BigInt' as a type.

Use lowercase primitives for consistency.
Safe fix: Use 'bigint' instead

(lint/complexity/noBannedTypes)

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

[error] 332-332: Change to an optional chain.

Unsafe fix: Change to an optional chain.

(lint/complexity/useOptionalChain)

internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts

[error] 112-112: Avoid redundant double-negation.

It is not necessary to use double-negation when a value will already be coerced to a boolean.
Unsafe fix: Remove redundant double-negation

(lint/complexity/noExtraBooleanCast)


[error] 621-621: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)

🪛 LanguageTool
internal-packages/run-engine/README.md

[typographical] ~94-~94: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...pleted when the associated run completes. Every run has an associatedWaitpoint ...

(WHAT_NOUNPHRASE_QUESTION_MARK)


[typographical] ~95-~95: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...s completed when the datetime is reached. - MANUAL which gets completed when th...

(WHAT_NOUNPHRASE_QUESTION_MARK)


[typographical] ~96-~96: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...ch gets completed when that event occurs. Waitpoints can have an idempotencyKey ...

(WHAT_NOUNPHRASE_QUESTION_MARK)


[uncategorized] ~102-~102: Did you mean: “By default,”?
Context: ...so a second attempt doesn't wait again. By default it would wait again. ```ts //Note if t...

(BY_DEFAULT_COMMA)


[uncategorized] ~113-~113: Use a comma before ‘so’ if it connects two independent clauses (unless they are closely connected and short).
Context: ... will still continue but with the errors so the developer can decide what to do. #...

(COMMA_COMPOUND_SENTENCE_2)

⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: typecheck / typecheck
🔇 Additional comments (164)
internal-packages/run-engine/tsconfig.test.json (1)

1-3: Expanded Test File Inclusion in TSConfig

The updated "include" array now explicitly adds the src/run-queue/tests/dequeueMessageFromMasterQueue.ts file. This is a necessary change to ensure that the new test, related to the release concurrency system enhancements, is compiled and executed as part of the test suite. The change is correctly integrated with the rest of the configuration.

internal-packages/run-queue/src/run-queue/tests/nack.test.ts (1)

1-9: Imports look appropriate for Redis-based queue testing

The imports cover essential dependencies for testing the nack functionality with Redis, including test containers, tracing, logging, and the core RunQueue components. This setup appears well-structured for testing concurrency release during message negative acknowledgment.

.cursor/mcp.json (1)

7-7: Good addition of the EOF newline character.

Adding a newline at the end of the file follows best practices for text files, ensuring compatibility with tools like git and various text editors.

internal-packages/redis/src/index.ts (1)

11-11: Good optimization for test environments.

Setting maxRetriesPerRequest to 1 in test environments is a sensible approach as it makes tests faster and more deterministic by reducing the amount of Redis retries during test runs.

.vscode/launch.json (2)

141-142: More specific test debug configuration.

The debug configuration now targets a specific test case related to the new concurrency system, which aligns with the PR's focus on the release concurrency queue. This makes it easier to debug the specific functionality being added.


149-149: Updated debug target for RunQueue tests.

The configuration now points to the main RunQueue test file instead of the waitpoints-specific test, providing a more comprehensive debugging setup for the run queue system.

internal-packages/run-engine/package.json (3)

22-22: Dependencies reordering and addition.

The dependencies have been reordered, with @unkey/cache and zod maintained, and seedrandom added (likely needed for the new concurrency system for random number generation).

Also applies to: 26-27


32-34: Updated testing dependencies.

Upgrading Vitest to v3.0.8 and adding the V8 coverage plugin will provide improved testing capabilities and better test coverage reporting.


40-40: Added test coverage script.

The new script makes it easier to run tests with coverage reporting enabled, which is a good practice for monitoring code quality and identifying untested areas of the codebase.

internal-packages/testcontainers/src/setup.ts (3)

72-76: Well-designed optional parameter structure for queue configuration

The addition of the queueOptions parameter with optional properties for releaseConcurrencyOnWaitpoint and concurrencyLimit provides a clean way to configure concurrency behavior without breaking existing code.


122-125: Good default handling for concurrency limit

The conditional logic correctly handles undefined values by defaulting to 10 when no specific concurrency limit is provided, while still allowing explicit null values to be passed through.


129-132: Proper type checking for boolean property

The code correctly checks if the releaseConcurrencyOnWaitpoint is specifically a boolean before applying it, preventing potential type errors from undefined or non-boolean values.

references/hello-world/package.json (1)

11-12: Good addition of development script

Adding the dev script simplifies the developer experience by providing a standardized way to start the development server with the trigger dev command.

internal-packages/database/prisma/migrations/20250314133612_add_queued_executing_status_to_snapshots/migration.sql (1)

1-4: LGTM: New status value supports improved concurrency handling

The addition of the 'QUEUED_EXECUTING' status to the TaskRunExecutionStatus enum will enable more granular tracking of task execution states, which is essential for the new concurrency system being implemented in this PR.

internal-packages/run-engine/src/run-queue/errors.ts (1)

1-5: LGTM: Clean error class implementation

The MessageNotFoundError class follows best practices for custom error types in TypeScript. The descriptive error message that includes the messageId will be helpful for debugging issues in the new concurrency system.

internal-packages/run-engine/src/engine/retrying.ts (1)

13-13:

❓ Verification inconclusive

Import path update looks good

Updating the import path for ServiceValidationError is good refactoring toward more modular code organization. This helps separate concerns by moving error definitions to a dedicated file.


🏁 Script executed:

#!/bin/bash
# Verify that ServiceValidationError is properly exported from errors.js
rg -l "export .*ServiceValidationError" internal-packages/run-engine/src/engine/errors.js

Length of output: 181


Verify that ServiceValidationError is Exported from a Valid Errors File

The updated import statement in internal-packages/run-engine/src/engine/retrying.ts appears to be a good refactoring step toward improved modularity. However, our initial check didn’t locate an errors.js file at the expected path. This raises one of the following possibilities:

  • The error definitions might now reside in a file with a different extension (e.g., errors.ts), or
  • The file might have been moved to a different directory, or its export syntax may differ.

Before approving the change, please verify that the file containing the ServiceValidationError export exists in the internal-packages/run-engine/src/engine directory and that it properly exports the class. For example, you can run the following command to list any files starting with “errors” in that directory:

#!/bin/bash
find internal-packages/run-engine/src/engine -maxdepth 1 -iname "errors.*"

Once you’ve confirmed the correct file is in place and that it exports ServiceValidationError appropriately, the import update can be considered safe.

internal-packages/database/prisma/migrations/20250319103257_add_release_concurrency_on_waitpoint_to_task_queue/migration.sql (1)

1-5: LGTM: Well-designed addition for concurrency control

The addition of the "releaseConcurrencyOnWaitpoint" column to the TaskQueue table provides the necessary database support for the new concurrency release system. The boolean type with NOT NULL constraint and DEFAULT false value is appropriate, maintaining backward compatibility with existing records.

references/hello-world/src/trigger/waits.ts (1)

76-81: Good addition of the releaseConcurrency parameter

Adding the releaseConcurrency: true parameter to wait.for() allows the system to release concurrency slots while waiting, enabling other tasks to execute during this wait period. This is an essential enhancement for the new release concurrency system.

With this feature, wait operations can now explicitly signal that they don't need to hold the concurrency token, improving resource utilization in the system.

internal-packages/run-engine/src/engine/tests/locking.test.ts (1)

4-4: Import path correction

The import path has been updated from a relative path pointing to the current directory (./locking.js) to a relative path pointing to the parent directory (../locking.js). This suggests the file has been moved or reorganized as part of the concurrency system changes.

packages/trigger-sdk/src/v3/shared.ts (1)

1348-1348: Good implementation of releaseConcurrency forwarding

The releaseConcurrency option is now being properly forwarded to the task execution options. This ensures that when a parent task triggers a child task with triggerAndWait(), the concurrency release behavior can be controlled by the caller.

This change integrates well with the overall concurrency control system and allows for more flexible concurrency management during task execution chains.

packages/cli-v3/src/entryPoints/dev-run-controller.ts (1)

452-453: Support for new QUEUED_EXECUTING status

Adding the QUEUED_EXECUTING case to the switch statement properly handles the new execution status that's part of the concurrency system. This status likely represents a state where a run is queued but is beginning execution while potentially waiting for concurrency tokens.

The implementation correctly follows the same pattern as the existing RUN_CREATED and QUEUED statuses, ensuring consistent behavior in the controller.

internal-packages/run-engine/src/engine/systems/systems.ts (1)

1-23: New system resources type looks well-structured

The new SystemResources type provides a clear structure for dependency injection across the system components, with specific support for the new release concurrency feature. The type signature properly includes all necessary dependencies, including Prisma, worker, event bus, logging, tracing, locks, queues, and the new concurrency token bucket.

Including the generic type parameter for ReleaseConcurrencyTokenBucketQueue with organization, project, and environment IDs ensures strong typing for the token bucket implementation.

references/hello-world/src/trigger/example.ts (1)

25-30:

❓ Verification inconclusive

Added support for concurrency control

The additional options parameter with releaseConcurrency: true enables the new release concurrency system when triggering child tasks. This will help optimize resource utilization by releasing concurrency tokens while waiting for the child task to complete.


🏁 Script executed:

#!/bin/bash
# Check for other places in the codebase that might need similar updates
rg -A 2 -B 2 "triggerAndWait\(" --type ts | grep -v "releaseConcurrency"

Length of output: 37069


Concurrency Control Update in Example File Verified

In references/hello-world/src/trigger/example.ts the changes properly add the new { releaseConcurrency: true } option to childTask.triggerAndWait, enabling the release concurrency system as intended to optimize resource utilization.

A couple of points to note:

  • Other call sites invoking triggerAndWait across the codebase do not include the releaseConcurrency flag. Please verify that these omissions are intentional (e.g., for tests or different use cases) or if similar updates should be applied for consistency.
  • Ensure that your concurrency control strategy is uniformly implemented where appropriate.
packages/cli-v3/src/entryPoints/managed-run-controller.ts (1)

630-630: Added support for new execution status

The addition of QUEUED_EXECUTING to the case statement ensures the controller correctly handles this new intermediary state between queueing and execution. This aligns with the status enum addition in the schemas file.

packages/core/src/v3/schemas/runEngine.ts (1)

9-9:

❓ Verification inconclusive

New execution status added to the enumeration

The addition of QUEUED_EXECUTING to TaskRunExecutionStatus introduces a new state that represents when a task has been dequeued but execution hasn't fully started. This provides more granular tracking of the execution lifecycle and supports the new release concurrency system.

Make sure to update your documentation to describe this new status and when it occurs in the lifecycle.


🏁 Script executed:

#!/bin/bash
# Verify that this status is properly handled across the codebase
rg "TaskRunExecutionStatus" -A 20 | grep -A 5 -B 5 "case.*QUEUED_EXECUTING"

Length of output: 75


Important: Update Documentation & Verify Handling for New Execution Status

  • In packages/core/src/v3/schemas/runEngine.ts (line 9), the new status QUEUED_EXECUTING has been added to TaskRunExecutionStatus to indicate that a task has been dequeued but not yet fully executing.
  • A preliminary search for switch-case statements or usage handling of QUEUED_EXECUTING returned no matches. Please manually verify that all parts of the codebase (e.g., switch blocks or conditionals that evaluate execution statuses) correctly handle this new state.
  • Additionally, update the documentation to clearly describe this status and specify the circumstances under which it occurs in the task lifecycle.
internal-packages/redis-worker/src/worker.ts (1)

368-368: Changed log level from 'log' to 'debug' for subscription message.

This change reduces log verbosity by moving the subscription confirmation message to debug level. This aligns with best practices for operational logging, where routine confirmation messages should only appear in debug logs.

internal-packages/run-engine/src/engine/tests/heartbeats.test.ts (1)

98-98:

❓ Verification inconclusive

Increased timeout duration in test case.

The wait time was doubled from pendingExecutingTimeout * 2 to pendingExecutingTimeout * 4. This change suggests the new concurrency system might require more time to process state transitions. This adjustment helps prevent flaky tests.


🏁 Script executed:

#!/bin/bash
# Check if there are any other timeout multipliers that might need adjustment
rg "pendingExecutingTimeout \* \d" --type ts

Length of output: 282


Action Required: Verify Consistency of Timeout Multipliers in Heartbeats Test

The increase from pendingExecutingTimeout * 2 to pendingExecutingTimeout * 4 on line 98 is clearly intended to accommodate the new concurrency dynamics by reducing test flakiness. However, a grep search on the repository shows another instance in the same file using pendingExecutingTimeout * 3. Please confirm whether the differing multipliers (*4 vs. *3) in the heartbeat tests are intentional. If both tests are expected to have the increased timeout, consider aligning them for consistency.

  • File: internal-packages/run-engine/src/engine/tests/heartbeats.test.ts
    • Line with increased timeout: await setTimeout(pendingExecutingTimeout * 4);
    • Parallel instance found: await setTimeout(pendingExecutingTimeout * 3);
apps/webapp/app/routes/engine.v1.runs.$runFriendlyId.wait.duration.ts (1)

53-53:

✅ Verification successful

Modified concurrency release behavior to use request body parameter.

This change makes the concurrency behavior configurable by propagating body.releaseConcurrency to the engine instead of using a hardcoded value. The environmentId parameter was also removed from the blockRunWithWaitpoint call.


🏁 Script executed:

#!/bin/bash
# Verify that blockRunWithWaitpoint no longer requires environmentId
rg -A 10 "blockRunWithWaitpoint" --type ts "internal-packages/run-engine/src/"

Length of output: 34192


Concurrency release update verified and approved

The modifications correctly extract the concurrency release configuration from the request body (body.releaseConcurrency), and all instances of the blockRunWithWaitpoint call have been updated to remove the hardcoded environmentId parameter, as confirmed by the grep output.

internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts (1)

1-114: LGTM: Comprehensive test for queue message handling.

This new test file thoroughly verifies the behavior of the RunQueue.enqueueMessage method, checking queue lengths, oldest message timestamps, and concurrency levels. The test is well-structured with clear setup, action, and assertions.

internal-packages/run-engine/src/index.ts (1)

1-2: Good refactoring to separate error class into its own file.

The code now exports RunEngine and RunDuplicateIdempotencyKeyError from separate files, which improves modularity and organization.

internal-packages/run-engine/src/run-queue/tests/releaseConcurrency.test.ts (3)

1-8: Good test setup using Redis test containers.

The imports and test setup look appropriate for testing the concurrency feature with Redis.


9-43: LGTM: Well-structured test configuration.

The test options and test data setup look comprehensive, with appropriate values for concurrency testing.


97-151: Good negative test case for concurrency management.

This test properly verifies that releasing concurrency has no effect when a run hasn't been dequeued, which is an important edge case to cover.

packages/core/src/v3/types/tasks.ts (1)

827-836:

✅ Verification successful

Good addition of releaseConcurrency option with clear documentation.

The releaseConcurrency option is well-documented with a clear explanation of its purpose: allowing other runs to execute while a child task is executing. The default value of false is explicitly specified which is good practice.

This change fits well with the new release concurrency system described in the PR objectives.


🏁 Script executed:

#!/bin/bash
# Check where this new option is used in the codebase
rg "releaseConcurrency\?: boolean" --type=ts

Length of output: 447


Verified Implementation: Release Concurrency Option Confirmed

  • The releaseConcurrency option is consistently defined and used across several files, including:
    • packages/core/src/v3/types/tasks.ts
    • internal-packages/run-engine/src/engine/index.ts
    • internal-packages/run-engine/src/engine/types.ts
    • internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
    • packages/trigger-sdk/src/v3/wait.ts
  • The documentation in TriggerAndWaitOptions clearly explains that setting releaseConcurrency to true allows other runs to execute while a child task is executing, with the default explicitly set to false.

This change is well implemented and matches the PR objectives for the new release concurrency system.

internal-packages/database/prisma/migrations/20250318163201_add_previous_snapshot_id_to_task_run_execution_snapshot/migration.sql (1)

4-5: Verify nullability of previousSnapshotId

The migration adds previousSnapshotId as TEXT without specifying NOT NULL, meaning it allows NULL values by default.

Confirm that this is the intended behavior for the new column. If NULL values should be valid (which seems logical for the first snapshot in a chain), this is correct. If not, the column should have a NOT NULL constraint.

packages/core/src/v3/schemas/api.ts (1)

122-122: Good addition of releaseConcurrency feature

The addition of the releaseConcurrency option to the TriggerTaskRequestBody schema is well-structured and maintains backward compatibility by making it optional.

This enhancement will provide better control over concurrency management during task execution.

internal-packages/run-engine/src/run-queue/tests/nack.test.ts (3)

1-48: Well-structured test setup and configuration

The test setup includes properly configured Redis containers, appropriate timeout settings, and clear test data fixtures. This provides a solid foundation for the test cases.

The configuration and imports are well-organized, making the tests easy to understand and maintain.


49-127: Comprehensive test for concurrency clearing on nack

This test case thoroughly verifies that nacking a message properly clears concurrency resources. It checks both the queue and environment concurrency before and after the nack operation.

The test follows a clear pattern of:

  1. Setting up the test environment
  2. Enqueueing and dequeueing a message
  3. Verifying initial concurrency state
  4. Performing the nack operation
  5. Verifying concurrency has been released
  6. Checking that the message is properly requeued

This covers the main functionality of the nack operation effectively.


129-216: Thorough test for dead letter queue functionality

This test verifies the critical path for handling message retries and dead letter queue management, ensuring that messages that exceed max attempts are properly moved to the dead letter queue.

The test effectively:

  1. Configures a low max attempt threshold (2) for easier testing
  2. Verifies the message is in the normal queue initially
  3. Nacks the message to increment the attempt counter
  4. Verifies it's still in the main queue after first nack
  5. Dequeues and nacks again to trigger the max attempts logic
  6. Verifies the message moves to the dead letter queue

This provides good confidence in the retry and dead letter queue functionality.

packages/trigger-sdk/src/v3/wait.ts (3)

92-103: Well-documented releaseConcurrency parameter

The added releaseConcurrency parameter includes comprehensive documentation that clearly explains:

  • What the parameter does (releases run from queue's concurrency)
  • When it's useful (allowing other runs to execute during a waitpoint)
  • The potential consequences (run may not resume immediately)
  • The default value (false)

This level of documentation is excellent as it helps developers understand both the functionality and the implications of using this feature.


154-154: Consistent integration of releaseConcurrency in wait.for method

The releaseConcurrency option is properly passed through from the options to the API call.

The implementation maintains consistency with how other options are handled.


192-192: Consistent integration of releaseConcurrency in wait.until method

The releaseConcurrency option is properly passed through from the options to the API call.

The implementation maintains consistency with how other options are handled.

internal-packages/run-engine/src/engine/eventBus.ts (3)

4-4: Added import for EventEmitter

The imported EventEmitter will be used to define the EventBus type, improving the overall typing of the event system.


183-183: Well-defined EventBus type

Good addition of a type alias that combines EventEmitter with your custom EventBusEvents, creating a strongly-typed event bus. This ensures that all emitted events conform to your predefined event signatures.


185-211: New worker notification function with clear documentation

The sendNotificationToWorker function is well-documented with a clear purpose: to notify workers about run state changes. The implementation correctly emits the "workerNotification" event with the proper structure as defined in the EventBusEvents type.

This function will be useful for the new concurrency release system, allowing workers to react to changes in run state.

references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (4)

1-1: Added batch import for enhanced testing capability

The addition of the batch import enables the retrieval of batch state at runtime, which is essential for these concurrency tests.


296-297: Improved test reliability with batch retrieval

Good improvement to retrieve the latest batch state with batch.retrieve() instead of relying on the initial state. This ensures the test is working with the current state of the batch tasks.


299-299: Updated to use retrieved batch for waitForRunStatus

Using retrievedHoldBatch.runs ensures that the test is waiting on the most up-to-date run objects with current status information.


346-346: Consistent use of retrieved batch runs

The updated code consistently uses the retrieved batch runs for waiting on completions, maintaining a reliable test pattern throughout the file.

internal-packages/run-engine/vitest.config.ts (1)

14-17: Enhanced test configuration for concurrency tests

The increased test timeout (60 seconds) is appropriate for concurrency-related tests that involve asynchronous operations and potential waiting periods. The V8 coverage provider is also a good choice for TypeScript projects.

These changes will support the new concurrency-related tests being added to the codebase.

internal-packages/run-engine/src/run-queue/tests/ack.test.ts (4)

1-9: Well-structured test imports

Good selection of imports for testing the RunQueue's acknowledgment functionality, especially using redisTest to work with a real Redis container.


10-46: Comprehensive test setup with realistic data

The test options and mock data are well-defined, providing a realistic environment for testing the acknowledgment functionality. The test timeout configuration aligns with the updated vitest.config.ts timeout.


48-105: Thorough test for concurrency clearing

This test effectively verifies that acknowledging a message properly clears concurrency counts from both the queue and environment levels. The test follows good practices:

  1. Setting up the environment with realistic data
  2. Verifying concurrency is correctly set after dequeuing
  3. Calling acknowledgeMessage
  4. Verifying concurrency counts are reset to zero
  5. Properly cleaning up resources with queue.quit()

This ensures the concurrency management aspect of message acknowledgment works correctly.


107-169: Complete coverage of message removal functionality

This test thoroughly verifies that acknowledging a message correctly removes it from the queue by:

  1. Checking queue lengths before and after dequeuing
  2. Verifying queue remains empty after acknowledgment
  3. Testing both task-specific queue and environment queue

The test is well-structured with proper setup, verification, and cleanup phases.

apps/webapp/app/routes/admin.api.v1.environments.$environmentId.ts (4)

5-5: Module replacement: marqs → engine

The import statement has been updated to use the engine from ~/v3/runEngine.server instead of the previous marqs module. This aligns with the new release concurrency system mentioned in the PR objectives.


116-117: Updated method references to use engine.runQueue

The calls to get environment concurrency limit and current concurrency have been updated to use the engine.runQueue methods instead of direct marqs calls. This is consistent with the import change and maintains the same functionality while consolidating concurrency management under the engine module.


120-127: Updated queue concurrency methods to use engine.runQueue

Queue-specific concurrency methods have been migrated to use the engine.runQueue methods, ensuring consistency with the environment concurrency methods. This completes the transition from marqs to engine.runQueue for concurrency management.


138-138: Simplified JSON response structure

The return statement has been simplified to exclude the reserveConcurrency and queueReserveConcurrency values that were previously included. This streamlines the API response and avoids exposing unnecessary implementation details.

internal-packages/run-engine/src/engine/statuses.ts (3)

4-4: Added QUEUED_EXECUTING to dequeueable statuses

The QUEUED_EXECUTING status has been added to the list of dequeueable execution statuses, which extends the criteria for what statuses can be dequeued. This supports the new concurrency system by introducing a transitional state for tasks that are queued but in the process of execution.


29-29: Added QUEUED_EXECUTING to checkpointable statuses

The QUEUED_EXECUTING status has been added to the list of checkpointable statuses, ensuring that tasks in this state can be appropriately checkpointed. This maintains consistency with the changes to dequeueable statuses.


49-52: Added new canReleaseConcurrency function

A new function has been implemented to determine if concurrency can be released based on the task status. This is a critical component of the new release concurrency system, allowing tasks in SUSPENDED or EXECUTING_WITH_WAITPOINTS states to release their concurrency, potentially improving resource utilization.

internal-packages/run-engine/src/engine/workerCatalog.ts (1)

1-56: New workerCatalog with well-defined operations

This new file defines a catalog of worker operations with clear schemas and timeout values. Each operation (finishWaitpoint, heartbeatSnapshot, expireRun, etc.) has a well-structured schema validation using Zod and appropriate visibility timeouts.

The catalog provides a centralized definition of operations that workers can perform, which will help maintain consistency and type safety throughout the system.

The operations defined cover important aspects of the system including waitpoints, snapshots, run lifecycle management, batch processing, and delayed runs - all critical components for a robust concurrency management system.

internal-packages/run-engine/src/run-queue/tests/reacquireConcurrency.test.ts (4)

47-108: Comprehensive test for reacquiring released concurrency

This test case thoroughly validates that concurrency can be reacquired after being explicitly released. The test properly sets up the environment, releases concurrency, attempts to reacquire it, and verifies the expected state changes.


110-167: Test for already counted concurrency

This test confirms that the reacquireConcurrency method returns true when a run is already counted in the concurrency system, preventing duplicate concurrency allocations. This is important for maintaining accurate concurrency counts.


228-300: Test for handling capacity limits

This test verifies that reacquireConcurrency returns false when trying to reacquire concurrency for a run that's not already counted and there's no remaining capacity in the environment. This is important for preventing concurrency limits from being exceeded.


302-327: Error handling test for non-existent runs

This test ensures that attempting to reacquire concurrency for a non-existent run throws a MessageNotFoundError, which is appropriate error handling behavior. The test properly uses Jest's expect().rejects.toThrow() pattern for testing rejected promises.

internal-packages/database/prisma/migrations/20250319110754_add_org_and_project_to_execution_snapshots/migration.sql (2)

8-14: Appropriate column additions

Adding organization and project IDs to execution snapshots is a good enhancement that provides better context for task runs. This aligns well with the PR objectives around the new release concurrency system.


16-26: Foreign key constraints set up correctly

The foreign key constraints are properly established with appropriate ON DELETE RESTRICT and ON UPDATE CASCADE policies, ensuring referential integrity while preventing accidental deletion of referenced organizations or projects.

internal-packages/run-engine/src/engine/errors.ts (2)

60-68: Well-structured validation error class

The ServiceValidationError class provides a clean way to handle validation errors with optional status codes, which is useful for HTTP responses. This implementation follows best practices for extending the Error class.


77-82: Properly implemented idempotency key error

The RunDuplicateIdempotencyKeyError class is well-implemented with a custom name property, making it easy to identify in error handling scenarios.

internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts (1)

195-195: Updated key format to include project information

The expected key format now includes project ID information (proj:p1234), which aligns with the database migration that adds project IDs to execution snapshots. This change ensures proper namespacing of concurrency keys.

internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts (3)

52-55: Simplified FairQueueConcurrency type

The FairQueueConcurrency type has been simplified by removing the reserve property. This aligns with the PR's objective to streamline the concurrency management system.


502-510: Updated getEnvConcurrency method to remove reserve retrieval

The #getEnvConcurrency method has been updated to only retrieve and return current and limit values, omitting the reserve value retrieval. This is consistent with the removal of the reserve property from the FairQueueConcurrency type.


405-406:

✅ Verification successful

Simplified concurrency check logic

The condition to check if an environment is at full concurrency has been simplified to only check if current >= limit, removing the reserve buffer. This simplification makes the code more straightforward but could potentially reduce flexibility in managing concurrency.

Verify that removing the reserve buffer was an intentional design decision:


🏁 Script executed:

#!/bin/bash
# Check for any references to reserve concurrency still remaining in the codebase
echo "Checking for remaining references to reserve concurrency..."
rg -A 2 -B 2 "reserve.*concurrency" --type ts
echo "------"
echo "This helps verify if the removal of reserve concurrency was comprehensive"

Length of output: 18519


Concurrency Check Simplification Verified

The removal of the reserve buffer in the concurrency check within internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts appears to be an intentional design decision. The grep results show that reserve concurrency logic is still present and actively tested in other areas of the codebase (e.g., in various test files and parts of the queuing system). This indicates that while the selection strategy was simplified to only check env.concurrency.current >= env.concurrency.limit, the reserve concurrency mechanism continues to be managed elsewhere as needed.

  • The simplified check in fairQueueSelectionStrategy.ts focuses solely on whether the current concurrency reaches its limit.
  • Reserve concurrency logic remains in the system (as seen in test files and the queue processing logic), suggesting that its management has been decoupled from this particular selection strategy.

No further changes are required regarding this design decision.

internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (7)

1-14: Well-structured imports with clear dependencies

The imports are organized logically, with proper use of named imports and appropriate relative paths, showing good module structure.


11-14: Clean type definition with clear requirements

The EnqueueSystemOptions type is well-defined with its dependencies clearly specified, making the component's requirements explicit.


16-23: Well-structured class initialization

The EnqueueSystem class follows a clean dependency injection pattern with appropriate private readonly properties, making the code more maintainable and testable.


25-56: Comprehensive parameter interface with appropriate optionals

The method signature for enqueueRun is well-designed with a clear parameter interface. It properly handles optional parameters and uses TypeScript's extract utility type for constraining the snapshot status values.


59-78: Effective use of locking mechanism and execution snapshot creation

The code correctly uses the run lock to prevent race conditions, with a reasonable timeout of 5000ms. The execution snapshot creation is properly configured with all necessary parameters.


79-82: Robust handling of master queues

The code properly handles both primary and secondary master queues, improving the reliability of the queue management system.


84-99: Well-structured message enqueuing with comprehensive metadata

The message enqueuing logic properly includes all necessary context data (environment, project, organization IDs) and run-specific information. The concurrencyKey handling correctly accommodates nullable values.

internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts (3)

1-47: Good test setup with appropriate configuration

The setup code is well-structured with comprehensive test options and environment configuration. Using real Redis instances through testcontainers provides more realistic testing conditions.


135-200: Effective concurrency limit testing

This test case correctly validates that the queue respects environment concurrency limits, which is crucial for the concurrency management system being implemented.


202-266: Queue-specific concurrency limits are properly tested

The test validates that queue-specific concurrency limits are respected, which is an important aspect of the overall concurrency management system.

apps/webapp/app/v3/services/triggerTaskV2.server.ts (5)

1-10: Enhanced error handling with structured error types

The new imports for TaskRunError, taskRunErrorEnhancer, and taskRunErrorToString enable more robust and consistent error handling throughout the service.


173-174: Added releaseConcurrency parameter to blockRunWithWaitpoint

The releaseConcurrency parameter is properly propagated from the request body to the engine call, aligning with the PR's focus on improving concurrency management.


376-377: Added releaseConcurrency parameter to trigger

The releaseConcurrency parameter is correctly passed to the engine's trigger method, maintaining consistency with the concurrency control pattern.


381-388: Improved error handling with structured error parsing

The task run error is now properly parsed using TaskRunError.parse and propagated to the event system, enhancing error reporting and debugging capabilities.


403-410: Enhanced error validation and reporting

The service now properly checks for errors in the result and throws a formatted ServiceValidationError with detailed information, improving error handling for API consumers.

internal-packages/run-engine/src/engine/tests/delays.test.ts (2)

158-159: Increased delay duration for test stability

Increasing the delay from 200ms to 400ms likely improves test reliability by allowing more time for operations to complete.


169-172: Improved method naming for clarity

Changing from rescheduleRun to rescheduleDelayedRun makes the API more explicit about its purpose, improving code readability and documentation.

internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts (3)

3-6: Imports look good.

These updated imports cleanly reference your constants, strategy, and producer. No concerns here.


214-215: Check potential flakiness in performance test threshold.

Increasing the threshold to a 6x difference (from 9x) might reduce test flakiness. Ensure this does not inadvertently pass slower second calls. Overall, your approach seems correct for typical test environments.

Would you like to run these tests under stress conditions or in slower environments to ensure stability?


229-229: Confirm updated factor for third distribution test.

The factor has been changed to 2 (instead of the prior 4). Ensure that the lowered multiple provides the intended performance check while avoiding false positives under varying conditions.

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts (1)

1-116: Overall implementation looks robust.

The concurrency checks, snapshot creation, and error handling are clear. Good use of locks and structured error flows. No other pressing issues identified in this new DequeueSystem class.

Also applies to: 121-537, 540-619

internal-packages/run-engine/src/engine/tests/waitpoints.test.ts (3)

10-10: Use of Node timers is fine.

Importing setTimeout from "node:timers/promises" is recommended for Node 18+ to access the ESM-friendly API.


107-107: Releasing concurrency while blocking the run.

Adding releaseConcurrency: true clarifies that concurrency tokens will be freed during the wait, which is consistent with your concurrency design.


1137-1344: New test for reacquiring concurrency after unblocking is thorough.

This test accurately checks the scenario of concurrency-limited queues. It appears solid and aligns with the updated concurrency code flow.

internal-packages/run-engine/src/engine/types.ts (4)

1-2: Imports from internal redis packages appear well-structured
No specific concerns. This import is consistent with your project's approach to typed Redis functionality.


7-8: MinimalAuthenticatedEnvironment import is consistent
No issues found with referencing MinimalAuthenticatedEnvironment or workerCatalog.


39-51: Check partial Redis config fallback
The optional releaseConcurrency property is well-defined, but ensure that any partial Redis config merges gracefully with default Redis settings to avoid misconfiguration.


111-111: EngineWorker type introduction
Defining EngineWorker as Worker<typeof workerCatalog> effectively couples the worker and catalog. This looks correct and straightforward.

internal-packages/run-engine/README.md (3)

27-27: Expanded explanation of race conditions
The added note around checkpoint concurrency clarifies potential conflicts. No further issues noted.


94-96: Grammar check for ‘which’ usage
The static analysis suggestion to add question marks seems like a false positive here, as these lines are statements, not questions.

🧰 Tools
🪛 LanguageTool

[typographical] ~94-~94: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...pleted when the associated run completes. Every run has an associatedWaitpoint ...

(WHAT_NOUNPHRASE_QUESTION_MARK)


[typographical] ~95-~95: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...s completed when the datetime is reached. - MANUAL which gets completed when th...

(WHAT_NOUNPHRASE_QUESTION_MARK)


[typographical] ~96-~96: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...ch gets completed when that event occurs. Waitpoints can have an idempotencyKey ...

(WHAT_NOUNPHRASE_QUESTION_MARK)


202-331: System architecture documentation
This new section and its diagram provide clarity on subsystem responsibilities. No changes needed.

internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (7)

1-2: Imports for concurrency system
These imports align with the new concurrency logic. No immediate issues found.


7-9: Zod schema for releaseConcurrency
Defining a Zod schema for metadata parsing ensures a safer and more robust approach to typed concurrency data.


11-12: Leverage typed metadata model
The ReleaseConcurrencyMetadata type keeps concurrency-related fields consistent. Implementation looks good.


13-15: ReleaseConcurrencySystemOptions design
Bundling SystemResources this way promotes modular architecture and straightforward testing.


17-22: Constructor injects system resources
This allows for easier testability and clear separation of concerns across different concurrency operations.


24-33: Checkpoint environment concurrency tokens
The checkpointCreatedOnEnvironment() method refilling tokens is logical. Verify the incremental token count to avoid possible oversubscription.


35-49: Immediate concurrency release for development
Allowing dev environments to bypass the queued release is convenient for local testing. Confirm queue logic remains consistent with other environment types.

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts (1)

56-56: Validate lock acquisition timeouts and retry logic.

You lock the run with a 5,000ms timeout. If it times out, it might leave the checkpoint process partially completed. Confirm whether higher-level error handling is needed or if it’s acceptable to fail the lock attempt without further retries.

internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts (1)

206-246: Verify correct token return when executor fails.

The test ensures tokens are returned upon failure, allowing subsequent runs to execute. Confirm that no race conditions occur if multiple failures happen quickly. Edge cases might require specialized tests or defensive logic in the executor.

internal-packages/run-engine/src/engine/systems/waitingForWorkerSystem.ts (2)

78-79: Confirm timestamp logic.
Subtracting updatedRun.priorityMs from the run’s creation timestamp is correct if you want to preserve priority-based ordering. However, ensure that negative or unexpected priority values won't invert the enqueue order.


85-88: Ensure repeated scheduling won't trigger too many concurrent jobs.
When runsWaitingForDeploy.length > maxCount, the method reschedules itself. Confirm that repeated calls to scheduleEnqueueRunsWaitingForWorker won't cause excessive recursion or queue buildup for large volumes.

internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts (1)

14-73: Thorough concurrency release testing.
The first test scenario comprehensively verifies default concurrency release behaviors and waitpoint blocking. This is a good baseline showcasing environment concurrency and queue concurrency updates.

internal-packages/run-engine/src/engine/systems/ttlSystem.ts (1)

25-34: Check concurrency with actively executing runs.
expireRun skips expiration for runs whose snapshot shows "executing." Confirm that logic aligns with your desired behavior for WAITING, SUSPENDED, or other transitional states that may also need to be guarded.

internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts (3)

277-384: Logic for handling non-retriable errors looks correct.
The test accurately verifies that internal, non-retriable errors lead to a final "CRASHED" status.


493-648: Proper verification of OOM retry logic.
This test appropriately retries the run on a larger machine following an OOM error and validates the final success upon reattempt.


650-812: Correct handling of final OOM failure after retry.
The test properly confirms that the second OOM attempt leads to a "CRASHED" final status.

internal-packages/run-engine/src/engine/tests/checkpoints.test.ts (5)

191-277: Validation of invalid snapshot ID scenario looks good.
The test properly rejects checkpoint creation when referencing an outdated or invalid snapshot.


279-474: Multiple checkpoints test covers essential checkpoint transitions.
The test ensures that multiple checkpoints can be chained correctly without interfering with each other.


476-619: Checkpoint attempt after concurrency release is properly blocked.
The test demonstrates correct handling when the snapshot has progressed beyond the point of checkpoint creation, returning "Not the latest snapshot."


622-761: Prevention of checkpoint creation in a non-checkpointable state is properly verified.
The test ensures that once suspended, further checkpoint attempts are disallowed and fired as "discarded."


764-983: Accurate state transition for checkpoint creation in QUEUED_EXECUTING.
This test confirms that the run moves to "QUEUED" after forced concurrency release, and that a checkpoint can be created in that state without issues.

internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts (4)

14-22: Initialization logic is straightforward.
The constructor stores references to the provided resources cleanly, preparing the system for use.


26-78: Reschedule logic appears robust.
Using a locked block on runId prevents concurrency issues when rescheduling a delayed run. The additional snapshot creation ensures traceability of the reschedule event.


80-125: Enqueue logic cleanly updates run status and scheduling.
The function updates run status to "PENDING" and sets the queuedAt timestamp, followed by scheduling optional TTL expirations. This approach maintains consistent lifecycle management.


127-135: Deferred scheduling of delayed runs is well-designed.
Scheduling enqueuing jobs via worker.enqueue keeps the code modular and records events for future tracing.

internal-packages/database/prisma/schema.prisma (3)

167-167: Nice addition of executionSnapshots to the Organization model.

Having a direct relationship to TaskRunExecutionSnapshot can be very useful for traceability. Ensure that any required cascading deletes or updates are configured to avoid dangling references.


507-507: Great addition of executionSnapshots to the Project model.

This mirrors the relationship in Organization, providing consistent ways to track snapshots at a project level. Ensure that you have the appropriate test coverage for retrieving snapshots through the Project relationship.


2545-2545:

✅ Verification successful

releaseConcurrencyOnWaitpoint field adds flexibility for concurrency release.

Be mindful of the potential for concurrency tokens to be released prematurely if many waiting tasks exist; double-check the logic in your RunQueue or concurrency system to ensure stable queue processing.

Would you like to verify all usage sites of this field for proper handling? If so, run this script to search for references:


🏁 Script executed:

#!/bin/bash
rg --context 5 'releaseConcurrencyOnWaitpoint'

Length of output: 6790


Verification Passed – releaseConcurrencyOnWaitpoint Usage Confirmed

After reviewing the references to releaseConcurrencyOnWaitpoint across the codebase, we can confirm that:

  • The field is properly defined in the Prisma schema (internal-packages/database/prisma/schema.prisma) with a default value of false.
  • It is correctly referenced and conditionally passed in setup logic (e.g., in internal-packages/testcontainers/src/setup.ts).
  • Its usage in the concurrency system (specifically within internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts) and relevant tests (in internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts) confirms that the concurrency release logic behaves as intended, including scenarios where the flag is set.

That said, while the code correctly applies this flag, please continue to monitor the behavior under load to ensure that concurrency tokens are not released prematurely when many tasks wait simultaneously.

internal-packages/run-engine/src/run-queue/keyProducer.ts (3)

2-2: Good addition of EnvDescriptor, QueueDescriptor, and RunQueueKeyProducer imports.

This clarifies the type boundaries and ensures the code is more self-documenting.


15-15: Introduction of DEAD_LETTER_QUEUE_PART constant for queue keys.

This helps keep key segments descriptive.


196-200:

✅ Verification successful

deadLetterQueueKeyFromQueue() builds on top of descriptorFromQueue to produce robust queue definitions.

This consistent reusability is excellent. Consider logging or handling potential mismatches in queue naming for debugging in large distributed systems.

If you want to confirm correct usage, you can search for all calls to deadLetterQueueKeyFromQueue and confirm the post-conditions:


🏁 Script executed:

#!/bin/bash
rg -A 3 'deadLetterQueueKeyFromQueue'

Length of output: 1082


Verified Usage of deadLetterQueueKeyFromQueue Function

The verification confirms that the function is being consistently used across the codebase (notably in internal-packages/run-engine/src/run-queue/types.ts and internal-packages/run-engine/src/run-queue/index.ts). The approach of building the key on top of descriptorFromQueue is solid. As an enhancement, consider adding logging or explicit error handling to capture any potential mismatches in queue naming for easier debugging in distributed environments.

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (7)

1-25: Imports and WaitpointSystemOptions definition look good.

You've pulled in everything needed for concurrency, event emission, and database access. Nicely structured.


26-37: Constructor properly injects required subsystem references.

No issues identified here. Keep the constructor minimal and rely on specialized methods inside the class to handle logic.


56-116: completeWaitpoint method handles concurrency gracefully.

  1. The transaction approach with $transaction is good—ensures atomic waitpoint completion.
  2. Well-handled fallback for P2025 error prevents the system from crashing when the waitpoint has already been updated.

118-152: Event-based scheduling of run continuation is well done.

Using worker.enqueue with minimal delays ensures that the code doesn't block normal flow. Also emitting cachedRunCompleted is a nice approach for hooking into specialized logic.


154-237: createDateTimeWaitpoint method is well organized.

  1. Good use of upsert for idempotency, though you also handle stale keys by rotating them into inactive.
  2. Scheduling a job with finishWaitpoint extends the system elegantly.

239-323: createManualWaitpoint method parallels createDateTimeWaitpoint effectively.

  1. The code for rotating expired idempotency keys is nearly identical—good consistency.
  2. The scheduled finalization with a possible error payload is a neat approach.

589-609: createRunAssociatedWaitpoint provides a direct way to link runs to new waitpoints of type RUN.

The approach is consistent with the other creation methods. The usage of generated ID plus a random idempotencyKey is well-handled.

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (7)

1-56: Congratulate on a well-defined system entry point

This file introduces a robust structure for the RunAttemptSystem class. The imports and class setup appear consistent, and the typed resources (SystemResources) integrate nicely. Logging, tracing, and typed constructor arguments stand out as well-organized. No immediate concerns here.


347-392: completeRunAttempt error handling confirmation

completeRunAttempt correctly delegates to either attemptSucceeded or attemptFailed. The switch-case for completion.ok is clear. Just confirm that any external calls (like event bus emits) have appropriate try/catch or fallback to avoid swallowing critical errors if the bus is unavailable.


526-766: attemptFailed: robust error handling; consider double-checking concurrency cleanup

The method covers multiple fail paths with thorough logic (cancellation, permanent fail, retry, etc.). Since multiple concurrency paths may be left open upon failure, ensure that every path eventually calls your concurrency release methods (particularly for partial or catastrophic failures).


768-824: systemFailure gracefully handles leftover errors

This functionality is a safety net for unhandled system-level issues. The approach of reusing attemptFailed is consistent. No major concerns, though adding specific logs or metrics for repeated system failures can be helpful in diagnosing repeated issues at scale.


901-1072: cancelRun: ensure partial cancellation scenarios are tested

The cancellation logic is robust, with multiple exit points depending on the run status. Ensure that each partial or edge scenario (e.g. run is already finished, RUN_PENDING_CANCEL states) is covered by tests or QA scenarios to avoid orphaned runs.


1074-1183: #permanentlyFailRun: thorough final error handling

Gracefully sets final status, removes concurrency, completes waitpoints, and emits events. This strongly finalizes a run after all attempts are exhausted. Looks correct; no additional concerns.


1185-1220: #finalizeRun and #getAuthenticatedEnvironmentFromRun: concise finishers

The final run cleanup and environment retrieval appear both concise and well-typed, particularly for the environment lookups. No issues spotted.

internal-packages/run-engine/src/run-queue/index.ts (12)

32-32: Use of custom MessageNotFoundError

Importing a dedicated error class for missing messages is a good practice for clearer debugging and domain-specific error handling. This is a strong addition that keeps error management explicit.


160-167: lengthOfDeadLetterQueue & messageInDeadLetterQueue: extends queue diagnostics

These new methods improve observability into the dead letter queue. Ensure that usage of messageInDeadLetterQueue includes subsequent handling if a message is found stuck in the DLQ.


169-180: redriveMessage logic adherence

Publishing to "rq:redrive" is clear, but ensure that the subscribing side properly handles any malformed or outdated messages. Also confirm that the redrive fully handles concurrency re-limits if needed.


214-215: messageExists vs. old concurrency methods

Replacing concurrency-based checks with a simpler existence check is a good simplification. Verify that older concurrency checks removed in other files are no longer needed.


218-241: readMessage: good parse & error logging

The schema-based parsing with OutputPayload.safeParse(...) is robust. Logging parse errors is essential. Confirm that subsequent usage of partial or corrupted data does not proceed inadvertently.


403-420: acknowledgeMessage: concurrency release check

Acknowledging a message calls #callAcknowledgeMessage, which includes concurrency updates. Confirm that final concurrency keys are always cleared, especially if acknowledgment is called after a partial run.


447-476: nackMessage: robust fallback to DLQ

Solid approach: increment attempts, compare with max, then transfer to the DLQ if the limit is exceeded. Ensure tests simulate multiple consecutive nacks to confirm correct behavior with repeated attempt increments.


535-570: releaseEnvConcurrency: partial concurrency release usage

Releasing only the environment concurrency is useful for certain partial release scenarios. Confirm that partial queue concurrency might remain associated if the queue concurrency isn't released at the same time. This can lead to confusion if used incorrectly.


572-614: reacquireConcurrency: re-check concurrency limit correctness

Reacquiring concurrency for a message is carefully guarded with limit checks. Good approach. For reliability, confirm that if either queue or environment concurrency has changed in the interim, reacquiring doesn’t cause miscounts.


730-771: #callEnqueueMessage: thorough parent queue rebalancing

Enqueue logic extends concurrency checks and rebalances parent queues. The debug logs are thorough and helpful. No immediate issues.


884-923: #callNackMessage: exponential backoff logic in calculateNextRetryDelay

Reinserting the message into the queue with updated score is straightforward. The logs are well-detailed. Ensure that extremely high attempt counts and large time values are tested so that the system won’t overflow or exceed Redis data type limits.


925-946: #callMoveToDeadLetterQueue: final concurrency cleanup

This is a critical function for permanent message removal from active sets. The concurrency keys are removed, which is essential. The approach looks correct.

internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (2)

27-43: Consider verifying upstream error handling when a snapshot isn't found.
The function throws an error if no valid snapshot is found. Ensure that the calling code is prepared to handle this exception to prevent unhandled rejections.


271-284: Heartbeat approach for non-latest snapshot.
If the snapshot isn't the latest by the time the heartbeat runs, your approach to acknowledge and return is sensible. Ensure the calling code gracefully handles unexpected states when dealing with older snapshots.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
internal-packages/run-engine/README.md (4)

27-29: Race Condition Explanation Enhancements
The updated description clearly highlights the potential race condition during checkpointing. This effectively communicates the need to lock the run to prevent outdated checkpoints. Consider optionally providing additional context or linking to further documentation if more details are available.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~28-~28: Possible missing comma found.
Context: ...onditions we need to deal with: - When checkpointing the run continues to execute until the ...

(AI_HYDRA_LEO_MISSING_COMMA)


[uncategorized] ~28-~28: Use a comma before “and” if it connects two independent clauses (unless they are closely connected and short).
Context: ...ored. At the same time the run continues and the checkpoint can become irrelevant if...

(COMMA_COMPOUND_SENTENCE_2)


93-97: Waitpoint Type Descriptions
The bullet points for the waitpoint types are informative. However, consider rephrasing the relative clauses for improved clarity and consistency. For example, you might change:

  • "RUN which gets completed when the associated run completes. Every run has an associatedWaitpoint that matches the lifetime of the run."
    to
  • "RUN: Completed when the associated run completes. Every run has an associatedWaitpoint that matches the lifetime of the run."

Also, review the punctuation as noted by static analysis; while the context isn’t a question, ensuring consistent grammatical styling helps maintain clarity.

🧰 Tools
🪛 LanguageTool

[typographical] ~94-~94: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...pleted when the associated run completes. Every run has an associatedWaitpoint ...

(WHAT_NOUNPHRASE_QUESTION_MARK)


[typographical] ~95-~95: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...s completed when the datetime is reached. - MANUAL which gets completed when th...

(WHAT_NOUNPHRASE_QUESTION_MARK)


[typographical] ~96-~96: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...ch gets completed when that event occurs. Waitpoints can have an idempotencyKey ...

(WHAT_NOUNPHRASE_QUESTION_MARK)


101-103: Enhance the wait.for()/wait.until() Description
The explanation is largely clear; however, refining the phrasing can improve readability. For instance, consider:

-Wait for a future time, then continue. We should add the option to pass an `idempotencyKey` so a second attempt doesn't wait again. By default it would wait again.
+Wait for a future time, then continue. We should add an option to pass an `idempotencyKey` so that a second attempt does not wait again. By default, it will wait again.

This change adds the needed comma (as suggested in the static analysis) and clarifies the intent.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~102-~102: Did you mean: “By default,”?
Context: ...so a second attempt doesn't wait again. By default it would wait again. ```ts //Note if t...

(BY_DEFAULT_COMMA)


112-113: Comma for Clarity in triggerAndWait() Description
To improve sentence clarity, consider inserting commas for a natural pause:

-Trigger and then wait for run(s) to finish. If the run fails it will still continue but with the errors so the developer can decide what to do.
+Trigger and then wait for run(s) to finish. If the run fails, it will still continue but with the errors, so the developer can decide what to do.

This minor change enhances readability by clearly separating the clauses.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~113-~113: Use a comma before ‘so’ if it connects two independent clauses (unless they are closely connected and short).
Context: ... will still continue but with the errors so the developer can decide what to do. #...

(COMMA_COMPOUND_SENTENCE_2)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6eaba5f and 3f14940.

📒 Files selected for processing (1)
  • internal-packages/run-engine/README.md (8 hunks)
🧰 Additional context used
🪛 LanguageTool
internal-packages/run-engine/README.md

[typographical] ~94-~94: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...pleted when the associated run completes. Every run has an associatedWaitpoint ...

(WHAT_NOUNPHRASE_QUESTION_MARK)


[typographical] ~95-~95: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...s completed when the datetime is reached. - MANUAL which gets completed when th...

(WHAT_NOUNPHRASE_QUESTION_MARK)


[typographical] ~96-~96: If the word ‘which’ starts a question, add a question mark (“?”) at the end of the sentence.
Context: ...ch gets completed when that event occurs. Waitpoints can have an idempotencyKey ...

(WHAT_NOUNPHRASE_QUESTION_MARK)


[uncategorized] ~102-~102: Did you mean: “By default,”?
Context: ...so a second attempt doesn't wait again. By default it would wait again. ```ts //Note if t...

(BY_DEFAULT_COMMA)


[uncategorized] ~113-~113: Use a comma before ‘so’ if it connects two independent clauses (unless they are closely connected and short).
Context: ... will still continue but with the errors so the developer can decide what to do. #...

(COMMA_COMPOUND_SENTENCE_2)

⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (13)
internal-packages/run-engine/README.md (13)

45-48: Improved Worker Types Formatting
The section now distinctly lists the two types of workers (hosted and self-hosted) and improves readability. The formatting addition (marked by the "~" at line 45) supports the explanation well.


72-73: Clarification in Run Queue Reliability Features
The description of built-in reliability features (using nacking to increment the attempt count and moving runs to the DLQ if necessary) is clear and concise. No further changes are needed here.


107-108: Code Examples for IdempotencyKey Usage
The added TypeScript examples using wait.until with an idempotencyKey are clear and demonstrate the intended usage effectively.


120-121: wait.forRequest() Explanation
The updated explanation for wait.forRequest() is clear and provides a practical example of its use case. The improvements make the intent immediately understandable.

🧰 Tools
🪛 LanguageTool

[grammar] ~121-~121: The word “callback” is a noun. The verb is spelled with a space.
Context: ..., Replicate have an API where they will callback when their work is complete. ### `wait...

(NOUN_VERB_CONFUSION)


165-168: Rate Limiting Section Clarity
The additions in the rate limiting section clearly articulate its purposes—preventing abuse and limiting execution frequency using a key. The changes enhance the overall clarity and are well integrated.


174-176: Debouncing Section Enhancement
The clarifying note about debouncing—preventing too many runs from occurring in a short period—is succinct and effectively communicates its utility. No changes needed.


200-202: New RunEngine System Architecture Section
The introduction of the "RunEngine System Architecture" header provides a good structural break, marking the start of high-level architectural documentation. This addition enhances the overall documentation clarity.


203-271: Mermaid Diagram for System Architecture
The new Mermaid diagram offers a comprehensive view of the system’s components and their interactions. This visual representation improves understanding of the system architecture. Ensure that the diagram renders correctly in all documentation viewers.


272-280: System Responsibilities Documentation
The “System Responsibilities” section is well-organized, clearly outlining the roles of the core systems such as ESS, ES, RAS, and RCS. This structured breakdown makes it easier for developers to grasp system functions.


281-286: Queue Management Overview
The Queue Management section clearly describes the functions of DS, DRS, and WFS. The concise documentation here matches the overall clarity of the README.


287-293: State Management Clarity
The State Management section neatly documents the roles of CS, WS, BS, and TS. This clear listing improves the reader's understanding of how state is managed in the system.


295-303: Shared Resources Section
The updated section on Shared Resources provides thorough context regarding critical components like Prisma, Logger, and RunLocker. This is valuable for maintaining consistency and understanding across the system.


304-327: Key Interactions Summary
The “Key Interactions” section clearly summarizes core workflows and dependencies among system components. It’s well structured; just ensure that all system names correctly reflect the latest nomenclature and implementations in the codebase.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
internal-packages/redis-worker/src/worker.test.ts (1)

1-253: 🛠️ Refactor suggestion

Consider refactoring tests to use beforeEach/afterEach for consistent cleanup

All tests create a worker instance but none consistently clean it up. This pattern makes the tests more prone to resource leaks and inconsistent behavior.

Consider refactoring the tests to use a more structured setup/teardown approach:

describe("Worker", () => {
+  let worker;
+  
+  afterEach(async () => {
+    if (worker) {
+      await worker.stop();
+      worker = null;
+    }
+  });

  redisTest("Process items that don't throw", { timeout: 30_000 }, async ({ redisContainer }) => {
    const processedItems: number[] = [];
-    const worker = new Worker({
+    worker = new Worker({
      // ...configuration...
    }).start();
    
    // Test code...
  });
  
  // Apply the same pattern to other tests

This would ensure consistent cleanup regardless of test success or failure.

🧹 Nitpick comments (3)
internal-packages/redis-worker/src/worker.test.ts (3)

96-110: Missing worker cleanup in error case test

Similar to the previous test, this test for error handling no longer calls worker.stop(). This is particularly concerning in error scenarios where proper cleanup becomes even more important.

Consider adding cleanup logic, especially since this test involves error handling:

  // Wait for items to be processed
  await new Promise((resolve) => setTimeout(resolve, 500));

  expect(processedItems.length).toBe(10);
  expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely
+ 
+ // Cleanup
+ await worker.stop();

151-173: Missing explicit cleanup for DLQ test

This test creates a worker that processes items and checks Dead Letter Queue functionality, but doesn't explicitly stop the worker afterward. Resources allocated by the worker may remain active after the test.

Consider adding cleanup at the end of the test:

  // Check that the failed item is in the DLQ
  const dlqSize = await worker.queue.sizeOfDeadLetterQueue();
  expect(dlqSize).toBe(1);
+
+ // Cleanup
+ await worker.stop();

216-250: Redis client is closed but worker isn't

The test properly closes the Redis client with await redisClient.quit() but doesn't close the worker. This inconsistency in resource management might cause subtle issues, especially in extended test runs.

Consider adding the worker cleanup after the Redis client is closed:

  dlqSize = await worker.queue.sizeOfDeadLetterQueue();
  expect(dlqSize).toBe(0);

  await redisClient.quit();
+ await worker.stop();
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3f14940 and e1a362e.

📒 Files selected for processing (1)
  • internal-packages/redis-worker/src/worker.test.ts (4 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
internal-packages/redis-worker/src/worker.test.ts (1)
internal-packages/redis/src/index.ts (1) (1)
  • createRedisClient (16-42)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: typecheck / typecheck

Comment on lines +39 to +53
// Enqueue 10 items
for (let i = 0; i < 10; i++) {
await worker.enqueue({
id: `item-${i}`,
job: "testJob",
payload: { value: i },
visibilityTimeoutMs: 5000,
});
}

// Wait for items to be processed
await new Promise((resolve) => setTimeout(resolve, 2000));
// Wait for items to be processed
await new Promise((resolve) => setTimeout(resolve, 2000));

expect(processedItems.length).toBe(10);
expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely
} finally {
worker.stop();
}
expect(processedItems.length).toBe(10);
expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Potential resource leak after removal of worker.stop()

The test no longer explicitly stops the worker after completion. This could potentially lead to resource leaks as Redis connections might remain open, particularly if these tests run frequently or in large numbers.


🏁 Script executed:

#!/bin/bash
# Check if the Worker class has a cleanup mechanism or if the test environment handles cleanup

# Look for any automatic cleanup in the Worker implementation
echo "Checking Worker implementation for automatic cleanup..."
cat internal-packages/redis-worker/src/worker.ts | grep -A 20 "start()" | grep -B 20 "stop()"

# Look for cleanup in the redisTest helper
echo "Checking redisTest helper for cleanup..."
grep -r "redisTest" --include="*.ts" internal-packages/ | head -n 20

Length of output: 3037


Action Required: Address Resource Cleanup in Worker Tests

Our verification indicates that the current implementation of the Worker class does not include an automatic cleanup mechanism, and the redisTest helper does not appear to handle stopping the worker. Therefore, the removal of the explicit worker.stop() call leaves the connection open, potentially leading to resource leaks when tests are executed frequently.

  • File: internal-packages/redis-worker/src/worker.test.ts
  • Concern: The test enqueues and processes items without stopping the worker.
  • Suggestion: Reintroduce an explicit worker.stop() call after processing is complete, or ensure that an automatic cleanup mechanism is implemented in the Worker class or via the redisTest helper to safely release resources.

@ericallam ericallam force-pushed the re2-reserve-concurrency branch from e1a362e to 38e1887 Compare March 19, 2025 17:59
@ericallam ericallam merged commit a8b3c70 into main Mar 19, 2025
10 of 12 checks passed
@ericallam ericallam deleted the re2-reserve-concurrency branch March 19, 2025 18:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants