diff --git a/apps/event-worker/src/workers/env-selector-update.ts b/apps/event-worker/src/workers/env-selector-update.ts new file mode 100644 index 000000000..068870952 --- /dev/null +++ b/apps/event-worker/src/workers/env-selector-update.ts @@ -0,0 +1,219 @@ +import type { Tx } from "@ctrlplane/db"; +import type { ResourceCondition } from "@ctrlplane/validators/resources"; +import { isPresent } from "ts-is-present"; + +import { and, eq, inArray, isNull } from "@ctrlplane/db"; +import { db } from "@ctrlplane/db/client"; +import * as schema from "@ctrlplane/db/schema"; +import { Channel, createWorker } from "@ctrlplane/events"; +import { handleEvent } from "@ctrlplane/job-dispatch"; +import { logger } from "@ctrlplane/logger"; +import { + ComparisonOperator, + ConditionType, +} from "@ctrlplane/validators/conditions"; + +const log = logger.child({ + module: "env-selector-update", + function: "envSelectorUpdateWorker", +}); + +const getAffectedResources = async ( + db: Tx, + workspaceId: string, + oldSelector: ResourceCondition | null, + newSelector: ResourceCondition | null, +) => { + const oldResources = + oldSelector == null + ? [] + : await db.query.resource.findMany({ + where: and( + eq(schema.resource.workspaceId, workspaceId), + schema.resourceMatchesMetadata(db, oldSelector), + isNull(schema.resource.deletedAt), + ), + }); + + const newResources = + newSelector == null + ? [] + : await db.query.resource.findMany({ + where: and( + eq(schema.resource.workspaceId, workspaceId), + schema.resourceMatchesMetadata(db, newSelector), + isNull(schema.resource.deletedAt), + ), + }); + + const newlyMatchedResources = newResources.filter( + (newResource) => + !oldResources.some((oldResource) => oldResource.id === newResource.id), + ); + + const unmatchedResources = oldResources.filter( + (oldResource) => + !newResources.some((newResource) => newResource.id === oldResource.id), + ); + + return { newlyMatchedResources, unmatchedResources }; +}; + +const createReleaseTargets = ( + db: Tx, + newlyMatchedResources: schema.Resource[], + environmentId: string, + deployments: schema.Deployment[], +) => + db.insert(schema.releaseTarget).values( + newlyMatchedResources.flatMap((resource) => + deployments.map((deployment) => ({ + resourceId: resource.id, + deploymentId: deployment.id, + environmentId: environmentId, + })), + ), + ); + +const removeReleaseTargets = ( + db: Tx, + unmatchedResources: schema.Resource[], + environmentId: string, +) => + db.delete(schema.releaseTarget).where( + and( + eq(schema.releaseTarget.environmentId, environmentId), + inArray( + schema.releaseTarget.resourceId, + unmatchedResources.map((r) => r.id), + ), + ), + ); + +type SystemWithDeploymentsAndEnvironments = schema.System & { + deployments: schema.Deployment[]; + environments: schema.Environment[]; +}; + +const getNotInSystemCondition = ( + environmentId: string, + system: SystemWithDeploymentsAndEnvironments, +): ResourceCondition | null => { + const otherEnvironmentsWithSelector = system.environments.filter( + (e) => e.id !== environmentId && e.resourceSelector != null, + ); + + if (otherEnvironmentsWithSelector.length === 0) return null; + + return { + type: ConditionType.Comparison, + operator: ComparisonOperator.Or, + not: true, + conditions: otherEnvironmentsWithSelector + .map((e) => e.resourceSelector) + .filter(isPresent), + }; +}; + +const dispatchExitHooks = async ( + db: Tx, + environmentId: string, + system: SystemWithDeploymentsAndEnvironments, + unmatchedResources: schema.Resource[], +) => { + const notInSystemCondition = getNotInSystemCondition(environmentId, system); + + if (notInSystemCondition == null) return; + + const exitedResources = await db.query.resource.findMany({ + where: and( + eq(schema.resource.workspaceId, system.workspaceId), + isNull(schema.resource.deletedAt), + schema.resourceMatchesMetadata(db, notInSystemCondition), + inArray( + schema.resource.id, + unmatchedResources.map((r) => r.id), + ), + ), + }); + + const events = exitedResources.flatMap((resource) => + system.deployments.map((deployment) => ({ + action: "deployment.resource.removed" as const, + payload: { deployment, resource }, + })), + ); + + const handleEventPromises = events.map(handleEvent); + await Promise.allSettled(handleEventPromises); +}; + +/** + * Worker that handles environment selector updates. + * + * When an environment's resource selector is updated: + * 1. Finds newly matched resources and resources that no longer match the selector + * 2. For each newly matched resource, + * - creates release targets for all deployments in the system associated with the environment + * - inserts the new release targets into the database + * 3. For each unmatched resource, + * - removes the release targets (and consequently the releases) for the resource + environment + * - dispatches exit hooks for the resource per deployment if the resource is no longer in the system + * + * @param {Job} job - The job containing environment data with old and new selectors + * @returns {Promise} - Resolves when processing is complete + * @throws {Error} - If there's an issue with database operations + */ +export const envSelectorUpdateWorker = createWorker( + Channel.EnvironmentSelectorUpdate, + async (job) => { + const { oldSelector, ...environment } = job.data; + const system = await db.query.environment + .findFirst({ + where: eq(schema.environment.id, environment.id), + with: { system: { with: { deployments: true, environments: true } } }, + }) + .then((res) => res?.system); + + if (system == null) { + log.error("System not found", { environmentId: environment.id }); + return; + } + + const { workspaceId, deployments } = system; + + const { newlyMatchedResources, unmatchedResources } = + await getAffectedResources( + db, + workspaceId, + oldSelector, + environment.resourceSelector, + ); + + const createReleaseTargetsPromise = createReleaseTargets( + db, + newlyMatchedResources, + environment.id, + deployments, + ); + + const removeReleaseTargetsPromise = removeReleaseTargets( + db, + unmatchedResources, + environment.id, + ); + + const dispatchExitHooksPromise = dispatchExitHooks( + db, + environment.id, + system, + unmatchedResources, + ); + + await Promise.all([ + createReleaseTargetsPromise, + removeReleaseTargetsPromise, + dispatchExitHooksPromise, + ]); + }, +); diff --git a/apps/event-worker/src/workers/index.ts b/apps/event-worker/src/workers/index.ts index 885efd837..ab67b499f 100644 --- a/apps/event-worker/src/workers/index.ts +++ b/apps/event-worker/src/workers/index.ts @@ -3,6 +3,7 @@ import type { Worker } from "bullmq"; import { Channel } from "@ctrlplane/events"; +import { envSelectorUpdateWorker } from "./env-selector-update.js"; import { dispatchJobWorker } from "./job-dispatch/index.js"; import { newDeploymentVersionWorker } from "./new-deployment-version.js"; import { newDeploymentWorker } from "./new-deployment.js"; @@ -16,6 +17,7 @@ export const workers: Workers = { [Channel.NewDeployment]: newDeploymentWorker, [Channel.NewDeploymentVersion]: newDeploymentVersionWorker, [Channel.NewEnvironment]: null, + [Channel.EnvironmentSelectorUpdate]: envSelectorUpdateWorker, [Channel.ReleaseEvaluate]: null, [Channel.DispatchJob]: dispatchJobWorker, [Channel.ResourceScan]: resourceScanWorker, diff --git a/apps/webservice/src/app/api/v1/environments/route.ts b/apps/webservice/src/app/api/v1/environments/route.ts index a9f05d7ad..db6348c8f 100644 --- a/apps/webservice/src/app/api/v1/environments/route.ts +++ b/apps/webservice/src/app/api/v1/environments/route.ts @@ -4,8 +4,9 @@ import { NextResponse } from "next/server"; import _ from "lodash"; import { z } from "zod"; -import { inArray, upsertEnv } from "@ctrlplane/db"; +import { eq, inArray, takeFirstOrNull, upsertEnv } from "@ctrlplane/db"; import * as schema from "@ctrlplane/db/schema"; +import { Channel, getQueue } from "@ctrlplane/events"; import { createJobsForNewEnvironment } from "@ctrlplane/job-dispatch"; import { logger } from "@ctrlplane/logger"; import { Permission } from "@ctrlplane/validators/auth"; @@ -49,11 +50,26 @@ export const POST = request() })), ); + const existingEnv = await db + .select() + .from(schema.environment) + .where(eq(schema.environment.name, body.name)) + .then(takeFirstOrNull); + const environment = await upsertEnv(tx, { ...body, versionChannels: channels, }); + if ( + existingEnv != null && + !_.isEqual(existingEnv.resourceSelector, body.resourceSelector) + ) + getQueue(Channel.EnvironmentSelectorUpdate).add(environment.id, { + ...environment, + oldSelector: existingEnv.resourceSelector, + }); + await createJobsForNewEnvironment(tx, environment); const { metadata } = body; return NextResponse.json({ ...environment, metadata }); diff --git a/packages/api/src/router/environment.ts b/packages/api/src/router/environment.ts index 186f049c9..6495fc1c6 100644 --- a/packages/api/src/router/environment.ts +++ b/packages/api/src/router/environment.ts @@ -29,6 +29,7 @@ import { system, updateEnvironment, } from "@ctrlplane/db/schema"; +import { Channel, getQueue } from "@ctrlplane/events"; import { dispatchJobsForAddedResources, getEventsForEnvironmentDeleted, @@ -311,6 +312,11 @@ export const environmentRouter = createTRPCRouter({ const isUpdatingResourceSelector = resourceSelector != null || oldEnv.environment.resourceSelector != null; if (isUpdatingResourceSelector) { + getQueue(Channel.EnvironmentSelectorUpdate).add(input.id, { + ...updatedEnv, + oldSelector: oldEnv.environment.resourceSelector, + }); + const hasResourceSelectorsChanged = !_.isEqual( oldEnv.environment.resourceSelector, resourceSelector, diff --git a/packages/events/package.json b/packages/events/package.json index 9a066913b..a72819598 100644 --- a/packages/events/package.json +++ b/packages/events/package.json @@ -21,6 +21,7 @@ "dependencies": { "@ctrlplane/db": "workspace:*", "@ctrlplane/logger": "workspace:*", + "@ctrlplane/validators": "workspace:*", "@t3-oss/env-core": "catalog:", "bullmq": "catalog:", "date-fns": "^4.1.0", diff --git a/packages/events/src/types.ts b/packages/events/src/types.ts index c6740d29a..95b929d8c 100644 --- a/packages/events/src/types.ts +++ b/packages/events/src/types.ts @@ -1,4 +1,5 @@ import type * as schema from "@ctrlplane/db/schema"; +import type { ResourceCondition } from "@ctrlplane/validators/resources"; export enum Channel { JobSync = "job-sync", @@ -12,6 +13,7 @@ export enum Channel { NewDeployment = "new-deployment", NewDeploymentVersion = "new-deployment-version", NewEnvironment = "new-environment", + EnvironmentSelectorUpdate = "environment-selector-update", NewRelease = "new-release", ReleaseEvaluate = "release-evaluate", } @@ -26,6 +28,9 @@ export type ChannelMap = { [Channel.NewDeployment]: schema.Deployment; [Channel.NewDeploymentVersion]: schema.DeploymentVersion; [Channel.NewEnvironment]: typeof schema.environment.$inferSelect; + [Channel.EnvironmentSelectorUpdate]: schema.Environment & { + oldSelector: ResourceCondition | null; + }; [Channel.ReleaseEvaluate]: ReleaseEvaluateJobData; [Channel.DispatchJob]: { jobId: string }; [Channel.ResourceScan]: { resourceProviderId: string }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6770aeb5e..9ce92774f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1127,6 +1127,9 @@ importers: '@ctrlplane/logger': specifier: workspace:* version: link:../logger + '@ctrlplane/validators': + specifier: workspace:* + version: link:../validators '@t3-oss/env-core': specifier: 'catalog:' version: 0.11.1(typescript@5.8.2)(zod@3.24.2)