From d8879786de083d97c962ecc3589fae3c3c9f62a0 Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Wed, 7 May 2025 23:12:49 -0700 Subject: [PATCH 1/3] perf: only dispatch actionable release targets to eval queue in compute policy target job --- ...e-policy-target-release-target-selector.ts | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts b/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts index a0794da2b..716b30c2c 100644 --- a/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts +++ b/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts @@ -1,6 +1,6 @@ import type { Tx } from "@ctrlplane/db"; -import { and, eq, isNull, selector, sql } from "@ctrlplane/db"; +import { and, eq, inArray, isNull, selector, sql } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; import { Channel, createWorker } from "@ctrlplane/events"; @@ -66,16 +66,12 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker( const policyTarget = await db.query.policyTarget.findFirst({ where: eq(schema.policyTarget.id, id), - with: { policy: true }, }); if (policyTarget == null) throw new Error("Policy target not found"); - const { policy } = policyTarget; - const { workspaceId } = policy; - try { - await db.transaction(async (tx) => { + const actionableReleaseTargetIds = await db.transaction(async (tx) => { await tx.execute( sql` SELECT * from ${schema.computedPolicyTargetReleaseTarget} @@ -85,43 +81,52 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker( `, ); - await tx + const previous = await tx .delete(schema.computedPolicyTargetReleaseTarget) .where( eq( schema.computedPolicyTargetReleaseTarget.policyTargetId, policyTarget.id, ), - ); + ) + .returning(); const releaseTargets = await findMatchingReleaseTargets( tx, policyTarget, ); - if (releaseTargets.length === 0) return; - await tx - .insert(schema.computedPolicyTargetReleaseTarget) - .values(releaseTargets) - .onConflictDoNothing(); + const unmatched = previous.filter( + (previousRt) => + !releaseTargets.some( + (rt) => rt.releaseTargetId === previousRt.releaseTargetId, + ), + ); + + const newReleaseTargets = releaseTargets.filter( + (rt) => + !previous.some( + (previousRt) => previousRt.releaseTargetId === rt.releaseTargetId, + ), + ); + + if (releaseTargets.length > 0) + await tx + .insert(schema.computedPolicyTargetReleaseTarget) + .values(releaseTargets) + .onConflictDoNothing(); + + return [...unmatched, ...newReleaseTargets].map( + (rt) => rt.releaseTargetId, + ); }); - const releaseTargets = await db + const actionableReleaseTargets = await db .select() .from(schema.releaseTarget) - .innerJoin( - schema.resource, - eq(schema.releaseTarget.resourceId, schema.resource.id), - ) - .where( - and( - isNull(schema.resource.deletedAt), - eq(schema.resource.workspaceId, workspaceId), - ), - ) - .then((rows) => rows.map((row) => row.release_target)); - - await dispatchEvaluateJobs(releaseTargets); + .where(inArray(schema.releaseTarget.id, actionableReleaseTargetIds)); + + await dispatchEvaluateJobs(actionableReleaseTargets); } catch (e: any) { const isRowLocked = e.code === "55P03"; if (isRowLocked) { From f52edbc5fdb278319e8e8bed0f8e3301468a4add Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Wed, 7 May 2025 23:14:10 -0700 Subject: [PATCH 2/3] rename --- .../compute-policy-target-release-target-selector.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts b/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts index 716b30c2c..fbe3fafe9 100644 --- a/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts +++ b/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts @@ -71,7 +71,7 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker( if (policyTarget == null) throw new Error("Policy target not found"); try { - const actionableReleaseTargetIds = await db.transaction(async (tx) => { + const affectedReleaseTargetIds = await db.transaction(async (tx) => { await tx.execute( sql` SELECT * from ${schema.computedPolicyTargetReleaseTarget} @@ -121,12 +121,12 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker( ); }); - const actionableReleaseTargets = await db + const affectedReleaseTargets = await db .select() .from(schema.releaseTarget) - .where(inArray(schema.releaseTarget.id, actionableReleaseTargetIds)); + .where(inArray(schema.releaseTarget.id, affectedReleaseTargetIds)); - await dispatchEvaluateJobs(actionableReleaseTargets); + await dispatchEvaluateJobs(affectedReleaseTargets); } catch (e: any) { const isRowLocked = e.code === "55P03"; if (isRowLocked) { From 5f9306bdd0dd416d021d6136740f6d2611550ab9 Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Wed, 7 May 2025 23:26:16 -0700 Subject: [PATCH 3/3] rabbit comments --- ...mpute-policy-target-release-target-selector.ts | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts b/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts index fbe3fafe9..b129b7144 100644 --- a/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts +++ b/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts @@ -96,18 +96,13 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker( policyTarget, ); + const prevIds = new Set(previous.map((rt) => rt.releaseTargetId)); + const nextIds = new Set(releaseTargets.map((rt) => rt.releaseTargetId)); const unmatched = previous.filter( - (previousRt) => - !releaseTargets.some( - (rt) => rt.releaseTargetId === previousRt.releaseTargetId, - ), + ({ releaseTargetId }) => !nextIds.has(releaseTargetId), ); - const newReleaseTargets = releaseTargets.filter( - (rt) => - !previous.some( - (previousRt) => previousRt.releaseTargetId === rt.releaseTargetId, - ), + ({ releaseTargetId }) => !prevIds.has(releaseTargetId), ); if (releaseTargets.length > 0) @@ -121,6 +116,8 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker( ); }); + if (affectedReleaseTargetIds.length === 0) return; + const affectedReleaseTargets = await db .select() .from(schema.releaseTarget)