Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Tx } from "@ctrlplane/db";

import { and, eq, inArray, isNull, selector, sql } from "@ctrlplane/db";
import { and, eq, inArray, isNull } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import { computePolicyTargets } from "@ctrlplane/db/queries";
import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker } from "@ctrlplane/events";
import { logger } from "@ctrlplane/logger";
Expand All @@ -13,52 +12,6 @@ const log = logger.child({
worker: "compute-policy-target-release-target-selector",
});

const findMatchingReleaseTargets = (
tx: Tx,
policyTarget: schema.PolicyTarget,
) =>
tx
.select()
.from(schema.releaseTarget)
.innerJoin(
schema.resource,
eq(schema.releaseTarget.resourceId, schema.resource.id),
)
.innerJoin(
schema.deployment,
eq(schema.releaseTarget.deploymentId, schema.deployment.id),
)
.innerJoin(
schema.environment,
eq(schema.releaseTarget.environmentId, schema.environment.id),
)
.where(
and(
isNull(schema.resource.deletedAt),
selector()
.query()
.resources()
.where(policyTarget.resourceSelector)
.sql(),
selector()
.query()
.deployments()
.where(policyTarget.deploymentSelector)
.sql(),
selector()
.query()
.environments()
.where(policyTarget.environmentSelector)
.sql(),
),
)
.then((rt) =>
rt.map((rt) => ({
policyTargetId: policyTarget.id,
releaseTargetId: rt.release_target.id,
})),
);

export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker(
Channel.ComputePolicyTargetReleaseTargetSelector,
async (job) => {
Expand All @@ -71,59 +24,27 @@ export const computePolicyTargetReleaseTargetSelectorWorkerEvent = createWorker(
if (policyTarget == null) throw new Error("Policy target not found");

try {
const affectedReleaseTargetIds = await db.transaction(async (tx) => {
await tx.execute(
sql`
SELECT * from ${schema.computedPolicyTargetReleaseTarget}
INNER JOIN ${schema.releaseTarget} ON ${eq(schema.releaseTarget.id, schema.computedPolicyTargetReleaseTarget.releaseTargetId)}
WHERE ${eq(schema.computedPolicyTargetReleaseTarget.policyTargetId, policyTarget.id)}
FOR UPDATE NOWAIT
`,
);

const previous = await tx
.delete(schema.computedPolicyTargetReleaseTarget)
.where(
eq(
schema.computedPolicyTargetReleaseTarget.policyTargetId,
policyTarget.id,
),
)
.returning();

const releaseTargets = await findMatchingReleaseTargets(
tx,
policyTarget,
);

const prevIds = new Set(previous.map((rt) => rt.releaseTargetId));
const nextIds = new Set(releaseTargets.map((rt) => rt.releaseTargetId));
const unmatched = previous.filter(
({ releaseTargetId }) => !nextIds.has(releaseTargetId),
);
const newReleaseTargets = releaseTargets.filter(
({ releaseTargetId }) => !prevIds.has(releaseTargetId),
);
const changedReleaseTaretIds = await computePolicyTargets(
db,
policyTarget,
);

if (releaseTargets.length > 0)
await tx
.insert(schema.computedPolicyTargetReleaseTarget)
.values(releaseTargets)
.onConflictDoNothing();

return [...unmatched, ...newReleaseTargets].map(
(rt) => rt.releaseTargetId,
);
});

if (affectedReleaseTargetIds.length === 0) return;

const affectedReleaseTargets = await db
const releaseTargets = await db
.select()
.from(schema.releaseTarget)
.where(inArray(schema.releaseTarget.id, affectedReleaseTargetIds));

await dispatchEvaluateJobs(affectedReleaseTargets);
.innerJoin(
schema.resource,
eq(schema.releaseTarget.resourceId, schema.resource.id),
)
.where(
and(
inArray(schema.releaseTarget.id, changedReleaseTaretIds),
isNull(schema.resource.deletedAt),
),
)
.then((rows) => rows.map((row) => row.release_target));

dispatchEvaluateJobs(releaseTargets);
} catch (e: any) {
const isRowLocked = e.code === "55P03";
if (isRowLocked) {
Expand Down
12 changes: 6 additions & 6 deletions apps/event-worker/src/workers/compute-systems-release-targets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import type { Tx } from "@ctrlplane/db";

import { and, eq, inArray, isNull, or, sql } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import { computePolicyTargets } from "@ctrlplane/db/queries";
import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";
import { logger } from "@ctrlplane/logger";

import { dispatchComputePolicyTargetReleaseTargetSelectorJobs } from "../utils/dispatch-compute-policy-target-selector-jobs.js";
import { dispatchComputeSystemReleaseTargetsJobs } from "../utils/dispatch-compute-system-jobs.js";
import { dispatchEvaluateJobs } from "../utils/dispatch-evaluate-jobs.js";

Expand Down Expand Up @@ -179,11 +179,11 @@ export const computeSystemsReleaseTargetsWorker = createWorker(
)
.where(eq(schema.policy.workspaceId, workspaceId));

if (policyTargets.length > 0) {
for (const { policy_target: policyTarget } of policyTargets)
dispatchComputePolicyTargetReleaseTargetSelectorJobs(policyTarget);
return;
}
await Promise.all(
policyTargets.map(({ policy_target: policyTarget }) =>
computePolicyTargets(db, policyTarget),
),
);

await dispatchEvaluateJobs(created);
} catch (e: any) {
Expand Down
131 changes: 131 additions & 0 deletions e2e/tests/api/release.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,137 @@ test.describe("Release Creation", () => {
expect(variable.value).toBe("test-a");
});

test("should create a release when a new resource is created that does not match any policy target", async ({
api,
page,
workspace,
}) => {
const policyName = faker.string.alphanumeric(10);
const policyResponse = await api.POST("/v1/policies", {
body: {
name: policyName,
description: "Test Policy Description",
workspaceId: workspace.id,
targets: [
{
resourceSelector: {
type: "identifier",
operator: "equals",
value: `${policyName}`,
},
},
],
},
});
expect(policyResponse.response.status).toBe(200);

const systemPrefix = importedEntities.system.slug.split("-")[0]!;
const deploymentName = `${systemPrefix}-${faker.string.alphanumeric(10)}`;
const deploymentCreateResponse = await api.POST("/v1/deployments", {
body: {
name: deploymentName,
slug: deploymentName,
systemId: importedEntities.system.id,
},
});
expect(deploymentCreateResponse.response.status).toBe(201);
const deploymentId = deploymentCreateResponse.data?.id ?? "";

const versionTag = faker.string.alphanumeric(10);

const versionResponse = await api.POST("/v1/deployment-versions", {
body: {
deploymentId,
tag: versionTag,
},
});
expect(versionResponse.response.status).toBe(201);

const variableCreateResponse = await api.POST(
"/v1/deployments/{deploymentId}/variables",
{
params: {
path: { deploymentId },
},
body: {
key: "test",
description: "test",
config: {
type: "string",
inputType: "text",
},
values: [
{
value: "test-a",
default: true,
},
{ value: "test-b" },
],
},
},
);
expect(variableCreateResponse.response.status).toBe(201);

const resourceName = `${systemPrefix}-${faker.string.alphanumeric(10)}`;
const resourceCreateResponse = await api.POST("/v1/resources", {
body: {
name: resourceName,
kind: "service",
identifier: resourceName,
version: "1.0.0",
config: {},
metadata: {},
variables: [],
workspaceId: workspace.id,
},
});
expect(resourceCreateResponse.response.status).toBe(200);
const resourceId = resourceCreateResponse.data?.id ?? "";

await page.waitForTimeout(26_000);

const releaseTargetResponse = await api.GET(
"/v1/resources/{resourceId}/release-targets",
{
params: {
path: { resourceId },
},
},
);
expect(releaseTargetResponse.response.status).toBe(200);
const releaseTargets = releaseTargetResponse.data ?? [];
const releaseTarget = releaseTargets.find(
(rt) => rt.deployment.id === deploymentId,
);
expect(releaseTarget).toBeDefined();

const releaseResponse = await api.GET(
"/v1/release-targets/{releaseTargetId}/releases",
{
params: {
path: {
releaseTargetId: releaseTarget?.id ?? "",
},
},
},
);

expect(releaseResponse.response.status).toBe(200);
const releases = releaseResponse.data ?? [];
for (const release of releases) {
console.log(release);
}
expect(releases.length).toBe(1);

const latestRelease = releases.at(0)!;
expect(latestRelease.version.tag).toBe(versionTag);
const variables = latestRelease.variables ?? [];
expect(variables.length).toBe(1);
const variable = variables[0]!;
expect(variable.key).toBe("test");
expect(variable.value).toBe("test-a");
});

test("should not create a release when an existing resource is updated", async ({
api,
page,
Expand Down
Loading
Loading