Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/event-worker/src/utils/dispatch-evaluate-jobs.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type * as schema from "@ctrlplane/db/schema";
import type { ReleaseTargetIdentifier } from "@ctrlplane/rule-engine";

import { Channel, getQueue } from "@ctrlplane/events";

export const dispatchEvaluateJobs = async (rts: schema.ReleaseTarget[]) => {
export const dispatchEvaluateJobs = async (rts: ReleaseTargetIdentifier[]) => {
const jobs = rts.map((rt) => ({
name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
data: rt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ export const computeDeploymentResourceSelectorWorkerEvent = createWorker(
await db.transaction(async (tx) => {
await tx.execute(
sql`
SELECT * from ${schema.deployment}
WHERE ${schema.deployment.id} = ${id}
SELECT * from ${schema.computedDeploymentResource}
WHERE ${eq(schema.computedDeploymentResource.deploymentId, deployment.id)}
FOR UPDATE NOWAIT
`,
);
Expand Down Expand Up @@ -58,7 +58,7 @@ export const computeDeploymentResourceSelectorWorkerEvent = createWorker(
.onConflictDoNothing();
});

getQueue(Channel.ComputeSystemsReleaseTargets).add(
await getQueue(Channel.ComputeSystemsReleaseTargets).add(
deployment.system.id,
deployment.system,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ export const computeEnvironmentResourceSelectorWorkerEvent = createWorker(
// acquire a lock on the environment
await tx.execute(
sql`
SELECT * from ${schema.environment}
WHERE ${schema.environment.id} = ${id}
SELECT * from ${schema.computedEnvironmentResource}
WHERE ${eq(schema.computedEnvironmentResource.environmentId, environment.id)}
FOR UPDATE NOWAIT
`,
);
Expand Down Expand Up @@ -79,7 +79,7 @@ export const computeEnvironmentResourceSelectorWorkerEvent = createWorker(
.onConflictDoNothing();
});

getQueue(Channel.ComputeSystemsReleaseTargets).add(
await getQueue(Channel.ComputeSystemsReleaseTargets).add(
environment.system.id,
environment.system,
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Tx } from "@ctrlplane/db";

import { and, eq, inArray, isNull, selector, sql } from "@ctrlplane/db";
import { and, eq, isNull, selector, sql } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";
Expand Down Expand Up @@ -67,14 +67,12 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker(
const { workspaceId } = policy;

try {
const rts = await db.transaction(async (tx) => {
await db.transaction(async (tx) => {
await tx.execute(
sql`
SELECT * FROM ${schema.system}
INNER JOIN ${schema.environment} ON ${eq(schema.environment.systemId, schema.system.id)}
INNER JOIN ${schema.deployment} ON ${eq(schema.deployment.systemId, schema.system.id)}
INNER JOIN ${schema.releaseTarget} ON ${eq(schema.releaseTarget.environmentId, schema.environment.id)}
WHERE ${eq(schema.system.workspaceId, workspaceId)}
SELECT * from ${schema.computedPolicyTargetReleaseTarget}
INNER JOIN ${schema.releaseTarget} ON ${eq(schema.releaseTarget.id, schema.computedPolicyTargetReleaseTarget.releaseTargetId)}
WHERE ${eq(schema.computedPolicyTargetReleaseTarget.policyTargetId, policyTarget.id)}
FOR UPDATE NOWAIT
`,
);
Expand All @@ -93,28 +91,41 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker(
policyTarget,
);

if (releaseTargets.length === 0) return [];
return tx
if (releaseTargets.length === 0) return;
await tx
.insert(schema.computedPolicyTargetReleaseTarget)
.values(releaseTargets)
.onConflictDoNothing()
.returning();
.onConflictDoNothing();
});

if (rts.length === 0) return;
const releaseTargets = await db
.select()
.from(schema.releaseTarget)
.innerJoin(
schema.resource,
eq(schema.releaseTarget.resourceId, schema.resource.id),
)
.where(
and(
isNull(schema.resource.deletedAt),
eq(schema.resource.workspaceId, workspaceId),
),
)
.then((rows) => rows.map((row) => row.release_target));

const releaseTargets = await db.query.releaseTarget.findMany({
where: inArray(
schema.releaseTarget.id,
rts.map((rt) => rt.releaseTargetId),
const queueInsertionPromises = releaseTargets.map((rt) =>
getQueue(Channel.EvaluateReleaseTarget).add(
`${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
rt,
{
deduplication: {
id: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
ttl: 500,
},
},
),
});

const jobs = releaseTargets.map((rt) => ({
name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
data: rt,
}));
await getQueue(Channel.EvaluateReleaseTarget).addBulk(jobs);
);
await Promise.all(queueInsertionPromises);
} catch (e: any) {
const isRowLocked = e.code === "55P03";
if (isRowLocked) {
Expand Down
74 changes: 46 additions & 28 deletions apps/event-worker/src/workers/compute-systems-release-targets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";

import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js";

const findMatchingEnvironmentDeploymentPairs = (
tx: Tx,
system: { id: string; workspaceId: string },
Expand Down Expand Up @@ -75,50 +77,60 @@ export const computeSystemsReleaseTargetsWorker = createWorker(
const environmentIds = environments.map((e) => e.id);
const { workspaceId } = system;

if (deploymentIds.length === 0 || environmentIds.length === 0) return;

try {
const createdReleaseTargets = await db.transaction(async (tx) => {
await tx.execute(
sql`
SELECT * FROM ${schema.system}
INNER JOIN ${schema.environment} ON ${eq(schema.environment.systemId, schema.system.id)}
INNER JOIN ${schema.deployment} ON ${eq(schema.deployment.systemId, schema.system.id)}
INNER JOIN ${schema.releaseTarget} ON ${eq(schema.releaseTarget.environmentId, schema.environment.id)}
WHERE ${eq(schema.system.id, systemId)}
SELECT ${schema.releaseTarget.id} FROM ${schema.releaseTarget}
WHERE ${or(
inArray(schema.releaseTarget.deploymentId, deploymentIds),
inArray(schema.releaseTarget.environmentId, environmentIds),
)}
FOR UPDATE NOWAIT
`,
);

const previousReleaseTargets = await tx
.delete(schema.releaseTarget)
.where(
or(
inArray(schema.releaseTarget.deploymentId, deploymentIds),
inArray(schema.releaseTarget.environmentId, environmentIds),
),
)
.returning();
await tx.execute(
sql`
SELECT * FROM ${schema.computedEnvironmentResource}
WHERE ${inArray(schema.computedEnvironmentResource.environmentId, environmentIds)}
FOR UPDATE NOWAIT
`,
);

await tx.execute(
sql`
SELECT * FROM ${schema.computedDeploymentResource}
WHERE ${inArray(schema.computedDeploymentResource.deploymentId, deploymentIds)}
FOR UPDATE NOWAIT
`,
);

Comment on lines +86 to +110
Copy link
Contributor

@coderabbitai coderabbitai bot Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard against empty IN () clauses when locking rows
inArray(_, deploymentIds) / inArray(_, environmentIds) will generate … IN () if the
corresponding list is empty. PostgreSQL rejects that syntax, so an empty list will make the
whole transaction fail before we reach the retry logic.

- await tx.execute(sql`SELECT * FROM ${schema.computedEnvironmentResource}
-   WHERE ${inArray(schema.computedEnvironmentResource.environmentId, environmentIds)}
-   FOR UPDATE NOWAIT`);
+ if (environmentIds.length > 0) {
+   await tx.execute(sql`
+     SELECT * FROM ${schema.computedEnvironmentResource}
+     WHERE ${inArray(schema.computedEnvironmentResource.environmentId, environmentIds)}
+     FOR UPDATE NOWAIT
+   `);
+ }

Apply the same conditional guard to all three lock statements (releaseTarget,
computedEnvironmentResource, computedDeploymentResource).

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (deploymentIds.length === 0 && environmentIds.length === 0) return;
we do an early return befdore the tx even happens if either is empty

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adityachoudhari26 You're right that there's an early return if both arrays are empty. However, there's still a potential issue if only one of the arrays is empty:

  • If deploymentIds is empty but environmentIds is not, the lock statement for computedDeploymentResource would have an empty IN () clause
  • If environmentIds is empty but deploymentIds is not, the lock statement for computedEnvironmentResource would have an empty IN () clause

PostgreSQL rejects empty IN () syntax, so additional checks before each lock statement would prevent SQL errors in these cases. Something like:

if (environmentIds.length > 0) {
  await tx.execute(sql`
    SELECT * FROM ${schema.computedEnvironmentResource}
    WHERE ${inArray(schema.computedEnvironmentResource.environmentId, environmentIds)}
    FOR UPDATE NOWAIT
  `);
}

✏️ Learnings added
Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#515
File: apps/event-worker/src/workers/compute-systems-release-targets.ts:86-110
Timestamp: 2025-04-28T18:38:21.152Z
Learning: In SQL queries that use `inArray()` with arrays like `deploymentIds` or `environmentIds`, if these arrays are empty, it will generate an invalid `IN ()` clause that PostgreSQL rejects. Adding condition checks (e.g., `if (array.length > 0)`) before executing such queries prevents SQL syntax errors.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I should actually be using || for the early return

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adityachoudhari26 Yes, using || for the early return is actually a more elegant solution! Changing to:

if (deploymentIds.length === 0 || environmentIds.length === 0) return;

This would prevent execution of the transaction entirely if either array is empty, which elegantly avoids the empty IN () clause issue altogether. Much cleaner than adding individual checks before each SQL statement.


🧠 Learnings used
Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#515
File: apps/event-worker/src/workers/compute-systems-release-targets.ts:86-110
Timestamp: 2025-04-28T18:38:21.152Z
Learning: In SQL queries that use `inArray()` with arrays like `deploymentIds` or `environmentIds`, if these arrays are empty, it will generate an invalid `IN ()` clause that PostgreSQL rejects. Adding condition checks (e.g., `if (array.length > 0)`) before executing such queries prevents SQL syntax errors.

const previousReleaseTargets = await tx.query.releaseTarget.findMany({
where: or(
inArray(schema.releaseTarget.deploymentId, deploymentIds),
inArray(schema.releaseTarget.environmentId, environmentIds),
),
});

const releaseTargets = await findMatchingEnvironmentDeploymentPairs(
tx,
system,
);

if (releaseTargets.length > 0)
await tx
.insert(schema.releaseTarget)
.values(releaseTargets)
.onConflictDoNothing();

const created = releaseTargets.filter(
(rt) =>
!previousReleaseTargets.some(
(prevRt) =>
prevRt.deploymentId === rt.deploymentId &&
prevRt.resourceId === rt.resourceId,
prevRt.resourceId === rt.resourceId &&
prevRt.environmentId === rt.environmentId,
),
);

return created;
if (created.length === 0) return [];
return tx.insert(schema.releaseTarget).values(created).returning();
});

if (createdReleaseTargets.length === 0) return;
Expand All @@ -132,18 +144,24 @@ export const computeSystemsReleaseTargetsWorker = createWorker(
)
.where(eq(schema.policy.workspaceId, workspaceId));

for (const { policy_target: policyTarget } of policyTargets) {
getQueue(Channel.ComputePolicyTargetReleaseTargetSelector).add(
policyTarget.id,
policyTarget,
);
if (policyTargets.length > 0) {
for (const { policy_target: policyTarget } of policyTargets) {
getQueue(Channel.ComputePolicyTargetReleaseTargetSelector).add(
policyTarget.id,
policyTarget,
);
}
return;
}

await dispatchEvaluateJobs(createdReleaseTargets);
} catch (e: any) {
const isRowLocked = e.code === "55P03";
if (isRowLocked) {
await getQueue(Channel.ComputeSystemsReleaseTargets).add(
job.name,
job.data,
{ delay: 500 },
);
return;
}
Expand Down
Loading
Loading