Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 219 additions & 0 deletions apps/event-worker/src/workers/env-selector-update.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import type { Tx } from "@ctrlplane/db";
import type { ResourceCondition } from "@ctrlplane/validators/resources";
import { isPresent } from "ts-is-present";

import { and, eq, inArray, isNull } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker } from "@ctrlplane/events";
import { handleEvent } from "@ctrlplane/job-dispatch";
import { logger } from "@ctrlplane/logger";
import {
ComparisonOperator,
ConditionType,
} from "@ctrlplane/validators/conditions";

const log = logger.child({
module: "env-selector-update",
function: "envSelectorUpdateWorker",
});

const getAffectedResources = async (
db: Tx,
workspaceId: string,
oldSelector: ResourceCondition | null,
newSelector: ResourceCondition | null,
) => {
const oldResources =
oldSelector == null
? []
: await db.query.resource.findMany({
where: and(
eq(schema.resource.workspaceId, workspaceId),
schema.resourceMatchesMetadata(db, oldSelector),
isNull(schema.resource.deletedAt),
),
});

const newResources =
newSelector == null
? []
: await db.query.resource.findMany({
where: and(
eq(schema.resource.workspaceId, workspaceId),
schema.resourceMatchesMetadata(db, newSelector),
isNull(schema.resource.deletedAt),
),
});

const newlyMatchedResources = newResources.filter(
(newResource) =>
!oldResources.some((oldResource) => oldResource.id === newResource.id),
);

const unmatchedResources = oldResources.filter(
(oldResource) =>
!newResources.some((newResource) => newResource.id === oldResource.id),
);

return { newlyMatchedResources, unmatchedResources };
};

const createReleaseTargets = (
db: Tx,
newlyMatchedResources: schema.Resource[],
environmentId: string,
deployments: schema.Deployment[],
) =>
db.insert(schema.releaseTarget).values(
newlyMatchedResources.flatMap((resource) =>
deployments.map((deployment) => ({
resourceId: resource.id,
deploymentId: deployment.id,
environmentId: environmentId,
})),
),
);

const removeReleaseTargets = (
db: Tx,
unmatchedResources: schema.Resource[],
environmentId: string,
) =>
db.delete(schema.releaseTarget).where(
and(
eq(schema.releaseTarget.environmentId, environmentId),
inArray(
schema.releaseTarget.resourceId,
unmatchedResources.map((r) => r.id),
),
),
);

type SystemWithDeploymentsAndEnvironments = schema.System & {
deployments: schema.Deployment[];
environments: schema.Environment[];
};

const getNotInSystemCondition = (
environmentId: string,
system: SystemWithDeploymentsAndEnvironments,
): ResourceCondition | null => {
const otherEnvironmentsWithSelector = system.environments.filter(
(e) => e.id !== environmentId && e.resourceSelector != null,
);

if (otherEnvironmentsWithSelector.length === 0) return null;

return {
type: ConditionType.Comparison,
operator: ComparisonOperator.Or,
not: true,
conditions: otherEnvironmentsWithSelector
.map((e) => e.resourceSelector)
.filter(isPresent),
};
};

const dispatchExitHooks = async (
db: Tx,
environmentId: string,
system: SystemWithDeploymentsAndEnvironments,
unmatchedResources: schema.Resource[],
) => {
const notInSystemCondition = getNotInSystemCondition(environmentId, system);

if (notInSystemCondition == null) return;

const exitedResources = await db.query.resource.findMany({
where: and(
eq(schema.resource.workspaceId, system.workspaceId),
isNull(schema.resource.deletedAt),
schema.resourceMatchesMetadata(db, notInSystemCondition),
inArray(
schema.resource.id,
unmatchedResources.map((r) => r.id),
),
),
});

const events = exitedResources.flatMap((resource) =>
system.deployments.map((deployment) => ({
action: "deployment.resource.removed" as const,
payload: { deployment, resource },
})),
);

const handleEventPromises = events.map(handleEvent);
await Promise.allSettled(handleEventPromises);
};

/**
* Worker that handles environment selector updates.
*
* When an environment's resource selector is updated:
* 1. Finds newly matched resources and resources that no longer match the selector
* 2. For each newly matched resource,
* - creates release targets for all deployments in the system associated with the environment
* - inserts the new release targets into the database
* 3. For each unmatched resource,
* - removes the release targets (and consequently the releases) for the resource + environment
* - dispatches exit hooks for the resource per deployment if the resource is no longer in the system
*
* @param {Job<ChannelMap[Channel.EnvironmentSelectorUpdate]>} job - The job containing environment data with old and new selectors
* @returns {Promise<void>} - Resolves when processing is complete
* @throws {Error} - If there's an issue with database operations
*/
export const envSelectorUpdateWorker = createWorker(
Channel.EnvironmentSelectorUpdate,
async (job) => {
const { oldSelector, ...environment } = job.data;
const system = await db.query.environment
.findFirst({
where: eq(schema.environment.id, environment.id),
with: { system: { with: { deployments: true, environments: true } } },
})
.then((res) => res?.system);

if (system == null) {
log.error("System not found", { environmentId: environment.id });
return;
}

const { workspaceId, deployments } = system;

const { newlyMatchedResources, unmatchedResources } =
await getAffectedResources(
db,
workspaceId,
oldSelector,
environment.resourceSelector,
);

const createReleaseTargetsPromise = createReleaseTargets(
db,
newlyMatchedResources,
environment.id,
deployments,
);

const removeReleaseTargetsPromise = removeReleaseTargets(
db,
unmatchedResources,
environment.id,
);

const dispatchExitHooksPromise = dispatchExitHooks(
db,
environment.id,
system,
unmatchedResources,
);

await Promise.all([
createReleaseTargetsPromise,
removeReleaseTargetsPromise,
dispatchExitHooksPromise,
]);
},
);
2 changes: 2 additions & 0 deletions apps/event-worker/src/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { Worker } from "bullmq";

import { Channel } from "@ctrlplane/events";

import { envSelectorUpdateWorker } from "./env-selector-update.js";
import { dispatchJobWorker } from "./job-dispatch/index.js";
import { newDeploymentVersionWorker } from "./new-deployment-version.js";
import { newDeploymentWorker } from "./new-deployment.js";
Expand All @@ -16,6 +17,7 @@ export const workers: Workers<keyof ChannelMap> = {
[Channel.NewDeployment]: newDeploymentWorker,
[Channel.NewDeploymentVersion]: newDeploymentVersionWorker,
[Channel.NewEnvironment]: null,
[Channel.EnvironmentSelectorUpdate]: envSelectorUpdateWorker,
[Channel.ReleaseEvaluate]: null,
[Channel.DispatchJob]: dispatchJobWorker,
[Channel.ResourceScan]: resourceScanWorker,
Expand Down
18 changes: 17 additions & 1 deletion apps/webservice/src/app/api/v1/environments/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import { NextResponse } from "next/server";
import _ from "lodash";
import { z } from "zod";

import { inArray, upsertEnv } from "@ctrlplane/db";
import { eq, inArray, takeFirstOrNull, upsertEnv } from "@ctrlplane/db";
import * as schema from "@ctrlplane/db/schema";
import { Channel, getQueue } from "@ctrlplane/events";
import { createJobsForNewEnvironment } from "@ctrlplane/job-dispatch";
import { logger } from "@ctrlplane/logger";
import { Permission } from "@ctrlplane/validators/auth";
Expand Down Expand Up @@ -49,11 +50,26 @@ export const POST = request()
})),
);

const existingEnv = await db
.select()
.from(schema.environment)
.where(eq(schema.environment.name, body.name))
.then(takeFirstOrNull);

const environment = await upsertEnv(tx, {
...body,
versionChannels: channels,
});

if (
existingEnv != null &&
!_.isEqual(existingEnv.resourceSelector, body.resourceSelector)
)
getQueue(Channel.EnvironmentSelectorUpdate).add(environment.id, {
...environment,
oldSelector: existingEnv.resourceSelector,
});

await createJobsForNewEnvironment(tx, environment);
const { metadata } = body;
return NextResponse.json({ ...environment, metadata });
Expand Down
6 changes: 6 additions & 0 deletions packages/api/src/router/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
system,
updateEnvironment,
} from "@ctrlplane/db/schema";
import { Channel, getQueue } from "@ctrlplane/events";
import {
dispatchJobsForAddedResources,
getEventsForEnvironmentDeleted,
Expand Down Expand Up @@ -311,6 +312,11 @@ export const environmentRouter = createTRPCRouter({
const isUpdatingResourceSelector =
resourceSelector != null || oldEnv.environment.resourceSelector != null;
if (isUpdatingResourceSelector) {
getQueue(Channel.EnvironmentSelectorUpdate).add(input.id, {
...updatedEnv,
oldSelector: oldEnv.environment.resourceSelector,
});

const hasResourceSelectorsChanged = !_.isEqual(
oldEnv.environment.resourceSelector,
resourceSelector,
Expand Down
1 change: 1 addition & 0 deletions packages/events/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"dependencies": {
"@ctrlplane/db": "workspace:*",
"@ctrlplane/logger": "workspace:*",
"@ctrlplane/validators": "workspace:*",
"@t3-oss/env-core": "catalog:",
"bullmq": "catalog:",
"date-fns": "^4.1.0",
Expand Down
5 changes: 5 additions & 0 deletions packages/events/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type * as schema from "@ctrlplane/db/schema";
import type { ResourceCondition } from "@ctrlplane/validators/resources";

export enum Channel {
JobSync = "job-sync",
Expand All @@ -12,6 +13,7 @@ export enum Channel {
NewDeployment = "new-deployment",
NewDeploymentVersion = "new-deployment-version",
NewEnvironment = "new-environment",
EnvironmentSelectorUpdate = "environment-selector-update",
NewRelease = "new-release",
ReleaseEvaluate = "release-evaluate",
}
Expand All @@ -26,6 +28,9 @@ export type ChannelMap = {
[Channel.NewDeployment]: schema.Deployment;
[Channel.NewDeploymentVersion]: schema.DeploymentVersion;
[Channel.NewEnvironment]: typeof schema.environment.$inferSelect;
[Channel.EnvironmentSelectorUpdate]: schema.Environment & {
oldSelector: ResourceCondition | null;
};
[Channel.ReleaseEvaluate]: ReleaseEvaluateJobData;
[Channel.DispatchJob]: { jobId: string };
[Channel.ResourceScan]: { resourceProviderId: string };
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading