Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 78 additions & 41 deletions apps/event-worker/src/workers/evaluate-release-target.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,99 +7,136 @@ import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";
import { logger } from "@ctrlplane/logger";
import {
DatabaseReleaseRepository,
evaluateRepository,
VariableReleaseManager,
VersionReleaseManager,
} from "@ctrlplane/rule-engine";

import { env } from "../config.js";
import { ReleaseTargetMutex } from "../releases/mutex.js";

const log = logger.child({
worker: "policy-evaluate",
});
const log = logger.child({ worker: "policy-evaluate" });

const createJobForRelease = async (tx: Tx, chosenReleaseId: string) => {
const release = await tx.query.release.findFirst({
where: eq(schema.release.id, chosenReleaseId),
const createRelease = async (
tx: Tx,
release: { id: string; versionReleaseId: string; variableReleaseId: string },
) => {
// Get version release and related data
const versionRelease = await tx.query.versionRelease.findFirst({
where: eq(schema.versionRelease.id, release.versionReleaseId),
with: {
variables: true,
version: { with: { deployment: { with: { jobAgent: true } } } },
},
});
if (!versionRelease) throw new Error("Failed to get release");

if (release == null) throw new Error("Failed to get release");
// Extract job agent info
const { jobAgent, jobAgentConfig: deploymentJobAgentConfig } =
versionRelease.version.deployment;
if (!jobAgent) throw new Error("Deployment has no Job Agent");

const { version } = release;
const { deployment } = version;
const { jobAgent, jobAgentConfig: deploymentJobAgentConfig } = deployment;
if (jobAgent == null) throw new Error("Deployment has no Job Agent");

const jobAgentId = jobAgent.id;
const jobAgentConfig = _.merge(jobAgent.config, deploymentJobAgentConfig);

// Get variable release data
const variableRelease = await tx.query.variableSetRelease.findFirst({
where: eq(schema.variableSetRelease.id, release.variableReleaseId),
with: { values: { with: { variableValueSnapshot: true } } },
});
if (!variableRelease) throw new Error("Failed to get variable release");

// Create job
const job = await tx
.insert(schema.job)
.values({
jobAgentId,
jobAgentId: jobAgent.id,
jobAgentConfig,
status: "pending",
reason: "policy_passing",
})
.returning()
.then(takeFirst);

if (release.variables.length > 0)
// Add job variables if any exist
if (variableRelease.values.length > 0) {
await tx.insert(schema.jobVariable).values(
release.variables.map((v) => ({
variableRelease.values.map((v) => ({
jobId: job.id,
key: v.key,
sensitive: v.sensitive,
value: v.value,
key: v.variableValueSnapshot.key,
sensitive: v.variableValueSnapshot.sensitive,
value: v.variableValueSnapshot.value,
Comment on lines +61 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential security concern for sensitive values.
Storing sensitive variables in plain text might pose risks. Consider encrypting or masking sensitive values in the database or restricting access to these fields.

 // Example approach for encryption (pseudo-code illustration)
- value: v.variableValueSnapshot.value,
+ value: encrypt(v.variableValueSnapshot.value),

Also applies to: 68-68

})),
);
}

// Create release record
await tx.insert(schema.releaseJob).values({
releaseId: release.id,
jobId: job.id,
releaseId: chosenReleaseId,
});

return job;
};

const handleVersionRelease = async (releaseTarget: any) => {
const vrm = new VersionReleaseManager(db, {
...releaseTarget,
workspaceId: releaseTarget.resource.workspaceId,
});

const { chosenCandidate } = await vrm.evaluate();
if (!chosenCandidate) throw new Error("Failed to get chosen release");

const { release: versionRelease } = await vrm.upsertRelease(
chosenCandidate.id,
);

return versionRelease;
};

const handleVariableRelease = async (releaseTarget: any) => {
const varrm = new VariableReleaseManager(db, {
...releaseTarget,
workspaceId: releaseTarget.resource.workspaceId,
});
Comment on lines +95 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add proper typing to function parameter

Similar to handleVersionRelease, this function uses any for the releaseTarget parameter, which reduces type safety. Use the same interface to ensure consistent typing across functions.

-const handleVariableRelease = async (releaseTarget: any) => {
+const handleVariableRelease = async (releaseTarget: schema.ReleaseTarget & {
+  resource: schema.Resource;
+  environment: schema.Environment;
+  deployment: schema.Deployment;
+}) => {
  const varrm = new VariableReleaseManager(db, {
    ...releaseTarget,
    workspaceId: releaseTarget.resource.workspaceId,
  });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const handleVariableRelease = async (releaseTarget: any) => {
const varrm = new VariableReleaseManager(db, {
...releaseTarget,
workspaceId: releaseTarget.resource.workspaceId,
});
const handleVariableRelease = async (releaseTarget: schema.ReleaseTarget & {
resource: schema.Resource;
environment: schema.Environment;
deployment: schema.Deployment;
}) => {
const varrm = new VariableReleaseManager(db, {
...releaseTarget,
workspaceId: releaseTarget.resource.workspaceId,
});


const { chosenCandidate: variableValues } = await varrm.evaluate();
const { release: variableRelease } =
await varrm.upsertRelease(variableValues);

return variableRelease;
};

export const evaluateReleaseTarget = createWorker(
Channel.EvaluateReleaseTarget,
async (job) => {
const mutex = await ReleaseTargetMutex.lock(job.data);

try {
// Get release target
const releaseTarget = await db.query.releaseTarget.findFirst({
where: and(
eq(schema.releaseTarget.resourceId, job.data.resourceId),
eq(schema.releaseTarget.environmentId, job.data.environmentId),
eq(schema.releaseTarget.deploymentId, job.data.deploymentId),
),
with: {
resource: true,
environment: true,
deployment: true,
},
with: { resource: true, environment: true, deployment: true },
});
if (releaseTarget == null)
throw new Error("Failed to get release target");
if (!releaseTarget) throw new Error("Failed to get release target");

const releaseRepository = await DatabaseReleaseRepository.create({
...releaseTarget,
workspaceId: releaseTarget.resource.workspaceId,
});
const versionRelease = await handleVersionRelease(releaseTarget);
const variableRelease = await handleVariableRelease(releaseTarget);

const { chosenRelease } = await evaluateRepository(releaseRepository);
if (chosenRelease == null)
throw new Error("Failed to get chosen release");
const release = await db
.insert(schema.release)
.values({
versionReleaseId: versionRelease.id,
variableReleaseId: variableRelease.id,
})
.onConflictDoNothing()
.returning()
.then(takeFirst);

if (env.NODE_ENV === "development") {
// In development dispatch the job immediately
const job = await db.transaction((tx) =>
createJobForRelease(tx, chosenRelease.id),
);
const job = await db.transaction((tx) => createRelease(tx, release));
getQueue(Channel.DispatchJob).add(job.id, { jobId: job.id });
}
} catch (e) {
Expand Down
34 changes: 17 additions & 17 deletions apps/event-worker/src/workers/job-dispatch/github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const getGithubEntity = async (
eq(SCHEMA.releaseJobTrigger.jobId, jobId),
),
)
.then(takeFirstOrNull);
.then(takeFirstOrNull)
.then((r) => r?.github_entity);

const runbookGhEntityPromise = db
.select()
Expand All @@ -69,18 +70,23 @@ const getGithubEntity = async (
eq(SCHEMA.runbookJobTrigger.jobId, jobId),
),
)
.then(takeFirstOrNull);
.then(takeFirstOrNull)
.then((r) => r?.github_entity);

const releaseJobEntityPromise = db
.select()
.from(SCHEMA.releaseJob)
.from(SCHEMA.release)
.innerJoin(
SCHEMA.releaseJob,
eq(SCHEMA.release.id, SCHEMA.releaseJob.releaseId),
)
.innerJoin(
SCHEMA.release,
eq(SCHEMA.releaseJob.releaseId, SCHEMA.release.id),
SCHEMA.versionRelease,
eq(SCHEMA.release.versionReleaseId, SCHEMA.versionRelease.id),
)
.innerJoin(
SCHEMA.releaseTarget,
eq(SCHEMA.release.releaseTargetId, SCHEMA.releaseTarget.id),
eq(SCHEMA.versionRelease.releaseTargetId, SCHEMA.releaseTarget.id),
)
.innerJoin(
SCHEMA.resource,
Expand All @@ -97,19 +103,13 @@ const getGithubEntity = async (
eq(SCHEMA.releaseJob.jobId, jobId),
),
)
.then(takeFirstOrNull);

const [releaseGhEntityResult, runbookGhEntityResult, releaseJobEntityResult] =
await Promise.all([
releaseGhEntityPromise,
runbookGhEntityPromise,
releaseJobEntityPromise,
]);
.then(takeFirstOrNull)
.then((r) => r?.github_entity);

return (
releaseGhEntityResult?.github_entity ??
runbookGhEntityResult?.github_entity ??
releaseJobEntityResult?.github_entity
(await releaseGhEntityPromise) ??
(await runbookGhEntityPromise) ??
(await releaseJobEntityPromise)
);
};

Expand Down
13 changes: 0 additions & 13 deletions apps/event-worker/src/workers/new-deployment-version.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { and, eq, isNull } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";
import { DatabaseReleaseRepository } from "@ctrlplane/rule-engine";

const getDeploymentResources = async (
tx: Tx,
Expand Down Expand Up @@ -79,18 +78,6 @@ export const newDeploymentVersionWorker = createWorker(
where: eq(schema.releaseTarget.deploymentId, version.deploymentId),
});

const { system } = deployment;
const { workspaceId } = system;

const updateReleaseVersionPromises = releaseTargets.map(async (rt) => {
const releaseRepository = await DatabaseReleaseRepository.create({
...rt,
workspaceId,
});
await releaseRepository.updateReleaseVersion(version.id);
});
await Promise.all(updateReleaseVersionPromises);

await getQueue(Channel.EvaluateReleaseTarget).addBulk(
releaseTargets.map((rt) => ({
name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
Expand Down
15 changes: 0 additions & 15 deletions apps/event-worker/src/workers/update-deployment-variable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { eq } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";
import { DatabaseReleaseRepository } from "@ctrlplane/rule-engine";

/**
* Worker that handles deployment variable changes by triggering evaluations for
Expand All @@ -26,20 +25,6 @@ export const updateDeploymentVariableWorker = createWorker(
where: eq(schema.releaseTarget.deploymentId, variable.deploymentId),
});

const { deployment } = variable;
const { system } = deployment;
const { workspaceId } = system;

const updateReleaseVariablePromises = releaseTargets.map(async (rt) => {
const releaseRepository = await DatabaseReleaseRepository.create({
...rt,
workspaceId,
});
const variables = await releaseRepository.getLatestVariables();
await releaseRepository.updateReleaseVariables(variables);
});
await Promise.all(updateReleaseVariablePromises);

await getQueue(Channel.EvaluateReleaseTarget).addBulk(
releaseTargets.map((rt) => ({
name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
Expand Down
12 changes: 0 additions & 12 deletions apps/event-worker/src/workers/update-resource-variable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { and, eq, inArray } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";
import { DatabaseReleaseRepository } from "@ctrlplane/rule-engine";

export const updateResourceVariableWorker = createWorker(
Channel.UpdateResourceVariable,
Expand Down Expand Up @@ -34,7 +33,6 @@ export const updateResourceVariableWorker = createWorker(
const resource = rows[0]?.resource;
if (!resource) throw new Error("Resource not found");

const { workspaceId } = resource;
const deploymentIds = rows.map((row) => row.deployment.id);

const releaseTargets = await db.query.releaseTarget.findMany({
Expand All @@ -44,16 +42,6 @@ export const updateResourceVariableWorker = createWorker(
),
});

const updateReleaseVariablePromises = releaseTargets.map(async (rt) => {
const releaseRepository = await DatabaseReleaseRepository.create({
...rt,
workspaceId,
});
const variables = await releaseRepository.getLatestVariables();
await releaseRepository.updateReleaseVariables(variables);
});
await Promise.all(updateReleaseVariablePromises);

await getQueue(Channel.EvaluateReleaseTarget).addBulk(
releaseTargets.map((rt) => ({
name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`,
Expand Down
Loading
Loading