Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/event-worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ COPY packages/db/package.json ./packages/db/package.json
COPY packages/validators/package.json ./packages/validators/package.json
COPY packages/logger/package.json ./packages/logger/package.json
COPY packages/job-dispatch/package.json ./packages/job-dispatch/package.json
COPY packages/release-manager/package.json ./packages/release-manager/package.json
COPY packages/rule-engine/package.json ./packages/rule-engine/package.json
COPY packages/secrets/package.json ./packages/secrets/package.json

COPY apps/event-worker/package.json ./apps/event-worker/package.json
Expand Down
1 change: 1 addition & 0 deletions apps/event-worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"js-yaml": "^4.1.0",
"lodash": "catalog:",
"ms": "^2.1.3",
"redis-semaphore": "^5.6.2",
"semver": "catalog:",
"ts-is-present": "^1.2.2",
"uuid": "^10.0.0",
Expand Down
14 changes: 10 additions & 4 deletions apps/event-worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@ import { logger } from "@ctrlplane/logger";

import { createDispatchExecutionJobWorker } from "./job-dispatch/index.js";
import { redis } from "./redis.js";
import { createReleaseNewVersionWorker } from "./releases/new-version/index.js";
import { createResourceScanWorker } from "./resource-scan/index.js";

const resourceScanWorker = createResourceScanWorker();
const dispatchExecutionJobWorker = createDispatchExecutionJobWorker();
const releaseNewVersionWorker = createReleaseNewVersionWorker();

const shutdown = () => {
logger.warn("Exiting...");
resourceScanWorker.close();
dispatchExecutionJobWorker.close();
redis.quit();
process.exit(0);
Promise.all([
resourceScanWorker.close(),
dispatchExecutionJobWorker.close(),
releaseNewVersionWorker.close(),
]).then(async () => {
await redis.quit();
process.exit(0);
});
};

process.on("SIGTERM", shutdown);
Expand Down
30 changes: 19 additions & 11 deletions apps/event-worker/src/releases/evaluate/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,30 @@ import { evaluate } from "@ctrlplane/rule-engine";
import { createCtx, getApplicablePolicies } from "@ctrlplane/rule-engine/db";
import { Channel } from "@ctrlplane/validators/events";

import { ReleaseRepositoryMutex } from "../mutex.js";

export const createReleaseEvaluateWorker = () =>
new Worker<ReleaseEvaluateEvent>(Channel.ReleaseEvaluate, async (job) => {
job.log(
`Evaluating release for deployment ${job.data.deploymentId} and resource ${job.data.resourceId}`,
);

const ctx = await createCtx(db, job.data);
if (ctx == null) {
job.log(
`Resource ${job.data.resourceId} not found for deployment ${job.data.deploymentId} and environment ${job.data.environmentId}`,
);
return;
}
const mutex = await ReleaseRepositoryMutex.lock(job.data);

const { workspaceId } = ctx.resource;
const policy = await getApplicablePolicies(db, workspaceId, job.data);
const result = await evaluate(policy, [], ctx);
console.log(result);
try {
const ctx = await createCtx(db, job.data);
if (ctx == null) {
job.log(
`Resource ${job.data.resourceId} not found for deployment ${job.data.deploymentId} and environment ${job.data.environmentId}`,
);
return;
}

const { workspaceId } = ctx.resource;
const policy = await getApplicablePolicies(db, workspaceId, job.data);
const result = await evaluate(policy, [], ctx);
console.log(result);
} finally {
await mutex.unlock();
}
});
30 changes: 30 additions & 0 deletions apps/event-worker/src/releases/mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import type { ReleaseRepository } from "@ctrlplane/rule-engine";
import type { Mutex as RedisMutex } from "redis-semaphore";
import { Mutex as RedisSemaphoreMutex } from "redis-semaphore";

import { redis } from "../redis.js";

export class ReleaseRepositoryMutex {
static async lock(repo: ReleaseRepository) {
const mutex = new ReleaseRepositoryMutex(repo);
await mutex.lock();
return mutex;
}

private mutex: RedisMutex;

constructor(repo: ReleaseRepository) {
const key = `release-repository-mutex-${repo.deploymentId}-${repo.resourceId}-${repo.environmentId}`;
this.mutex = new RedisSemaphoreMutex(redis, key, {});
}

async lock(): Promise<void> {
if (this.mutex.isAcquired) throw new Error("Mutex is already locked");
await this.mutex.acquire();
}

async unlock(): Promise<void> {
if (!this.mutex.isAcquired) throw new Error("Mutex is not locked");
await this.mutex.release();
}
}
26 changes: 26 additions & 0 deletions apps/event-worker/src/releases/new-version/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { ReleaseNewVersionEvent } from "@ctrlplane/validators/events";
import { Worker } from "bullmq";
import _ from "lodash";

import { eq } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel } from "@ctrlplane/validators/events";

import { getSystemResources } from "./system-resources.js";

export const createReleaseNewVersionWorker = () =>
new Worker<ReleaseNewVersionEvent>(Channel.ReleaseNewVersion, async (job) => {
const version = await db.query.deploymentVersion.findFirst({
where: eq(schema.deploymentVersion.id, job.data.versionId),
with: { deployment: true },
});

if (version == null) throw new Error("Version not found");

const { deployment } = version;
const { systemId } = deployment;

const impactedResources = await getSystemResources(db, systemId);
console.log(impactedResources.length);
});
37 changes: 37 additions & 0 deletions apps/event-worker/src/releases/new-version/system-resources.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import type { Tx } from "@ctrlplane/db";
import _ from "lodash";

import { and, eq } from "@ctrlplane/db";
import * as schema from "@ctrlplane/db/schema";

/**
* Retrieves all resources for a given system by using environment selectors
*/
export const getSystemResources = async (tx: Tx, systemId: string) => {
const system = await tx.query.system.findFirst({
where: eq(schema.system.id, systemId),
with: { environments: true },
});

if (system == null) throw new Error("System not found");

const { environments } = system;

// Simplify the chained operations with standard Promise.all
const resources = await Promise.all(
environments.map(async (env) => {
const res = await tx
.select()
.from(schema.resource)
.where(
and(
eq(schema.resource.workspaceId, system.workspaceId),
schema.resourceMatchesMetadata(tx, env.resourceSelector),
),
);
return res.map((r) => ({ ...r, environment: env }));
}),
).then((arrays) => arrays.flat());

return resources;
};
Empty file.
43 changes: 42 additions & 1 deletion packages/db/src/schema/deployment-variables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { ResourceCondition } from "@ctrlplane/validators/resources";
import type { VariableConfigType } from "@ctrlplane/validators/variables";
import type { InferInsertModel, InferSelectModel } from "drizzle-orm";
import type { AnyPgColumn, ColumnsWithTable } from "drizzle-orm/pg-core";
import { sql } from "drizzle-orm";
import { relations, sql } from "drizzle-orm";
import {
foreignKey,
jsonb,
Expand Down Expand Up @@ -116,3 +116,44 @@ export const deploymentVariableSet = pgTable(
},
(t) => ({ uniq: uniqueIndex().on(t.deploymentId, t.variableSetId) }),
);

export const deploymentVariableRelationships = relations(
deploymentVariable,
({ one, many }) => ({
deployment: one(deployment, {
fields: [deploymentVariable.deploymentId],
references: [deployment.id],
}),

defaultValue: one(deploymentVariableValue, {
fields: [deploymentVariable.defaultValueId],
references: [deploymentVariableValue.id],
}),

values: many(deploymentVariableValue),
}),
);

export const deploymentVariableValueRelationships = relations(
deploymentVariableValue,
({ one }) => ({
variable: one(deploymentVariable, {
fields: [deploymentVariableValue.variableId],
references: [deploymentVariable.id],
}),
}),
);

export const deploymentVariableSetRelationships = relations(
deploymentVariableSet,
({ one }) => ({
deployment: one(deployment, {
fields: [deploymentVariableSet.deploymentId],
references: [deployment.id],
}),
variableSet: one(variableSet, {
fields: [deploymentVariableSet.variableSetId],
references: [variableSet.id],
}),
}),
);
48 changes: 48 additions & 0 deletions packages/db/src/schema/deployment-version.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
not,
notExists,
or,
relations,
sql,
} from "drizzle-orm";
import {
Expand Down Expand Up @@ -286,3 +287,50 @@ export function deploymentVersionMatchesCondition(
? undefined
: buildCondition(tx, condition);
}

export const deploymentVersionRelations = relations(
deploymentVersion,
({ one, many }) => ({
deployment: one(deployment, {
fields: [deploymentVersion.deploymentId],
references: [deployment.id],
}),
metadata: many(deploymentVersionMetadata),
dependencies: many(versionDependency),
channels: many(deploymentVersionChannel),
}),
);

export const deploymentVersionChannelRelations = relations(
deploymentVersionChannel,
({ one }) => ({
deployment: one(deployment, {
fields: [deploymentVersionChannel.deploymentId],
references: [deployment.id],
}),
}),
);

export const versionDependencyRelations = relations(
versionDependency,
({ one }) => ({
version: one(deploymentVersion, {
fields: [versionDependency.versionId],
references: [deploymentVersion.id],
}),
deployment: one(deployment, {
fields: [versionDependency.deploymentId],
references: [deployment.id],
}),
}),
);

export const deploymentVersionMetadataRelations = relations(
deploymentVersionMetadata,
({ one }) => ({
version: one(deploymentVersion, {
fields: [deploymentVersionMetadata.versionId],
references: [deploymentVersion.id],
}),
}),
);
43 changes: 43 additions & 0 deletions packages/db/src/schema/release.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { relations } from "drizzle-orm";
import {
boolean,
json,
Expand Down Expand Up @@ -61,3 +62,45 @@ export const releaseJob = pgTable("release_job", {
.notNull()
.defaultNow(),
});

export const releaseRelations = relations(release, ({ one, many }) => ({
version: one(deploymentVersion, {
fields: [release.versionId],
references: [deploymentVersion.id],
}),
resource: one(resource, {
fields: [release.resourceId],
references: [resource.id],
}),
deployment: one(deployment, {
fields: [release.deploymentId],
references: [deployment.id],
}),
environment: one(environment, {
fields: [release.environmentId],
references: [environment.id],
}),
variables: many(releaseVariable),
jobs: many(releaseJob),
}));

export const releaseVariableRelations = relations(
releaseVariable,
({ one }) => ({
release: one(release, {
fields: [releaseVariable.releaseId],
references: [release.id],
}),
}),
);

export const releaseJobRelations = relations(releaseJob, ({ one }) => ({
release: one(release, {
fields: [releaseJob.releaseId],
references: [release.id],
}),
job: one(job, {
fields: [releaseJob.jobId],
references: [job.id],
}),
}));
Loading
Loading