Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 38 additions & 57 deletions apps/event-worker/src/utils/upsert-release-targets.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import type { Tx } from "@ctrlplane/db";
import type { ReleaseTargetIdentifier } from "@ctrlplane/rule-engine";
import { trace } from "@opentelemetry/api";
import { isPresent } from "ts-is-present";

import { and, eq } from "@ctrlplane/db";
import * as SCHEMA from "@ctrlplane/db/schema";
Expand All @@ -11,72 +9,55 @@ import { makeWithSpan } from "./spans.js";
const tracer = trace.getTracer("upsert-release-targets");
const withSpan = makeWithSpan(tracer);

const getReleaseTargetInsertsForSystem = withSpan(
"getReleaseTargetInsertsForSystem",
async (
span,
db: Tx,
resourceId: string,
system: SCHEMA.System & {
environments: SCHEMA.Environment[];
deployments: SCHEMA.Deployment[];
},
): Promise<ReleaseTargetIdentifier[]> => {
span.setAttribute("resource.id", resourceId);
span.setAttribute("system.id", system.id);
span.setAttribute("system.name", system.name);

const envs = system.environments.filter((e) =>
isPresent(e.resourceSelector),
);
const { deployments } = system;

const maybeTargetsPromises = envs.flatMap((env) =>
deployments.map(async (dep) => {
const resource = await db.query.resource.findFirst({
where: and(
eq(SCHEMA.resource.id, resourceId),
SCHEMA.resourceMatchesMetadata(db, env.resourceSelector),
SCHEMA.resourceMatchesMetadata(db, dep.resourceSelector),
),
});

if (resource == null) return null;
return { environmentId: env.id, deploymentId: dep.id };
}),
);

const targets = await Promise.all(maybeTargetsPromises).then((results) =>
results.filter(isPresent),
);

return targets.map((t) => ({ ...t, resourceId }));
},
);

export const upsertReleaseTargets = withSpan(
"upsertReleaseTargets",
async (span, db: Tx, resource: SCHEMA.Resource) => {
span.setAttribute("resource.id", resource.id);
span.setAttribute("resource.name", resource.name);
span.setAttribute("workspace.id", resource.workspaceId);

const workspace = await db.query.workspace.findFirst({
where: eq(SCHEMA.workspace.id, resource.workspaceId),
with: { systems: { with: { environments: true, deployments: true } } },
});
if (workspace == null) throw new Error("Workspace not found");
const rows = await db
.select()
.from(SCHEMA.computedEnvironmentResource)
.innerJoin(
SCHEMA.environment,
eq(
SCHEMA.computedEnvironmentResource.environmentId,
SCHEMA.environment.id,
),
)
.innerJoin(
SCHEMA.deployment,
eq(SCHEMA.deployment.systemId, SCHEMA.environment.systemId),
)
.leftJoin(
SCHEMA.computedDeploymentResource,
and(
eq(
SCHEMA.computedDeploymentResource.deploymentId,
SCHEMA.deployment.id,
),
eq(SCHEMA.computedDeploymentResource.resourceId, resource.id),
),
)
.where(eq(SCHEMA.computedEnvironmentResource.resourceId, resource.id));

const releaseTargetInserts = await Promise.all(
workspace.systems.map((system) =>
getReleaseTargetInsertsForSystem(db, resource.id, system),
),
).then((results) => results.flat());
const targets = rows
.filter(
(r) =>
r.deployment.resourceSelector == null ||
r.computed_deployment_resource != null,
)
.map((r) => ({
environmentId: r.environment.id,
deploymentId: r.deployment.id,
resourceId: resource.id,
}));

if (releaseTargetInserts.length === 0) return [];
if (targets.length === 0) return [];
return db
.insert(SCHEMA.releaseTarget)
.values(releaseTargetInserts)
.values(targets)
.onConflictDoNothing()
.returning();
},
Expand Down
2 changes: 2 additions & 0 deletions apps/event-worker/src/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { newDeploymentWorker } from "./new-deployment.js";
import { newResourceWorker } from "./new-resource.js";
import { resourceScanWorker } from "./resource-scan/index.js";
import { updateDeploymentVariableWorker } from "./update-deployment-variable.js";
import { updateDeploymentWorker } from "./update-deployment.js";
import { updateEnvironmentWorker } from "./update-environment.js";
import { updateResourceVariableWorker } from "./update-resource-variable.js";
import { updatedResourceWorker } from "./updated-resources/index.js";
Expand All @@ -23,6 +24,7 @@ export const workers: Workers<keyof ChannelMap> = {
[Channel.NewDeploymentVersion]: newDeploymentVersionWorker,
[Channel.NewEnvironment]: null,
[Channel.UpdateEnvironment]: updateEnvironmentWorker,
[Channel.UpdateDeployment]: updateDeploymentWorker,
[Channel.UpdateDeploymentVariable]: updateDeploymentVariableWorker,
[Channel.UpdateResourceVariable]: updateResourceVariableWorker,
[Channel.EvaluateReleaseTarget]: evaluateReleaseTargetWorker,
Expand Down
17 changes: 17 additions & 0 deletions apps/event-worker/src/workers/update-deployment.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import _ from "lodash";

import { selector } from "@ctrlplane/db";
import { Channel, createWorker } from "@ctrlplane/events";

export const updateDeploymentWorker = createWorker(
Channel.UpdateDeployment,
async ({ data }) => {
const { oldSelector, resourceSelector } = data;
if (_.isEqual(oldSelector, resourceSelector)) return;
await selector()
.compute()
.deployments([data.id])
.resourceSelectors()
.replace();
},
);
8 changes: 7 additions & 1 deletion apps/event-worker/src/workers/update-environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { ResourceCondition } from "@ctrlplane/validators/resources";
import _ from "lodash";
import { isPresent } from "ts-is-present";

import { and, eq, inArray, isNull } from "@ctrlplane/db";
import { and, eq, inArray, isNull, selector } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";
Expand Down Expand Up @@ -200,6 +200,12 @@ export const updateEnvironmentWorker = createWorker(
return;
}

await selector(db)
.compute()
.environments([environment.id])
.resourceSelectors()
.replace();

const { workspaceId, deployments } = system;

const { newlyMatchedResources, unmatchedResources } =
Expand Down
14 changes: 13 additions & 1 deletion apps/pty-proxy/src/controller/agent-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type WebSocket from "ws";
import type { MessageEvent } from "ws";

import { can, getUser } from "@ctrlplane/auth/utils";
import { eq, upsertResources } from "@ctrlplane/db";
import { eq, selector, upsertResources } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel, getQueue } from "@ctrlplane/events";
Expand Down Expand Up @@ -138,6 +138,18 @@ export class AgentSocket {
]);
const res = all.at(0);
if (res == null) throw new Error("Failed to create resource");
await Promise.all([
selector(db)
.compute()
.allEnvironments(this.workspaceId)
.resourceSelectors()
.replace(),
selector(db)
.compute()
.allDeployments(this.workspaceId)
.resourceSelectors()
.replace(),
]);
await getQueue(Channel.UpdatedResource).add(res.id, res);
this.resource = res;
agents.set(res.id, { lastSync: new Date(), agent: this });
Expand Down
15 changes: 13 additions & 2 deletions apps/webservice/src/app/api/v1/resources/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type * as schema from "@ctrlplane/db/schema";
import { NextResponse } from "next/server";
import { z } from "zod";

import { upsertResources } from "@ctrlplane/db";
import { selector, upsertResources } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import { createResource } from "@ctrlplane/db/schema";
import { Channel, getQueue } from "@ctrlplane/events";
Expand Down Expand Up @@ -74,7 +74,18 @@ export const POST = request()
data: r,
}));
const updateJobs = updatedResources.map((r) => ({ name: r.id, data: r }));

await Promise.all([
selector(db)
.compute()
.allEnvironments(workspaceId)
.resourceSelectors()
.replace(),
selector(db)
.compute()
.allDeployments(workspaceId)
.resourceSelectors()
.replace(),
]);
await Promise.all([
getQueue(Channel.NewResource).addBulk(insertJobs),
getQueue(Channel.UpdatedResource).addBulk(updateJobs),
Expand Down
35 changes: 35 additions & 0 deletions packages/db/drizzle/0089_robust_diamondback.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
CREATE TABLE IF NOT EXISTS "deployment_selector_computed_resource" (
"deployment_id" uuid NOT NULL,
"resource_id" uuid NOT NULL,
CONSTRAINT "deployment_selector_computed_resource_deployment_id_resource_id_pk" PRIMARY KEY("deployment_id","resource_id")
);
--> statement-breakpoint
CREATE TABLE IF NOT EXISTS "environment_selector_computed_resource" (
"environment_id" uuid NOT NULL,
"resource_id" uuid NOT NULL,
CONSTRAINT "environment_selector_computed_resource_environment_id_resource_id_pk" PRIMARY KEY("environment_id","resource_id")
);
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "deployment_selector_computed_resource" ADD CONSTRAINT "deployment_selector_computed_resource_deployment_id_deployment_id_fk" FOREIGN KEY ("deployment_id") REFERENCES "public"."deployment"("id") ON DELETE cascade ON UPDATE no action;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "deployment_selector_computed_resource" ADD CONSTRAINT "deployment_selector_computed_resource_resource_id_resource_id_fk" FOREIGN KEY ("resource_id") REFERENCES "public"."resource"("id") ON DELETE cascade ON UPDATE no action;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "environment_selector_computed_resource" ADD CONSTRAINT "environment_selector_computed_resource_environment_id_environment_id_fk" FOREIGN KEY ("environment_id") REFERENCES "public"."environment"("id") ON DELETE cascade ON UPDATE no action;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "environment_selector_computed_resource" ADD CONSTRAINT "environment_selector_computed_resource_resource_id_resource_id_fk" FOREIGN KEY ("resource_id") REFERENCES "public"."resource"("id") ON DELETE cascade ON UPDATE no action;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
37 changes: 37 additions & 0 deletions packages/db/drizzle/0090_organic_sinister_six.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
CREATE TABLE IF NOT EXISTS "computed_deployment_resource" (
"deployment_id" uuid NOT NULL,
"resource_id" uuid NOT NULL,
CONSTRAINT "computed_deployment_resource_deployment_id_resource_id_pk" PRIMARY KEY("deployment_id","resource_id")
);
--> statement-breakpoint
CREATE TABLE IF NOT EXISTS "computed_environment_resource" (
"environment_id" uuid NOT NULL,
"resource_id" uuid NOT NULL,
CONSTRAINT "computed_environment_resource_environment_id_resource_id_pk" PRIMARY KEY("environment_id","resource_id")
);
--> statement-breakpoint
DROP TABLE "deployment_selector_computed_resource";--> statement-breakpoint
DROP TABLE "environment_selector_computed_resource";--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "computed_deployment_resource" ADD CONSTRAINT "computed_deployment_resource_deployment_id_deployment_id_fk" FOREIGN KEY ("deployment_id") REFERENCES "public"."deployment"("id") ON DELETE cascade ON UPDATE no action;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "computed_deployment_resource" ADD CONSTRAINT "computed_deployment_resource_resource_id_resource_id_fk" FOREIGN KEY ("resource_id") REFERENCES "public"."resource"("id") ON DELETE cascade ON UPDATE no action;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "computed_environment_resource" ADD CONSTRAINT "computed_environment_resource_environment_id_environment_id_fk" FOREIGN KEY ("environment_id") REFERENCES "public"."environment"("id") ON DELETE cascade ON UPDATE no action;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "computed_environment_resource" ADD CONSTRAINT "computed_environment_resource_resource_id_resource_id_fk" FOREIGN KEY ("resource_id") REFERENCES "public"."resource"("id") ON DELETE cascade ON UPDATE no action;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
Loading
Loading