Skip to content

Conversation

@adityachoudhari26
Copy link
Contributor

@adityachoudhari26 adityachoudhari26 commented Jun 13, 2025

Summary by CodeRabbit

  • Refactor
    • Unified multiple job dispatch mechanisms into a single, consistent interface for queueing resource, evaluation, and compute jobs.
    • Replaced various direct dispatch function calls with a fluent, hierarchical job dispatch API, improving maintainability and clarity.
  • Chores
    • Removed redundant utility files and legacy job dispatch functions no longer needed after the refactor.
  • New Features
    • Introduced a new job dispatch interface that prevents duplicate jobs and streamlines background processing tasks.
  • Enhancements
    • Improved job dispatching consistency across workers and event handlers by adopting a standardized fluent interface.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 13, 2025

Walkthrough

This change removes several specialized job dispatch utility functions and replaces their usage throughout the codebase with a new, unified dispatch interface, dispatchQueueJob. The new interface provides a fluent API for dispatching jobs to various queues, consolidating and standardizing job enqueuing logic for resource updates, evaluations, and computations.

Changes

Files/Paths Summary
apps/event-worker/src/utils/dispatch-compute-*.ts, dispatch-evaluate-jobs.ts Deleted specialized job dispatch utility functions for compute/evaluate jobs.
apps/event-worker/src/workers/compute-.ts, delete-resource.ts, new-.ts, update-*.ts, updated-resources/index.ts Replaced old dispatch utility calls with the new dispatchQueueJob fluent interface; updated imports accordingly.
apps/jobs/src/policy-checker/index.ts Added helper function for fetching affected release targets; updated main loop to use this helper.
apps/pty-proxy/src/controller/agent-socket.ts, apps/webservice/src/app/api/github/webhook/handlers/pull_request.ts, apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts Switched from dispatchUpdatedResourceJob to dispatchQueueJob().toUpdatedResource() for event dispatching.
packages/events/src/dispatch-jobs.ts New module implementing all job dispatch logic with deduplication and a hierarchical API.
packages/events/src/index.ts Re-exported from dispatch-jobs.js instead of old updated-resource module.
packages/events/src/resource-provider-scan/handle-provider-scan.ts Replaced dispatchUpdatedResourceJob with dispatchQueueJob().toUpdatedResource().
packages/events/src/updated-resource/dispatch-job.ts, updated-resource/index.ts Deleted old updated resource job dispatch function and its re-export.

Sequence Diagram(s)

sequenceDiagram
    participant Worker
    participant DispatchQueueJob
    participant Queue

    Worker->>DispatchQueueJob: dispatchQueueJob().toCompute().deployment(deployment).resourceSelector()
    DispatchQueueJob->>Queue: Check for duplicate job
    alt Job not in queue
        DispatchQueueJob->>Queue: Add job with deployment data
    else Job already queued
        DispatchQueueJob-->>Worker: Skip enqueue
    end
Loading
sequenceDiagram
    participant API
    participant DispatchQueueJob
    participant Queue

    API->>DispatchQueueJob: dispatchQueueJob().toUpdatedResource([resource])
    DispatchQueueJob->>Queue: Check for duplicate job
    alt Not duplicate
        DispatchQueueJob->>Queue: Add job for resource
    else Duplicate
        DispatchQueueJob-->>API: Skip
    end
Loading

Possibly related PRs

  • ctrlplanedev/ctrlplane#502: Introduces the new environment worker, which is now refactored to use the unified dispatch interface in this PR.
  • ctrlplanedev/ctrlplane#585: Introduces and refactors the updated resource job dispatch function, which is now replaced by the new unified interface.
  • ctrlplanedev/ctrlplane#506: Adds workers and channels for compute deployment resource selector jobs, which are now dispatched using the new interface.

Suggested reviewers

  • jsbroks

Poem

In the warren where queues once sprawled,
Dispatchers hopped, their logic called.
Now with one fluent leap they go,
Through dispatchQueueJob—in a single flow!
No more confusion, no more fuss,
Just streamlined jobs for all of us.
🐇✨


📜 Recent review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a552f78 and 56567f2.

📒 Files selected for processing (2)
  • apps/jobs/src/policy-checker/index.ts (2 hunks)
  • packages/events/src/resource-provider-scan/handle-provider-scan.ts (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • packages/events/src/resource-provider-scan/handle-provider-scan.ts
  • apps/jobs/src/policy-checker/index.ts
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Format
  • GitHub Check: Typecheck
  • GitHub Check: build (linux/amd64)
  • GitHub Check: Lint
  • GitHub Check: build (linux/amd64)
  • GitHub Check: build (linux/amd64)
  • GitHub Check: build (linux/amd64)
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

🧹 Nitpick comments (5)
apps/event-worker/src/workers/new-deployment-version.ts (1)

4-5: Fluent evaluate dispatch: minor API-surface check

dispatchQueueJob().toEvaluate().releaseTargets(releaseTargets) assumes:

  1. toEvaluate() is a function returning a builder object.
  2. That builder exposes releaseTargets(...).

If either of those change, TypeScript will catch it, but at runtime mis-configuration could silently noop.
Consider an integration test that asserts at least one job is enqueued for a mock target set.

Also applies to: 21-22

apps/event-worker/src/workers/update-deployment-variable.ts (1)

4-4: Nit: misleading typographical symbol in JSDoc above.

Unrelated to the logic change, but worth cleaning up while in the file: the JSDoc line 10 contains the character in “release targe†s”. Remove it to avoid searchability / lint noise.

apps/event-worker/src/workers/updated-resources/index.ts (1)

50-60: Dispatches run sequentially – consider parallelising

for-of + await sends jobs one-by-one; a workspace with hundreds of deployments/environments will incur needless latency.
If dispatchQueueJob() is idempotent (it already de-dupes), you can safely fire them in parallel:

-for (const deployment of deployments)
-  await dispatchQueueJob().toCompute().deployment(deployment).resourceSelector();
+await Promise.all(
+  deployments.map((d) =>
+    dispatchQueueJob().toCompute().deployment(d).resourceSelector(),
+  ),
+);

Same for environments.

apps/event-worker/src/workers/new-resource.ts (1)

29-34: Consider batching queue writes for many environments

await-ing inside a for loop serialises Redis calls and can be slow when hundreds of environments exist.
A small optimisation:

await Promise.all(
  environments.map((env) =>
    dispatchQueueJob().toCompute().environment(env).resourceSelector(),
  ),
);

Same idea applies to the deployment loop below (lines 38-42).

packages/events/src/dispatch-jobs.ts (1)

21-36: Use addBulk for evaluate jobs to reduce RTT

Inside dispatchEvaluateJobs you enqueue each release target individually.
Queue.addBulk is already in use elsewhere—doing the same here cuts Redis round-trips:

-  for (const rt of rtsToEvaluate)
-    await q.add(`${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`, rt);
+  await q.addBulk(
+    rtsToEvaluate.map((rt) => ({
+      name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
+      data: rt,
+    })),
+  );
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0b1e064 and a552f78.

📒 Files selected for processing (31)
  • apps/event-worker/src/utils/dispatch-compute-deployment-jobs.ts (0 hunks)
  • apps/event-worker/src/utils/dispatch-compute-env-jobs.ts (0 hunks)
  • apps/event-worker/src/utils/dispatch-compute-policy-target-selector-jobs.ts (0 hunks)
  • apps/event-worker/src/utils/dispatch-compute-system-jobs.ts (0 hunks)
  • apps/event-worker/src/utils/dispatch-evaluate-jobs.ts (0 hunks)
  • apps/event-worker/src/workers/compute-deployment-resource-selector.ts (2 hunks)
  • apps/event-worker/src/workers/compute-environment-resource-selector.ts (2 hunks)
  • apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts (2 hunks)
  • apps/event-worker/src/workers/compute-systems-release-targets.ts (3 hunks)
  • apps/event-worker/src/workers/delete-resource.ts (2 hunks)
  • apps/event-worker/src/workers/evaluate-release-target.ts (2 hunks)
  • apps/event-worker/src/workers/new-deployment-version.ts (2 hunks)
  • apps/event-worker/src/workers/new-deployment.ts (1 hunks)
  • apps/event-worker/src/workers/new-environment.ts (1 hunks)
  • apps/event-worker/src/workers/new-policy.ts (1 hunks)
  • apps/event-worker/src/workers/new-resource.ts (2 hunks)
  • apps/event-worker/src/workers/update-deployment-variable.ts (2 hunks)
  • apps/event-worker/src/workers/update-deployment.ts (2 hunks)
  • apps/event-worker/src/workers/update-environment.ts (2 hunks)
  • apps/event-worker/src/workers/update-policy.ts (2 hunks)
  • apps/event-worker/src/workers/update-resource-variable.ts (2 hunks)
  • apps/event-worker/src/workers/updated-resources/index.ts (2 hunks)
  • apps/jobs/src/policy-checker/index.ts (2 hunks)
  • apps/pty-proxy/src/controller/agent-socket.ts (2 hunks)
  • apps/webservice/src/app/api/github/webhook/handlers/pull_request.ts (2 hunks)
  • apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts (2 hunks)
  • packages/events/src/dispatch-jobs.ts (1 hunks)
  • packages/events/src/index.ts (1 hunks)
  • packages/events/src/resource-provider-scan/handle-provider-scan.ts (2 hunks)
  • packages/events/src/updated-resource/dispatch-job.ts (0 hunks)
  • packages/events/src/updated-resource/index.ts (0 hunks)
💤 Files with no reviewable changes (7)
  • packages/events/src/updated-resource/index.ts
  • apps/event-worker/src/utils/dispatch-evaluate-jobs.ts
  • apps/event-worker/src/utils/dispatch-compute-policy-target-selector-jobs.ts
  • packages/events/src/updated-resource/dispatch-job.ts
  • apps/event-worker/src/utils/dispatch-compute-deployment-jobs.ts
  • apps/event-worker/src/utils/dispatch-compute-env-jobs.ts
  • apps/event-worker/src/utils/dispatch-compute-system-jobs.ts
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.{ts,tsx}`: **Note on Error Handling:** Avoid strict enforcement of try/catch blocks. Code may use early returns, Promise chains (.then().catch()), or other patterns for error...

**/*.{ts,tsx}: Note on Error Handling:
Avoid strict enforcement of try/catch blocks. Code may use early returns, Promise chains (.then().catch()), or other patterns for error handling. These are acceptable as long as they maintain clarity and predictability.

  • packages/events/src/index.ts
  • apps/event-worker/src/workers/update-deployment-variable.ts
  • apps/event-worker/src/workers/update-environment.ts
  • apps/webservice/src/app/api/github/webhook/handlers/pull_request.ts
  • apps/event-worker/src/workers/update-resource-variable.ts
  • apps/event-worker/src/workers/updated-resources/index.ts
  • apps/event-worker/src/workers/new-policy.ts
  • apps/event-worker/src/workers/new-environment.ts
  • apps/event-worker/src/workers/compute-deployment-resource-selector.ts
  • apps/event-worker/src/workers/new-deployment.ts
  • apps/event-worker/src/workers/new-deployment-version.ts
  • packages/events/src/resource-provider-scan/handle-provider-scan.ts
  • apps/event-worker/src/workers/delete-resource.ts
  • apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts
  • apps/event-worker/src/workers/compute-systems-release-targets.ts
  • apps/pty-proxy/src/controller/agent-socket.ts
  • apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts
  • apps/event-worker/src/workers/new-resource.ts
  • apps/jobs/src/policy-checker/index.ts
  • apps/event-worker/src/workers/compute-environment-resource-selector.ts
  • apps/event-worker/src/workers/evaluate-release-target.ts
  • apps/event-worker/src/workers/update-deployment.ts
  • apps/event-worker/src/workers/update-policy.ts
  • packages/events/src/dispatch-jobs.ts
🧬 Code Graph Analysis (20)
apps/event-worker/src/workers/update-deployment-variable.ts (1)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
apps/event-worker/src/workers/update-environment.ts (2)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
packages/db/src/schema/job.ts (1)
  • job (75-107)
apps/webservice/src/app/api/github/webhook/handlers/pull_request.ts (1)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
apps/event-worker/src/workers/update-resource-variable.ts (1)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
apps/event-worker/src/workers/new-policy.ts (4)
packages/events/src/index.ts (1)
  • createWorker (10-25)
packages/db/src/client.ts (1)
  • db (15-15)
packages/db/src/schema/policy.ts (1)
  • policyTarget (65-79)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
apps/event-worker/src/workers/new-environment.ts (2)
packages/events/src/index.ts (1)
  • createWorker (10-25)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
apps/event-worker/src/workers/compute-deployment-resource-selector.ts (1)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
apps/event-worker/src/workers/new-deployment.ts (2)
packages/events/src/index.ts (1)
  • createWorker (10-25)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
apps/event-worker/src/workers/new-deployment-version.ts (1)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
packages/events/src/resource-provider-scan/handle-provider-scan.ts (1)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
apps/event-worker/src/workers/delete-resource.ts (1)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts (2)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
packages/db/src/schema/policy.ts (1)
  • policyTarget (65-79)
apps/event-worker/src/workers/compute-systems-release-targets.ts (1)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
apps/pty-proxy/src/controller/agent-socket.ts (1)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts (1)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
apps/event-worker/src/workers/new-resource.ts (3)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
packages/db/src/schema/environment.ts (1)
  • environment (59-84)
packages/db/src/schema/deployment.ts (1)
  • deployment (69-93)
apps/jobs/src/policy-checker/index.ts (1)
packages/db/src/client.ts (1)
  • db (15-15)
apps/event-worker/src/workers/compute-environment-resource-selector.ts (2)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
packages/db/src/schema/environment.ts (1)
  • environment (59-84)
apps/event-worker/src/workers/update-deployment.ts (1)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
apps/event-worker/src/workers/update-policy.ts (2)
packages/events/src/dispatch-jobs.ts (1)
  • dispatchQueueJob (117-121)
packages/db/src/schema/policy.ts (1)
  • policyTarget (65-79)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Typecheck
  • GitHub Check: Format
  • GitHub Check: Lint
  • GitHub Check: build (linux/amd64)
  • GitHub Check: build (linux/amd64)
  • GitHub Check: build (linux/amd64)
  • GitHub Check: build (linux/amd64)
🔇 Additional comments (14)
packages/events/src/index.ts (1)

39-40: Barrel export switched to dispatch-jobs – looks correct

The re-export now points to ./dispatch-jobs.js, matching the new unified dispatch API. No issues spotted.

apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts (1)

8-9: Update to unified dispatch API – 👍

The route now imports the barrel-exported dispatchQueueJob and uses the fluent call dispatchQueueJob().toUpdatedResource([res]).
Assuming toUpdatedResource resolves a promise identical to the previous helper, this is a drop-in replacement.

Make sure dispatchQueueJob().toUpdatedResource is covered by existing tests (e.g. that the queue receives one job for the single-element array).

Also applies to: 156-157

apps/pty-proxy/src/controller/agent-socket.ts (1)

15-16: Switch to dispatchQueueJob acknowledged

The agent socket now emits updated-resource events through the new dispatcher. Implementation matches other call sites; nothing else to flag.

Also applies to: 141-142

apps/webservice/src/app/api/github/webhook/handlers/pull_request.ts (1)

7-8: Job dispatch refactor looks good

Import swap and the new await dispatchQueueJob().toUpdatedResource(resources) integrate cleanly with the consolidated API. 👍

Also applies to: 300-300

apps/event-worker/src/workers/new-deployment.ts (1)

7-10: Good use of the fluent dispatch API

Awaiting the chained call ensures the enqueue completes before the worker returns. Implementation is clear and matches the new conventions.

apps/event-worker/src/workers/delete-resource.ts (1)

8-13: Imports & evaluate-dispatch updated correctly

The switch to dispatchQueueJob().toEvaluate().releaseTargets(...) is consistent with the new API and is properly awaited.

Also applies to: 53-53

apps/event-worker/src/workers/update-deployment-variable.ts (1)

29-30: LGTM – new fluent dispatch pattern correctly awaited.

The chain aligns with the new dispatchQueueJob API and the await ensures BullMQ back-pressure semantics remain intact. No action needed.

apps/event-worker/src/workers/update-environment.ts (1)

32-35: LGTM – correct use of the compute-queue builder.

The chained call clearly scopes the job to the environment selector recomputation and is properly awaited; error handling is preserved. 👍

apps/event-worker/src/workers/update-deployment.ts (2)

36-37: LGTM – evaluation jobs correctly awaited

The evaluation dispatch is gated behind the “jobAgent changed” guard and is awaited, preserving transactional semantics.
No further comments.


42-45: LGTM – compute job dispatch follows new API

Chained builder is clear and awaited; matches the refactor goals.

apps/event-worker/src/workers/new-environment.ts (1)

7-10: LGTM – new environment compute dispatch implemented correctly

Adopts the new fluent API and awaits the Promise; concise and readable.

apps/event-worker/src/workers/updated-resources/index.ts (1)

63-65: Redundant await?

If .releaseTargets() internally chunks & enqueues jobs synchronously, the outer await is pointless.
Verify the return type and either:
• keep await (preferred, ensures back-pressure), or
• remove it for non-Promise results.

apps/jobs/src/policy-checker/index.ts (1)

49-53: Pagination offset can skip records if duplicates are removed

If you implement the de-duplication above after fetching, the number of processed items per page may be < PAGE_SIZE, yet you still add PAGE_SIZE to offset, skipping some rows.

Consider using offset += releaseTargets.length or switch to keyset pagination.

apps/event-worker/src/workers/compute-systems-release-targets.ts (1)

226-226: 👍 Correct async usage here

await dispatchQueueJob().toEvaluate().releaseTargets(toEvaluate); keeps the transactional flow deterministic; nice catch.

Comment on lines +14 to +17
dispatchQueueJob()
.toCompute()
.policyTarget(policyTarget)
.releaseTargetSelector();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Await dispatches or batch them to prevent silent failures

releaseTargetSelector() most likely returns a Promise (compare usage in new-deployment where it’s awaited).
Not awaiting here means:

  1. The worker may finish before the enqueue call resolves, losing telemetry/errors.
  2. Unhandled-rejection noise if the enqueue rejects.

Either await inside the loop or batch-dispatch:

-  for (const policyTarget of policyTargets)
-    dispatchQueueJob()
-      .toCompute()
-      .policyTarget(policyTarget)
-      .releaseTargetSelector();
+  await Promise.all(
+    policyTargets.map((policyTarget) =>
+      dispatchQueueJob()
+        .toCompute()
+        .policyTarget(policyTarget)
+        .releaseTargetSelector(),
+    ),
+  );

Keeps semantics while guaranteeing the job completes (or fails) before the worker acknowledges.


🏁 Script executed:

#!/bin/bash
set -e

echo "1) Locate update-policy.ts in the repo:"
rg -l "update-policy.ts"

echo -e "\n2) Show context around dispatchQueueJob() in update-policy.ts:"
rg -n -C3 "dispatchQueueJob" apps/event-worker/src/workers/update-policy.ts

echo -e "\n3) Find all occurrences (definitions and calls) of releaseTargetSelector:"
rg -n -C5 "releaseTargetSelector" .

Length of output: 122


🏁 Script executed:

#!/bin/bash
set -e

# 1. Locate the exact path of update-policy.ts
echo "Finding update-policy.ts file path:"
fd -t f update-policy.ts

# 2. Show the first 50 lines to confirm imports and context
echo -e "\n--- update-policy.ts (first 50 lines) ---"
sed -n '1,50p' apps/event-worker/src/workers/update-policy.ts

# 3. Inspect dispatchQueueJob usage around the loop
echo -e "\n--- dispatchQueueJob calls ---"
rg -n -C3 "dispatchQueueJob" apps/event-worker/src/workers/update-policy.ts

# 4. Inspect releaseTargetSelector calls in the same file
echo -e "\n--- releaseTargetSelector calls ---"
rg -n -C3 "releaseTargetSelector" apps/event-worker/src/workers/update-policy.ts

# 5. Locate definition of releaseTargetSelector to verify its return type
echo -e "\n--- Definition of releaseTargetSelector ---"
rg -R "releaseTargetSelector" -n .

Length of output: 2461


🏁 Script executed:

#!/bin/bash
set -e

# 1. List all workspace packages
echo "Packages directory contents:"
ls packages
echo

# 2. Identify the @ctrlplane/events package path
EVENTS_PKG=$(ls -d packages/* | grep events || true)
echo "Events package path: $EVENTS_PKG"
echo

# 3. List source files in the events package
echo "Listing $EVENTS_PKG/src:"
ls "$EVENTS_PKG"/src
echo

# 4. Search for dispatchQueueJob definition
echo "Searching for dispatchQueueJob in $EVENTS_PKG/src:"
rg -n "dispatchQueueJob" "$EVENTS_PKG"/src
echo

# 5. Search for releaseTargetSelector definition
echo "Searching for releaseTargetSelector in $EVENTS_PKG/src:"
rg -n "releaseTargetSelector" "$EVENTS_PKG"/src

Length of output: 1478


🏁 Script executed:

#!/bin/bash
set -e

# Display implementation of dispatchQueueJob and its methods
echo "----- dispatch-jobs.ts (lines 80-160) -----"
sed -n '80,160p' packages/events/src/dispatch-jobs.ts

Length of output: 1385


🏁 Script executed:

#!/bin/bash
set -e

# Locate the definition and usage of dispatchComputePolicyTargetReleaseTargetSelectorJobs
echo "Searching for dispatchComputePolicyTargetReleaseTargetSelectorJobs definition:"
rg -n -C3 "dispatchComputePolicyTargetReleaseTargetSelectorJobs" -n packages/events/src

Length of output: 1251


Ensure enqueue calls complete by awaiting releaseTargetSelector

In apps/event-worker/src/workers/update-policy.ts,

for (const policyTarget of policyTargets)
  dispatchQueueJob()
    .toCompute()
    .policyTarget(policyTarget)
    .releaseTargetSelector();

does not await the promise returned by releaseTargetSelector(), so the worker callback can finish before jobs are enqueued (losing errors/telemetry) and unhandled rejections may occur.

Update to collect and await all dispatches, for example:

- for (const policyTarget of policyTargets)
-   dispatchQueueJob()
-     .toCompute()
-     .policyTarget(policyTarget)
-     .releaseTargetSelector();
+ await Promise.all(
+   policyTargets.map((policyTarget) =>
+     dispatchQueueJob()
+       .toCompute()
+       .policyTarget(policyTarget)
+       .releaseTargetSelector(),
+   ),
+ );

This guarantees the worker waits for every enqueue (or surfaces failures) before completing.

🤖 Prompt for AI Agents
In apps/event-worker/src/workers/update-policy.ts around lines 14 to 17, the
call to releaseTargetSelector() returns a Promise that is not awaited, causing
potential silent failures and unhandled rejections. Fix this by collecting all
the promises returned by releaseTargetSelector() in an array during the loop and
then awaiting Promise.all on that array after the loop to ensure all dispatches
complete before the worker finishes.

Comment on lines 11 to +15
for (const policyTarget of policyTargets)
dispatchComputePolicyTargetReleaseTargetSelectorJobs(policyTarget);
dispatchQueueJob()
.toCompute()
.policyTarget(policyTarget)
.releaseTargetSelector();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Mirror the awaiting strategy used elsewhere for consistency

Same pattern as in update-policy.ts: the dispatch is fire-and-forget, whereas new-deployment awaits the promise. Aligning behaviour avoids hidden race conditions and makes error-handling uniform across workers.

🤖 Prompt for AI Agents
In apps/event-worker/src/workers/new-policy.ts around lines 11 to 15, the
dispatchQueueJob calls are currently fire-and-forget without awaiting their
promises. To align with the awaiting strategy used in new-deployment and ensure
consistent error handling and avoid race conditions, modify the code to await
each dispatchQueueJob call inside the loop or collect the promises and await
them properly.

Comment on lines +39 to 40
dispatchQueueJob().toEvaluate().releaseTargets(affectedReleaseTargets);
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing await may cause silent dispatch failure

dispatchQueueJob().toEvaluate().releaseTargets() returns a Promise.
Without await, the worker will resolve before the dispatch settles; any rejection becomes an unhandled background promise and BullMQ will mark the job as completed even on failure.

-    dispatchQueueJob().toEvaluate().releaseTargets(affectedReleaseTargets);
+    await dispatchQueueJob()
+      .toEvaluate()
+      .releaseTargets(affectedReleaseTargets);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dispatchQueueJob().toEvaluate().releaseTargets(affectedReleaseTargets);
},
await dispatchQueueJob()
.toEvaluate()
.releaseTargets(affectedReleaseTargets);
},
🤖 Prompt for AI Agents
In apps/event-worker/src/workers/update-resource-variable.ts around lines 39 to
40, the call to
dispatchQueueJob().toEvaluate().releaseTargets(affectedReleaseTargets) returns a
Promise but is not awaited. To prevent silent dispatch failures and ensure the
worker waits for the dispatch to complete, add the await keyword before this
call so that any rejection is properly handled and the job status reflects the
actual outcome.

Comment on lines +193 to 195
dispatchQueueJob().toEvaluate().releaseTargets([job.data]);
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing await may swallow dispatch errors

dispatchQueueJob().toEvaluate().releaseTargets(...) is very likely asynchronous (every other call site awaits it).
Not awaiting here means any rejection is detached from the surrounding try/​catch, causing an unhandled-rejection and silent job-loss.

-        dispatchQueueJob().toEvaluate().releaseTargets([job.data]);
+        await dispatchQueueJob().toEvaluate().releaseTargets([job.data]);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dispatchQueueJob().toEvaluate().releaseTargets([job.data]);
return;
}
await dispatchQueueJob().toEvaluate().releaseTargets([job.data]);
return;
}
🤖 Prompt for AI Agents
In apps/event-worker/src/workers/evaluate-release-target.ts around lines 193 to
195, the call to dispatchQueueJob().toEvaluate().releaseTargets([job.data]) is
asynchronous but lacks an await, which can cause errors to be unhandled and
swallowed. Add the await keyword before this call to ensure any rejections are
caught by the surrounding try/catch and prevent silent job failures.

Comment on lines +43 to +46
dispatchQueueJob()
.toCompute()
.policyTarget(policyTarget)
.releaseTargetSelector();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Retry dispatch should also be awaited

-        dispatchQueueJob()
+        await dispatchQueueJob()
           .toCompute()
           .policyTarget(policyTarget)
           .releaseTargetSelector();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dispatchQueueJob()
.toCompute()
.policyTarget(policyTarget)
.releaseTargetSelector();
await dispatchQueueJob()
.toCompute()
.policyTarget(policyTarget)
.releaseTargetSelector();
🤖 Prompt for AI Agents
In
apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts
around lines 43 to 46, the dispatchQueueJob call chain that includes toCompute,
policyTarget, and releaseTargetSelector is missing an await for the retry
dispatch. Modify the code to await the dispatchQueueJob call to ensure the retry
dispatch completes before proceeding.

Comment on lines +211 to +217
dispatchQueueJob()
.toCompute()
.system(system)
.releaseTargets(true, [
...(processedPolicyTargetIds ?? []),
...additionalProcessedPolicyTargetIds,
]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fire-and-forget dispatch may swallow Redis / BullMQ errors

dispatchQueueJob().toCompute().system(system).releaseTargets(...) returns a promise from the underlying queue API, but it is not await-ed (nor chained with .catch()).
If Redis is unreachable or BullMQ rejects the add-job call, the exception becomes an unhandled-rejection and the worker will exit silently in many Node runtimes.

-            dispatchQueueJob()
-              .toCompute()
-              .system(system)
-              .releaseTargets(true, [
+            await dispatchQueueJob()
+              .toCompute()
+              .system(system)
+              .releaseTargets(true, [

Apply the same treatment to the analogous call two blocks below (lines 230-231).
If the intention is truly “fire-and-forget”, add an explicit .catch((e) => log.error(e)) to avoid process crashes.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dispatchQueueJob()
.toCompute()
.system(system)
.releaseTargets(true, [
...(processedPolicyTargetIds ?? []),
...additionalProcessedPolicyTargetIds,
]);
await dispatchQueueJob()
.toCompute()
.system(system)
.releaseTargets(true, [
...(processedPolicyTargetIds ?? []),
...additionalProcessedPolicyTargetIds,
]);
🤖 Prompt for AI Agents
In apps/event-worker/src/workers/compute-systems-release-targets.ts around lines
211 to 217, the dispatchQueueJob call returns a promise that is not awaited or
handled for errors, risking unhandled promise rejections if Redis or BullMQ
fail. Fix this by adding a .catch handler to the promise to log any errors. Also
apply the same fix to the similar dispatchQueueJob call around lines 230 to 231
to ensure all fire-and-forget calls handle errors properly.

Comment on lines +88 to +91
dispatchQueueJob()
.toCompute()
.environment(environment)
.resourceSelector();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add awaiting / error handling for retry dispatch

For the retry path you probably want to make sure the retry job is actually enqueued before returning.

-        dispatchQueueJob()
+        await dispatchQueueJob()
           .toCompute()
           .environment(environment)
           .resourceSelector();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dispatchQueueJob()
.toCompute()
.environment(environment)
.resourceSelector();
await dispatchQueueJob()
.toCompute()
.environment(environment)
.resourceSelector();
🤖 Prompt for AI Agents
In apps/event-worker/src/workers/compute-environment-resource-selector.ts around
lines 88 to 91, the dispatchQueueJob call for the retry path is missing awaiting
and error handling. Modify the code to await the dispatchQueueJob chain to
ensure the retry job is enqueued before returning, and add try-catch error
handling to properly handle any errors during dispatch.

Comment on lines +81 to +84
dispatchQueueJob()
.toCompute()
.system(environment.system)
.releaseTargets();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Missing await may drop job if queue add fails

Same pattern as in the system worker: the promise from dispatchQueueJob() is ignored.
Either await it or add an error handler to prevent unobserved rejections.

-      dispatchQueueJob()
+      await dispatchQueueJob()
         .toCompute()
         .system(environment.system)
         .releaseTargets();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dispatchQueueJob()
.toCompute()
.system(environment.system)
.releaseTargets();
await dispatchQueueJob()
.toCompute()
.system(environment.system)
.releaseTargets();
🤖 Prompt for AI Agents
In apps/event-worker/src/workers/compute-environment-resource-selector.ts around
lines 81 to 84, the call to dispatchQueueJob() returns a promise that is not
awaited or handled, risking dropped jobs if the queue addition fails. Fix this
by adding an await before dispatchQueueJob() to ensure the promise is resolved
or rejected properly, or attach a catch handler to handle any errors and prevent
unobserved promise rejections.

Comment on lines +8 to +12
const waiting = await q.getWaiting();
const waitingIds = new Set(waiting.map((job) => job.data.id));
const resourcesNotAlreadyQueued = resources.filter(
(resource) => !waitingIds.has(resource.id),
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Deduplication checks only “waiting” jobs

q.getWaiting() ignores jobs that are delayed, active, or paused.
Duplicates can still slip through, especially under high load.

Options:

  1. Query by jobId – BullMQ rejects duplicates regardless of state.
  2. Or call q.getJobs(['waiting','delayed','active']) to widen the check.

Worth tightening if idempotency is critical.

🤖 Prompt for AI Agents
In packages/events/src/dispatch-jobs.ts around lines 8 to 12, the current
deduplication only checks jobs in the "waiting" state using q.getWaiting(),
which misses duplicates in "delayed", "active", or "paused" states. To fix this,
replace q.getWaiting() with q.getJobs(['waiting', 'delayed', 'active']) to
include these states in the deduplication check, ensuring no duplicates slip
through under high load.

@adityachoudhari26 adityachoudhari26 merged commit a5cce53 into main Jun 13, 2025
9 checks passed
@adityachoudhari26 adityachoudhari26 deleted the move-dispatch-jobs branch June 13, 2025 19:54
This was referenced Oct 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants