-
Notifications
You must be signed in to change notification settings - Fork 11
chore: move dispatch jobs to events package #588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,7 @@ | ||
| import { and, eq, isNull, selector, sql } from "@ctrlplane/db"; | ||
| import { db } from "@ctrlplane/db/client"; | ||
| import * as schema from "@ctrlplane/db/schema"; | ||
| import { Channel, createWorker } from "@ctrlplane/events"; | ||
|
|
||
| import { dispatchComputeDeploymentResourceSelectorJobs } from "../utils/dispatch-compute-deployment-jobs.js"; | ||
| import { dispatchComputeSystemReleaseTargetsJobs } from "../utils/dispatch-compute-system-jobs.js"; | ||
| import { Channel, createWorker, dispatchQueueJob } from "@ctrlplane/events"; | ||
|
|
||
| export const computeDeploymentResourceSelectorWorkerEvent = createWorker( | ||
| Channel.ComputeDeploymentResourceSelector, | ||
|
|
@@ -61,11 +58,14 @@ export const computeDeploymentResourceSelectorWorkerEvent = createWorker( | |
| .onConflictDoNothing(); | ||
| }); | ||
|
|
||
| dispatchComputeSystemReleaseTargetsJobs(deployment.system); | ||
| dispatchQueueJob().toCompute().system(deployment.system).releaseTargets(); | ||
| } catch (e: any) { | ||
| const isRowLocked = e.code === "55P03"; | ||
| if (isRowLocked) { | ||
| dispatchComputeDeploymentResourceSelectorJobs(deployment); | ||
| dispatchQueueJob() | ||
| .toCompute() | ||
| .deployment(deployment) | ||
| .resourceSelector(); | ||
|
Comment on lines
+65
to
+68
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Ditto in retry branch Add 🤖 Prompt for AI Agents |
||
| return; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,10 +1,7 @@ | ||||||||||||||||||
| import { and, eq, isNull, selector, sql } from "@ctrlplane/db"; | ||||||||||||||||||
| import { db } from "@ctrlplane/db/client"; | ||||||||||||||||||
| import * as schema from "@ctrlplane/db/schema"; | ||||||||||||||||||
| import { Channel, createWorker } from "@ctrlplane/events"; | ||||||||||||||||||
|
|
||||||||||||||||||
| import { dispatchComputeEnvironmentResourceSelectorJobs } from "../utils/dispatch-compute-env-jobs.js"; | ||||||||||||||||||
| import { dispatchComputeSystemReleaseTargetsJobs } from "../utils/dispatch-compute-system-jobs.js"; | ||||||||||||||||||
| import { Channel, createWorker, dispatchQueueJob } from "@ctrlplane/events"; | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Worker that computes and updates the resources associated with an environment | ||||||||||||||||||
|
|
@@ -81,12 +78,17 @@ export const computeEnvironmentResourceSelectorWorkerEvent = createWorker( | |||||||||||||||||
| .values(computedEnvironmentResources) | ||||||||||||||||||
| .onConflictDoNothing(); | ||||||||||||||||||
| }); | ||||||||||||||||||
|
|
||||||||||||||||||
| dispatchComputeSystemReleaseTargetsJobs(environment.system); | ||||||||||||||||||
| dispatchQueueJob() | ||||||||||||||||||
| .toCompute() | ||||||||||||||||||
| .system(environment.system) | ||||||||||||||||||
| .releaseTargets(); | ||||||||||||||||||
|
Comment on lines
+81
to
+84
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Missing Same pattern as in the system worker: the promise from - dispatchQueueJob()
+ await dispatchQueueJob()
.toCompute()
.system(environment.system)
.releaseTargets();📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||
| } catch (e: any) { | ||||||||||||||||||
| const isRowLocked = e.code === "55P03"; | ||||||||||||||||||
| if (isRowLocked) { | ||||||||||||||||||
| dispatchComputeEnvironmentResourceSelectorJobs(environment); | ||||||||||||||||||
| dispatchQueueJob() | ||||||||||||||||||
| .toCompute() | ||||||||||||||||||
| .environment(environment) | ||||||||||||||||||
| .resourceSelector(); | ||||||||||||||||||
|
Comment on lines
+88
to
+91
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add awaiting / error handling for retry dispatch For the retry path you probably want to make sure the retry job is actually enqueued before returning. - dispatchQueueJob()
+ await dispatchQueueJob()
.toCompute()
.environment(environment)
.resourceSelector();📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||
| return; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,10 +2,7 @@ import { and, eq, inArray, isNull } from "@ctrlplane/db"; | |||||||||||||||||
| import { db } from "@ctrlplane/db/client"; | ||||||||||||||||||
| import { computePolicyTargets } from "@ctrlplane/db/queries"; | ||||||||||||||||||
| import * as schema from "@ctrlplane/db/schema"; | ||||||||||||||||||
| import { Channel, createWorker } from "@ctrlplane/events"; | ||||||||||||||||||
|
|
||||||||||||||||||
| import { dispatchComputePolicyTargetReleaseTargetSelectorJobs } from "../utils/dispatch-compute-policy-target-selector-jobs.js"; | ||||||||||||||||||
| import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js"; | ||||||||||||||||||
| import { Channel, createWorker, dispatchQueueJob } from "@ctrlplane/events"; | ||||||||||||||||||
|
|
||||||||||||||||||
| export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker( | ||||||||||||||||||
| Channel.ComputePolicyTargetReleaseTargetSelector, | ||||||||||||||||||
|
|
@@ -39,11 +36,14 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker( | |||||||||||||||||
| ) | ||||||||||||||||||
| .then((rows) => rows.map((row) => row.release_target)); | ||||||||||||||||||
|
|
||||||||||||||||||
| dispatchEvaluateJobs(releaseTargets); | ||||||||||||||||||
| dispatchQueueJob().toEvaluate().releaseTargets(releaseTargets); | ||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing - dispatchQueueJob().toEvaluate().releaseTargets(releaseTargets);
+ await dispatchQueueJob().toEvaluate().releaseTargets(releaseTargets);Ensures the enqueue finishes or throws inside the current 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||
| } catch (e: any) { | ||||||||||||||||||
| const isRowLocked = e.code === "55P03"; | ||||||||||||||||||
| if (isRowLocked) { | ||||||||||||||||||
| dispatchComputePolicyTargetReleaseTargetSelectorJobs(policyTarget); | ||||||||||||||||||
| dispatchQueueJob() | ||||||||||||||||||
| .toCompute() | ||||||||||||||||||
| .policyTarget(policyTarget) | ||||||||||||||||||
| .releaseTargetSelector(); | ||||||||||||||||||
|
Comment on lines
+43
to
+46
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Retry dispatch should also be awaited - dispatchQueueJob()
+ await dispatchQueueJob()
.toCompute()
.policyTarget(policyTarget)
.releaseTargetSelector();📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||
| return; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -4,12 +4,14 @@ import { and, eq, inArray, isNull, notInArray, or, sql } from "@ctrlplane/db"; | |||||||||||||||||||||||||||||
| import { db } from "@ctrlplane/db/client"; | ||||||||||||||||||||||||||||||
| import { computePolicyTargets } from "@ctrlplane/db/queries"; | ||||||||||||||||||||||||||||||
| import * as schema from "@ctrlplane/db/schema"; | ||||||||||||||||||||||||||||||
| import { Channel, createWorker, getQueue } from "@ctrlplane/events"; | ||||||||||||||||||||||||||||||
| import { | ||||||||||||||||||||||||||||||
| Channel, | ||||||||||||||||||||||||||||||
| createWorker, | ||||||||||||||||||||||||||||||
| dispatchQueueJob, | ||||||||||||||||||||||||||||||
| getQueue, | ||||||||||||||||||||||||||||||
| } from "@ctrlplane/events"; | ||||||||||||||||||||||||||||||
| import { logger } from "@ctrlplane/logger"; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| import { dispatchComputeSystemReleaseTargetsJobs } from "../utils/dispatch-compute-system-jobs.js"; | ||||||||||||||||||||||||||||||
| import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js"; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| const log = logger.child({ | ||||||||||||||||||||||||||||||
| component: "computeSystemsReleaseTargetsWorker", | ||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||
|
|
@@ -206,10 +208,13 @@ export const computeSystemsReleaseTargetsWorker = createWorker( | |||||||||||||||||||||||||||||
| additionalProcessedPolicyTargetIds.push(policyTarget.id); | ||||||||||||||||||||||||||||||
| } catch (e: any) { | ||||||||||||||||||||||||||||||
| if (e.code === "55P03") { | ||||||||||||||||||||||||||||||
| dispatchComputeSystemReleaseTargetsJobs(system, true, [ | ||||||||||||||||||||||||||||||
| ...(processedPolicyTargetIds ?? []), | ||||||||||||||||||||||||||||||
| ...additionalProcessedPolicyTargetIds, | ||||||||||||||||||||||||||||||
| ]); | ||||||||||||||||||||||||||||||
| dispatchQueueJob() | ||||||||||||||||||||||||||||||
| .toCompute() | ||||||||||||||||||||||||||||||
| .system(system) | ||||||||||||||||||||||||||||||
| .releaseTargets(true, [ | ||||||||||||||||||||||||||||||
| ...(processedPolicyTargetIds ?? []), | ||||||||||||||||||||||||||||||
| ...additionalProcessedPolicyTargetIds, | ||||||||||||||||||||||||||||||
| ]); | ||||||||||||||||||||||||||||||
|
Comment on lines
+211
to
+217
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Fire-and-forget dispatch may swallow Redis / BullMQ errors
- dispatchQueueJob()
- .toCompute()
- .system(system)
- .releaseTargets(true, [
+ await dispatchQueueJob()
+ .toCompute()
+ .system(system)
+ .releaseTargets(true, [Apply the same treatment to the analogous call two blocks below (lines 230-231). 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| throw e; | ||||||||||||||||||||||||||||||
|
|
@@ -218,11 +223,11 @@ export const computeSystemsReleaseTargetsWorker = createWorker( | |||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| const toEvaluate = [...created, ...(redeployAll ? unchanged : [])]; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| await dispatchEvaluateJobs(toEvaluate); | ||||||||||||||||||||||||||||||
| await dispatchQueueJob().toEvaluate().releaseTargets(toEvaluate); | ||||||||||||||||||||||||||||||
| } catch (e: any) { | ||||||||||||||||||||||||||||||
| const isRowLocked = e.code === "55P03"; | ||||||||||||||||||||||||||||||
| if (isRowLocked) { | ||||||||||||||||||||||||||||||
| dispatchComputeSystemReleaseTargetsJobs(system); | ||||||||||||||||||||||||||||||
| dispatchQueueJob().toCompute().system(system).releaseTargets(); | ||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| log.error("Failed to compute release targets", { error: e }); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -5,15 +5,18 @@ import { and, desc, eq, sql, takeFirst } from "@ctrlplane/db"; | |||||||||||||
| import { db } from "@ctrlplane/db/client"; | ||||||||||||||
| import { createReleaseJob } from "@ctrlplane/db/queries"; | ||||||||||||||
| import * as schema from "@ctrlplane/db/schema"; | ||||||||||||||
| import { Channel, createWorker, getQueue } from "@ctrlplane/events"; | ||||||||||||||
| import { | ||||||||||||||
| Channel, | ||||||||||||||
| createWorker, | ||||||||||||||
| dispatchQueueJob, | ||||||||||||||
| getQueue, | ||||||||||||||
| } from "@ctrlplane/events"; | ||||||||||||||
| import { logger, makeWithSpan, trace } from "@ctrlplane/logger"; | ||||||||||||||
| import { | ||||||||||||||
| VariableReleaseManager, | ||||||||||||||
| VersionReleaseManager, | ||||||||||||||
| } from "@ctrlplane/rule-engine"; | ||||||||||||||
|
|
||||||||||||||
| import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js"; | ||||||||||||||
|
|
||||||||||||||
| const tracer = trace.getTracer("evaluate-release-target"); | ||||||||||||||
| const withSpan = makeWithSpan(tracer); | ||||||||||||||
| const log = logger.child({ module: "evaluate-release-target" }); | ||||||||||||||
|
|
@@ -187,7 +190,7 @@ export const evaluateReleaseTargetWorker = createWorker( | |||||||||||||
| const isRowLocked = e.code === "55P03"; | ||||||||||||||
| const isReleaseTargetNotCommittedYet = e.code === "23503"; | ||||||||||||||
| if (isRowLocked || isReleaseTargetNotCommittedYet) { | ||||||||||||||
| dispatchEvaluateJobs([job.data]); | ||||||||||||||
| dispatchQueueJob().toEvaluate().releaseTargets([job.data]); | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
|
Comment on lines
+193
to
195
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing
- dispatchQueueJob().toEvaluate().releaseTargets([job.data]);
+ await dispatchQueueJob().toEvaluate().releaseTargets([job.data]);📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||
| const isJobAgentError = | ||||||||||||||
|
|
||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,12 @@ | ||
| import { Channel, createWorker } from "@ctrlplane/events"; | ||
|
|
||
| import { dispatchComputeDeploymentResourceSelectorJobs } from "../utils/dispatch-compute-deployment-jobs.js"; | ||
| import { Channel, createWorker, dispatchQueueJob } from "@ctrlplane/events"; | ||
|
|
||
| export const newDeploymentWorker = createWorker( | ||
| Channel.NewDeployment, | ||
| async (job) => { | ||
| const { data: deployment } = job; | ||
| await dispatchComputeDeploymentResourceSelectorJobs(deployment); | ||
| await dispatchQueueJob() | ||
| .toCompute() | ||
| .deployment(deployment) | ||
| .resourceSelector(); | ||
| }, | ||
| ); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,12 @@ | ||
| import { Channel, createWorker } from "@ctrlplane/events"; | ||
|
|
||
| import { dispatchComputeEnvironmentResourceSelectorJobs } from "../utils/dispatch-compute-env-jobs.js"; | ||
| import { Channel, createWorker, dispatchQueueJob } from "@ctrlplane/events"; | ||
|
|
||
| export const newEnvironmentWorker = createWorker( | ||
| Channel.NewEnvironment, | ||
| async (job) => { | ||
| const { data: environment } = job; | ||
| await dispatchComputeEnvironmentResourceSelectorJobs(environment); | ||
| await dispatchQueueJob() | ||
| .toCompute() | ||
| .environment(environment) | ||
| .resourceSelector(); | ||
| }, | ||
| ); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,15 +1,16 @@ | ||
| import { eq } from "@ctrlplane/db"; | ||
| import { db } from "@ctrlplane/db/client"; | ||
| import * as schema from "@ctrlplane/db/schema"; | ||
| import { Channel, createWorker } from "@ctrlplane/events"; | ||
|
|
||
| import { dispatchComputePolicyTargetReleaseTargetSelectorJobs } from "../utils/dispatch-compute-policy-target-selector-jobs.js"; | ||
| import { Channel, createWorker, dispatchQueueJob } from "@ctrlplane/events"; | ||
|
|
||
| export const newPolicyWorker = createWorker(Channel.NewPolicy, async (job) => { | ||
| const policyTargets = await db.query.policyTarget.findMany({ | ||
| where: eq(schema.policyTarget.policyId, job.data.id), | ||
| }); | ||
|
|
||
| for (const policyTarget of policyTargets) | ||
| dispatchComputePolicyTargetReleaseTargetSelectorJobs(policyTarget); | ||
| dispatchQueueJob() | ||
| .toCompute() | ||
| .policyTarget(policyTarget) | ||
| .releaseTargetSelector(); | ||
|
Comment on lines
11
to
+15
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Mirror the awaiting strategy used elsewhere for consistency Same pattern as in 🤖 Prompt for AI Agents |
||
| }); | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot to
awaitthe compute-releaseTargets dispatchSame reasoning as other workers: lack of
awaithides failures and may exit the worker before the dispatch is persisted.📝 Committable suggestion
🤖 Prompt for AI Agents