diff --git a/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts b/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts index b129b7144..afbe4312c 100644 --- a/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts +++ b/apps/event-worker/src/workers/compute-policy-target-release-target-selector.ts @@ -1,7 +1,6 @@ -import type { Tx } from "@ctrlplane/db"; - -import { and, eq, inArray, isNull, selector, sql } from "@ctrlplane/db"; +import { and, eq, inArray, isNull } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; +import { computePolicyTargets } from "@ctrlplane/db/queries"; import * as schema from "@ctrlplane/db/schema"; import { Channel, createWorker } from "@ctrlplane/events"; import { logger } from "@ctrlplane/logger"; @@ -13,52 +12,6 @@ const log = logger.child({ worker: "compute-policy-target-release-target-selector", }); -const findMatchingReleaseTargets = ( - tx: Tx, - policyTarget: schema.PolicyTarget, -) => - tx - .select() - .from(schema.releaseTarget) - .innerJoin( - schema.resource, - eq(schema.releaseTarget.resourceId, schema.resource.id), - ) - .innerJoin( - schema.deployment, - eq(schema.releaseTarget.deploymentId, schema.deployment.id), - ) - .innerJoin( - schema.environment, - eq(schema.releaseTarget.environmentId, schema.environment.id), - ) - .where( - and( - isNull(schema.resource.deletedAt), - selector() - .query() - .resources() - .where(policyTarget.resourceSelector) - .sql(), - selector() - .query() - .deployments() - .where(policyTarget.deploymentSelector) - .sql(), - selector() - .query() - .environments() - .where(policyTarget.environmentSelector) - .sql(), - ), - ) - .then((rt) => - rt.map((rt) => ({ - policyTargetId: policyTarget.id, - releaseTargetId: rt.release_target.id, - })), - ); - export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker( Channel.ComputePolicyTargetReleaseTargetSelector, async (job) => { @@ -71,59 +24,27 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker( if (policyTarget == null) throw new Error("Policy target not found"); try { - const affectedReleaseTargetIds = await db.transaction(async (tx) => { - await tx.execute( - sql` - SELECT * from ${schema.computedPolicyTargetReleaseTarget} - INNER JOIN ${schema.releaseTarget} ON ${eq(schema.releaseTarget.id, schema.computedPolicyTargetReleaseTarget.releaseTargetId)} - WHERE ${eq(schema.computedPolicyTargetReleaseTarget.policyTargetId, policyTarget.id)} - FOR UPDATE NOWAIT - `, - ); - - const previous = await tx - .delete(schema.computedPolicyTargetReleaseTarget) - .where( - eq( - schema.computedPolicyTargetReleaseTarget.policyTargetId, - policyTarget.id, - ), - ) - .returning(); - - const releaseTargets = await findMatchingReleaseTargets( - tx, - policyTarget, - ); - - const prevIds = new Set(previous.map((rt) => rt.releaseTargetId)); - const nextIds = new Set(releaseTargets.map((rt) => rt.releaseTargetId)); - const unmatched = previous.filter( - ({ releaseTargetId }) => !nextIds.has(releaseTargetId), - ); - const newReleaseTargets = releaseTargets.filter( - ({ releaseTargetId }) => !prevIds.has(releaseTargetId), - ); + const changedReleaseTaretIds = await computePolicyTargets( + db, + policyTarget, + ); - if (releaseTargets.length > 0) - await tx - .insert(schema.computedPolicyTargetReleaseTarget) - .values(releaseTargets) - .onConflictDoNothing(); - - return [...unmatched, ...newReleaseTargets].map( - (rt) => rt.releaseTargetId, - ); - }); - - if (affectedReleaseTargetIds.length === 0) return; - - const affectedReleaseTargets = await db + const releaseTargets = await db .select() .from(schema.releaseTarget) - .where(inArray(schema.releaseTarget.id, affectedReleaseTargetIds)); - - await dispatchEvaluateJobs(affectedReleaseTargets); + .innerJoin( + schema.resource, + eq(schema.releaseTarget.resourceId, schema.resource.id), + ) + .where( + and( + inArray(schema.releaseTarget.id, changedReleaseTaretIds), + isNull(schema.resource.deletedAt), + ), + ) + .then((rows) => rows.map((row) => row.release_target)); + + dispatchEvaluateJobs(releaseTargets); } catch (e: any) { const isRowLocked = e.code === "55P03"; if (isRowLocked) { diff --git a/apps/event-worker/src/workers/compute-systems-release-targets.ts b/apps/event-worker/src/workers/compute-systems-release-targets.ts index 7968bd1a3..6ed2f636d 100644 --- a/apps/event-worker/src/workers/compute-systems-release-targets.ts +++ b/apps/event-worker/src/workers/compute-systems-release-targets.ts @@ -2,11 +2,11 @@ import type { Tx } from "@ctrlplane/db"; import { and, eq, inArray, isNull, or, sql } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; +import { computePolicyTargets } from "@ctrlplane/db/queries"; import * as schema from "@ctrlplane/db/schema"; import { Channel, createWorker, getQueue } from "@ctrlplane/events"; import { logger } from "@ctrlplane/logger"; -import { dispatchComputePolicyTargetReleaseTargetSelectorJobs } from "../utils/dispatch-compute-policy-target-selector-jobs.js"; import { dispatchComputeSystemReleaseTargetsJobs } from "../utils/dispatch-compute-system-jobs.js"; import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js"; @@ -179,11 +179,11 @@ export const computeSystemsReleaseTargetsWorker = createWorker( ) .where(eq(schema.policy.workspaceId, workspaceId)); - if (policyTargets.length > 0) { - for (const { policy_target: policyTarget } of policyTargets) - dispatchComputePolicyTargetReleaseTargetSelectorJobs(policyTarget); - return; - } + await Promise.all( + policyTargets.map(({ policy_target: policyTarget }) => + computePolicyTargets(db, policyTarget), + ), + ); await dispatchEvaluateJobs(created); } catch (e: any) { diff --git a/e2e/tests/api/release.spec.ts b/e2e/tests/api/release.spec.ts index 6da98517f..3aeee437c 100644 --- a/e2e/tests/api/release.spec.ts +++ b/e2e/tests/api/release.spec.ts @@ -332,6 +332,137 @@ test.describe("Release Creation", () => { expect(variable.value).toBe("test-a"); }); + test("should create a release when a new resource is created that does not match any policy target", async ({ + api, + page, + workspace, + }) => { + const policyName = faker.string.alphanumeric(10); + const policyResponse = await api.POST("/v1/policies", { + body: { + name: policyName, + description: "Test Policy Description", + workspaceId: workspace.id, + targets: [ + { + resourceSelector: { + type: "identifier", + operator: "equals", + value: `${policyName}`, + }, + }, + ], + }, + }); + expect(policyResponse.response.status).toBe(200); + + const systemPrefix = importedEntities.system.slug.split("-")[0]!; + const deploymentName = `${systemPrefix}-${faker.string.alphanumeric(10)}`; + const deploymentCreateResponse = await api.POST("/v1/deployments", { + body: { + name: deploymentName, + slug: deploymentName, + systemId: importedEntities.system.id, + }, + }); + expect(deploymentCreateResponse.response.status).toBe(201); + const deploymentId = deploymentCreateResponse.data?.id ?? ""; + + const versionTag = faker.string.alphanumeric(10); + + const versionResponse = await api.POST("/v1/deployment-versions", { + body: { + deploymentId, + tag: versionTag, + }, + }); + expect(versionResponse.response.status).toBe(201); + + const variableCreateResponse = await api.POST( + "/v1/deployments/{deploymentId}/variables", + { + params: { + path: { deploymentId }, + }, + body: { + key: "test", + description: "test", + config: { + type: "string", + inputType: "text", + }, + values: [ + { + value: "test-a", + default: true, + }, + { value: "test-b" }, + ], + }, + }, + ); + expect(variableCreateResponse.response.status).toBe(201); + + const resourceName = `${systemPrefix}-${faker.string.alphanumeric(10)}`; + const resourceCreateResponse = await api.POST("/v1/resources", { + body: { + name: resourceName, + kind: "service", + identifier: resourceName, + version: "1.0.0", + config: {}, + metadata: {}, + variables: [], + workspaceId: workspace.id, + }, + }); + expect(resourceCreateResponse.response.status).toBe(200); + const resourceId = resourceCreateResponse.data?.id ?? ""; + + await page.waitForTimeout(26_000); + + const releaseTargetResponse = await api.GET( + "/v1/resources/{resourceId}/release-targets", + { + params: { + path: { resourceId }, + }, + }, + ); + expect(releaseTargetResponse.response.status).toBe(200); + const releaseTargets = releaseTargetResponse.data ?? []; + const releaseTarget = releaseTargets.find( + (rt) => rt.deployment.id === deploymentId, + ); + expect(releaseTarget).toBeDefined(); + + const releaseResponse = await api.GET( + "/v1/release-targets/{releaseTargetId}/releases", + { + params: { + path: { + releaseTargetId: releaseTarget?.id ?? "", + }, + }, + }, + ); + + expect(releaseResponse.response.status).toBe(200); + const releases = releaseResponse.data ?? []; + for (const release of releases) { + console.log(release); + } + expect(releases.length).toBe(1); + + const latestRelease = releases.at(0)!; + expect(latestRelease.version.tag).toBe(versionTag); + const variables = latestRelease.variables ?? []; + expect(variables.length).toBe(1); + const variable = variables[0]!; + expect(variable.key).toBe("test"); + expect(variable.value).toBe("test-a"); + }); + test("should not create a release when an existing resource is updated", async ({ api, page, diff --git a/packages/db/src/queries/compute-policy-targets.ts b/packages/db/src/queries/compute-policy-targets.ts new file mode 100644 index 000000000..78461c3af --- /dev/null +++ b/packages/db/src/queries/compute-policy-targets.ts @@ -0,0 +1,94 @@ +import { and, eq, isNull, sql } from "drizzle-orm"; + +import type { Tx } from "../common.js"; +import * as schema from "../schema/index.js"; +import { selector } from "../selectors/index.js"; + +const findMatchingReleaseTargets = ( + tx: Tx, + policyTarget: schema.PolicyTarget, +) => + tx + .select() + .from(schema.releaseTarget) + .innerJoin( + schema.resource, + eq(schema.releaseTarget.resourceId, schema.resource.id), + ) + .innerJoin( + schema.deployment, + eq(schema.releaseTarget.deploymentId, schema.deployment.id), + ) + .innerJoin( + schema.environment, + eq(schema.releaseTarget.environmentId, schema.environment.id), + ) + .where( + and( + isNull(schema.resource.deletedAt), + selector() + .query() + .resources() + .where(policyTarget.resourceSelector) + .sql(), + selector() + .query() + .deployments() + .where(policyTarget.deploymentSelector) + .sql(), + selector() + .query() + .environments() + .where(policyTarget.environmentSelector) + .sql(), + ), + ) + .then((rt) => + rt.map((rt) => ({ + policyTargetId: policyTarget.id, + releaseTargetId: rt.release_target.id, + })), + ); + +export const computePolicyTargets = async ( + db: Tx, + policyTarget: schema.PolicyTarget, +) => { + return db.transaction(async (tx) => { + await tx.execute( + sql` + SELECT * from ${schema.computedPolicyTargetReleaseTarget} + INNER JOIN ${schema.releaseTarget} ON ${eq(schema.releaseTarget.id, schema.computedPolicyTargetReleaseTarget.releaseTargetId)} + WHERE ${eq(schema.computedPolicyTargetReleaseTarget.policyTargetId, policyTarget.id)} + FOR UPDATE NOWAIT + `, + ); + + const previous = await tx + .delete(schema.computedPolicyTargetReleaseTarget) + .where( + eq( + schema.computedPolicyTargetReleaseTarget.policyTargetId, + policyTarget.id, + ), + ) + .returning(); + + const releaseTargets = await findMatchingReleaseTargets(tx, policyTarget); + + const prevIds = new Set(previous.map((rt) => rt.releaseTargetId)); + const nextIds = new Set(releaseTargets.map((rt) => rt.releaseTargetId)); + const deleted = previous.filter((rt) => !nextIds.has(rt.releaseTargetId)); + const created = releaseTargets.filter( + (rt) => !prevIds.has(rt.releaseTargetId), + ); + + if (releaseTargets.length > 0) + await tx + .insert(schema.computedPolicyTargetReleaseTarget) + .values(releaseTargets) + .onConflictDoNothing(); + + return [...created, ...deleted].map((rt) => rt.releaseTargetId); + }); +}; diff --git a/packages/db/src/queries/index.ts b/packages/db/src/queries/index.ts index 5bcc96c4d..d6c7cdfdb 100644 --- a/packages/db/src/queries/index.ts +++ b/packages/db/src/queries/index.ts @@ -1,2 +1,3 @@ export * from "./get-resource-parents.js"; export * from "./create-release-job.js"; +export * from "./compute-policy-targets.js";