Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/event-worker/src/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { dispatchJobWorker } from "./job-dispatch/index.js";
import { newDeploymentVersionWorker } from "./new-deployment-version.js";
import { newDeploymentWorker } from "./new-deployment.js";
import { policyEvaluate } from "./policy-evaluate.js";
import { processUpsertedResourceWorker } from "./process-upserted-resource/process-upserted-resource.js";
import { resourceScanWorker } from "./resource-scan/index.js";
import { updateDeploymentVariableWorker } from "./update-deployment-variable.js";
import { updateEnvironmentWorker } from "./update-environment.js";
Expand All @@ -26,4 +27,5 @@ export const workers: Workers<keyof ChannelMap> = {
[Channel.EvaluateReleaseTarget]: policyEvaluate,
[Channel.DispatchJob]: dispatchJobWorker,
[Channel.ResourceScan]: resourceScanWorker,
[Channel.ProcessUpsertedResource]: processUpsertedResourceWorker,
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import type { Tx } from "@ctrlplane/db";
import type { ReleaseTargetIdentifier } from "@ctrlplane/rule-engine";

import { and, eq, inArray, isNotNull, notInArray, or } from "@ctrlplane/db";
import * as SCHEMA from "@ctrlplane/db/schema";
import { handleEvent } from "@ctrlplane/job-dispatch";
import { HookAction } from "@ctrlplane/validators/events";

const getSystemsForUnmatchedEnvs = async (
db: Tx,
previousReleaseTargets: ReleaseTargetIdentifier[],
newReleaseTargets: ReleaseTargetIdentifier[],
) => {
const previousEnvIds = new Set<string>(
previousReleaseTargets.map((rt) => rt.environmentId),
);
const newEnvIds = new Set<string>(
newReleaseTargets.map((rt) => rt.environmentId),
);
const unmatchedEnvs = Array.from(previousEnvIds).filter(
(envId) => !newEnvIds.has(envId),
);

const envs = await db.query.environment.findMany({
where: inArray(SCHEMA.environment.id, unmatchedEnvs),
});

return db.query.system.findMany({
where: inArray(
SCHEMA.system.id,
envs.map((e) => e.systemId),
),
with: {
deployments: true,
environments: {
where: and(
isNotNull(SCHEMA.environment.resourceSelector),
notInArray(SCHEMA.environment.id, unmatchedEnvs),
),
},
},
});
};

const dispatchExitHooksIfExitedSystem = async (
db: Tx,
resource: SCHEMA.Resource,
system: {
deployments: SCHEMA.Deployment[];
environments: SCHEMA.Environment[];
},
) => {
const { deployments, environments } = system;
const matchedResource = await db.query.resource.findFirst({
where: and(
eq(SCHEMA.resource.id, resource.id),
or(
...environments.map((e) =>
SCHEMA.resourceMatchesMetadata(db, e.resourceSelector),
),
),
),
});
if (matchedResource == null) return;

const events = deployments.map((deployment) => ({
action: HookAction.DeploymentResourceRemoved,
payload: { deployment, resource },
}));

const handleEventPromises = events.map(handleEvent);
await Promise.allSettled(handleEventPromises);
};

export const dispatchExitHooks = async (
db: Tx,
resource: SCHEMA.Resource,
currentReleaseTargets: ReleaseTargetIdentifier[],
newReleaseTargets: ReleaseTargetIdentifier[],
) => {
const systems = await getSystemsForUnmatchedEnvs(
db,
currentReleaseTargets,
newReleaseTargets,
);
const dispatchExitHooksPromises = systems.map((system) =>
dispatchExitHooksIfExitedSystem(db, resource, system),
);
await Promise.allSettled(dispatchExitHooksPromises);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { eq, inArray } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as SCHEMA from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";

import { dispatchExitHooks } from "./dispatch-exit-hooks.js";
import { upsertReleaseTargets } from "./upsert-release-targets.js";

export const processUpsertedResourceWorker = createWorker(
Channel.ProcessUpsertedResource,
async ({ data: resource }) => {
const currentReleaseTargets = await db.query.releaseTarget.findMany({
where: eq(SCHEMA.releaseTarget.resourceId, resource.id),
});

const newReleaseTargets = await upsertReleaseTargets(db, resource);
const releaseTargetsToDelete = currentReleaseTargets.filter(
(rt) => !newReleaseTargets.includes(rt),
);
await db.delete(SCHEMA.releaseTarget).where(
inArray(
SCHEMA.releaseTarget.id,
releaseTargetsToDelete.map((rt) => rt.id),
),
);

const dispatchExitHooksPromise = dispatchExitHooks(
db,
resource,
currentReleaseTargets,
newReleaseTargets,
);

const addToEvaluateQueuePromise = getQueue(
Channel.EvaluateReleaseTarget,
).addBulk(
newReleaseTargets.map((rt) => ({
name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
data: rt,
})),
);

await Promise.allSettled([
dispatchExitHooksPromise,
addToEvaluateQueuePromise,
]);
},
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import type { Tx } from "@ctrlplane/db";
import type { ReleaseTargetIdentifier } from "@ctrlplane/rule-engine";
import { isPresent } from "ts-is-present";

import { and, eq } from "@ctrlplane/db";
import * as SCHEMA from "@ctrlplane/db/schema";

const getReleaseTargetInsertsForSystem = async (
db: Tx,
resourceId: string,
system: SCHEMA.System & {
environments: SCHEMA.Environment[];
deployments: SCHEMA.Deployment[];
},
): Promise<ReleaseTargetIdentifier[]> => {
const envs = system.environments.filter((e) => isPresent(e.resourceSelector));
const { deployments } = system;

const maybeTargetsPromises = envs.flatMap((env) =>
deployments.map(async (dep) => {
const resource = await db.query.resource.findFirst({
where: and(
eq(SCHEMA.resource.id, resourceId),
SCHEMA.resourceMatchesMetadata(db, env.resourceSelector),
SCHEMA.resourceMatchesMetadata(db, dep.resourceSelector),
),
});

if (resource == null) return null;
return { environmentId: env.id, deploymentId: dep.id };
}),
);

const targets = await Promise.all(maybeTargetsPromises).then((results) =>
results.filter(isPresent),
);

return targets.map((t) => ({ ...t, resourceId }));
};

export const upsertReleaseTargets = async (
db: Tx,
resource: SCHEMA.Resource,
) => {
const workspace = await db.query.workspace.findFirst({
where: eq(SCHEMA.workspace.id, resource.workspaceId),
with: { systems: { with: { environments: true, deployments: true } } },
});
if (workspace == null) throw new Error("Workspace not found");

const releaseTargetInserts = await Promise.all(
workspace.systems.map((system) =>
getReleaseTargetInsertsForSystem(db, resource.id, system),
),
).then((results) => results.flat());

return db
.insert(SCHEMA.releaseTarget)
.values(releaseTargetInserts)
.onConflictDoNothing()
.returning();
};
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { z } from "zod";

import { and, eq, isNull } from "@ctrlplane/db";
import * as schema from "@ctrlplane/db/schema";
import { deleteResources, upsertResources } from "@ctrlplane/job-dispatch";
import { deleteResource, upsertResources } from "@ctrlplane/job-dispatch";
import { variablesAES256 } from "@ctrlplane/secrets";
import { Permission } from "@ctrlplane/validators/auth";

Expand Down Expand Up @@ -120,6 +120,6 @@ export const DELETE = request()
{ status: 404 },
);

await deleteResources(db, [resource]);
await deleteResource(db, resource.id);
return NextResponse.json({ success: true });
});
Loading
Loading