Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions apps/event-worker/src/releases/evaluate/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import type { ReleaseEvaluateEvent } from "@ctrlplane/validators/events";
import { Worker } from "bullmq";
import _ from "lodash";

import { db } from "@ctrlplane/db/client";
import { evaluate } from "@ctrlplane/rule-engine";
import { createCtx, getApplicablePolicies } from "@ctrlplane/rule-engine/db";
import { Channel } from "@ctrlplane/validators/events";

export const createReleaseEvaluateWorker = () =>
new Worker<ReleaseEvaluateEvent>(Channel.ReleaseEvaluate, async (job) => {
job.log(
`Evaluating release for deployment ${job.data.deploymentId} and resource ${job.data.resourceId}`,
);

const ctx = await createCtx(db, job.data);
if (ctx == null) {
job.log(
`Resource ${job.data.resourceId} not found for deployment ${job.data.deploymentId} and environment ${job.data.environmentId}`,
);
return;
}

const { workspaceId } = ctx.resource;
const policy = await getApplicablePolicies(db, workspaceId, job.data);
const result = await evaluate(policy, [], ctx);
console.log(result);
});
Comment on lines +10 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling to worker implementation

The worker function is well-structured, but lacks explicit error handling for the database operations and policy evaluation. A try/catch block would make the worker more resilient against unexpected errors.

Also, consider using a structured logger instead of console.log for the evaluation result.

 export const createReleaseEvaluateWorker = () =>
   new Worker<ReleaseEvaluateEvent>(Channel.ReleaseEvaluate, async (job) => {
     job.log(
       `Evaluating release for deployment ${job.data.deploymentId} and resource ${job.data.resourceId}`,
     );
 
+    try {
       const ctx = await createCtx(db, job.data);
       if (ctx == null) {
         job.log(
           `Resource ${job.data.resourceId} not found for deployment ${job.data.deploymentId} and environment ${job.data.environmentId}`,
         );
         return;
       }
 
       const { workspaceId } = ctx.resource;
       const policy = await getApplicablePolicies(db, workspaceId, job.data);
       const result = await evaluate(policy, [], ctx);
-      console.log(result);
+      job.log(`Evaluation result: ${JSON.stringify(result)}`);
+    } catch (error) {
+      job.log(`Error during release evaluation: ${error instanceof Error ? error.message : String(error)}`);
+      throw error; // Rethrow to let BullMQ handle the job failure
+    }
   });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export const createReleaseEvaluateWorker = () =>
new Worker<ReleaseEvaluateEvent>(Channel.ReleaseEvaluate, async (job) => {
job.log(
`Evaluating release for deployment ${job.data.deploymentId} and resource ${job.data.resourceId}`,
);
const ctx = await createCtx(db, job.data);
if (ctx == null) {
job.log(
`Resource ${job.data.resourceId} not found for deployment ${job.data.deploymentId} and environment ${job.data.environmentId}`,
);
return;
}
const { workspaceId } = ctx.resource;
const policy = await getApplicablePolicies(db, workspaceId, job.data);
const result = await evaluate(policy, [], ctx);
console.log(result);
});
export const createReleaseEvaluateWorker = () =>
new Worker<ReleaseEvaluateEvent>(Channel.ReleaseEvaluate, async (job) => {
job.log(
`Evaluating release for deployment ${job.data.deploymentId} and resource ${job.data.resourceId}`,
);
try {
const ctx = await createCtx(db, job.data);
if (ctx == null) {
job.log(
`Resource ${job.data.resourceId} not found for deployment ${job.data.deploymentId} and environment ${job.data.environmentId}`,
);
return;
}
const { workspaceId } = ctx.resource;
const policy = await getApplicablePolicies(db, workspaceId, job.data);
const result = await evaluate(policy, [], ctx);
job.log(`Evaluation result: ${JSON.stringify(result)}`);
} catch (error) {
job.log(`Error during release evaluation: ${error instanceof Error ? error.message : String(error)}`);
throw error; // Rethrow to let BullMQ handle the job failure
}
});

25 changes: 23 additions & 2 deletions packages/db/src/common.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { SQL } from "drizzle-orm";
import type { PgTable } from "drizzle-orm/pg-core";
import { getTableColumns, sql } from "drizzle-orm";
import { eq, getTableColumns, ilike, sql } from "drizzle-orm";

import { ColumnOperator } from "@ctrlplane/validators/conditions";

import type { db } from "./client";

Expand All @@ -19,9 +21,15 @@ export const takeFirstOrNull = <T extends any[]>(

export type Tx = Omit<typeof db, "$client">;

type ColumnKey<T extends PgTable> = keyof T["_"]["columns"];
type ColumnType<
T extends PgTable,
Q extends ColumnKey<T>,
> = T["_"]["columns"][Q];

export const buildConflictUpdateColumns = <
T extends PgTable,
Q extends keyof T["_"]["columns"],
Q extends ColumnKey<T>,
>(
table: T,
columns: Q[],
Expand All @@ -42,3 +50,16 @@ export function enumToPgEnum<T extends Record<string, any>>(
): [T[keyof T], ...T[keyof T][]] {
return Object.values(myEnum).map((value: any) => `${value}`) as any;
}

export const ColumnOperatorFn: Record<
ColumnOperator,
<T extends PgTable, Q extends ColumnKey<T>>(
column: ColumnType<T, Q>,
value: string,
) => SQL<unknown>
> = {
[ColumnOperator.Equals]: (column, value) => eq(column, value),
[ColumnOperator.StartsWith]: (column, value) => ilike(column, `${value}%`),
[ColumnOperator.EndsWith]: (column, value) => ilike(column, `%${value}`),
[ColumnOperator.Contains]: (column, value) => ilike(column, `%${value}%`),
};
37 changes: 26 additions & 11 deletions packages/db/src/schema/deployment.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { DeploymentCondition } from "@ctrlplane/validators/jobs";
import type { DeploymentCondition } from "@ctrlplane/validators/deployments";
import type { ResourceCondition } from "@ctrlplane/validators/resources";
import type { InferSelectModel, SQL } from "drizzle-orm";
import { relations, sql } from "drizzle-orm";
import { and, eq, not, or, relations, sql } from "drizzle-orm";
import {
integer,
jsonb,
Expand All @@ -13,12 +13,13 @@ import {
import { createInsertSchema } from "drizzle-zod";
import { z } from "zod";

import { ComparisonOperator } from "@ctrlplane/validators/conditions";
import {
isValidResourceCondition,
resourceCondition,
} from "@ctrlplane/validators/resources";

import type { Tx } from "../common.js";
import { ColumnOperatorFn } from "../common.js";
import { jobAgent } from "./job-agent.js";
import { system } from "./system.js";

Expand Down Expand Up @@ -117,11 +118,25 @@ export const deploymentDependency = pgTable(
(t) => ({ uniq: uniqueIndex().on(t.dependsOnId, t.deploymentId) }),
);

export function deploymentMatchSelector(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
tx: Tx,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
metadata?: DeploymentCondition | null,
): SQL<unknown> | undefined {
throw new Error("Not implemented");
}
const buildCondition = (cond: DeploymentCondition): SQL<unknown> => {
if (cond.type === "name")
return ColumnOperatorFn[cond.operator](deployment.name, cond.value);
if (cond.type === "slug")
return ColumnOperatorFn[cond.operator](deployment.slug, cond.value);
if (cond.type === "system") return eq(deployment.systemId, cond.value);
if (cond.type === "id") return eq(deployment.id, cond.value);

if (cond.conditions.length === 0) return sql`FALSE`;

const subCon = cond.conditions.map((c) => buildCondition(c));
const con =
cond.operator === ComparisonOperator.And ? and(...subCon)! : or(...subCon)!;
return cond.not ? not(con) : con;
};

export const deploymentMatchSelector = (
condition?: DeploymentCondition | null,
): SQL<unknown> | undefined =>
condition == null || Object.keys(condition).length === 0
? undefined
: buildCondition(condition);
121 changes: 115 additions & 6 deletions packages/db/src/schema/environment.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { EnvironmentCondition } from "@ctrlplane/validators/jobs";
import type { MetadataCondition } from "@ctrlplane/validators/conditions";
import type { EnvironmentCondition } from "@ctrlplane/validators/environments";
import type { ResourceCondition } from "@ctrlplane/validators/resources";
import type { InferSelectModel, SQL } from "drizzle-orm";
import type { AnyPgColumn, ColumnsWithTable } from "drizzle-orm/pg-core";
import { sql } from "drizzle-orm";
import { and, eq, exists, ilike, not, notExists, or, sql } from "drizzle-orm";
import {
bigint,
foreignKey,
Expand All @@ -18,12 +19,17 @@ import {
import { createInsertSchema } from "drizzle-zod";
import { z } from "zod";

import {
ComparisonOperator,
MetadataOperator,
} from "@ctrlplane/validators/conditions";
import {
isValidResourceCondition,
resourceCondition,
} from "@ctrlplane/validators/resources";

import type { Tx } from "../common.js";
import { ColumnOperatorFn } from "../common.js";
import { user } from "./auth.js";
import { deploymentVersion } from "./deployment-version.js";
import { system } from "./system.js";
Expand Down Expand Up @@ -280,11 +286,114 @@ export type EnvironmentPolicyApproval = InferSelectModel<
typeof environmentPolicyApproval
>;

const buildMetadataCondition = (tx: Tx, cond: MetadataCondition): SQL => {
if (cond.operator === MetadataOperator.Null)
return notExists(
tx
.select({ value: sql<number>`1` })
.from(environmentMetadata)
.where(
and(
eq(environmentMetadata.environmentId, environment.id),
eq(environmentMetadata.key, cond.key),
),
),
);

if (cond.operator === MetadataOperator.StartsWith)
return exists(
tx
.select({ value: sql<number>`1` })
.from(environmentMetadata)
.where(
and(
eq(environmentMetadata.environmentId, environment.id),
eq(environmentMetadata.key, cond.key),
ilike(environmentMetadata.value, `${cond.value}%`),
),
),
);

if (cond.operator === MetadataOperator.EndsWith)
return exists(
tx
.select({ value: sql<number>`1` })
.from(environmentMetadata)
.where(
and(
eq(environmentMetadata.environmentId, environment.id),
eq(environmentMetadata.key, cond.key),
ilike(environmentMetadata.value, `%${cond.value}`),
),
),
);

if (cond.operator === MetadataOperator.Contains)
return exists(
tx
.select({ value: sql<number>`1` })
.from(environmentMetadata)
.where(
and(
eq(environmentMetadata.environmentId, environment.id),
eq(environmentMetadata.key, cond.key),
ilike(environmentMetadata.value, `%${cond.value}%`),
),
),
);

if ("value" in cond)
return exists(
tx
.select({ value: sql<number>`1` })
.from(environmentMetadata)
.where(
and(
eq(environmentMetadata.environmentId, environment.id),
eq(environmentMetadata.key, cond.key),
eq(environmentMetadata.value, cond.value),
),
),
);

throw Error("invalid metadata conditions");
};

const buildCondition = (
tx: Tx,
condition: EnvironmentCondition,
): SQL<unknown> => {
if (condition.type === "name")
return ColumnOperatorFn[condition.operator](
environment.name,
condition.value,
);
if (condition.type === "directory")
return ColumnOperatorFn[condition.operator](
environment.directory,
condition.value,
);
if (condition.type === "system")
return eq(environment.systemId, condition.value);
if (condition.type === "id") return eq(environment.id, condition.value);
if (condition.type === "metadata")
return buildMetadataCondition(tx, condition);

if (condition.conditions.length === 0) return sql`FALSE`;

const subCon = condition.conditions.map((c) => buildCondition(tx, c));
const con =
condition.operator === ComparisonOperator.And
? and(...subCon)!
: or(...subCon)!;
return condition.not ? not(con) : con;
};

export function environmentMatchSelector(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
tx: Tx,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
metadata?: EnvironmentCondition | null,
condition?: EnvironmentCondition | null,
): SQL<unknown> | undefined {
throw new Error("Not implemented");
return condition == null || Object.keys(condition).length === 0
? undefined
: buildCondition(tx, condition);
}
17 changes: 13 additions & 4 deletions packages/db/src/schema/policy.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { DeploymentCondition } from "@ctrlplane/validators/deployments";
import type { EnvironmentCondition } from "@ctrlplane/validators/environments";
import type { InferSelectModel } from "drizzle-orm";
import type { Options } from "rrule";
import { sql } from "drizzle-orm";
Expand All @@ -13,6 +15,9 @@ import {
import { createInsertSchema } from "drizzle-zod";
import { z } from "zod";

import { deploymentCondition } from "@ctrlplane/validators/deployments";
import { environmentCondition } from "@ctrlplane/validators/environments";

import { workspace } from "./workspace.js";

export const policy = pgTable("policy", {
Expand All @@ -38,8 +43,12 @@ export const policyTarget = pgTable("policy_target", {
policyId: uuid("policy_id")
.notNull()
.references(() => policy.id, { onDelete: "cascade" }),
deploymentSelector: jsonb("deployment_selector").default(sql`NULL`),
environmentSelector: jsonb("environment_selector").default(sql`NULL`),
deploymentSelector: jsonb("deployment_selector")
.default(sql`NULL`)
.$type<DeploymentCondition | null>(),
environmentSelector: jsonb("environment_selector")
.default(sql`NULL`)
.$type<EnvironmentCondition | null>(),
});

export const policyRuleDenyWindow = pgTable("policy_rule_deny_window", {
Expand Down Expand Up @@ -74,8 +83,8 @@ const policyInsertSchema = createInsertSchema(policy, {

const policyTargetInsertSchema = createInsertSchema(policyTarget, {
policyId: z.string().uuid(),
deploymentSelector: z.record(z.any()).optional(),
environmentSelector: z.record(z.any()).optional(),
deploymentSelector: deploymentCondition.nullable(),
environmentSelector: environmentCondition.nullable(),
}).omit({ id: true });

// Define the structure of RRule Options for Zod validation
Expand Down
1 change: 1 addition & 0 deletions packages/rule-engine/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"date-fns": "^4.1.0",
"lodash": "catalog:",
"rrule": "^2.8.1",
"ts-is-present": "catalog:",
"zod": "catalog:"
},
"devDependencies": {
Expand Down
Loading
Loading