Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/event-worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ COPY packages/logger/package.json ./packages/logger/package.json
COPY packages/job-dispatch/package.json ./packages/job-dispatch/package.json
COPY packages/release-manager/package.json ./packages/release-manager/package.json
COPY packages/rule-engine/package.json ./packages/rule-engine/package.json
COPY packages/secrets/package.json ./packages/secrets/package.json
COPY packages/events/package.json ./packages/events/package.json

COPY apps/event-worker/package.json ./apps/event-worker/package.json

Expand Down
1 change: 1 addition & 0 deletions apps/event-worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"@azure/arm-containerservice": "^21.3.0",
"@azure/identity": "^4.5.0",
"@ctrlplane/db": "workspace:*",
"@ctrlplane/events": "workspace:*",
"@ctrlplane/job-dispatch": "workspace:*",
"@ctrlplane/logger": "workspace:*",
"@ctrlplane/release-manager": "workspace:*",
Expand Down
19 changes: 9 additions & 10 deletions apps/event-worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@ import { redis } from "./redis.js";
import { createReleaseNewVersionWorker } from "./releases/new-version/index.js";
import { createReleaseVariableChangeWorker } from "./releases/variable-change/index.js";
import { createResourceScanWorker } from "./resource-scan/index.js";
import { workers } from "./workers/index.js";

const resourceScanWorker = createResourceScanWorker();
const dispatchExecutionJobWorker = createDispatchExecutionJobWorker();
const releaseNewVersionWorker = createReleaseNewVersionWorker();
const releaseVariableChangeWorker = createReleaseVariableChangeWorker();
const allWorkers = [
createResourceScanWorker(),
createDispatchExecutionJobWorker(),
createReleaseNewVersionWorker(),
createReleaseVariableChangeWorker(),
...Object.values(workers),
];

const shutdown = () => {
logger.warn("Exiting...");
Promise.all([
resourceScanWorker.close(),
dispatchExecutionJobWorker.close(),
releaseNewVersionWorker.close(),
releaseVariableChangeWorker.close(),
]).then(async () => {
Promise.all(allWorkers.map((w) => w?.close())).then(async () => {
await redis.quit();
process.exit(0);
});
Expand Down
14 changes: 14 additions & 0 deletions apps/event-worker/src/workers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import type { ChannelMap } from "@ctrlplane/events";
import type { Worker } from "bullmq";

import { Channel } from "@ctrlplane/events";

type Workers<T extends keyof ChannelMap> = {
[K in T]: Worker<ChannelMap[K]> | null;
};

export const workers: Workers<keyof ChannelMap> = {
[Channel.NewDeployment]: null,
[Channel.NewEnvironment]: null,
[Channel.ReleaseEvaluate]: null,
};
67 changes: 67 additions & 0 deletions apps/event-worker/src/workers/new-deployment.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import type { Tx } from "@ctrlplane/db";
import type { ResourceCondition } from "@ctrlplane/validators/resources";
import _ from "lodash";

import { and, eq, isNull } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";

const getDeploymentResources = async (
tx: Tx,
deployment: {
id: string;
systemId: string;
resourceSelector?: ResourceCondition | null;
},
) => {
const system = await tx.query.system.findFirst({
where: eq(schema.system.id, deployment.systemId),
with: { environments: true },
});

if (system == null) throw new Error("System or deployment not found");

const { environments } = system;

// Simplify the chained operations with standard Promise.all
const resources = await Promise.all(
environments.map(async (env) => {
if (env.resourceSelector == null) return [];

const res = await tx
.select()
.from(schema.resource)
.where(
and(
eq(schema.resource.workspaceId, system.workspaceId),
isNull(schema.resource.deletedAt),
schema.resourceMatchesMetadata(tx, env.resourceSelector),
schema.resourceMatchesMetadata(tx, deployment.resourceSelector),
),
);
return res.map((r) => ({ ...r, environment: env }));
}),
).then((arrays) => arrays.flat());

return resources;
};

const evaluatedQueue = getQueue(Channel.ReleaseEvaluate);

export const newDeploymentWorker = createWorker(
Channel.NewDeployment,
async (job) => {
const resources = await getDeploymentResources(db, job.data);
const jobData = resources.map((r) => {
const resourceId = r.id;
const environmentId = r.environment.id;
const deploymentId = job.data.id;
return {
name: `${resourceId}-${environmentId}-${deploymentId}`,
data: { resourceId, environmentId, deploymentId },
};
});
await evaluatedQueue.addBulk(jobData);
},
);
2 changes: 2 additions & 0 deletions packages/events/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
dist/
node_modules/
1 change: 1 addition & 0 deletions packages/events/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Events
17 changes: 17 additions & 0 deletions packages/events/eslint.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import baseConfig, {
requireJsSuffix,
vitestEslintConfig,
} from "@ctrlplane/eslint-config/base";

/** @type {import('typescript-eslint').Config} */
export default [
{
ignores: ["dist/**"],
rules: {
"@typescript-eslint/require-await": "off",
},
},
...vitestEslintConfig,
...requireJsSuffix,
...baseConfig,
];
40 changes: 40 additions & 0 deletions packages/events/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"name": "@ctrlplane/events",
"private": true,
"version": "0.1.0",
"type": "module",
"exports": {
".": {
"types": "./src/index.ts",
"default": "./dist/index.js"
}
},
"license": "MIT",
"scripts": {
"build": "tsc",
"dev": "tsc --watch",
"clean": "rm -rf .turbo node_modules",
"format": "prettier --check . --ignore-path ../../.gitignore",
"lint": "eslint",
"typecheck": "tsc --noEmit --emitDeclarationOnly false"
},
"dependencies": {
"@ctrlplane/db": "workspace:*",
"@ctrlplane/logger": "workspace:*",
"@t3-oss/env-core": "catalog:",
"bullmq": "catalog:",
"date-fns": "^4.1.0",
"lodash": "catalog:"
},
"devDependencies": {
"@ctrlplane/eslint-config": "workspace:*",
"@ctrlplane/prettier-config": "workspace:*",
"@ctrlplane/tsconfig": "workspace:*",
"@types/lodash": "catalog:",
"@types/node": "catalog:node22",
"eslint": "catalog:",
"prettier": "catalog:",
"typescript": "catalog:"
},
"prettier": "@ctrlplane/prettier-config"
}
14 changes: 14 additions & 0 deletions packages/events/src/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { createEnv } from "@t3-oss/env-core";
import { z } from "zod";

export const env = createEnv({
server: {
REDIS_URL: z.string(),
},
runtimeEnv: process.env,
emptyStringAsUndefined: true,
skipValidation:
!!process.env.CI ||
!!process.env.SKIP_ENV_VALIDATION ||
process.env.npm_lifecycle_event === "lint",
});
30 changes: 30 additions & 0 deletions packages/events/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import type { Job, WorkerOptions } from "bullmq";
import { Queue, Worker } from "bullmq";

import type { ChannelMap } from "./types.js";
import { bullmqRedis } from "./redis.js";

export const createWorker = <T extends keyof ChannelMap>(
name: T,
handler: (job: Job<ChannelMap[T]>) => Promise<void>,
opts?: WorkerOptions,
) =>
new Worker(String(name), handler, {
connection: bullmqRedis,
removeOnComplete: { age: 1 * 60 * 60, count: 5000 },
removeOnFail: { age: 12 * 60 * 60, count: 5000 },
concurrency: 100,
...opts,
});

const _queues = new Map<keyof ChannelMap, Queue>();
export const getQueue = <T extends keyof ChannelMap>(name: T) => {
if (!_queues.has(name)) {
_queues.set(name, new Queue(String(name), { connection: bullmqRedis }));
}

return _queues.get(name) as Queue<ChannelMap[T]>;
};

export * from "./types.js";
export * from "./redis.js";
7 changes: 7 additions & 0 deletions packages/events/src/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import IORedis from "ioredis";

import { env } from "./config.js";

export const bullmqRedis = new IORedis(env.REDIS_URL, {
maxRetriesPerRequest: null,
});
29 changes: 29 additions & 0 deletions packages/events/src/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import type * as schema from "@ctrlplane/db/schema";

export enum Channel {
JobSync = "job-sync",
DispatchJob = "dispatch-job",
ResourceScan = "resource-scan",

ReleaseNewVersion = "release-new-version",
ReleaseNewRepository = "release-new-repository",
ReleaseVariableChange = "release-variable-change",

NewDeployment = "new-deployment",
NewEnvironment = "new-environment",
NewRelease = "new-release",
ReleaseEvaluate = "release-evaluate",
}

export type ReleaseEvaluateJobData = {
environmentId: string;
resourceId: string;
deploymentId: string;
};

export type ChannelMap = {
// [Channel.UpsertRelease]: typeof schema.release.$inferInsert;
[Channel.NewDeployment]: typeof schema.deployment.$inferSelect;
[Channel.NewEnvironment]: typeof schema.environment.$inferSelect;
[Channel.ReleaseEvaluate]: ReleaseEvaluateJobData;
};
12 changes: 12 additions & 0 deletions packages/events/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"extends": "@ctrlplane/tsconfig/internal-package.json",
"compilerOptions": {
"outDir": "dist",
"baseUrl": ".",
"incremental": true,
"lib": ["es2017"],
"tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json"
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}
1 change: 1 addition & 0 deletions packages/validators/src/events/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export enum Channel {
ResourceScan = "resource-scan",
ReleaseEvaluate = "release-evaluate",
ReleaseNewVersion = "release-new-version",
ReleaseNewRepository = "release-new-repository",
ReleaseVariableChange = "release-variable-change",
}

Expand Down
51 changes: 50 additions & 1 deletion pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading