Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Tx } from "@ctrlplane/db";

import { and, eq, isNull, selector, sql } from "@ctrlplane/db";
import { and, eq, inArray, isNull, selector, sql } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker } from "@ctrlplane/events";
Expand Down Expand Up @@ -66,16 +66,12 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker(

const policyTarget = await db.query.policyTarget.findFirst({
where: eq(schema.policyTarget.id, id),
with: { policy: true },
});

if (policyTarget == null) throw new Error("Policy target not found");

const { policy } = policyTarget;
const { workspaceId } = policy;

try {
await db.transaction(async (tx) => {
const affectedReleaseTargetIds = await db.transaction(async (tx) => {
await tx.execute(
sql`
SELECT * from ${schema.computedPolicyTargetReleaseTarget}
Expand All @@ -85,43 +81,49 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker(
`,
);

await tx
const previous = await tx
.delete(schema.computedPolicyTargetReleaseTarget)
.where(
eq(
schema.computedPolicyTargetReleaseTarget.policyTargetId,
policyTarget.id,
),
);
)
.returning();

const releaseTargets = await findMatchingReleaseTargets(
tx,
policyTarget,
);

if (releaseTargets.length === 0) return;
await tx
.insert(schema.computedPolicyTargetReleaseTarget)
.values(releaseTargets)
.onConflictDoNothing();
const prevIds = new Set(previous.map((rt) => rt.releaseTargetId));
const nextIds = new Set(releaseTargets.map((rt) => rt.releaseTargetId));
const unmatched = previous.filter(
({ releaseTargetId }) => !nextIds.has(releaseTargetId),
);
const newReleaseTargets = releaseTargets.filter(
({ releaseTargetId }) => !prevIds.has(releaseTargetId),
);

if (releaseTargets.length > 0)
await tx
.insert(schema.computedPolicyTargetReleaseTarget)
.values(releaseTargets)
.onConflictDoNothing();

return [...unmatched, ...newReleaseTargets].map(
(rt) => rt.releaseTargetId,
);
});

const releaseTargets = await db
if (affectedReleaseTargetIds.length === 0) return;

const affectedReleaseTargets = await db
.select()
.from(schema.releaseTarget)
.innerJoin(
schema.resource,
eq(schema.releaseTarget.resourceId, schema.resource.id),
)
.where(
and(
isNull(schema.resource.deletedAt),
eq(schema.resource.workspaceId, workspaceId),
),
)
.then((rows) => rows.map((row) => row.release_target));

await dispatchEvaluateJobs(releaseTargets);
.where(inArray(schema.releaseTarget.id, affectedReleaseTargetIds));

await dispatchEvaluateJobs(affectedReleaseTargets);
} catch (e: any) {
const isRowLocked = e.code === "55P03";
if (isRowLocked) {
Expand Down
Loading