diff --git a/apps/event-worker/src/workers/index.ts b/apps/event-worker/src/workers/index.ts index 5b162807a..3468e73c6 100644 --- a/apps/event-worker/src/workers/index.ts +++ b/apps/event-worker/src/workers/index.ts @@ -7,6 +7,7 @@ import { dispatchJobWorker } from "./job-dispatch/index.js"; import { newDeploymentVersionWorker } from "./new-deployment-version.js"; import { newDeploymentWorker } from "./new-deployment.js"; import { policyEvaluate } from "./policy-evaluate.js"; +import { processUpsertedResourceWorker } from "./process-upserted-resource/process-upserted-resource.js"; import { resourceScanWorker } from "./resource-scan/index.js"; import { updateDeploymentVariableWorker } from "./update-deployment-variable.js"; import { updateEnvironmentWorker } from "./update-environment.js"; @@ -26,4 +27,5 @@ export const workers: Workers = { [Channel.EvaluateReleaseTarget]: policyEvaluate, [Channel.DispatchJob]: dispatchJobWorker, [Channel.ResourceScan]: resourceScanWorker, + [Channel.ProcessUpsertedResource]: processUpsertedResourceWorker, }; diff --git a/apps/event-worker/src/workers/process-upserted-resource/dispatch-exit-hooks.ts b/apps/event-worker/src/workers/process-upserted-resource/dispatch-exit-hooks.ts new file mode 100644 index 000000000..fce914644 --- /dev/null +++ b/apps/event-worker/src/workers/process-upserted-resource/dispatch-exit-hooks.ts @@ -0,0 +1,90 @@ +import type { Tx } from "@ctrlplane/db"; +import type { ReleaseTargetIdentifier } from "@ctrlplane/rule-engine"; + +import { and, eq, inArray, isNotNull, notInArray, or } from "@ctrlplane/db"; +import * as SCHEMA from "@ctrlplane/db/schema"; +import { handleEvent } from "@ctrlplane/job-dispatch"; +import { HookAction } from "@ctrlplane/validators/events"; + +const getSystemsForUnmatchedEnvs = async ( + db: Tx, + previousReleaseTargets: ReleaseTargetIdentifier[], + newReleaseTargets: ReleaseTargetIdentifier[], +) => { + const previousEnvIds = new Set( + previousReleaseTargets.map((rt) => rt.environmentId), + ); + const newEnvIds = new Set( + newReleaseTargets.map((rt) => rt.environmentId), + ); + const unmatchedEnvs = Array.from(previousEnvIds).filter( + (envId) => !newEnvIds.has(envId), + ); + + const envs = await db.query.environment.findMany({ + where: inArray(SCHEMA.environment.id, unmatchedEnvs), + }); + + return db.query.system.findMany({ + where: inArray( + SCHEMA.system.id, + envs.map((e) => e.systemId), + ), + with: { + deployments: true, + environments: { + where: and( + isNotNull(SCHEMA.environment.resourceSelector), + notInArray(SCHEMA.environment.id, unmatchedEnvs), + ), + }, + }, + }); +}; + +const dispatchExitHooksIfExitedSystem = async ( + db: Tx, + resource: SCHEMA.Resource, + system: { + deployments: SCHEMA.Deployment[]; + environments: SCHEMA.Environment[]; + }, +) => { + const { deployments, environments } = system; + const matchedResource = await db.query.resource.findFirst({ + where: and( + eq(SCHEMA.resource.id, resource.id), + or( + ...environments.map((e) => + SCHEMA.resourceMatchesMetadata(db, e.resourceSelector), + ), + ), + ), + }); + if (matchedResource == null) return; + + const events = deployments.map((deployment) => ({ + action: HookAction.DeploymentResourceRemoved, + payload: { deployment, resource }, + })); + + const handleEventPromises = events.map(handleEvent); + await Promise.allSettled(handleEventPromises); +}; + +export const dispatchExitHooks = async ( + db: Tx, + resource: SCHEMA.Resource, + currentReleaseTargets: ReleaseTargetIdentifier[], + newReleaseTargets: ReleaseTargetIdentifier[], +) => { + const systems = await getSystemsForUnmatchedEnvs( + db, + currentReleaseTargets, + newReleaseTargets, + ); + const dispatchExitHooksPromises = systems.map((system) => + dispatchExitHooksIfExitedSystem(db, resource, system), + ); + await Promise.allSettled(dispatchExitHooksPromises); +}; diff --git a/apps/event-worker/src/workers/process-upserted-resource/process-upserted-resource.ts b/apps/event-worker/src/workers/process-upserted-resource/process-upserted-resource.ts new file mode 100644 index 000000000..9fdb2c12b --- /dev/null +++ b/apps/event-worker/src/workers/process-upserted-resource/process-upserted-resource.ts @@ -0,0 +1,48 @@ +import { eq, inArray } from "@ctrlplane/db"; +import { db } from "@ctrlplane/db/client"; +import * as SCHEMA from "@ctrlplane/db/schema"; +import { Channel, createWorker, getQueue } from "@ctrlplane/events"; + +import { dispatchExitHooks } from "./dispatch-exit-hooks.js"; +import { upsertReleaseTargets } from "./upsert-release-targets.js"; + +export const processUpsertedResourceWorker = createWorker( + Channel.ProcessUpsertedResource, + async ({ data: resource }) => { + const currentReleaseTargets = await db.query.releaseTarget.findMany({ + where: eq(SCHEMA.releaseTarget.resourceId, resource.id), + }); + + const newReleaseTargets = await upsertReleaseTargets(db, resource); + const releaseTargetsToDelete = currentReleaseTargets.filter( + (rt) => !newReleaseTargets.includes(rt), + ); + await db.delete(SCHEMA.releaseTarget).where( + inArray( + SCHEMA.releaseTarget.id, + releaseTargetsToDelete.map((rt) => rt.id), + ), + ); + + const dispatchExitHooksPromise = dispatchExitHooks( + db, + resource, + currentReleaseTargets, + newReleaseTargets, + ); + + const addToEvaluateQueuePromise = getQueue( + Channel.EvaluateReleaseTarget, + ).addBulk( + newReleaseTargets.map((rt) => ({ + name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`, + data: rt, + })), + ); + + await Promise.allSettled([ + dispatchExitHooksPromise, + addToEvaluateQueuePromise, + ]); + }, +); diff --git a/apps/event-worker/src/workers/process-upserted-resource/upsert-release-targets.ts b/apps/event-worker/src/workers/process-upserted-resource/upsert-release-targets.ts new file mode 100644 index 000000000..8ea5dc0ea --- /dev/null +++ b/apps/event-worker/src/workers/process-upserted-resource/upsert-release-targets.ts @@ -0,0 +1,62 @@ +import type { Tx } from "@ctrlplane/db"; +import type { ReleaseTargetIdentifier } from "@ctrlplane/rule-engine"; +import { isPresent } from "ts-is-present"; + +import { and, eq } from "@ctrlplane/db"; +import * as SCHEMA from "@ctrlplane/db/schema"; + +const getReleaseTargetInsertsForSystem = async ( + db: Tx, + resourceId: string, + system: SCHEMA.System & { + environments: SCHEMA.Environment[]; + deployments: SCHEMA.Deployment[]; + }, +): Promise => { + const envs = system.environments.filter((e) => isPresent(e.resourceSelector)); + const { deployments } = system; + + const maybeTargetsPromises = envs.flatMap((env) => + deployments.map(async (dep) => { + const resource = await db.query.resource.findFirst({ + where: and( + eq(SCHEMA.resource.id, resourceId), + SCHEMA.resourceMatchesMetadata(db, env.resourceSelector), + SCHEMA.resourceMatchesMetadata(db, dep.resourceSelector), + ), + }); + + if (resource == null) return null; + return { environmentId: env.id, deploymentId: dep.id }; + }), + ); + + const targets = await Promise.all(maybeTargetsPromises).then((results) => + results.filter(isPresent), + ); + + return targets.map((t) => ({ ...t, resourceId })); +}; + +export const upsertReleaseTargets = async ( + db: Tx, + resource: SCHEMA.Resource, +) => { + const workspace = await db.query.workspace.findFirst({ + where: eq(SCHEMA.workspace.id, resource.workspaceId), + with: { systems: { with: { environments: true, deployments: true } } }, + }); + if (workspace == null) throw new Error("Workspace not found"); + + const releaseTargetInserts = await Promise.all( + workspace.systems.map((system) => + getReleaseTargetInsertsForSystem(db, resource.id, system), + ), + ).then((results) => results.flat()); + + return db + .insert(SCHEMA.releaseTarget) + .values(releaseTargetInserts) + .onConflictDoNothing() + .returning(); +}; diff --git a/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts b/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts index d485cb343..c5584abe6 100644 --- a/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts +++ b/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts @@ -4,7 +4,7 @@ import { z } from "zod"; import { and, eq, isNull } from "@ctrlplane/db"; import * as schema from "@ctrlplane/db/schema"; -import { deleteResources, upsertResources } from "@ctrlplane/job-dispatch"; +import { deleteResource, upsertResources } from "@ctrlplane/job-dispatch"; import { variablesAES256 } from "@ctrlplane/secrets"; import { Permission } from "@ctrlplane/validators/auth"; @@ -120,6 +120,6 @@ export const DELETE = request() { status: 404 }, ); - await deleteResources(db, [resource]); + await deleteResource(db, resource.id); return NextResponse.json({ success: true }); }); diff --git a/packages/api/src/router/environment.ts b/packages/api/src/router/environment.ts index d9c913678..5ff297957 100644 --- a/packages/api/src/router/environment.ts +++ b/packages/api/src/router/environment.ts @@ -1,21 +1,8 @@ -import type { ResourceCondition } from "@ctrlplane/validators/resources"; import _ from "lodash"; import { isPresent } from "ts-is-present"; import { z } from "zod"; -import { - and, - count, - eq, - ilike, - inArray, - isNotNull, - isNull, - ne, - not, - takeFirst, - upsertEnv, -} from "@ctrlplane/db"; +import { and, count, eq, ilike, takeFirst, upsertEnv } from "@ctrlplane/db"; import { createEnvironment, deploymentVersionChannel, @@ -24,22 +11,15 @@ import { environmentPolicy, environmentPolicyDeploymentVersionChannel, environmentPolicyReleaseWindow, - resource, - resourceMatchesMetadata, system, updateEnvironment, } from "@ctrlplane/db/schema"; import { Channel, getQueue } from "@ctrlplane/events"; import { - dispatchJobsForAddedResources, getEventsForEnvironmentDeleted, handleEvent, } from "@ctrlplane/job-dispatch"; import { Permission } from "@ctrlplane/validators/auth"; -import { - ComparisonOperator, - ConditionType, -} from "@ctrlplane/validators/conditions"; import { createTRPCRouter, protectedProcedure } from "../trpc"; import { environmentPageRouter } from "./environment-page/router"; @@ -308,120 +288,11 @@ export const environmentRouter = createTRPCRouter({ .returning() .then(takeFirst); - const { resourceSelector } = input.data; - const isUpdatingResourceSelector = - resourceSelector != null || oldEnv.environment.resourceSelector != null; - getQueue(Channel.UpdateEnvironment).add(input.id, { ...updatedEnv, oldSelector: oldEnv.environment.resourceSelector, }); - if (isUpdatingResourceSelector) { - const hasResourceSelectorsChanged = !_.isEqual( - oldEnv.environment.resourceSelector, - resourceSelector, - ); - - if (hasResourceSelectorsChanged) { - const isOtherEnv = and( - isNotNull(environment.resourceSelector), - ne(environment.id, input.id), - ); - const sys = await ctx.db.query.system.findFirst({ - where: eq(system.id, oldEnv.system.id), - with: { - environments: { where: isOtherEnv }, - deployments: true, - }, - }); - - const otherEnvFilters = - sys?.environments - .map((e) => e.resourceSelector) - .filter(isPresent) ?? []; - - const oldQuery = resourceMatchesMetadata( - ctx.db, - oldEnv.environment.resourceSelector, - ); - const newQuery = resourceMatchesMetadata(ctx.db, resourceSelector); - - const newResources = - newQuery != null - ? await ctx.db - .select({ id: resource.id }) - .from(resource) - .where( - and( - eq(resource.workspaceId, oldEnv.system.workspaceId), - isNull(resource.deletedAt), - newQuery, - oldQuery && not(oldQuery), - ), - ) - : []; - - const removedResources = - oldQuery != null - ? await ctx.db.query.resource.findMany({ - where: and( - eq(resource.workspaceId, oldEnv.system.workspaceId), - isNull(resource.deletedAt), - oldQuery, - newQuery && not(newQuery), - ), - }) - : []; - - if (removedResources.length > 0) { - const sysFilter: ResourceCondition = { - type: ConditionType.Comparison, - operator: ComparisonOperator.Or, - not: true, - conditions: otherEnvFilters, - }; - - const isRemovedFromEnv = inArray( - resource.id, - removedResources.map((r) => r.id), - ); - - const isRemovedFromSystem = - otherEnvFilters.length > 0 - ? resourceMatchesMetadata(ctx.db, sysFilter) - : undefined; - const isNotDeleted = isNull(resource.deletedAt); - - const removedFromSystemResources = - await ctx.db.query.resource.findMany({ - where: and(isRemovedFromEnv, isRemovedFromSystem, isNotDeleted), - }); - - const events = removedFromSystemResources.flatMap((resource) => - (sys?.deployments ?? []).map((deployment) => ({ - action: "deployment.resource.removed" as const, - payload: { deployment, resource }, - })), - ); - - const handleEventPromises = events.map(handleEvent); - await Promise.allSettled(handleEventPromises); - } - - if (newResources.length > 0) { - await dispatchJobsForAddedResources( - ctx.db, - newResources.map((r) => r.id), - input.id, - ); - console.log( - `Found ${newResources.length} new resources for environment ${input.id}`, - ); - } - } - } - return updatedEnv; }), diff --git a/packages/api/src/router/resources.ts b/packages/api/src/router/resources.ts index f3f6e4e18..8efc3ef7c 100644 --- a/packages/api/src/router/resources.ts +++ b/packages/api/src/router/resources.ts @@ -22,7 +22,7 @@ import { cancelOldReleaseJobTriggersOnJobDispatch, createJobApprovals, createReleaseJobTriggers, - deleteResources, + deleteResource, dispatchReleaseJobTriggers, isPassingAllPoliciesExceptNewerThanLastActive, isPassingChannelSelectorPolicy, @@ -695,12 +695,14 @@ export const resourceRouter = createTRPCRouter({ ), }) .input(z.array(z.string().uuid())) - .mutation(async ({ ctx, input }) => + .mutation(({ ctx, input }) => ctx.db.query.resource .findMany({ where: and(inArray(schema.resource.id, input), isNotDeleted), }) - .then((resources) => deleteResources(ctx.db, resources)), + .then((resources) => + Promise.all(resources.map((r) => deleteResource(ctx.db, r.id))), + ), ), metadataKeys: protectedProcedure diff --git a/packages/db/src/schema/resource.ts b/packages/db/src/schema/resource.ts index 2acc97f51..147b7c234 100644 --- a/packages/db/src/schema/resource.ts +++ b/packages/db/src/schema/resource.ts @@ -171,6 +171,8 @@ export const resourceMetadataRelations = relations( }), ); +export type ResourceMetadata = InferSelectModel; + const buildMetadataCondition = (tx: Tx, cond: MetadataCondition): SQL => { if (cond.operator === MetadataOperator.Null) return notExists( diff --git a/packages/events/src/types.ts b/packages/events/src/types.ts index f04361cf4..d0e1aa067 100644 --- a/packages/events/src/types.ts +++ b/packages/events/src/types.ts @@ -21,6 +21,8 @@ export enum Channel { UpdateResourceVariable = "update-resource-variable", EvaluateReleaseTarget = "evaluate-release-target", + + ProcessUpsertedResource = "process-upserted-resource", } export type EvaluateReleaseTargetJob = { @@ -42,4 +44,5 @@ export type ChannelMap = { [Channel.EvaluateReleaseTarget]: EvaluateReleaseTargetJob; [Channel.DispatchJob]: { jobId: string }; [Channel.ResourceScan]: { resourceProviderId: string }; + [Channel.ProcessUpsertedResource]: schema.Resource; }; diff --git a/packages/job-dispatch/src/index.ts b/packages/job-dispatch/src/index.ts index ce6b10ba9..2c075dd38 100644 --- a/packages/job-dispatch/src/index.ts +++ b/packages/job-dispatch/src/index.ts @@ -23,4 +23,5 @@ export * from "./policies/release-window.js"; export * from "./environment-creation.js"; export * from "./pending-job-checker.js"; export * from "./events/index.js"; -export * from "./resource/index.js"; +export * from "./resource/delete-resource.js"; +export * from "./resource/upsert.js"; diff --git a/packages/job-dispatch/src/resource/db-upsert-resource.ts b/packages/job-dispatch/src/resource/db-upsert-resource.ts new file mode 100644 index 000000000..9799a00eb --- /dev/null +++ b/packages/job-dispatch/src/resource/db-upsert-resource.ts @@ -0,0 +1,125 @@ +import type { Tx } from "@ctrlplane/db"; + +import { + and, + buildConflictUpdateColumns, + eq, + inArray, + takeFirst, +} from "@ctrlplane/db"; +import * as SCHEMA from "@ctrlplane/db/schema"; + +type VariableInsert = { + key: string; + value: any; + sensitive: boolean; +}; + +type ResourceToInsert = SCHEMA.InsertResource & { + metadata?: Record; + variables?: Array; +}; + +const upsertMetadata = async ( + db: Tx, + resourceId: string, + oldKeys: string[], + newMetadata: Record, +) => { + const newKeys = Object.keys(newMetadata); + const deletedKeys = oldKeys.filter((key) => !newKeys.includes(key)); + + await db + .insert(SCHEMA.resourceMetadata) + .values( + Object.entries(newMetadata).map(([key, value]) => ({ + resourceId, + key, + value, + })), + ) + .onConflictDoUpdate({ + target: [SCHEMA.resourceMetadata.key, SCHEMA.resourceMetadata.resourceId], + set: buildConflictUpdateColumns(SCHEMA.resourceMetadata, ["value"]), + }); + + await db + .delete(SCHEMA.resourceMetadata) + .where(inArray(SCHEMA.resourceMetadata.key, deletedKeys)); +}; + +const upsertVariables = async ( + db: Tx, + resourceId: string, + oldKeys: string[], + newVariables: VariableInsert[], +) => { + const newKeys = newVariables.map((v) => v.key); + const deletedKeys = oldKeys.filter((key) => !newKeys.includes(key)); + + await db + .insert(SCHEMA.resourceVariable) + .values(newVariables.map((v) => ({ ...v, resourceId }))) + .onConflictDoUpdate({ + target: [SCHEMA.resourceVariable.key, SCHEMA.resourceVariable.resourceId], + set: buildConflictUpdateColumns(SCHEMA.resourceVariable, [ + "value", + "sensitive", + ]), + }); + + await db + .delete(SCHEMA.resourceVariable) + .where(inArray(SCHEMA.resourceVariable.key, deletedKeys)); +}; + +export const dbUpsertResource = async ( + db: Tx, + resourceInsert: ResourceToInsert, +) => { + const existingResource = await db.query.resource.findFirst({ + where: and( + eq(SCHEMA.resource.identifier, resourceInsert.identifier), + eq(SCHEMA.resource.workspaceId, resourceInsert.workspaceId), + ), + with: { variables: true, metadata: true }, + }); + + const resource = await db + .insert(SCHEMA.resource) + .values(resourceInsert) + .onConflictDoUpdate({ + target: [SCHEMA.resource.identifier, SCHEMA.resource.workspaceId], + set: { + ...buildConflictUpdateColumns(SCHEMA.resource, [ + "name", + "version", + "kind", + "config", + "providerId", + ]), + updatedAt: new Date(), + deletedAt: null, + }, + }) + .returning() + .then(takeFirst); + + const upsertMetadataPromise = upsertMetadata( + db, + resource.id, + existingResource?.metadata.map((m) => m.key) ?? [], + resourceInsert.metadata ?? {}, + ); + + const upsertVariablesPromise = upsertVariables( + db, + resource.id, + existingResource?.variables.map((v) => v.key) ?? [], + resourceInsert.variables ?? [], + ); + + await Promise.allSettled([upsertMetadataPromise, upsertVariablesPromise]); + + return resource; +}; diff --git a/packages/job-dispatch/src/resource/delete-resource.ts b/packages/job-dispatch/src/resource/delete-resource.ts new file mode 100644 index 000000000..140aa3250 --- /dev/null +++ b/packages/job-dispatch/src/resource/delete-resource.ts @@ -0,0 +1,72 @@ +import type { Tx } from "@ctrlplane/db"; + +import { and, eq, inArray, or, takeFirst } from "@ctrlplane/db"; +import * as SCHEMA from "@ctrlplane/db/schema"; +import { + getEventsForResourceDeleted, + handleEvent, + updateJob, +} from "@ctrlplane/job-dispatch"; +import { JobStatus } from "@ctrlplane/validators/jobs"; + +const deleteObjectsAssociatedWithResource = ( + db: Tx, + resource: SCHEMA.Resource, +) => + db + .delete(SCHEMA.resourceRelationship) + .where( + or( + eq(SCHEMA.resourceRelationship.fromIdentifier, resource.identifier), + eq(SCHEMA.resourceRelationship.toIdentifier, resource.identifier), + ), + ); + +const cancelJobs = async (db: Tx, resource: SCHEMA.Resource) => { + const jobs = await db + .select() + .from(SCHEMA.job) + .innerJoin(SCHEMA.releaseJob, eq(SCHEMA.releaseJob.jobId, SCHEMA.job.id)) + .innerJoin( + SCHEMA.release, + eq(SCHEMA.releaseJob.releaseId, SCHEMA.release.id), + ) + .innerJoin( + SCHEMA.releaseTarget, + eq(SCHEMA.release.releaseTargetId, SCHEMA.releaseTarget.id), + ) + .where( + and( + eq(SCHEMA.releaseTarget.resourceId, resource.id), + inArray(SCHEMA.job.status, [JobStatus.Pending, JobStatus.InProgress]), + ), + ); + + await Promise.all( + jobs.map((job) => + updateJob(db, job.job.id, { status: JobStatus.Cancelled }), + ), + ); +}; + +export const deleteResource = async (db: Tx, resourceId: string) => { + const where = eq(SCHEMA.resource.id, resourceId); + const resource = await db.query.resource.findFirst({ where }); + if (resource == null) throw new Error(`Resource not found: ${resourceId}`); + + const events = await getEventsForResourceDeleted(resource); + const eventPromises = events.map(handleEvent); + const deleteObjectsPromise = deleteObjectsAssociatedWithResource( + db, + resource, + ); + const cancelJobsPromise = cancelJobs(db, resource); + + await Promise.all([ + ...eventPromises, + deleteObjectsPromise, + cancelJobsPromise, + ]); + + return db.delete(SCHEMA.resource).where(where).returning().then(takeFirst); +}; diff --git a/packages/job-dispatch/src/resource/delete.ts b/packages/job-dispatch/src/resource/delete.ts deleted file mode 100644 index 32e555c7a..000000000 --- a/packages/job-dispatch/src/resource/delete.ts +++ /dev/null @@ -1,74 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; -import type { Resource } from "@ctrlplane/db/schema"; -import _ from "lodash"; - -import { and, eq, inArray, or } from "@ctrlplane/db"; -import * as SCHEMA from "@ctrlplane/db/schema"; -import { JobStatus } from "@ctrlplane/validators/jobs"; - -import { getEventsForResourceDeleted, handleEvent } from "../events/index.js"; -import { updateJob } from "../job-update.js"; - -const deleteObjectsAssociatedWithResource = (tx: Tx, resource: Resource) => - tx - .delete(SCHEMA.resourceRelationship) - .where( - or( - eq(SCHEMA.resourceRelationship.fromIdentifier, resource.identifier), - eq(SCHEMA.resourceRelationship.toIdentifier, resource.identifier), - ), - ); - -const cancelJobsForDeletedResources = (tx: Tx, resources: Resource[]) => - tx - .select() - .from(SCHEMA.releaseJobTrigger) - .innerJoin(SCHEMA.job, eq(SCHEMA.releaseJobTrigger.jobId, SCHEMA.job.id)) - .innerJoin( - SCHEMA.resource, - eq(SCHEMA.releaseJobTrigger.resourceId, SCHEMA.resource.id), - ) - .where( - and( - inArray( - SCHEMA.resource.id, - resources.map((r) => r.id), - ), - inArray(SCHEMA.job.status, [JobStatus.Pending, JobStatus.InProgress]), - ), - ) - .then((rjt) => rjt.map((rjt) => rjt.job.id)) - .then((jobIds) => - Promise.all( - jobIds.map((jobId) => - updateJob(tx, jobId, { status: JobStatus.Cancelled }), - ), - ), - ); - -/** - * Delete resources from the database. - * - * @param tx - The transaction to use. - * @param resourceIds - The ids of the resources to delete. - */ -export const deleteResources = async (tx: Tx, resources: Resource[]) => { - const eventsPromises = Promise.all( - resources.map(getEventsForResourceDeleted), - ); - const events = await eventsPromises.then((res) => res.flat()); - await Promise.all(events.map(handleEvent)); - const resourceIds = resources.map((r) => r.id); - const deleteAssociatedObjects = Promise.all( - resources.map((r) => deleteObjectsAssociatedWithResource(tx, r)), - ); - const cancelJobs = cancelJobsForDeletedResources(tx, resources); - await Promise.all([ - deleteAssociatedObjects, - cancelJobs, - tx - .update(SCHEMA.resource) - .set({ deletedAt: new Date() }) - .where(inArray(SCHEMA.resource.id, resourceIds)), - ]); -}; diff --git a/packages/job-dispatch/src/resource/dispatch-resource.ts b/packages/job-dispatch/src/resource/dispatch-resource.ts deleted file mode 100644 index 55bacf473..000000000 --- a/packages/job-dispatch/src/resource/dispatch-resource.ts +++ /dev/null @@ -1,242 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; -import type { ResourceCondition } from "@ctrlplane/validators/resources"; -import { isPresent } from "ts-is-present"; - -import { - and, - desc, - eq, - inArray, - isNotNull, - takeFirstOrNull, -} from "@ctrlplane/db"; -import { db } from "@ctrlplane/db/client"; -import * as SCHEMA from "@ctrlplane/db/schema"; -import { logger } from "@ctrlplane/logger"; -import { ComparisonOperator } from "@ctrlplane/validators/conditions"; -import { ResourceConditionType } from "@ctrlplane/validators/resources"; - -import { handleEvent } from "../events/index.js"; -import { dispatchReleaseJobTriggers } from "../job-dispatch.js"; -import { isPassingAllPolicies } from "../policy-checker.js"; -import { createJobApprovals } from "../policy-create.js"; -import { createReleaseJobTriggers } from "../release-job-trigger.js"; - -const log = logger.child({ label: "dispatch-resource" }); - -/** - * Gets an environment with its associated release channels, policy, and system information - * @param db - Database transaction - * @param envId - Environment ID to look up - * @returns Promise resolving to the environment with its relationships or null if not found - */ -const getEnvironmentWithVersionChannels = (db: Tx, envId: string) => - db.query.environment.findFirst({ - where: eq(SCHEMA.environment.id, envId), - with: { - policy: { - with: { - environmentPolicyDeploymentVersionChannels: { - with: { deploymentVersionChannel: true }, - }, - }, - }, - system: { with: { deployments: true } }, - }, - }); - -/** - * Dispatches jobs for newly added resources in an environment - * @param db - Database transaction - * @param resourceIds - IDs of the resources that were added - * @param envId - ID of the environment the resources were added to - */ -export async function dispatchJobsForAddedResources( - db: Tx, - resourceIds: string[], - envId: string, -): Promise { - if (resourceIds.length === 0) return; - log.info("Dispatching jobs for added resources", { resourceIds, envId }); - - const environment = await getEnvironmentWithVersionChannels(db, envId); - if (environment == null) { - log.warn("Environment not found", { envId }); - return; - } - - const { policy, system } = environment; - const { deployments } = system; - const { environmentPolicyDeploymentVersionChannels } = policy; - const deploymentsWithVersionSelector = deployments.map((deployment) => { - const policy = environmentPolicyDeploymentVersionChannels.find( - (prc) => prc.deploymentId === deployment.id, - ); - - const { versionSelector } = policy?.deploymentVersionChannel ?? {}; - return { ...deployment, versionSelector }; - }); - - log.debug("Fetching latest versions", { - deploymentCount: deployments.length, - }); - const versionPromises = deploymentsWithVersionSelector.map( - ({ id, versionSelector }) => - db - .select() - .from(SCHEMA.deploymentVersion) - .where( - and( - eq(SCHEMA.deploymentVersion.deploymentId, id), - SCHEMA.deploymentVersionMatchesCondition( - db, - versionSelector ?? undefined, - ), - ), - ) - .orderBy(desc(SCHEMA.deploymentVersion.createdAt)) - .limit(1) - .then(takeFirstOrNull), - ); - - const versions = await Promise.all(versionPromises).then((rows) => - rows.filter(isPresent), - ); - if (versions.length === 0) { - log.info("No versions found for deployments"); - return; - } - - log.debug("Creating release job triggers"); - const releaseJobTriggers = await createReleaseJobTriggers(db, "new_resource") - .resources(resourceIds) - .environments([envId]) - .versions(versions.map((v) => v.id)) - .then(createJobApprovals) - .insert(); - - if (releaseJobTriggers.length === 0) { - log.info("No job triggers created"); - return; - } - - log.debug("Dispatching release job triggers", { - count: releaseJobTriggers.length, - }); - await dispatchReleaseJobTriggers(db) - .filter(isPassingAllPolicies) - .releaseTriggers(releaseJobTriggers) - .dispatch(); - - log.info("Successfully dispatched jobs for added resources", { - resourceCount: resourceIds.length, - triggerCount: releaseJobTriggers.length, - }); -} - -/** - * Gets all deployments associated with an environment - * @param db - Database transaction - * @param envId - Environment ID to get deployments for - * @returns Promise resolving to array of deployments - */ -const getEnvironmentDeployments = (db: Tx, envId: string) => - db - .select() - .from(SCHEMA.deployment) - .innerJoin(SCHEMA.system, eq(SCHEMA.deployment.systemId, SCHEMA.system.id)) - .innerJoin( - SCHEMA.environment, - eq(SCHEMA.system.id, SCHEMA.environment.systemId), - ) - .where(eq(SCHEMA.environment.id, envId)) - .then((rows) => rows.map((r) => r.deployment)); - -/** - * Gets the not in system filter for a system - * @param systemId - System ID to get the not in system filter for - * @returns Promise resolving to the not in system filter or null if not found - */ -const getNotInSystemFilter = async ( - systemId: string, -): Promise => { - const hasFilter = isNotNull(SCHEMA.environment.resourceSelector); - const system = await db.query.system.findFirst({ - where: eq(SCHEMA.system.id, systemId), - with: { environments: { where: hasFilter } }, - }); - if (system == null) return null; - - const filters = system.environments - .map((e) => e.resourceSelector) - .filter(isPresent); - if (filters.length === 0) return null; - - return { - type: ResourceConditionType.Comparison, - operator: ComparisonOperator.Or, - not: true, - conditions: filters, - }; -}; - -/** - * Dispatches hook events for resources that were removed from an environment - * @param db - Database transaction - * @param resourceIds - IDs of the resources that were removed - * @param env - Environment the resources were removed from - */ -export const dispatchEventsForRemovedResources = async ( - db: Tx, - resourceIds: string[], - env: { id: string; systemId: string }, -): Promise => { - const { id: envId, systemId } = env; - log.info("Dispatching events for removed resources", { resourceIds, envId }); - - const deployments = await getEnvironmentDeployments(db, envId); - if (deployments.length === 0) { - log.info("No deployments found for environment"); - return; - } - - const notInSystemFilter = await getNotInSystemFilter(systemId); - if (notInSystemFilter == null) { - log.warn("No system found for environment", { envId }); - return; - } - - const matchesResources = inArray(SCHEMA.resource.id, resourceIds); - const isRemovedFromSystem = SCHEMA.resourceMatchesMetadata( - db, - notInSystemFilter, - ); - const resources = await db.query.resource.findMany({ - where: and(matchesResources, isRemovedFromSystem), - }); - - log.debug("Creating removal events", { - resourceCount: resources.length, - deploymentCount: deployments.length, - }); - const events = resources.flatMap((resource) => - deployments.map((deployment) => ({ - action: "deployment.resource.removed" as const, - payload: { deployment, resource }, - })), - ); - - log.debug("Handling removal events", { eventCount: events.length }); - const handleEventPromises = events.map(handleEvent); - const results = await Promise.allSettled(handleEventPromises); - - const failures = results.filter((r) => r.status === "rejected").length; - if (failures > 0) - log.warn("Some removal events failed", { failureCount: failures }); - - log.info("Finished dispatching removal events", { - total: events.length, - succeeded: events.length - failures, - failed: failures, - }); -}; diff --git a/packages/job-dispatch/src/resource/insert-resources.ts b/packages/job-dispatch/src/resource/find-existing-resources.ts similarity index 54% rename from packages/job-dispatch/src/resource/insert-resources.ts rename to packages/job-dispatch/src/resource/find-existing-resources.ts index 09d56c77c..0072ce243 100644 --- a/packages/job-dispatch/src/resource/insert-resources.ts +++ b/packages/job-dispatch/src/resource/find-existing-resources.ts @@ -2,7 +2,7 @@ import type { Tx } from "@ctrlplane/db"; import type { InsertResource, Resource } from "@ctrlplane/db/schema"; import _ from "lodash"; -import { and, buildConflictUpdateColumns, eq, isNull, or } from "@ctrlplane/db"; +import { and, eq, isNull, or } from "@ctrlplane/db"; import { resource } from "@ctrlplane/db/schema"; /** @@ -49,7 +49,7 @@ const getResourcesByWorkspaceIdAndIdentifier = ( * @param resourcesToInsert - Array of resources to be inserted * @returns Promise resolving to array of existing resources */ -const findExistingResources = async ( +export const findExistingResources = async ( tx: Tx, resourcesToInsert: InsertResource[], ): Promise => { @@ -68,50 +68,3 @@ const findExistingResources = async ( const results = await Promise.all(promises); return results.flat(); }; - -/** - * Inserts or updates resources in the database. Note that this function only - * handles the core resource fields - it does not insert/update associated - * metadata or variables. Those must be handled separately. - * - * @param tx - Database transaction - * @param resourcesToInsert - Array of resources to insert/update. Can include - * metadata and variables but these will not be - * persisted by this function. - * @returns Promise resolving to array of inserted/updated resources, with any - * metadata/variables from the input merged onto the DB records - */ -export const insertResources = async ( - tx: Tx, - resourcesToInsert: InsertResource[], -) => { - const existingResources = await findExistingResources(tx, resourcesToInsert); - const deleted = existingResources.filter( - (existing) => - !resourcesToInsert.some( - (inserted) => - inserted.identifier === existing.identifier && - inserted.workspaceId === existing.workspaceId, - ), - ); - const insertedResources = await tx - .insert(resource) - .values(resourcesToInsert) - .onConflictDoUpdate({ - target: [resource.identifier, resource.workspaceId], - set: { - ...buildConflictUpdateColumns(resource, [ - "name", - "version", - "kind", - "config", - "providerId", - ]), - updatedAt: new Date(), - deletedAt: null, - }, - }) - .returning(); - - return { all: insertedResources, deleted }; -}; diff --git a/packages/job-dispatch/src/resource/index.ts b/packages/job-dispatch/src/resource/index.ts deleted file mode 100644 index 40a2163e7..000000000 --- a/packages/job-dispatch/src/resource/index.ts +++ /dev/null @@ -1,3 +0,0 @@ -export * from "./upsert.js"; -export * from "./delete.js"; -export * from "./dispatch-resource.js"; diff --git a/packages/job-dispatch/src/resource/insert-resource-metadata.ts b/packages/job-dispatch/src/resource/insert-resource-metadata.ts deleted file mode 100644 index 29a1d4735..000000000 --- a/packages/job-dispatch/src/resource/insert-resource-metadata.ts +++ /dev/null @@ -1,53 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; -import type { Resource } from "@ctrlplane/db/schema"; -import _ from "lodash"; - -import { - and, - buildConflictUpdateColumns, - eq, - notInArray, - or, -} from "@ctrlplane/db"; -import * as schema from "@ctrlplane/db/schema"; - -export type ResourceWithMetadata = Resource & { - metadata?: Record; -}; - -export const insertResourceMetadata = async ( - tx: Tx, - resources: ResourceWithMetadata[], -) => { - const resourceMetadataValues = resources.flatMap((resource) => { - const { id, metadata = {} } = resource; - return Object.entries(metadata).map(([key, value]) => ({ - resourceId: id, - key, - value, - })); - }); - if (resourceMetadataValues.length === 0) return; - - const deletedKeysChecks = _.chain(resourceMetadataValues) - .groupBy((r) => r.resourceId) - .map((groupedMeta) => { - const resourceId = groupedMeta[0]!.resourceId; - const keys = groupedMeta.map((m) => m.key); - return and( - eq(schema.resourceMetadata.resourceId, resourceId), - notInArray(schema.resourceMetadata.key, keys), - )!; - }) - .value(); - - await tx.delete(schema.resourceMetadata).where(or(...deletedKeysChecks)); - - return tx - .insert(schema.resourceMetadata) - .values(resourceMetadataValues) - .onConflictDoUpdate({ - target: [schema.resourceMetadata.key, schema.resourceMetadata.resourceId], - set: buildConflictUpdateColumns(schema.resourceMetadata, ["value"]), - }); -}; diff --git a/packages/job-dispatch/src/resource/insert-resource-variables.ts b/packages/job-dispatch/src/resource/insert-resource-variables.ts deleted file mode 100644 index 8b15b50f9..000000000 --- a/packages/job-dispatch/src/resource/insert-resource-variables.ts +++ /dev/null @@ -1,76 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; -import type { Resource } from "@ctrlplane/db/schema"; -import _ from "lodash"; - -import { buildConflictUpdateColumns, inArray } from "@ctrlplane/db"; -import * as schema from "@ctrlplane/db/schema"; -import { variablesAES256 } from "@ctrlplane/secrets"; - -export type ResourceWithVariables = Resource & { - variables?: Array<{ key: string; value: any; sensitive: boolean }>; -}; - -export const insertResourceVariables = async ( - tx: Tx, - resources: ResourceWithVariables[], -): Promise> => { - const resourceIds = resources.map(({ id }) => id); - const existingVariables = await tx - .select() - .from(schema.resourceVariable) - .where(inArray(schema.resourceVariable.resourceId, resourceIds)); - - const resourceVariablesValues = resources.flatMap(({ id, variables = [] }) => - variables.map(({ key, value, sensitive }) => ({ - resourceId: id, - key, - value: sensitive - ? variablesAES256().encrypt(JSON.stringify(value)) - : value, - sensitive, - })), - ); - - if (resourceVariablesValues.length === 0) return new Set(); - - const updatedVariables = await tx - .insert(schema.resourceVariable) - .values(resourceVariablesValues) - .onConflictDoUpdate({ - target: [schema.resourceVariable.key, schema.resourceVariable.resourceId], - set: buildConflictUpdateColumns(schema.resourceVariable, [ - "value", - "sensitive", - ]), - }) - .returning(); - - const created = _.differenceWith( - updatedVariables, - existingVariables, - (a, b) => a.resourceId === b.resourceId && a.key === b.key, - ); - - const deleted = _.differenceWith( - existingVariables, - updatedVariables, - (a, b) => a.resourceId === b.resourceId && a.key === b.key, - ); - - const updated = _.intersectionWith( - updatedVariables, - existingVariables, - (a, b) => - a.resourceId === b.resourceId && - a.key === b.key && - (a.value !== b.value || a.sensitive !== b.sensitive), - ); - - const updatedResourceIds = [ - ...created.map((r) => r.resourceId), - ...deleted.map((r) => r.resourceId), - ...updated.map((r) => r.resourceId), - ]; - - return new Set(updatedResourceIds); -}; diff --git a/packages/job-dispatch/src/resource/upsert.ts b/packages/job-dispatch/src/resource/upsert.ts index aa73f3555..deb3dbcf1 100644 --- a/packages/job-dispatch/src/resource/upsert.ts +++ b/packages/job-dispatch/src/resource/upsert.ts @@ -1,18 +1,12 @@ import type { Tx } from "@ctrlplane/db"; import type { InsertResource } from "@ctrlplane/db/schema"; -import _ from "lodash"; +import { Channel, getQueue } from "@ctrlplane/events"; import { logger } from "@ctrlplane/logger"; -import { deleteResources } from "./delete.js"; -import { - dispatchEventsForRemovedResources, - dispatchJobsForAddedResources, -} from "./dispatch-resource.js"; -import { insertResourceMetadata } from "./insert-resource-metadata.js"; -import { insertResourceVariables } from "./insert-resource-variables.js"; -import { insertResources } from "./insert-resources.js"; -import { getEnvironmentsByResourceWithIdentifiers } from "./utils.js"; +import { dbUpsertResource } from "./db-upsert-resource.js"; +import { deleteResource } from "./delete-resource.js"; +import { findExistingResources } from "./find-existing-resources.js"; const log = logger.child({ label: "upsert-resources" }); @@ -35,120 +29,37 @@ export const upsertResources = async ( if (!resourcesToInsert.every((r) => r.workspaceId === workspaceId)) throw new Error("All resources must belong to the same workspace"); - try { - const resourceIdentifiers = resourcesToInsert.map((r) => r.identifier); - log.debug("Getting environments before insert", { resourceIdentifiers }); - const envsBeforeInsert = await getEnvironmentsByResourceWithIdentifiers( - tx, - workspaceId, - resourceIdentifiers, - ); - - log.debug("Envs before insert", { - envs: envsBeforeInsert.map((e) => ({ - id: e.id, - resources: e.resources.map((r) => r.identifier), - })), - }); - - log.info("Inserting resources"); - const resources = await insertResources(tx, resourcesToInsert); - const resourcesWithId = resources.all.map((r) => ({ - ...r, - ...resourcesToInsert.find( - (ri) => - ri.identifier === r.identifier && ri.workspaceId === r.workspaceId, - ), - })); - - log.info("Inserting resource metadata and variables"); - const [, updatedVariableResourceIds] = await Promise.all([ - insertResourceMetadata(tx, resourcesWithId), - insertResourceVariables(tx, resourcesWithId), - ]); - - log.info("Getting environments after insert"); - const envsAfterInsert = await getEnvironmentsByResourceWithIdentifiers( - tx, - workspaceId, - resourceIdentifiers, - ); - - log.debug("Envs after insert", { - envs: envsAfterInsert.map((e) => ({ - id: e.id, - resources: e.resources.map((r) => r.identifier), - })), - }); - const envVariableChangePromises = envsAfterInsert.map((env) => - dispatchJobsForAddedResources( - tx, - env.resources - .filter((r) => updatedVariableResourceIds.has(r.id)) - .map((r) => r.id), - env.id, + const existingResources = await findExistingResources(tx, resourcesToInsert); + const resourcesToDelete = existingResources.filter( + (existing) => + !resourcesToInsert.some( + (inserted) => + inserted.identifier === existing.identifier && + inserted.workspaceId === existing.workspaceId, ), - ); - await Promise.all(envVariableChangePromises); - const changedEnvs = envsAfterInsert.map((env) => { - const beforeEnv = envsBeforeInsert.find((e) => e.id === env.id); - const beforeResources = beforeEnv?.resources ?? []; - const afterResources = env.resources; - const removedResources = beforeResources.filter( - (br) => !afterResources.some((ar) => ar.id === br.id), - ); - const addedResources = afterResources.filter( - (ar) => !beforeResources.some((br) => br.id === ar.id), - ); - return { ...env, removedResources, addedResources }; - }); - - const deletedResourceIds = new Set(resources.deleted.map((r) => r.id)); - if (resources.deleted.length > 0) { - log.info("Deleting resources", { count: resources.deleted.length }); - await deleteResources(tx, resources.deleted).catch((err) => { - log.error("Error deleting resources", { error: err }); - throw err; - }); - } - - for (const env of changedEnvs) { - if (env.addedResources.length > 0) { - log.info("Dispatching jobs for added resources", { - envId: env.id, - count: env.addedResources.length, - }); - await dispatchJobsForAddedResources( - tx, - env.addedResources - .map((r) => r.id) - .filter((r) => !updatedVariableResourceIds.has(r)), - env.id, - ); - } - - if (env.removedResources.length > 0) { - const removedIds = env.removedResources - .map((r) => r.id) - .filter((id) => !deletedResourceIds.has(id)); - - if (removedIds.length > 0) { - log.info("Dispatching hook events for removed resources", { - envId: env.id, - count: removedIds.length, - }); - await dispatchEventsForRemovedResources(tx, removedIds, env); - } - } - } - - log.info("Resource upsert completed successfully", { - added: resources.all.length, - deleted: resources.deleted.length, - }); - return resources; - } catch (err) { - log.error("Error upserting resources", { error: err }); - throw err; - } + ); + + const resources = await Promise.all( + resourcesToInsert.map((r) => dbUpsertResource(tx, r)), + ); + + const addToUpsertQueuePromise = getQueue( + Channel.ProcessUpsertedResource, + ).addBulk( + resources.map((r) => ({ + name: r.id, + data: r, + })), + ); + + const deleteResourcesPromise = Promise.all( + resourcesToDelete.map((r) => deleteResource(tx, r.id)), + ); + + const [, deletedResources] = await Promise.all([ + addToUpsertQueuePromise, + deleteResourcesPromise, + ]); + + return { all: resources, deleted: deletedResources }; }; diff --git a/packages/job-dispatch/src/resource/utils.ts b/packages/job-dispatch/src/resource/utils.ts deleted file mode 100644 index 7b814ba84..000000000 --- a/packages/job-dispatch/src/resource/utils.ts +++ /dev/null @@ -1,107 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; - -import { and, eq, inArray, isNotNull, isNull, or } from "@ctrlplane/db"; -import * as schema from "@ctrlplane/db/schema"; - -/** - * Gets resources for a specific provider - * @param tx - Database transaction - * @param providerId - ID of the provider to get resources for - * @param options - Options object - * @param options.deleted - If true, returns deleted resources. If false, returns non-deleted resources - * @returns Promise resolving to array of resources - */ -export const getResourcesByProvider = ( - tx: Tx, - providerId: string, - options: { deleted: boolean } = { deleted: false }, -) => - tx - .select() - .from(schema.resource) - .where( - and( - eq(schema.resource.providerId, providerId), - options.deleted - ? isNotNull(schema.resource.deletedAt) - : isNull(schema.resource.deletedAt), - ), - ); - -/** - * Gets resources matching the provided workspace IDs and identifiers - * Can filter for either deleted or non-deleted resources - * - * @param tx - Database transaction - * @param resources - Array of objects containing workspaceId and identifier to look up - * @param options - Options object - * @param options.deleted - If true, returns deleted resources. If false, returns non-deleted resources - * @returns Promise resolving to array of matching resources - */ -export const getResourcesByWorkspaceIdAndIdentifier = ( - tx: Tx, - resources: { workspaceId: string; identifier: string }[], - options: { deleted: boolean } = { deleted: false }, -) => - tx - .select() - .from(schema.resource) - .where( - or( - ...resources.map((r) => - and( - eq(schema.resource.workspaceId, r.workspaceId), - eq(schema.resource.identifier, r.identifier), - options.deleted - ? isNotNull(schema.resource.deletedAt) - : isNull(schema.resource.deletedAt), - ), - ), - ), - ); - -/** - * Groups provided resources by workspace environments matching them - * - * @param tx - Database transaction - * @param workspaceId - ID of the workspace to get environments for - * @param resourceIdentifiers - Array of resource identifiers to look up - * @returns Promise resolving to array of environments - */ -export const getEnvironmentsByResourceWithIdentifiers = ( - tx: Tx, - workspaceId: string, - resourceIdentifiers: string[], -) => - tx - .select({ - id: schema.environment.id, - resourceFilter: schema.environment.resourceSelector, - systemId: schema.environment.systemId, - }) - .from(schema.environment) - .innerJoin(schema.system, eq(schema.environment.systemId, schema.system.id)) - .where( - and( - eq(schema.system.workspaceId, workspaceId), - isNotNull(schema.environment.resourceSelector), - ), - ) - .then((envs) => - Promise.all( - envs.map(async (env) => ({ - ...env, - resources: await tx - .select() - .from(schema.resource) - .where( - and( - inArray(schema.resource.identifier, resourceIdentifiers), - eq(schema.resource.workspaceId, workspaceId), - schema.resourceMatchesMetadata(tx, env.resourceFilter), - isNull(schema.resource.deletedAt), - ), - ), - })), - ), - );