Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions apps/event-worker/src/workers/compute-systems-release-targets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export const computeSystemsReleaseTargetsWorker = createWorker(
if (deploymentIds.length === 0 || environmentIds.length === 0) return;

try {
const createdReleaseTargets = await db.transaction(async (tx) => {
const releaseTargets = await db.transaction(async (tx) => {
await tx.execute(
sql`
SELECT ${schema.releaseTarget.id} FROM ${schema.releaseTarget}
Expand Down Expand Up @@ -129,11 +129,22 @@ export const computeSystemsReleaseTargetsWorker = createWorker(
prevRt.environmentId === rt.environmentId,
),
);
if (created.length === 0) return [];
return tx.insert(schema.releaseTarget).values(created).returning();

if (created.length > 0)
await tx
.insert(schema.releaseTarget)
.values(created)
.onConflictDoNothing();

return tx.query.releaseTarget.findMany({
where: or(
inArray(schema.releaseTarget.deploymentId, deploymentIds),
inArray(schema.releaseTarget.environmentId, environmentIds),
),
});
});

if (createdReleaseTargets.length === 0) return;
if (releaseTargets.length === 0) return;

const policyTargets = await db
.select()
Expand All @@ -154,7 +165,7 @@ export const computeSystemsReleaseTargetsWorker = createWorker(
return;
}

await dispatchEvaluateJobs(createdReleaseTargets);
await dispatchEvaluateJobs(releaseTargets);
} catch (e: any) {
Comment on lines +168 to 169
Copy link
Contributor

@coderabbitai coderabbitai bot Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Type safety: dispatchEvaluateJobs expects identifiers only

dispatchEvaluateJobs serialises every field in rts onto the queue.
If tx.query.releaseTarget.findMany includes heavy columns (timestamps, JSON blobs), you’ll bloat the job payload.
Consider projecting only { deploymentId, environmentId, resourceId } before dispatching.

-await dispatchEvaluateJobs(releaseTargets);
+await dispatchEvaluateJobs(
+  releaseTargets.map(({ deploymentId, environmentId, resourceId }) => ({
+    deploymentId,
+    environmentId,
+    resourceId,
+  })),
+);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not really a concern

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adityachoudhari26 Understood! I appreciate the clarification. I'll defer to your judgment on this since you have a better understanding of the system's requirements and performance characteristics.

const isRowLocked = e.code === "55P03";
if (isRowLocked) {
Expand Down
11 changes: 9 additions & 2 deletions apps/event-worker/src/workers/evaluate-release-target.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Tx } from "@ctrlplane/db";
import _ from "lodash";

import { and, desc, eq, takeFirst } from "@ctrlplane/db";
import { and, desc, eq, sql, takeFirst } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";
Expand Down Expand Up @@ -186,6 +186,14 @@ export const evaluateReleaseTargetWorker = createWorker(
});
if (!releaseTarget) throw new Error("Failed to get release target");

await tx.execute(
sql`
SELECT id FROM ${schema.releaseTarget}
WHERE ${eq(schema.releaseTarget.id, releaseTarget.id)}
FOR UPDATE NOWAIT
`,
);

const existingVersionRelease = await tx.query.versionRelease.findFirst({
where: eq(schema.versionRelease.releaseTargetId, releaseTarget.id),
orderBy: desc(schema.versionRelease.createdAt),
Expand Down Expand Up @@ -246,7 +254,6 @@ export const evaluateReleaseTargetWorker = createWorker(
});
return;
}
log.error("Error in evaluateReleaseTarget", { error: e });
throw e;
}
}),
Expand Down
33 changes: 27 additions & 6 deletions apps/event-worker/src/workers/updated-resources/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import _ from "lodash";

import { eq } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel, createWorker, getQueue } from "@ctrlplane/events";

import { withSpan } from "./span.js";
Expand All @@ -24,14 +27,32 @@ export const updatedResourceWorker = createWorker(
span.setAttribute("resource.name", resource.name);
span.setAttribute("workspace.id", resource.workspaceId);

await getQueue(Channel.ComputeDeploymentResourceSelector).add(
resource.id,
resource,
const workspace = await db.query.workspace.findFirst({
where: eq(schema.workspace.id, resource.workspaceId),
with: { systems: { with: { environments: true, deployments: true } } },
});

if (workspace == null) throw new Error("Workspace not found");

const deploymentJobs = workspace.systems.flatMap((system) =>
system.deployments.map((deployment) => ({
name: deployment.id,
data: deployment,
})),
);

await getQueue(Channel.ComputeEnvironmentResourceSelector).add(
resource.id,
resource,
const environmentJobs = workspace.systems.flatMap((system) =>
system.environments.map((environment) => ({
name: environment.id,
data: environment,
})),
);

await getQueue(Channel.ComputeDeploymentResourceSelector).addBulk(
deploymentJobs,
);
await getQueue(Channel.ComputeEnvironmentResourceSelector).addBulk(
environmentJobs,
);
Comment on lines +30 to 56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Consider potential performance impact for large workspaces.

The new implementation fetches all systems, environments, and deployments for a workspace, which could be resource-intensive for workspaces with many entities. Consider implementing pagination or limiting the query if performance becomes an issue.

Additionally, you might want to add some basic logging to indicate the number of jobs being processed, which would be helpful for monitoring.


🏁 Script executed:

#!/bin/bash
# Check if there are workspaces with many systems, deployments, or environments that might cause performance issues

# Find workspaces with large numbers of systems
echo "Workspaces with many systems:"
rg -l "systems:" --type ts | xargs grep -l "findFirst" | xargs grep -l "workspace" | head -n 5

# Check if there are other places in the codebase where similar bulk queries are performed
echo "Similar bulk queries in the codebase:"
rg -A 3 "with: \{ systems: \{ with:" --type ts

Length of output: 769


Add performance guardrails and logging for bulk job processing

This handler eagerly loads every system, environment, and deployment for a workspace—which could exhaust memory or slow down the worker when a workspace grows large. To mitigate:

• In apps/event-worker/src/workers/updated-resources/index.ts (around line 30):
– Add a log before enqueuing to report job counts:
ts const deploymentCount = deploymentJobs.length; const environmentCount = environmentJobs.length; console.log( `Enqueuing ${deploymentCount} deployment jobs and ${environmentCount} environment jobs for workspace ${resource.workspaceId}` );
– If either array is large, batch the calls to addBulk in configurable chunks (e.g., 100 jobs per batch) to avoid overwhelming the queue client or worker:
ts const chunkSize = 100; for (let i = 0; i < deploymentJobs.length; i += chunkSize) { await getQueue(Channel.ComputeDeploymentResourceSelector) .addBulk(deploymentJobs.slice(i, i + chunkSize)); } // Same for environmentJobs…

• For very large workspaces, consider paginating the DB query (using take/skip) or filtering by recent changes rather than loading everything at once.

These changes will keep memory and response times bounded and give you visibility into workload spikes.

}),
);
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ export const PATCH = request()
...res,
metadata: Object.fromEntries(res.metadata.map((m) => [m.key, m.value])),
};

await getQueue(Channel.UpdatedResource).add(resource.id, resource);

return NextResponse.json(resourceWithMeta);
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err));
Expand Down
Loading
Loading