Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apps/event-worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
"cron": "^3.1.7",
"dotenv": "^16.4.5",
"google-auth-library": "^9.13.0",
"ioredis": "^5.4.1",
"ioredis": "catalog:",
"js-yaml": "^4.1.0",
"lodash": "catalog:",
"ms": "^2.1.3",
"redis-semaphore": "^5.6.2",
"ms": "catalog:",
"redis-semaphore": "catalog:",
"semver": "catalog:",
"ts-is-present": "^1.2.2",
"uuid": "^10.0.0",
Expand All @@ -59,7 +59,7 @@
"@ctrlplane/tsconfig": "workspace:*",
"@types/js-yaml": "^4.0.9",
"@types/lodash": "catalog:",
"@types/ms": "^0.7.34",
"@types/ms": "catalog:",
"@types/node": "catalog:node22",
"@types/semver": "^7.5.8",
"@types/uuid": "^10.0.0",
Expand Down
4 changes: 2 additions & 2 deletions apps/pty-proxy/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"dotenv": "^16.4.5",
"express": "^4.19.2",
"helmet": "^7.1.0",
"ms": "^2.1.3",
"ms": "catalog:",
"next-auth": "catalog:",
"uuid": "^10.0.0",
"ws": "^8.17.0",
Expand All @@ -37,7 +37,7 @@
"@types/cors": "^2.8.17",
"@types/express": "^4.17.21",
"@types/lodash": "catalog:",
"@types/ms": "^0.7.34",
"@types/ms": "catalog:",
"@types/node": "catalog:node22",
"@types/uuid": "^10.0.0",
"@types/ws": "^8.5.10",
Expand Down
8 changes: 5 additions & 3 deletions apps/pty-proxy/src/controller/agent-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,13 @@ export class AgentSocket {
"name" | "version" | "kind" | "identifier" | "workspaceId"
>,
) {
const all = await upsertResources(db, [
const all = await upsertResources(db, this.workspaceId, [
{
...resource,
name: this.name,
version: "ctrlplane.access/v1",
kind: "AccessNode",
identifier: `ctrlplane/access/access-node/${this.name}`,
workspaceId: this.workspaceId,
updatedAt: new Date(),
},
]);
Expand All @@ -141,7 +140,10 @@ export class AgentSocket {

await getQueue(Channel.UpdatedResource).add(res.id, res);

this.resource = res;
const metadata = Object.fromEntries(
res.metadata.map((m) => [m.key, m.value]),
);
this.resource = { ...res, metadata };
agents.set(res.id, { lastSync: new Date(), agent: this });
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,6 @@ export const EditDeploymentFlow: React.FC<{
);
const versions = versionsQ.data?.items ?? [];

const errors = form.formState.errors;

return (
<Form {...form}>
<form onSubmit={onSubmit} className="space-y-8">
Expand Down
2 changes: 1 addition & 1 deletion e2e/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"@types/node": "^22.13.10",
"@types/uuid": "^10.0.0",
"eslint": "catalog:",
"ms": "^2.1.3",
"ms": "catalog:",
"openapi-fetch": "^0.13.0",
"prettier": "catalog:"
},
Expand Down
4 changes: 2 additions & 2 deletions packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"googleapis": "^144.0.0",
"js-yaml": "^4.1.0",
"lodash": "catalog:",
"ms": "^2.1.3",
"ms": "catalog:",
"rrule": "^2.8.1",
"superjson": "catalog:",
"ts-is-present": "catalog:",
Expand All @@ -58,7 +58,7 @@
"@types/bcryptjs": "^2.4.6",
"@types/js-yaml": "^4.0.9",
"@types/lodash": "catalog:",
"@types/ms": "^0.7.34",
"@types/ms": "catalog:",
"@types/pg": "^8.11.12",
"@types/semver": "^7.5.8",
"@types/uuid": "^10.0.0",
Expand Down
4 changes: 4 additions & 0 deletions packages/db/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@
"@t3-oss/env-core": "catalog:",
"drizzle-orm": "^0.37.0",
"drizzle-zod": "^0.5.1",
"ioredis": "catalog:",
"lodash": "catalog:",
"ms": "catalog:",
"pg": "^8.14.1",
"redis-semaphore": "catalog:",
"rrule": "^2.8.1",
"zod": "catalog:"
},
Expand All @@ -56,6 +59,7 @@
"@ctrlplane/prettier-config": "workspace:*",
"@ctrlplane/tsconfig": "workspace:*",
"@types/lodash": "catalog:",
"@types/ms": "catalog:",
"@types/node": "catalog:node22",
"@types/pg": "^8.11.12",
"dotenv-cli": "catalog:",
Expand Down
2 changes: 2 additions & 0 deletions packages/db/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export const env = createEnv({
.string()
.default("50")
.transform((value) => parseInt(value)),

REDIS_URL: z.string().url().default("redis://127.0.0.1:6379"),
},
runtimeEnv: process.env,
emptyStringAsUndefined: true,
Expand Down
13 changes: 13 additions & 0 deletions packages/db/src/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import IORedis from "ioredis";

import { logger } from "@ctrlplane/logger";

import { env } from "./config.js";

const log = logger.child({ module: "redis" });

const config = { maxRetriesPerRequest: null };
export const redis = new IORedis(env.REDIS_URL, config);

redis.on("connect", () => log.info("Redis connected"));
redis.on("error", (err) => log.error("Redis error", { err }));
102 changes: 74 additions & 28 deletions packages/db/src/selectors/compute/deployment-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ import {
isNull,
} from "drizzle-orm/pg-core/expressions";

import { logger } from "@ctrlplane/logger";

import type { Tx } from "../../common.js";
import * as SCHEMA from "../../schema/index.js";
import { QueryBuilder } from "../query/builder.js";
import { createAndAcquireMutex, SelectorComputeType } from "./mutex.js";

const log = logger.child({ module: "deployment-builder" });

export class DeploymentBuilder {
constructor(
Expand All @@ -31,11 +36,15 @@ export class DeploymentBuilder {
);
}

private async findMatchingResourcesForDeployments(tx: Tx) {
const deployments = await tx.query.deployment.findMany({
private async getDeployments(tx: Tx) {
return tx.query.deployment.findMany({
where: inArray(SCHEMA.deployment.id, this.deploymentIds),
with: { system: true },
});
}

private async findMatchingResourcesForDeployments(tx: Tx) {
const deployments = await this.getDeployments(tx);

const promises = deployments.map(async (d) => {
const { system } = d;
Expand All @@ -57,18 +66,40 @@ export class DeploymentBuilder {
return fulfilled.flat();
}

resourceSelectors() {
return this.tx.transaction(async (tx) => {
await this.deleteExistingComputedResources(tx);
const computedResourceInserts =
await this.findMatchingResourcesForDeployments(tx);
if (computedResourceInserts.length === 0) return [];
return tx
.insert(SCHEMA.computedDeploymentResource)
.values(computedResourceInserts)
.onConflictDoNothing()
.returning();
});
async resourceSelectors() {
const deployments = await this.getDeployments(this.tx);
if (deployments.length === 0) return [];
const workspaceIds = new Set(deployments.map((d) => d.system.workspaceId));
if (workspaceIds.size !== 1)
throw new Error("All deployments must be in the same workspace");
const workspaceId = Array.from(workspaceIds)[0]!;

const mutex = await createAndAcquireMutex(
SelectorComputeType.DeploymentBuilder,
workspaceId,
);

try {
return await this.tx.transaction(async (tx) => {
await this.deleteExistingComputedResources(tx);
const computedResourceInserts =
await this.findMatchingResourcesForDeployments(tx);
if (computedResourceInserts.length === 0) return [];
return tx
.insert(SCHEMA.computedDeploymentResource)
.values(computedResourceInserts)
.onConflictDoNothing()
.returning();
});
} catch (e) {
log.error("Error computing resource selectors", {
error: e,
workspaceId,
});
throw e;
} finally {
await mutex.unlock();
}
}
}

Expand Down Expand Up @@ -133,19 +164,34 @@ export class WorkspaceDeploymentBuilder {
}

async resourceSelectors() {
return this.tx.transaction(async (tx) => {
const deployments = await this.getDeploymentsInWorkspace(tx);
await this.deleteExistingComputedResources(tx, deployments);
const computedResourceInserts =
await this.findMatchingResourcesForDeployments(tx, deployments);

if (computedResourceInserts.length === 0) return [];

return tx
.insert(SCHEMA.computedDeploymentResource)
.values(computedResourceInserts)
.onConflictDoNothing()
.returning();
});
const mutex = await createAndAcquireMutex(
SelectorComputeType.DeploymentBuilder,
this.workspaceId,
);

try {
return await this.tx.transaction(async (tx) => {
const deployments = await this.getDeploymentsInWorkspace(tx);
await this.deleteExistingComputedResources(tx, deployments);
const computedResourceInserts =
await this.findMatchingResourcesForDeployments(tx, deployments);

if (computedResourceInserts.length === 0) return [];

return tx
.insert(SCHEMA.computedDeploymentResource)
.values(computedResourceInserts)
.onConflictDoNothing()
.returning();
});
} catch (e) {
log.error("Error computing resource selectors", {
error: e,
workspaceId: this.workspaceId,
});
throw e;
} finally {
await mutex.unlock();
}
}
}
Loading
Loading