Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions apps/event-worker/src/utils/upsert-release-targets.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import type { Tx } from "@ctrlplane/db";
import type { ReleaseTargetIdentifier } from "@ctrlplane/rule-engine";
import { isPresent } from "ts-is-present";

import { and, eq } from "@ctrlplane/db";
import * as SCHEMA from "@ctrlplane/db/schema";

const getReleaseTargetInsertsForSystem = async (
db: Tx,
resourceId: string,
system: SCHEMA.System & {
environments: SCHEMA.Environment[];
deployments: SCHEMA.Deployment[];
},
): Promise<ReleaseTargetIdentifier[]> => {
const envs = system.environments.filter((e) => isPresent(e.resourceSelector));
const { deployments } = system;

const maybeTargetsPromises = envs.flatMap((env) =>
deployments.map(async (dep) => {
const resource = await db.query.resource.findFirst({
where: and(
eq(SCHEMA.resource.id, resourceId),
SCHEMA.resourceMatchesMetadata(db, env.resourceSelector),
SCHEMA.resourceMatchesMetadata(db, dep.resourceSelector),
),
});

if (resource == null) return null;
return { environmentId: env.id, deploymentId: dep.id };
}),
);

const targets = await Promise.all(maybeTargetsPromises).then((results) =>
results.filter(isPresent),
);

return targets.map((t) => ({ ...t, resourceId }));
};

export const upsertReleaseTargets = async (
db: Tx,
resource: SCHEMA.Resource,
) => {
const workspace = await db.query.workspace.findFirst({
where: eq(SCHEMA.workspace.id, resource.workspaceId),
with: { systems: { with: { environments: true, deployments: true } } },
});
if (workspace == null) throw new Error("Workspace not found");

const releaseTargetInserts = await Promise.all(
workspace.systems.map((system) =>
getReleaseTargetInsertsForSystem(db, resource.id, system),
),
).then((results) => results.flat());

if (releaseTargetInserts.length === 0) return [];
return db
.insert(SCHEMA.releaseTarget)
.values(releaseTargetInserts)
.onConflictDoNothing()
.returning();
};
4 changes: 4 additions & 0 deletions apps/event-worker/src/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import { evaluateReleaseTarget } from "./evaluate-release-target.js";
import { dispatchJobWorker } from "./job-dispatch/index.js";
import { newDeploymentVersionWorker } from "./new-deployment-version.js";
import { newDeploymentWorker } from "./new-deployment.js";
import { newResourceWorker } from "./new-resource.js";
import { resourceScanWorker } from "./resource-scan/index.js";
import { updateDeploymentVariableWorker } from "./update-deployment-variable.js";
import { updateEnvironmentWorker } from "./update-environment.js";
import { updateResourceVariableWorker } from "./update-resource-variable.js";
import { updatedResourceWorker } from "./updated-resources/index.js";

type Workers<T extends keyof ChannelMap> = {
[K in T]: Worker<ChannelMap[K]> | null;
Expand All @@ -26,4 +28,6 @@ export const workers: Workers<keyof ChannelMap> = {
[Channel.EvaluateReleaseTarget]: evaluateReleaseTarget,
[Channel.DispatchJob]: dispatchJobWorker,
[Channel.ResourceScan]: resourceScanWorker,
[Channel.UpdatedResource]: updatedResourceWorker,
[Channel.NewResource]: newResourceWorker,
};
19 changes: 19 additions & 0 deletions apps/event-worker/src/workers/new-resource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { db } from "@ctrlplane/db/client";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";

import { upsertReleaseTargets } from "../utils/upsert-release-targets.js";

const queue = getQueue(Channel.EvaluateReleaseTarget);
export const newResourceWorker = createWorker(
Channel.NewResource,
({ data: resource }) =>
db.transaction(async (tx) => {
const rts = await upsertReleaseTargets(tx, resource);
await queue.addBulk(
rts.map((rt) => ({
name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
data: rt,
})),
);
}),
);
2 changes: 1 addition & 1 deletion apps/event-worker/src/workers/resource-scan/aws/eks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { ReservedMetadataKey } from "@ctrlplane/validators/conditions";
import { cloudRegionsGeo } from "@ctrlplane/validators/resources";

import type { AwsCredentials } from "./aws.js";
import { omitNullUndefined } from "../../../utils.js";
import { omitNullUndefined } from "../../../utils/omit-null-undefined.js";
import { assumeRole, assumeWorkspaceRole } from "./aws.js";

const log = logger.child({ label: "resource-scan/eks" });
Expand Down
2 changes: 1 addition & 1 deletion apps/event-worker/src/workers/resource-scan/aws/vpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { logger } from "@ctrlplane/logger";
import { ReservedMetadataKey } from "@ctrlplane/validators/conditions";

import type { AwsCredentials } from "./aws.js";
import { omitNullUndefined } from "../../../utils.js";
import { omitNullUndefined } from "../../../utils/omit-null-undefined.js";
import { assumeRole, assumeWorkspaceRole } from "./aws.js";

const log = logger.child({ label: "resource-scan/aws/vpc" });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { logger } from "@ctrlplane/logger";
import { ReservedMetadataKey } from "@ctrlplane/validators/conditions";
import { cloudRegionsGeo } from "@ctrlplane/validators/resources";

import { omitNullUndefined } from "../../../utils.js";
import { omitNullUndefined } from "../../../utils/omit-null-undefined.js";

const log = logger.child({ module: "resource-scan/azure" });

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { SemVer } from "semver";
import { ReservedMetadataKey } from "@ctrlplane/validators/conditions";
import { cloudRegionsGeo } from "@ctrlplane/validators/resources";

import { omitNullUndefined } from "../../../utils.js";
import { omitNullUndefined } from "../../../utils/omit-null-undefined.js";

export const clusterToResource = (
workspaceId: string,
Expand Down
2 changes: 1 addition & 1 deletion apps/event-worker/src/workers/resource-scan/google/vm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import _ from "lodash";
import { logger } from "@ctrlplane/logger";
import { ReservedMetadataKey } from "@ctrlplane/validators/conditions";

import { omitNullUndefined } from "../../../utils.js";
import { omitNullUndefined } from "../../../utils/omit-null-undefined.js";
import { getGoogleClient } from "./client.js";

const log = logger.child({ module: "resource-scan/gke/vm" });
Expand Down
2 changes: 1 addition & 1 deletion apps/event-worker/src/workers/resource-scan/google/vpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { isPresent } from "ts-is-present";
import { logger } from "@ctrlplane/logger";
import { ReservedMetadataKey } from "@ctrlplane/validators/conditions";

import { omitNullUndefined } from "../../../utils.js";
import { omitNullUndefined } from "../../../utils/omit-null-undefined.js";
import { getGoogleClient } from "./client.js";

const log = logger.child({ label: "resource-scan/google/vpc" });
Expand Down
4 changes: 2 additions & 2 deletions apps/event-worker/src/workers/resource-scan/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
workspace,
} from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";
import { upsertResources } from "@ctrlplane/job-dispatch";
import { handleResourceProviderScan } from "@ctrlplane/job-dispatch";
import { logger } from "@ctrlplane/logger";

import { getEksResources } from "./aws/eks.js";
Expand Down Expand Up @@ -96,7 +96,7 @@ export const resourceScanWorker = createWorker(
log.info(
`Upserting ${resources.length} resources for provider ${rp.resource_provider.id}`,
);
await upsertResources(db, resources);
await handleResourceProviderScan(db, resources);
} catch (error: any) {
log.error(
`Error scanning/upserting resources for provider ${rp.resource_provider.id}: ${error.message}`,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import type { Tx } from "@ctrlplane/db";
import type { ReleaseTargetIdentifier } from "@ctrlplane/rule-engine";

import { and, eq, inArray, isNotNull, notInArray, or } from "@ctrlplane/db";
import * as SCHEMA from "@ctrlplane/db/schema";
import { handleEvent } from "@ctrlplane/job-dispatch";
import { HookAction } from "@ctrlplane/validators/events";

const getSystemsForUnmatchedEnvs = async (
db: Tx,
previousReleaseTargets: ReleaseTargetIdentifier[],
newReleaseTargets: ReleaseTargetIdentifier[],
) => {
const previousEnvIds = new Set<string>(
previousReleaseTargets.map((rt) => rt.environmentId),
);
const newEnvIds = new Set<string>(
newReleaseTargets.map((rt) => rt.environmentId),
);
const unmatchedEnvs = Array.from(previousEnvIds).filter(
(envId) => !newEnvIds.has(envId),
);

const envs = await db.query.environment.findMany({
where: inArray(SCHEMA.environment.id, unmatchedEnvs),
});

return db.query.system.findMany({
where: inArray(
SCHEMA.system.id,
envs.map((e) => e.systemId),
),
with: {
deployments: true,
environments: {
where: and(
isNotNull(SCHEMA.environment.resourceSelector),
notInArray(SCHEMA.environment.id, unmatchedEnvs),
),
},
},
});
};

const dispatchExitHooksIfExitedSystem = async (
db: Tx,
resource: SCHEMA.Resource,
system: {
deployments: SCHEMA.Deployment[];
environments: SCHEMA.Environment[];
},
) => {
const { deployments, environments } = system;
const matchedResource = await db.query.resource.findFirst({
where: and(
eq(SCHEMA.resource.id, resource.id),
or(
...environments.map((e) =>
SCHEMA.resourceMatchesMetadata(db, e.resourceSelector),
),
),
),
});
if (matchedResource == null) return;

const events = deployments.map((deployment) => ({
action: HookAction.DeploymentResourceRemoved,
payload: { deployment, resource },
}));

const handleEventPromises = events.map(handleEvent);
await Promise.allSettled(handleEventPromises);
};

export const dispatchExitHooks = async (
db: Tx,
resource: SCHEMA.Resource,
currentReleaseTargets: ReleaseTargetIdentifier[],
newReleaseTargets: ReleaseTargetIdentifier[],
) => {
const systems = await getSystemsForUnmatchedEnvs(
db,
currentReleaseTargets,
newReleaseTargets,
);
const dispatchExitHooksPromises = systems.map((system) =>
dispatchExitHooksIfExitedSystem(db, resource, system),
);
await Promise.allSettled(dispatchExitHooksPromises);
};
50 changes: 50 additions & 0 deletions apps/event-worker/src/workers/updated-resources/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { eq, inArray } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as SCHEMA from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";

import { upsertReleaseTargets } from "../../utils/upsert-release-targets.js";
import { dispatchExitHooks } from "./dispatch-exit-hooks.js";

export const updatedResourceWorker = createWorker(
Channel.UpdatedResource,
async ({ data: resource }) =>
db.transaction(async (tx) => {
const currentReleaseTargets = await tx.query.releaseTarget.findMany({
where: eq(SCHEMA.releaseTarget.resourceId, resource.id),
});

const newReleaseTargets = await upsertReleaseTargets(tx, resource);
const releaseTargetsToDelete = currentReleaseTargets.filter(
(rt) => !newReleaseTargets.some((nrt) => nrt.id === rt.id),
);

await tx.delete(SCHEMA.releaseTarget).where(
inArray(
SCHEMA.releaseTarget.id,
releaseTargetsToDelete.map((rt) => rt.id),
),
);

const dispatchExitHooksPromise = dispatchExitHooks(
tx,
resource,
currentReleaseTargets,
newReleaseTargets,
);

const addToEvaluateQueuePromise = getQueue(
Channel.EvaluateReleaseTarget,
).addBulk(
newReleaseTargets.map((rt) => ({
name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
data: rt,
})),
);

await Promise.allSettled([
dispatchExitHooksPromise,
addToEvaluateQueuePromise,
]);
}),
);
1 change: 1 addition & 0 deletions apps/pty-proxy/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"dependencies": {
"@ctrlplane/auth": "workspace:*",
"@ctrlplane/db": "workspace:*",
"@ctrlplane/events": "workspace:*",
"@ctrlplane/job-dispatch": "workspace:*",
"@ctrlplane/logger": "workspace:*",
"@ctrlplane/validators": "workspace:*",
Expand Down
7 changes: 4 additions & 3 deletions apps/pty-proxy/src/controller/agent-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import type WebSocket from "ws";
import type { MessageEvent } from "ws";

import { can, getUser } from "@ctrlplane/auth/utils";
import { eq } from "@ctrlplane/db";
import { eq, upsertResources } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { upsertResources } from "@ctrlplane/job-dispatch";
import { Channel, getQueue } from "@ctrlplane/events";
import { logger } from "@ctrlplane/logger";
import { Permission } from "@ctrlplane/validators/auth";
import { agentConnect, agentHeartbeat } from "@ctrlplane/validators/session";
Expand Down Expand Up @@ -125,7 +125,7 @@ export class AgentSocket {
"name" | "version" | "kind" | "identifier" | "workspaceId"
>,
) {
const { all } = await upsertResources(db, [
const all = await upsertResources(db, [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider wrapping upsert in a transaction if partial updates are a concern.
Currently, the call uses db directly. If a failure happens later in the upsert flow, partial updates may remain in the database. You may want to ensure an all-or-nothing update by using a transaction, if atomicity is needed.

{
...resource,
name: this.name,
Expand All @@ -138,6 +138,7 @@ export class AgentSocket {
]);
const res = all.at(0);
if (res == null) throw new Error("Failed to create resource");
await getQueue(Channel.UpdatedResource).add(res.id, res);
this.resource = res;
agents.set(res.id, { lastSync: new Date(), agent: this });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
resourceProvider,
workspace,
} from "@ctrlplane/db/schema";
import { upsertResources } from "@ctrlplane/job-dispatch";
import { handleResourceProviderScan } from "@ctrlplane/job-dispatch";
import { Permission } from "@ctrlplane/validators/auth";

import { authn, authz } from "~/app/api/v1/auth";
Expand Down Expand Up @@ -77,7 +77,7 @@ export const PATCH = request()
workspaceId: provider.workspaceId,
}));

const resources = await upsertResources(
const resources = await handleResourceProviderScan(
db,
resourcesToInsert.map((r) => ({
...r,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import { NextResponse } from "next/server";
import _ from "lodash";
import { z } from "zod";

import { and, eq, isNull } from "@ctrlplane/db";
import { and, eq, isNull, upsertResources } from "@ctrlplane/db";
import * as schema from "@ctrlplane/db/schema";
import { deleteResources, upsertResources } from "@ctrlplane/job-dispatch";
import { Channel, getQueue } from "@ctrlplane/events";
import { deleteResources } from "@ctrlplane/job-dispatch";
import { variablesAES256 } from "@ctrlplane/secrets";
import { Permission } from "@ctrlplane/validators/auth";

Expand Down Expand Up @@ -93,9 +94,10 @@ export const PATCH = request()
{ status: 404 },
);

const { all } = await upsertResources(db, [_.merge(resource, body)]);
const all = await upsertResources(db, [_.merge(resource, body)]);
const res = all.at(0);
if (res == null) throw new Error("Failed to update resource");
await getQueue(Channel.UpdatedResource).add(res.id, res);
return NextResponse.json(res);
});

Expand Down
Loading
Loading