diff --git a/apps/event-worker/src/utils.ts b/apps/event-worker/src/utils/omit-null-undefined.ts similarity index 100% rename from apps/event-worker/src/utils.ts rename to apps/event-worker/src/utils/omit-null-undefined.ts diff --git a/apps/event-worker/src/utils/upsert-release-targets.ts b/apps/event-worker/src/utils/upsert-release-targets.ts new file mode 100644 index 000000000..8fad01382 --- /dev/null +++ b/apps/event-worker/src/utils/upsert-release-targets.ts @@ -0,0 +1,63 @@ +import type { Tx } from "@ctrlplane/db"; +import type { ReleaseTargetIdentifier } from "@ctrlplane/rule-engine"; +import { isPresent } from "ts-is-present"; + +import { and, eq } from "@ctrlplane/db"; +import * as SCHEMA from "@ctrlplane/db/schema"; + +const getReleaseTargetInsertsForSystem = async ( + db: Tx, + resourceId: string, + system: SCHEMA.System & { + environments: SCHEMA.Environment[]; + deployments: SCHEMA.Deployment[]; + }, +): Promise => { + const envs = system.environments.filter((e) => isPresent(e.resourceSelector)); + const { deployments } = system; + + const maybeTargetsPromises = envs.flatMap((env) => + deployments.map(async (dep) => { + const resource = await db.query.resource.findFirst({ + where: and( + eq(SCHEMA.resource.id, resourceId), + SCHEMA.resourceMatchesMetadata(db, env.resourceSelector), + SCHEMA.resourceMatchesMetadata(db, dep.resourceSelector), + ), + }); + + if (resource == null) return null; + return { environmentId: env.id, deploymentId: dep.id }; + }), + ); + + const targets = await Promise.all(maybeTargetsPromises).then((results) => + results.filter(isPresent), + ); + + return targets.map((t) => ({ ...t, resourceId })); +}; + +export const upsertReleaseTargets = async ( + db: Tx, + resource: SCHEMA.Resource, +) => { + const workspace = await db.query.workspace.findFirst({ + where: eq(SCHEMA.workspace.id, resource.workspaceId), + with: { systems: { with: { environments: true, deployments: true } } }, + }); + if (workspace == null) throw new Error("Workspace not found"); + + const releaseTargetInserts = await Promise.all( + workspace.systems.map((system) => + getReleaseTargetInsertsForSystem(db, resource.id, system), + ), + ).then((results) => results.flat()); + + if (releaseTargetInserts.length === 0) return []; + return db + .insert(SCHEMA.releaseTarget) + .values(releaseTargetInserts) + .onConflictDoNothing() + .returning(); +}; diff --git a/apps/event-worker/src/workers/index.ts b/apps/event-worker/src/workers/index.ts index c9d6423cb..9c79f01dc 100644 --- a/apps/event-worker/src/workers/index.ts +++ b/apps/event-worker/src/workers/index.ts @@ -7,10 +7,12 @@ import { evaluateReleaseTarget } from "./evaluate-release-target.js"; import { dispatchJobWorker } from "./job-dispatch/index.js"; import { newDeploymentVersionWorker } from "./new-deployment-version.js"; import { newDeploymentWorker } from "./new-deployment.js"; +import { newResourceWorker } from "./new-resource.js"; import { resourceScanWorker } from "./resource-scan/index.js"; import { updateDeploymentVariableWorker } from "./update-deployment-variable.js"; import { updateEnvironmentWorker } from "./update-environment.js"; import { updateResourceVariableWorker } from "./update-resource-variable.js"; +import { updatedResourceWorker } from "./updated-resources/index.js"; type Workers = { [K in T]: Worker | null; @@ -26,4 +28,6 @@ export const workers: Workers = { [Channel.EvaluateReleaseTarget]: evaluateReleaseTarget, [Channel.DispatchJob]: dispatchJobWorker, [Channel.ResourceScan]: resourceScanWorker, + [Channel.UpdatedResource]: updatedResourceWorker, + [Channel.NewResource]: newResourceWorker, }; diff --git a/apps/event-worker/src/workers/new-resource.ts b/apps/event-worker/src/workers/new-resource.ts new file mode 100644 index 000000000..b061b0bee --- /dev/null +++ b/apps/event-worker/src/workers/new-resource.ts @@ -0,0 +1,19 @@ +import { db } from "@ctrlplane/db/client"; +import { Channel, createWorker, getQueue } from "@ctrlplane/events"; + +import { upsertReleaseTargets } from "../utils/upsert-release-targets.js"; + +const queue = getQueue(Channel.EvaluateReleaseTarget); +export const newResourceWorker = createWorker( + Channel.NewResource, + ({ data: resource }) => + db.transaction(async (tx) => { + const rts = await upsertReleaseTargets(tx, resource); + await queue.addBulk( + rts.map((rt) => ({ + name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`, + data: rt, + })), + ); + }), +); diff --git a/apps/event-worker/src/workers/resource-scan/aws/eks.ts b/apps/event-worker/src/workers/resource-scan/aws/eks.ts index 5ceca9588..dd9f96693 100644 --- a/apps/event-worker/src/workers/resource-scan/aws/eks.ts +++ b/apps/event-worker/src/workers/resource-scan/aws/eks.ts @@ -16,7 +16,7 @@ import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; import { cloudRegionsGeo } from "@ctrlplane/validators/resources"; import type { AwsCredentials } from "./aws.js"; -import { omitNullUndefined } from "../../../utils.js"; +import { omitNullUndefined } from "../../../utils/omit-null-undefined.js"; import { assumeRole, assumeWorkspaceRole } from "./aws.js"; const log = logger.child({ label: "resource-scan/eks" }); diff --git a/apps/event-worker/src/workers/resource-scan/aws/vpc.ts b/apps/event-worker/src/workers/resource-scan/aws/vpc.ts index 7ad372a20..7d7f6435c 100644 --- a/apps/event-worker/src/workers/resource-scan/aws/vpc.ts +++ b/apps/event-worker/src/workers/resource-scan/aws/vpc.ts @@ -14,7 +14,7 @@ import { logger } from "@ctrlplane/logger"; import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; import type { AwsCredentials } from "./aws.js"; -import { omitNullUndefined } from "../../../utils.js"; +import { omitNullUndefined } from "../../../utils/omit-null-undefined.js"; import { assumeRole, assumeWorkspaceRole } from "./aws.js"; const log = logger.child({ label: "resource-scan/aws/vpc" }); diff --git a/apps/event-worker/src/workers/resource-scan/azure/cluster-to-resource.ts b/apps/event-worker/src/workers/resource-scan/azure/cluster-to-resource.ts index 27c5eab57..55b75ed0a 100644 --- a/apps/event-worker/src/workers/resource-scan/azure/cluster-to-resource.ts +++ b/apps/event-worker/src/workers/resource-scan/azure/cluster-to-resource.ts @@ -11,7 +11,7 @@ import { logger } from "@ctrlplane/logger"; import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; import { cloudRegionsGeo } from "@ctrlplane/validators/resources"; -import { omitNullUndefined } from "../../../utils.js"; +import { omitNullUndefined } from "../../../utils/omit-null-undefined.js"; const log = logger.child({ module: "resource-scan/azure" }); diff --git a/apps/event-worker/src/workers/resource-scan/google/cluster-to-resource.ts b/apps/event-worker/src/workers/resource-scan/google/cluster-to-resource.ts index 980d8fb6f..1066b0f4c 100644 --- a/apps/event-worker/src/workers/resource-scan/google/cluster-to-resource.ts +++ b/apps/event-worker/src/workers/resource-scan/google/cluster-to-resource.ts @@ -5,7 +5,7 @@ import { SemVer } from "semver"; import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; import { cloudRegionsGeo } from "@ctrlplane/validators/resources"; -import { omitNullUndefined } from "../../../utils.js"; +import { omitNullUndefined } from "../../../utils/omit-null-undefined.js"; export const clusterToResource = ( workspaceId: string, diff --git a/apps/event-worker/src/workers/resource-scan/google/vm.ts b/apps/event-worker/src/workers/resource-scan/google/vm.ts index 3df339a8c..5fd33d086 100644 --- a/apps/event-worker/src/workers/resource-scan/google/vm.ts +++ b/apps/event-worker/src/workers/resource-scan/google/vm.ts @@ -7,7 +7,7 @@ import _ from "lodash"; import { logger } from "@ctrlplane/logger"; import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; -import { omitNullUndefined } from "../../../utils.js"; +import { omitNullUndefined } from "../../../utils/omit-null-undefined.js"; import { getGoogleClient } from "./client.js"; const log = logger.child({ module: "resource-scan/gke/vm" }); diff --git a/apps/event-worker/src/workers/resource-scan/google/vpc.ts b/apps/event-worker/src/workers/resource-scan/google/vpc.ts index 8790e3040..056433ad6 100644 --- a/apps/event-worker/src/workers/resource-scan/google/vpc.ts +++ b/apps/event-worker/src/workers/resource-scan/google/vpc.ts @@ -15,7 +15,7 @@ import { isPresent } from "ts-is-present"; import { logger } from "@ctrlplane/logger"; import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; -import { omitNullUndefined } from "../../../utils.js"; +import { omitNullUndefined } from "../../../utils/omit-null-undefined.js"; import { getGoogleClient } from "./client.js"; const log = logger.child({ label: "resource-scan/google/vpc" }); diff --git a/apps/event-worker/src/workers/resource-scan/index.ts b/apps/event-worker/src/workers/resource-scan/index.ts index fe53a59b4..15b732bf6 100644 --- a/apps/event-worker/src/workers/resource-scan/index.ts +++ b/apps/event-worker/src/workers/resource-scan/index.ts @@ -10,7 +10,7 @@ import { workspace, } from "@ctrlplane/db/schema"; import { Channel, createWorker, getQueue } from "@ctrlplane/events"; -import { upsertResources } from "@ctrlplane/job-dispatch"; +import { handleResourceProviderScan } from "@ctrlplane/job-dispatch"; import { logger } from "@ctrlplane/logger"; import { getEksResources } from "./aws/eks.js"; @@ -96,7 +96,7 @@ export const resourceScanWorker = createWorker( log.info( `Upserting ${resources.length} resources for provider ${rp.resource_provider.id}`, ); - await upsertResources(db, resources); + await handleResourceProviderScan(db, resources); } catch (error: any) { log.error( `Error scanning/upserting resources for provider ${rp.resource_provider.id}: ${error.message}`, diff --git a/apps/event-worker/src/workers/updated-resources/dispatch-exit-hooks.ts b/apps/event-worker/src/workers/updated-resources/dispatch-exit-hooks.ts new file mode 100644 index 000000000..fce914644 --- /dev/null +++ b/apps/event-worker/src/workers/updated-resources/dispatch-exit-hooks.ts @@ -0,0 +1,90 @@ +import type { Tx } from "@ctrlplane/db"; +import type { ReleaseTargetIdentifier } from "@ctrlplane/rule-engine"; + +import { and, eq, inArray, isNotNull, notInArray, or } from "@ctrlplane/db"; +import * as SCHEMA from "@ctrlplane/db/schema"; +import { handleEvent } from "@ctrlplane/job-dispatch"; +import { HookAction } from "@ctrlplane/validators/events"; + +const getSystemsForUnmatchedEnvs = async ( + db: Tx, + previousReleaseTargets: ReleaseTargetIdentifier[], + newReleaseTargets: ReleaseTargetIdentifier[], +) => { + const previousEnvIds = new Set( + previousReleaseTargets.map((rt) => rt.environmentId), + ); + const newEnvIds = new Set( + newReleaseTargets.map((rt) => rt.environmentId), + ); + const unmatchedEnvs = Array.from(previousEnvIds).filter( + (envId) => !newEnvIds.has(envId), + ); + + const envs = await db.query.environment.findMany({ + where: inArray(SCHEMA.environment.id, unmatchedEnvs), + }); + + return db.query.system.findMany({ + where: inArray( + SCHEMA.system.id, + envs.map((e) => e.systemId), + ), + with: { + deployments: true, + environments: { + where: and( + isNotNull(SCHEMA.environment.resourceSelector), + notInArray(SCHEMA.environment.id, unmatchedEnvs), + ), + }, + }, + }); +}; + +const dispatchExitHooksIfExitedSystem = async ( + db: Tx, + resource: SCHEMA.Resource, + system: { + deployments: SCHEMA.Deployment[]; + environments: SCHEMA.Environment[]; + }, +) => { + const { deployments, environments } = system; + const matchedResource = await db.query.resource.findFirst({ + where: and( + eq(SCHEMA.resource.id, resource.id), + or( + ...environments.map((e) => + SCHEMA.resourceMatchesMetadata(db, e.resourceSelector), + ), + ), + ), + }); + if (matchedResource == null) return; + + const events = deployments.map((deployment) => ({ + action: HookAction.DeploymentResourceRemoved, + payload: { deployment, resource }, + })); + + const handleEventPromises = events.map(handleEvent); + await Promise.allSettled(handleEventPromises); +}; + +export const dispatchExitHooks = async ( + db: Tx, + resource: SCHEMA.Resource, + currentReleaseTargets: ReleaseTargetIdentifier[], + newReleaseTargets: ReleaseTargetIdentifier[], +) => { + const systems = await getSystemsForUnmatchedEnvs( + db, + currentReleaseTargets, + newReleaseTargets, + ); + const dispatchExitHooksPromises = systems.map((system) => + dispatchExitHooksIfExitedSystem(db, resource, system), + ); + await Promise.allSettled(dispatchExitHooksPromises); +}; diff --git a/apps/event-worker/src/workers/updated-resources/index.ts b/apps/event-worker/src/workers/updated-resources/index.ts new file mode 100644 index 000000000..9aa698db0 --- /dev/null +++ b/apps/event-worker/src/workers/updated-resources/index.ts @@ -0,0 +1,50 @@ +import { eq, inArray } from "@ctrlplane/db"; +import { db } from "@ctrlplane/db/client"; +import * as SCHEMA from "@ctrlplane/db/schema"; +import { Channel, createWorker, getQueue } from "@ctrlplane/events"; + +import { upsertReleaseTargets } from "../../utils/upsert-release-targets.js"; +import { dispatchExitHooks } from "./dispatch-exit-hooks.js"; + +export const updatedResourceWorker = createWorker( + Channel.UpdatedResource, + async ({ data: resource }) => + db.transaction(async (tx) => { + const currentReleaseTargets = await tx.query.releaseTarget.findMany({ + where: eq(SCHEMA.releaseTarget.resourceId, resource.id), + }); + + const newReleaseTargets = await upsertReleaseTargets(tx, resource); + const releaseTargetsToDelete = currentReleaseTargets.filter( + (rt) => !newReleaseTargets.some((nrt) => nrt.id === rt.id), + ); + + await tx.delete(SCHEMA.releaseTarget).where( + inArray( + SCHEMA.releaseTarget.id, + releaseTargetsToDelete.map((rt) => rt.id), + ), + ); + + const dispatchExitHooksPromise = dispatchExitHooks( + tx, + resource, + currentReleaseTargets, + newReleaseTargets, + ); + + const addToEvaluateQueuePromise = getQueue( + Channel.EvaluateReleaseTarget, + ).addBulk( + newReleaseTargets.map((rt) => ({ + name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`, + data: rt, + })), + ); + + await Promise.allSettled([ + dispatchExitHooksPromise, + addToEvaluateQueuePromise, + ]); + }), +); diff --git a/apps/pty-proxy/package.json b/apps/pty-proxy/package.json index 547b6d918..2adaf8206 100644 --- a/apps/pty-proxy/package.json +++ b/apps/pty-proxy/package.json @@ -13,6 +13,7 @@ "dependencies": { "@ctrlplane/auth": "workspace:*", "@ctrlplane/db": "workspace:*", + "@ctrlplane/events": "workspace:*", "@ctrlplane/job-dispatch": "workspace:*", "@ctrlplane/logger": "workspace:*", "@ctrlplane/validators": "workspace:*", diff --git a/apps/pty-proxy/src/controller/agent-socket.ts b/apps/pty-proxy/src/controller/agent-socket.ts index 69701cdb5..b7066ee65 100644 --- a/apps/pty-proxy/src/controller/agent-socket.ts +++ b/apps/pty-proxy/src/controller/agent-socket.ts @@ -9,10 +9,10 @@ import type WebSocket from "ws"; import type { MessageEvent } from "ws"; import { can, getUser } from "@ctrlplane/auth/utils"; -import { eq } from "@ctrlplane/db"; +import { eq, upsertResources } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; -import { upsertResources } from "@ctrlplane/job-dispatch"; +import { Channel, getQueue } from "@ctrlplane/events"; import { logger } from "@ctrlplane/logger"; import { Permission } from "@ctrlplane/validators/auth"; import { agentConnect, agentHeartbeat } from "@ctrlplane/validators/session"; @@ -125,7 +125,7 @@ export class AgentSocket { "name" | "version" | "kind" | "identifier" | "workspaceId" >, ) { - const { all } = await upsertResources(db, [ + const all = await upsertResources(db, [ { ...resource, name: this.name, @@ -138,6 +138,7 @@ export class AgentSocket { ]); const res = all.at(0); if (res == null) throw new Error("Failed to create resource"); + await getQueue(Channel.UpdatedResource).add(res.id, res); this.resource = res; agents.set(res.id, { lastSync: new Date(), agent: this }); } diff --git a/apps/webservice/src/app/api/v1/resource-providers/[providerId]/set/route.ts b/apps/webservice/src/app/api/v1/resource-providers/[providerId]/set/route.ts index ef2d68bfb..26167be01 100644 --- a/apps/webservice/src/app/api/v1/resource-providers/[providerId]/set/route.ts +++ b/apps/webservice/src/app/api/v1/resource-providers/[providerId]/set/route.ts @@ -9,7 +9,7 @@ import { resourceProvider, workspace, } from "@ctrlplane/db/schema"; -import { upsertResources } from "@ctrlplane/job-dispatch"; +import { handleResourceProviderScan } from "@ctrlplane/job-dispatch"; import { Permission } from "@ctrlplane/validators/auth"; import { authn, authz } from "~/app/api/v1/auth"; @@ -77,7 +77,7 @@ export const PATCH = request() workspaceId: provider.workspaceId, })); - const resources = await upsertResources( + const resources = await handleResourceProviderScan( db, resourcesToInsert.map((r) => ({ ...r, diff --git a/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts b/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts index d485cb343..599553ab6 100644 --- a/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts +++ b/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts @@ -2,9 +2,10 @@ import { NextResponse } from "next/server"; import _ from "lodash"; import { z } from "zod"; -import { and, eq, isNull } from "@ctrlplane/db"; +import { and, eq, isNull, upsertResources } from "@ctrlplane/db"; import * as schema from "@ctrlplane/db/schema"; -import { deleteResources, upsertResources } from "@ctrlplane/job-dispatch"; +import { Channel, getQueue } from "@ctrlplane/events"; +import { deleteResources } from "@ctrlplane/job-dispatch"; import { variablesAES256 } from "@ctrlplane/secrets"; import { Permission } from "@ctrlplane/validators/auth"; @@ -93,9 +94,10 @@ export const PATCH = request() { status: 404 }, ); - const { all } = await upsertResources(db, [_.merge(resource, body)]); + const all = await upsertResources(db, [_.merge(resource, body)]); const res = all.at(0); if (res == null) throw new Error("Failed to update resource"); + await getQueue(Channel.UpdatedResource).add(res.id, res); return NextResponse.json(res); }); diff --git a/apps/webservice/src/app/api/v1/resources/route.ts b/apps/webservice/src/app/api/v1/resources/route.ts index 7faab0bc4..520934ad3 100644 --- a/apps/webservice/src/app/api/v1/resources/route.ts +++ b/apps/webservice/src/app/api/v1/resources/route.ts @@ -2,9 +2,11 @@ import type * as schema from "@ctrlplane/db/schema"; import { NextResponse } from "next/server"; import { z } from "zod"; +import { upsertResources } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import { createResource } from "@ctrlplane/db/schema"; -import { upsertResources } from "@ctrlplane/job-dispatch"; +import { Channel, getQueue } from "@ctrlplane/events"; +import { groupResourcesByHook } from "@ctrlplane/job-dispatch"; import { Permission } from "@ctrlplane/validators/auth"; import { authn, authz } from "../auth"; @@ -55,14 +57,30 @@ export const POST = request() { status: 400 }, ); - const resources = await upsertResources( + // since this endpoint is not scoped to a provider, we will ignore deleted resources + // as someone may be calling this endpoint to do a pure upsert + const { workspaceId } = ctx.body; + const { toInsert, toUpdate } = await groupResourcesByHook( db, - ctx.body.resources.map((t) => ({ - ...t, - workspaceId: ctx.body.workspaceId, - })), + ctx.body.resources.map((r) => ({ ...r, workspaceId })), ); - return NextResponse.json({ count: resources.all.length }); + const [insertedResources, updatedResources] = await Promise.all([ + upsertResources(db, toInsert), + upsertResources(db, toUpdate), + ]); + const insertJobs = insertedResources.map((r) => ({ + name: r.id, + data: r, + })); + const updateJobs = updatedResources.map((r) => ({ name: r.id, data: r })); + + await Promise.all([ + getQueue(Channel.NewResource).addBulk(insertJobs), + getQueue(Channel.UpdatedResource).addBulk(updateJobs), + ]); + + const count = insertedResources.length + updatedResources.length; + return NextResponse.json({ count }); }, ); diff --git a/packages/db/Dockerfile b/packages/db/Dockerfile index 730717be2..2247e5bf9 100644 --- a/packages/db/Dockerfile +++ b/packages/db/Dockerfile @@ -24,6 +24,7 @@ COPY tooling/eslint/package.json ./tooling/eslint/package.json COPY tooling/typescript/package.json ./tooling/typescript/package.json COPY packages/validators/package.json ./packages/validators/package.json +COPY packages/secrets/package.json ./packages/secrets/package.json COPY packages/db/package.json ./packages/db/package.json RUN pnpm install --frozen-lockfile diff --git a/packages/db/package.json b/packages/db/package.json index 888cd6df6..09fb137eb 100644 --- a/packages/db/package.json +++ b/packages/db/package.json @@ -39,10 +39,12 @@ "rest": "pnpm with-env psql $POSTGRES_URL -c \"DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public;\"" }, "dependencies": { + "@ctrlplane/secrets": "workspace:*", "@ctrlplane/validators": "workspace:*", "@t3-oss/env-core": "catalog:", "drizzle-orm": "^0.33.0", "drizzle-zod": "^0.5.1", + "lodash": "catalog:", "pg": "^8.11.5", "rrule": "^2.8.1", "zod": "catalog:" @@ -51,6 +53,7 @@ "@ctrlplane/eslint-config": "workspace:*", "@ctrlplane/prettier-config": "workspace:*", "@ctrlplane/tsconfig": "workspace:*", + "@types/lodash": "catalog:", "@types/node": "catalog:node22", "@types/pg": "^8.11.6", "dotenv-cli": "catalog:", diff --git a/packages/db/src/index.ts b/packages/db/src/index.ts index 4bd860b0d..9dc1d67e7 100644 --- a/packages/db/src/index.ts +++ b/packages/db/src/index.ts @@ -7,3 +7,4 @@ export { systemSchema, } from "./schema/index.js"; export * from "./upsert-env.js"; +export * from "./upsert-resources.js"; diff --git a/packages/db/src/schema/resource.ts b/packages/db/src/schema/resource.ts index 2acc97f51..780e7ac68 100644 --- a/packages/db/src/schema/resource.ts +++ b/packages/db/src/schema/resource.ts @@ -113,6 +113,10 @@ export const createResource = createInsertSchema(resource, { }).omit({ id: true }); export type InsertResource = InferInsertModel; +export type ResourceToUpsert = InsertResource & { + metadata?: Record; + variables?: Array<{ key: string; value: any; sensitive: boolean }>; +}; export const updateResource = createResource.partial(); diff --git a/packages/db/src/upsert-resources.ts b/packages/db/src/upsert-resources.ts new file mode 100644 index 000000000..ec68716ff --- /dev/null +++ b/packages/db/src/upsert-resources.ts @@ -0,0 +1,155 @@ +import { and, eq, inArray, notInArray, or } from "drizzle-orm"; +import _ from "lodash"; + +import { variablesAES256 } from "@ctrlplane/secrets"; + +import type { Tx } from "./common.js"; +import { buildConflictUpdateColumns } from "./common.js"; +import * as SCHEMA from "./schema/index.js"; + +type ResourceWithMetadata = SCHEMA.Resource & { + metadata?: Record; +}; + +export const updateResourceMetadata = async ( + tx: Tx, + resources: ResourceWithMetadata[], +) => { + const resourceMetadataValues = resources.flatMap((resource) => { + const { id, metadata = {} } = resource; + return Object.entries(metadata).map(([key, value]) => ({ + resourceId: id, + key, + value, + })); + }); + if (resourceMetadataValues.length === 0) return; + + const deletedKeysChecks = _.chain(resourceMetadataValues) + .groupBy((r) => r.resourceId) + .map((groupedMeta) => { + const resourceId = groupedMeta[0]!.resourceId; + const keys = groupedMeta.map((m) => m.key); + return and( + eq(SCHEMA.resourceMetadata.resourceId, resourceId), + notInArray(SCHEMA.resourceMetadata.key, keys), + )!; + }) + .value(); + + await tx.delete(SCHEMA.resourceMetadata).where(or(...deletedKeysChecks)); + + return tx + .insert(SCHEMA.resourceMetadata) + .values(resourceMetadataValues) + .onConflictDoUpdate({ + target: [SCHEMA.resourceMetadata.key, SCHEMA.resourceMetadata.resourceId], + set: buildConflictUpdateColumns(SCHEMA.resourceMetadata, ["value"]), + }); +}; + +type ResourceWithVariables = SCHEMA.Resource & { + variables?: Array<{ key: string; value: any; sensitive: boolean }>; +}; + +export const updateResourceVariables = async ( + tx: Tx, + resources: ResourceWithVariables[], +): Promise> => { + const resourceIds = resources.map(({ id }) => id); + const existingVariables = await tx + .select() + .from(SCHEMA.resourceVariable) + .where(inArray(SCHEMA.resourceVariable.resourceId, resourceIds)); + + const resourceVariablesValues = resources.flatMap(({ id, variables = [] }) => + variables.map(({ key, value, sensitive }) => ({ + resourceId: id, + key, + value: sensitive + ? variablesAES256().encrypt(JSON.stringify(value)) + : value, + sensitive, + })), + ); + + if (resourceVariablesValues.length === 0) return new Set(); + + const updatedVariables = await tx + .insert(SCHEMA.resourceVariable) + .values(resourceVariablesValues) + .onConflictDoUpdate({ + target: [SCHEMA.resourceVariable.key, SCHEMA.resourceVariable.resourceId], + set: buildConflictUpdateColumns(SCHEMA.resourceVariable, [ + "value", + "sensitive", + ]), + }) + .returning(); + + const created = _.differenceWith( + updatedVariables, + existingVariables, + (a, b) => a.resourceId === b.resourceId && a.key === b.key, + ); + + const deleted = _.differenceWith( + existingVariables, + updatedVariables, + (a, b) => a.resourceId === b.resourceId && a.key === b.key, + ); + + const updated = _.intersectionWith( + updatedVariables, + existingVariables, + (a, b) => + a.resourceId === b.resourceId && + a.key === b.key && + (a.value !== b.value || a.sensitive !== b.sensitive), + ); + + const updatedResourceIds = [ + ...created.map((r) => r.resourceId), + ...deleted.map((r) => r.resourceId), + ...updated.map((r) => r.resourceId), + ]; + + return new Set(updatedResourceIds); +}; + +export const upsertResources = async ( + tx: Tx, + resourcesToUpsert: SCHEMA.ResourceToUpsert[], +) => { + if (resourcesToUpsert.length === 0) return []; + const resources = await tx + .insert(SCHEMA.resource) + .values(resourcesToUpsert) + .onConflictDoUpdate({ + target: [SCHEMA.resource.identifier, SCHEMA.resource.workspaceId], + set: { + ...buildConflictUpdateColumns(SCHEMA.resource, [ + "name", + "version", + "kind", + "config", + "providerId", + ]), + updatedAt: new Date(), + deletedAt: null, + }, + }) + .returning(); + + const resourcesWithId = resources.map((r) => ({ + ...r, + ...resourcesToUpsert.find((ri) => ri.identifier === r.identifier), + })); + + await Promise.all([ + updateResourceMetadata(tx, resourcesWithId), + updateResourceVariables(tx, resourcesWithId), + ]); + + return resourcesWithId; +}; diff --git a/packages/events/src/types.ts b/packages/events/src/types.ts index f04361cf4..8b34f5156 100644 --- a/packages/events/src/types.ts +++ b/packages/events/src/types.ts @@ -21,6 +21,9 @@ export enum Channel { UpdateResourceVariable = "update-resource-variable", EvaluateReleaseTarget = "evaluate-release-target", + + UpdatedResource = "updated-resource", + NewResource = "new-resource", } export type EvaluateReleaseTargetJob = { @@ -42,4 +45,6 @@ export type ChannelMap = { [Channel.EvaluateReleaseTarget]: EvaluateReleaseTargetJob; [Channel.DispatchJob]: { jobId: string }; [Channel.ResourceScan]: { resourceProviderId: string }; + [Channel.UpdatedResource]: schema.Resource; + [Channel.NewResource]: schema.Resource; }; diff --git a/packages/job-dispatch/package.json b/packages/job-dispatch/package.json index c677abcdf..a1ac6b553 100644 --- a/packages/job-dispatch/package.json +++ b/packages/job-dispatch/package.json @@ -28,7 +28,6 @@ "@ctrlplane/db": "workspace:*", "@ctrlplane/events": "workspace:*", "@ctrlplane/logger": "workspace:*", - "@ctrlplane/secrets": "workspace:*", "@ctrlplane/validators": "workspace:*", "@t3-oss/env-core": "catalog:", "date-fns": "catalog:", diff --git a/packages/job-dispatch/src/resource/dispatch-resource.ts b/packages/job-dispatch/src/resource/dispatch-resource.ts index 55bacf473..c67da74d4 100644 --- a/packages/job-dispatch/src/resource/dispatch-resource.ts +++ b/packages/job-dispatch/src/resource/dispatch-resource.ts @@ -1,22 +1,10 @@ import type { Tx } from "@ctrlplane/db"; -import type { ResourceCondition } from "@ctrlplane/validators/resources"; import { isPresent } from "ts-is-present"; -import { - and, - desc, - eq, - inArray, - isNotNull, - takeFirstOrNull, -} from "@ctrlplane/db"; -import { db } from "@ctrlplane/db/client"; +import { and, desc, eq, takeFirstOrNull } from "@ctrlplane/db"; import * as SCHEMA from "@ctrlplane/db/schema"; import { logger } from "@ctrlplane/logger"; -import { ComparisonOperator } from "@ctrlplane/validators/conditions"; -import { ResourceConditionType } from "@ctrlplane/validators/resources"; -import { handleEvent } from "../events/index.js"; import { dispatchReleaseJobTriggers } from "../job-dispatch.js"; import { isPassingAllPolicies } from "../policy-checker.js"; import { createJobApprovals } from "../policy-create.js"; @@ -133,110 +121,3 @@ export async function dispatchJobsForAddedResources( triggerCount: releaseJobTriggers.length, }); } - -/** - * Gets all deployments associated with an environment - * @param db - Database transaction - * @param envId - Environment ID to get deployments for - * @returns Promise resolving to array of deployments - */ -const getEnvironmentDeployments = (db: Tx, envId: string) => - db - .select() - .from(SCHEMA.deployment) - .innerJoin(SCHEMA.system, eq(SCHEMA.deployment.systemId, SCHEMA.system.id)) - .innerJoin( - SCHEMA.environment, - eq(SCHEMA.system.id, SCHEMA.environment.systemId), - ) - .where(eq(SCHEMA.environment.id, envId)) - .then((rows) => rows.map((r) => r.deployment)); - -/** - * Gets the not in system filter for a system - * @param systemId - System ID to get the not in system filter for - * @returns Promise resolving to the not in system filter or null if not found - */ -const getNotInSystemFilter = async ( - systemId: string, -): Promise => { - const hasFilter = isNotNull(SCHEMA.environment.resourceSelector); - const system = await db.query.system.findFirst({ - where: eq(SCHEMA.system.id, systemId), - with: { environments: { where: hasFilter } }, - }); - if (system == null) return null; - - const filters = system.environments - .map((e) => e.resourceSelector) - .filter(isPresent); - if (filters.length === 0) return null; - - return { - type: ResourceConditionType.Comparison, - operator: ComparisonOperator.Or, - not: true, - conditions: filters, - }; -}; - -/** - * Dispatches hook events for resources that were removed from an environment - * @param db - Database transaction - * @param resourceIds - IDs of the resources that were removed - * @param env - Environment the resources were removed from - */ -export const dispatchEventsForRemovedResources = async ( - db: Tx, - resourceIds: string[], - env: { id: string; systemId: string }, -): Promise => { - const { id: envId, systemId } = env; - log.info("Dispatching events for removed resources", { resourceIds, envId }); - - const deployments = await getEnvironmentDeployments(db, envId); - if (deployments.length === 0) { - log.info("No deployments found for environment"); - return; - } - - const notInSystemFilter = await getNotInSystemFilter(systemId); - if (notInSystemFilter == null) { - log.warn("No system found for environment", { envId }); - return; - } - - const matchesResources = inArray(SCHEMA.resource.id, resourceIds); - const isRemovedFromSystem = SCHEMA.resourceMatchesMetadata( - db, - notInSystemFilter, - ); - const resources = await db.query.resource.findMany({ - where: and(matchesResources, isRemovedFromSystem), - }); - - log.debug("Creating removal events", { - resourceCount: resources.length, - deploymentCount: deployments.length, - }); - const events = resources.flatMap((resource) => - deployments.map((deployment) => ({ - action: "deployment.resource.removed" as const, - payload: { deployment, resource }, - })), - ); - - log.debug("Handling removal events", { eventCount: events.length }); - const handleEventPromises = events.map(handleEvent); - const results = await Promise.allSettled(handleEventPromises); - - const failures = results.filter((r) => r.status === "rejected").length; - if (failures > 0) - log.warn("Some removal events failed", { failureCount: failures }); - - log.info("Finished dispatching removal events", { - total: events.length, - succeeded: events.length - failures, - failed: failures, - }); -}; diff --git a/packages/job-dispatch/src/resource/insert-resources.ts b/packages/job-dispatch/src/resource/group-resources-by-hook.ts similarity index 66% rename from packages/job-dispatch/src/resource/insert-resources.ts rename to packages/job-dispatch/src/resource/group-resources-by-hook.ts index 09d56c77c..0a750fc09 100644 --- a/packages/job-dispatch/src/resource/insert-resources.ts +++ b/packages/job-dispatch/src/resource/group-resources-by-hook.ts @@ -2,7 +2,7 @@ import type { Tx } from "@ctrlplane/db"; import type { InsertResource, Resource } from "@ctrlplane/db/schema"; import _ from "lodash"; -import { and, buildConflictUpdateColumns, eq, isNull, or } from "@ctrlplane/db"; +import { and, eq, isNull, or } from "@ctrlplane/db"; import { resource } from "@ctrlplane/db/schema"; /** @@ -70,23 +70,23 @@ const findExistingResources = async ( }; /** - * Inserts or updates resources in the database. Note that this function only - * handles the core resource fields - it does not insert/update associated - * metadata or variables. Those must be handled separately. + * Groups resources into categories based on what type of hook operation needs to be performed. + * Compares input resources against existing database records to determine which resources + * need to be created, updated, or deleted. * * @param tx - Database transaction - * @param resourcesToInsert - Array of resources to insert/update. Can include - * metadata and variables but these will not be - * persisted by this function. - * @returns Promise resolving to array of inserted/updated resources, with any - * metadata/variables from the input merged onto the DB records + * @param resourcesToInsert - Array of resources to process and categorize + * @returns {Object} Object containing three arrays of resources: + * - new: Resources that don't exist in the database and need to be created + * - upsert: Resources that exist and need to be updated + * - delete: Existing resources that are no longer present in the input and should be deleted */ -export const insertResources = async ( +export const groupResourcesByHook = async ( tx: Tx, resourcesToInsert: InsertResource[], ) => { const existingResources = await findExistingResources(tx, resourcesToInsert); - const deleted = existingResources.filter( + const toDelete = existingResources.filter( (existing) => !resourcesToInsert.some( (inserted) => @@ -94,24 +94,19 @@ export const insertResources = async ( inserted.workspaceId === existing.workspaceId, ), ); - const insertedResources = await tx - .insert(resource) - .values(resourcesToInsert) - .onConflictDoUpdate({ - target: [resource.identifier, resource.workspaceId], - set: { - ...buildConflictUpdateColumns(resource, [ - "name", - "version", - "kind", - "config", - "providerId", - ]), - updatedAt: new Date(), - deletedAt: null, - }, - }) - .returning(); + const toInsert = resourcesToInsert.filter( + (r) => + !existingResources.some( + (er) => + er.identifier === r.identifier && er.workspaceId === r.workspaceId, + ), + ); + const toUpdate = resourcesToInsert.filter((r) => + existingResources.some( + (er) => + er.identifier === r.identifier && er.workspaceId === r.workspaceId, + ), + ); - return { all: insertedResources, deleted }; + return { toInsert, toUpdate, toDelete }; }; diff --git a/packages/job-dispatch/src/resource/handle-provider-scan.ts b/packages/job-dispatch/src/resource/handle-provider-scan.ts new file mode 100644 index 000000000..b763ab964 --- /dev/null +++ b/packages/job-dispatch/src/resource/handle-provider-scan.ts @@ -0,0 +1,54 @@ +import type { Tx } from "@ctrlplane/db"; +import type { InsertResource } from "@ctrlplane/db/schema"; + +import { upsertResources } from "@ctrlplane/db"; +import { Channel, getQueue } from "@ctrlplane/events"; +import { logger } from "@ctrlplane/logger"; + +import { deleteResources } from "./delete.js"; +import { groupResourcesByHook } from "./group-resources-by-hook.js"; + +const log = logger.child({ label: "upsert-resources" }); + +export type ResourceToInsert = InsertResource & { + metadata?: Record; + variables?: Array<{ key: string; value: any; sensitive: boolean }>; +}; +export const handleResourceProviderScan = async ( + tx: Tx, + resourcesToInsert: ResourceToInsert[], +) => { + log.info("Starting resource upsert", { + count: resourcesToInsert.length, + identifiers: resourcesToInsert.map((r) => r.identifier), + }); + try { + const workspaceId = resourcesToInsert[0]?.workspaceId; + if (workspaceId == null) throw new Error("Workspace ID is required"); + if (!resourcesToInsert.every((r) => r.workspaceId === workspaceId)) + throw new Error("All resources must belong to the same workspace"); + + const { toInsert, toUpdate, toDelete } = await groupResourcesByHook( + tx, + resourcesToInsert, + ); + const [insertedResources, updatedResources] = await Promise.all([ + upsertResources(tx, toInsert), + upsertResources(tx, toUpdate), + ]); + + const insertJobs = insertedResources.map((r) => ({ name: r.id, data: r })); + const updateJobs = updatedResources.map((r) => ({ name: r.id, data: r })); + + await Promise.all([ + getQueue(Channel.NewResource).addBulk(insertJobs), + getQueue(Channel.UpdatedResource).addBulk(updateJobs), + ]); + + const deleted = await deleteResources(tx, toDelete); + return { all: [...insertedResources, ...updatedResources], deleted }; + } catch (error) { + log.error("Error upserting resources", { error }); + throw error; + } +}; diff --git a/packages/job-dispatch/src/resource/index.ts b/packages/job-dispatch/src/resource/index.ts index 40a2163e7..f8079e83f 100644 --- a/packages/job-dispatch/src/resource/index.ts +++ b/packages/job-dispatch/src/resource/index.ts @@ -1,3 +1,4 @@ -export * from "./upsert.js"; +export * from "./handle-provider-scan.js"; export * from "./delete.js"; export * from "./dispatch-resource.js"; +export * from "./group-resources-by-hook.js"; diff --git a/packages/job-dispatch/src/resource/insert-resource-metadata.ts b/packages/job-dispatch/src/resource/insert-resource-metadata.ts deleted file mode 100644 index 29a1d4735..000000000 --- a/packages/job-dispatch/src/resource/insert-resource-metadata.ts +++ /dev/null @@ -1,53 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; -import type { Resource } from "@ctrlplane/db/schema"; -import _ from "lodash"; - -import { - and, - buildConflictUpdateColumns, - eq, - notInArray, - or, -} from "@ctrlplane/db"; -import * as schema from "@ctrlplane/db/schema"; - -export type ResourceWithMetadata = Resource & { - metadata?: Record; -}; - -export const insertResourceMetadata = async ( - tx: Tx, - resources: ResourceWithMetadata[], -) => { - const resourceMetadataValues = resources.flatMap((resource) => { - const { id, metadata = {} } = resource; - return Object.entries(metadata).map(([key, value]) => ({ - resourceId: id, - key, - value, - })); - }); - if (resourceMetadataValues.length === 0) return; - - const deletedKeysChecks = _.chain(resourceMetadataValues) - .groupBy((r) => r.resourceId) - .map((groupedMeta) => { - const resourceId = groupedMeta[0]!.resourceId; - const keys = groupedMeta.map((m) => m.key); - return and( - eq(schema.resourceMetadata.resourceId, resourceId), - notInArray(schema.resourceMetadata.key, keys), - )!; - }) - .value(); - - await tx.delete(schema.resourceMetadata).where(or(...deletedKeysChecks)); - - return tx - .insert(schema.resourceMetadata) - .values(resourceMetadataValues) - .onConflictDoUpdate({ - target: [schema.resourceMetadata.key, schema.resourceMetadata.resourceId], - set: buildConflictUpdateColumns(schema.resourceMetadata, ["value"]), - }); -}; diff --git a/packages/job-dispatch/src/resource/insert-resource-variables.ts b/packages/job-dispatch/src/resource/insert-resource-variables.ts deleted file mode 100644 index 8b15b50f9..000000000 --- a/packages/job-dispatch/src/resource/insert-resource-variables.ts +++ /dev/null @@ -1,76 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; -import type { Resource } from "@ctrlplane/db/schema"; -import _ from "lodash"; - -import { buildConflictUpdateColumns, inArray } from "@ctrlplane/db"; -import * as schema from "@ctrlplane/db/schema"; -import { variablesAES256 } from "@ctrlplane/secrets"; - -export type ResourceWithVariables = Resource & { - variables?: Array<{ key: string; value: any; sensitive: boolean }>; -}; - -export const insertResourceVariables = async ( - tx: Tx, - resources: ResourceWithVariables[], -): Promise> => { - const resourceIds = resources.map(({ id }) => id); - const existingVariables = await tx - .select() - .from(schema.resourceVariable) - .where(inArray(schema.resourceVariable.resourceId, resourceIds)); - - const resourceVariablesValues = resources.flatMap(({ id, variables = [] }) => - variables.map(({ key, value, sensitive }) => ({ - resourceId: id, - key, - value: sensitive - ? variablesAES256().encrypt(JSON.stringify(value)) - : value, - sensitive, - })), - ); - - if (resourceVariablesValues.length === 0) return new Set(); - - const updatedVariables = await tx - .insert(schema.resourceVariable) - .values(resourceVariablesValues) - .onConflictDoUpdate({ - target: [schema.resourceVariable.key, schema.resourceVariable.resourceId], - set: buildConflictUpdateColumns(schema.resourceVariable, [ - "value", - "sensitive", - ]), - }) - .returning(); - - const created = _.differenceWith( - updatedVariables, - existingVariables, - (a, b) => a.resourceId === b.resourceId && a.key === b.key, - ); - - const deleted = _.differenceWith( - existingVariables, - updatedVariables, - (a, b) => a.resourceId === b.resourceId && a.key === b.key, - ); - - const updated = _.intersectionWith( - updatedVariables, - existingVariables, - (a, b) => - a.resourceId === b.resourceId && - a.key === b.key && - (a.value !== b.value || a.sensitive !== b.sensitive), - ); - - const updatedResourceIds = [ - ...created.map((r) => r.resourceId), - ...deleted.map((r) => r.resourceId), - ...updated.map((r) => r.resourceId), - ]; - - return new Set(updatedResourceIds); -}; diff --git a/packages/job-dispatch/src/resource/upsert.ts b/packages/job-dispatch/src/resource/upsert.ts deleted file mode 100644 index aa73f3555..000000000 --- a/packages/job-dispatch/src/resource/upsert.ts +++ /dev/null @@ -1,154 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; -import type { InsertResource } from "@ctrlplane/db/schema"; -import _ from "lodash"; - -import { logger } from "@ctrlplane/logger"; - -import { deleteResources } from "./delete.js"; -import { - dispatchEventsForRemovedResources, - dispatchJobsForAddedResources, -} from "./dispatch-resource.js"; -import { insertResourceMetadata } from "./insert-resource-metadata.js"; -import { insertResourceVariables } from "./insert-resource-variables.js"; -import { insertResources } from "./insert-resources.js"; -import { getEnvironmentsByResourceWithIdentifiers } from "./utils.js"; - -const log = logger.child({ label: "upsert-resources" }); - -export type ResourceToInsert = InsertResource & { - metadata?: Record; - variables?: Array<{ key: string; value: any; sensitive: boolean }>; -}; - -export const upsertResources = async ( - tx: Tx, - resourcesToInsert: ResourceToInsert[], -) => { - log.info("Starting resource upsert", { - count: resourcesToInsert.length, - identifiers: resourcesToInsert.map((r) => r.identifier), - }); - - const workspaceId = resourcesToInsert[0]?.workspaceId; - if (workspaceId == null) throw new Error("Workspace ID is required"); - if (!resourcesToInsert.every((r) => r.workspaceId === workspaceId)) - throw new Error("All resources must belong to the same workspace"); - - try { - const resourceIdentifiers = resourcesToInsert.map((r) => r.identifier); - log.debug("Getting environments before insert", { resourceIdentifiers }); - const envsBeforeInsert = await getEnvironmentsByResourceWithIdentifiers( - tx, - workspaceId, - resourceIdentifiers, - ); - - log.debug("Envs before insert", { - envs: envsBeforeInsert.map((e) => ({ - id: e.id, - resources: e.resources.map((r) => r.identifier), - })), - }); - - log.info("Inserting resources"); - const resources = await insertResources(tx, resourcesToInsert); - const resourcesWithId = resources.all.map((r) => ({ - ...r, - ...resourcesToInsert.find( - (ri) => - ri.identifier === r.identifier && ri.workspaceId === r.workspaceId, - ), - })); - - log.info("Inserting resource metadata and variables"); - const [, updatedVariableResourceIds] = await Promise.all([ - insertResourceMetadata(tx, resourcesWithId), - insertResourceVariables(tx, resourcesWithId), - ]); - - log.info("Getting environments after insert"); - const envsAfterInsert = await getEnvironmentsByResourceWithIdentifiers( - tx, - workspaceId, - resourceIdentifiers, - ); - - log.debug("Envs after insert", { - envs: envsAfterInsert.map((e) => ({ - id: e.id, - resources: e.resources.map((r) => r.identifier), - })), - }); - const envVariableChangePromises = envsAfterInsert.map((env) => - dispatchJobsForAddedResources( - tx, - env.resources - .filter((r) => updatedVariableResourceIds.has(r.id)) - .map((r) => r.id), - env.id, - ), - ); - await Promise.all(envVariableChangePromises); - const changedEnvs = envsAfterInsert.map((env) => { - const beforeEnv = envsBeforeInsert.find((e) => e.id === env.id); - const beforeResources = beforeEnv?.resources ?? []; - const afterResources = env.resources; - const removedResources = beforeResources.filter( - (br) => !afterResources.some((ar) => ar.id === br.id), - ); - const addedResources = afterResources.filter( - (ar) => !beforeResources.some((br) => br.id === ar.id), - ); - return { ...env, removedResources, addedResources }; - }); - - const deletedResourceIds = new Set(resources.deleted.map((r) => r.id)); - if (resources.deleted.length > 0) { - log.info("Deleting resources", { count: resources.deleted.length }); - await deleteResources(tx, resources.deleted).catch((err) => { - log.error("Error deleting resources", { error: err }); - throw err; - }); - } - - for (const env of changedEnvs) { - if (env.addedResources.length > 0) { - log.info("Dispatching jobs for added resources", { - envId: env.id, - count: env.addedResources.length, - }); - await dispatchJobsForAddedResources( - tx, - env.addedResources - .map((r) => r.id) - .filter((r) => !updatedVariableResourceIds.has(r)), - env.id, - ); - } - - if (env.removedResources.length > 0) { - const removedIds = env.removedResources - .map((r) => r.id) - .filter((id) => !deletedResourceIds.has(id)); - - if (removedIds.length > 0) { - log.info("Dispatching hook events for removed resources", { - envId: env.id, - count: removedIds.length, - }); - await dispatchEventsForRemovedResources(tx, removedIds, env); - } - } - } - - log.info("Resource upsert completed successfully", { - added: resources.all.length, - deleted: resources.deleted.length, - }); - return resources; - } catch (err) { - log.error("Error upserting resources", { error: err }); - throw err; - } -}; diff --git a/packages/job-dispatch/src/resource/utils.ts b/packages/job-dispatch/src/resource/utils.ts deleted file mode 100644 index 7b814ba84..000000000 --- a/packages/job-dispatch/src/resource/utils.ts +++ /dev/null @@ -1,107 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; - -import { and, eq, inArray, isNotNull, isNull, or } from "@ctrlplane/db"; -import * as schema from "@ctrlplane/db/schema"; - -/** - * Gets resources for a specific provider - * @param tx - Database transaction - * @param providerId - ID of the provider to get resources for - * @param options - Options object - * @param options.deleted - If true, returns deleted resources. If false, returns non-deleted resources - * @returns Promise resolving to array of resources - */ -export const getResourcesByProvider = ( - tx: Tx, - providerId: string, - options: { deleted: boolean } = { deleted: false }, -) => - tx - .select() - .from(schema.resource) - .where( - and( - eq(schema.resource.providerId, providerId), - options.deleted - ? isNotNull(schema.resource.deletedAt) - : isNull(schema.resource.deletedAt), - ), - ); - -/** - * Gets resources matching the provided workspace IDs and identifiers - * Can filter for either deleted or non-deleted resources - * - * @param tx - Database transaction - * @param resources - Array of objects containing workspaceId and identifier to look up - * @param options - Options object - * @param options.deleted - If true, returns deleted resources. If false, returns non-deleted resources - * @returns Promise resolving to array of matching resources - */ -export const getResourcesByWorkspaceIdAndIdentifier = ( - tx: Tx, - resources: { workspaceId: string; identifier: string }[], - options: { deleted: boolean } = { deleted: false }, -) => - tx - .select() - .from(schema.resource) - .where( - or( - ...resources.map((r) => - and( - eq(schema.resource.workspaceId, r.workspaceId), - eq(schema.resource.identifier, r.identifier), - options.deleted - ? isNotNull(schema.resource.deletedAt) - : isNull(schema.resource.deletedAt), - ), - ), - ), - ); - -/** - * Groups provided resources by workspace environments matching them - * - * @param tx - Database transaction - * @param workspaceId - ID of the workspace to get environments for - * @param resourceIdentifiers - Array of resource identifiers to look up - * @returns Promise resolving to array of environments - */ -export const getEnvironmentsByResourceWithIdentifiers = ( - tx: Tx, - workspaceId: string, - resourceIdentifiers: string[], -) => - tx - .select({ - id: schema.environment.id, - resourceFilter: schema.environment.resourceSelector, - systemId: schema.environment.systemId, - }) - .from(schema.environment) - .innerJoin(schema.system, eq(schema.environment.systemId, schema.system.id)) - .where( - and( - eq(schema.system.workspaceId, workspaceId), - isNotNull(schema.environment.resourceSelector), - ), - ) - .then((envs) => - Promise.all( - envs.map(async (env) => ({ - ...env, - resources: await tx - .select() - .from(schema.resource) - .where( - and( - inArray(schema.resource.identifier, resourceIdentifiers), - eq(schema.resource.workspaceId, workspaceId), - schema.resourceMatchesMetadata(tx, env.resourceFilter), - isNull(schema.resource.deletedAt), - ), - ), - })), - ), - ); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f0dae0ae5..b640bc877 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -342,6 +342,9 @@ importers: '@ctrlplane/db': specifier: workspace:* version: link:../../packages/db + '@ctrlplane/events': + specifier: workspace:* + version: link:../../packages/events '@ctrlplane/job-dispatch': specifier: workspace:* version: link:../../packages/job-dispatch @@ -1017,6 +1020,9 @@ importers: packages/db: dependencies: + '@ctrlplane/secrets': + specifier: workspace:* + version: link:../secrets '@ctrlplane/validators': specifier: workspace:* version: link:../validators @@ -1029,6 +1035,9 @@ importers: drizzle-zod: specifier: ^0.5.1 version: 0.5.1(drizzle-orm@0.33.0(@opentelemetry/api@1.9.0)(@types/pg@8.11.10)(@types/react@19.0.8)(pg@8.13.1)(react@19.0.0))(zod@3.24.2) + lodash: + specifier: 'catalog:' + version: 4.17.21 pg: specifier: ^8.11.5 version: 8.13.1 @@ -1048,6 +1057,9 @@ importers: '@ctrlplane/tsconfig': specifier: workspace:* version: link:../../tooling/typescript + '@types/lodash': + specifier: 'catalog:' + version: 4.17.12 '@types/node': specifier: catalog:node22 version: 22.13.10 @@ -1194,9 +1206,6 @@ importers: '@ctrlplane/logger': specifier: workspace:* version: link:../logger - '@ctrlplane/secrets': - specifier: workspace:* - version: link:../secrets '@ctrlplane/validators': specifier: workspace:* version: link:../validators