Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 135 additions & 54 deletions apps/event-queue/src/events/deployment-variables.ts
Original file line number Diff line number Diff line change
@@ -1,65 +1,146 @@
import type { Event } from "@ctrlplane/events";

import { makeWithSpan, trace } from "@ctrlplane/logger";

import type { Handler } from ".";
import { OperationPipeline } from "../workspace/pipeline.js";
import { WorkspaceManager } from "../workspace/workspace.js";

export const newDeploymentVariable: Handler<
Event.DeploymentVariableCreated
> = async (event) => {
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws)
.deploymentVariable(event.payload)
.dispatch();
};
const newDeploymentVariableTracer = trace.getTracer("new-deployment-variable");
const withNewDeploymentVariableSpan = makeWithSpan(newDeploymentVariableTracer);

export const newDeploymentVariable: Handler<Event.DeploymentVariableCreated> =
withNewDeploymentVariableSpan(
"new-deployment-variable",
async (span, event) => {
span.setAttribute("event.type", event.eventType);
span.setAttribute("deployment-variable.id", event.payload.id);
span.setAttribute("deployment.id", event.payload.deploymentId);
span.setAttribute("workspace.id", event.workspaceId);
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws)
.deploymentVariable(event.payload)
.dispatch();
},
);

const updatedDeploymentVariableTracer = trace.getTracer(
"updated-deployment-variable",
);
const withUpdatedDeploymentVariableSpan = makeWithSpan(
updatedDeploymentVariableTracer,
);

export const updatedDeploymentVariable: Handler<Event.DeploymentVariableUpdated> =
withUpdatedDeploymentVariableSpan(
"updated-deployment-variable",
async (span, event) => {
span.setAttribute("event.type", event.eventType);
span.setAttribute("deployment-variable.id", event.payload.current.id);
span.setAttribute("deployment.id", event.payload.current.deploymentId);
span.setAttribute("workspace.id", event.workspaceId);
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws)
.deploymentVariable(event.payload.current)
.dispatch();
},
);

const deletedDeploymentVariableTracer = trace.getTracer(
"deleted-deployment-variable",
);
const withDeletedDeploymentVariableSpan = makeWithSpan(
deletedDeploymentVariableTracer,
);

export const deletedDeploymentVariable: Handler<Event.DeploymentVariableDeleted> =
withDeletedDeploymentVariableSpan(
"deleted-deployment-variable",
async (span, event) => {
span.setAttribute("event.type", event.eventType);
span.setAttribute("deployment-variable.id", event.payload.id);
span.setAttribute("deployment.id", event.payload.deploymentId);
span.setAttribute("workspace.id", event.workspaceId);
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.delete(ws)
.deploymentVariable(event.payload)
.dispatch();
},
);

const newDeploymentVariableValueTracer = trace.getTracer(
"new-deployment-variable-value",
);
const withNewDeploymentVariableValueSpan = makeWithSpan(
newDeploymentVariableValueTracer,
);

export const updatedDeploymentVariable: Handler<
Event.DeploymentVariableUpdated
> = async (event) => {
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws)
.deploymentVariable(event.payload.current)
.dispatch();
};
export const newDeploymentVariableValue: Handler<Event.DeploymentVariableValueCreated> =
withNewDeploymentVariableValueSpan(
"new-deployment-variable-value",
async (span, event) => {
span.setAttribute("deployment-variable-value.id", event.payload.id);
span.setAttribute("deployment-variable.id", event.payload.variableId);
span.setAttribute("workspace.id", event.workspaceId);
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws)
.deploymentVariableValue(event.payload)
.dispatch();
},
);

export const deletedDeploymentVariable: Handler<
Event.DeploymentVariableDeleted
> = async (event) => {
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws)
.deploymentVariable(event.payload)
.dispatch();
};
const updatedDeploymentVariableValueTracer = trace.getTracer(
"updated-deployment-variable-value",
);
const withUpdatedDeploymentVariableValueSpan = makeWithSpan(
updatedDeploymentVariableValueTracer,
);

export const newDeploymentVariableValue: Handler<
Event.DeploymentVariableValueCreated
> = async (event) => {
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws)
.deploymentVariableValue(event.payload)
.dispatch();
};
export const updatedDeploymentVariableValue: Handler<Event.DeploymentVariableValueUpdated> =
withUpdatedDeploymentVariableValueSpan(
"updated-deployment-variable-value",
async (span, event) => {
span.setAttribute("event.type", event.eventType);
span.setAttribute(
"deployment-variable-value.id",
event.payload.current.id,
);
span.setAttribute(
"deployment-variable.id",
event.payload.current.variableId,
);
span.setAttribute("workspace.id", event.workspaceId);
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws)
.deploymentVariableValue(event.payload.current)
.dispatch();
},
);

export const updatedDeploymentVariableValue: Handler<
Event.DeploymentVariableValueUpdated
> = async (event) => {
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws)
.deploymentVariableValue(event.payload.current)
.dispatch();
};
const deletedDeploymentVariableValueTracer = trace.getTracer(
"deleted-deployment-variable-value",
);
const withDeletedDeploymentVariableValueSpan = makeWithSpan(
deletedDeploymentVariableValueTracer,
);

export const deletedDeploymentVariableValue: Handler<
Event.DeploymentVariableValueDeleted
> = async (event) => {
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws)
.deploymentVariableValue(event.payload)
.dispatch();
};
export const deletedDeploymentVariableValue: Handler<Event.DeploymentVariableValueDeleted> =
withDeletedDeploymentVariableValueSpan(
"deleted-deployment-variable-value",
async (span, event) => {
span.setAttribute("event.type", event.eventType);
span.setAttribute("deployment-variable-value.id", event.payload.id);
span.setAttribute("deployment-variable.id", event.payload.variableId);
span.setAttribute("workspace.id", event.workspaceId);
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.delete(ws)
.deploymentVariableValue(event.payload)
.dispatch();
},
);
99 changes: 68 additions & 31 deletions apps/event-queue/src/events/deployment-versions.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,85 @@
import type * as schema from "@ctrlplane/db/schema";
import type { Event } from "@ctrlplane/events";

import { makeWithSpan, trace } from "@ctrlplane/logger";

import type { Handler } from ".";
import { OperationPipeline } from "../workspace/pipeline.js";
import { WorkspaceManager } from "../workspace/workspace.js";

const newDeploymentVersionTracer = trace.getTracer("new-deployment-version");
const withNewDeploymentVersionSpan = makeWithSpan(newDeploymentVersionTracer);

const getDeploymentVersionWithDates = (
deploymentVersion: schema.DeploymentVersion,
) => {
const createdAt = new Date(deploymentVersion.createdAt);
return { ...deploymentVersion, createdAt };
};

export const newDeploymentVersion: Handler<
Event.DeploymentVersionCreated
> = async (event) => {
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
const deploymentVersion = getDeploymentVersionWithDates(event.payload);
await OperationPipeline.update(ws)
.deploymentVersion(deploymentVersion)
.dispatch();
};
export const newDeploymentVersion: Handler<Event.DeploymentVersionCreated> =
withNewDeploymentVersionSpan(
"new-deployment-version",
async (span, event) => {
span.setAttribute("event.type", event.eventType);
span.setAttribute("deployment-version.id", event.payload.id);
span.setAttribute("deployment.id", event.payload.deploymentId);
span.setAttribute("workspace.id", event.workspaceId);
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
const deploymentVersion = getDeploymentVersionWithDates(event.payload);
await OperationPipeline.update(ws)
.deploymentVersion(deploymentVersion)
.dispatch();
},
);

const updatedDeploymentVersionTracer = trace.getTracer(
"updated-deployment-version",
);
const withUpdatedDeploymentVersionSpan = makeWithSpan(
updatedDeploymentVersionTracer,
);

export const updatedDeploymentVersion: Handler<
Event.DeploymentVersionUpdated
> = async (event) => {
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
const deploymentVersion = getDeploymentVersionWithDates(
event.payload.current,
export const updatedDeploymentVersion: Handler<Event.DeploymentVersionUpdated> =
withUpdatedDeploymentVersionSpan(
"updated-deployment-version",
async (span, event) => {
span.setAttribute("event.type", event.eventType);
span.setAttribute("deployment-version.id", event.payload.current.id);
span.setAttribute("deployment.id", event.payload.current.deploymentId);
span.setAttribute("workspace.id", event.workspaceId);
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
const deploymentVersion = getDeploymentVersionWithDates(
event.payload.current,
);
await OperationPipeline.update(ws)
.deploymentVersion(deploymentVersion)
.dispatch();
},
);
await OperationPipeline.update(ws)
.deploymentVersion(deploymentVersion)
.dispatch();
};

export const deletedDeploymentVersion: Handler<
Event.DeploymentVersionDeleted
> = async (event) => {
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
const deploymentVersion = getDeploymentVersionWithDates(event.payload);
await OperationPipeline.delete(ws)
.deploymentVersion(deploymentVersion)
.dispatch();
};
const deletedDeploymentVersionTracer = trace.getTracer(
"deleted-deployment-version",
);
const withDeletedDeploymentVersionSpan = makeWithSpan(
deletedDeploymentVersionTracer,
);

export const deletedDeploymentVersion: Handler<Event.DeploymentVersionDeleted> =
withDeletedDeploymentVersionSpan(
"deleted-deployment-version",
async (span, event) => {
span.setAttribute("event.type", event.eventType);
span.setAttribute("deployment-version.id", event.payload.id);
span.setAttribute("deployment.id", event.payload.deploymentId);
span.setAttribute("workspace.id", event.workspaceId);
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
const deploymentVersion = getDeploymentVersionWithDates(event.payload);
await OperationPipeline.delete(ws)
.deploymentVersion(deploymentVersion)
.dispatch();
},
);
67 changes: 42 additions & 25 deletions apps/event-queue/src/events/deployments.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,48 @@
import type { Event } from "@ctrlplane/events";

import { makeWithSpan, trace } from "@ctrlplane/logger";

import type { Handler } from ".";
import { OperationPipeline } from "../workspace/pipeline.js";
import { WorkspaceManager } from "../workspace/workspace.js";

export const newDeployment: Handler<Event.DeploymentCreated> = async (
event,
) => {
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws).deployment(event.payload).dispatch();
};

export const updatedDeployment: Handler<Event.DeploymentUpdated> = async (
event,
) => {
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws)
.deployment(event.payload.current)
.dispatch();
};

export const deletedDeployment: Handler<Event.DeploymentDeleted> = async (
event,
) => {
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.delete(ws).deployment(event.payload).dispatch();
};
const newDeploymentTracer = trace.getTracer("new-deployment");
const withNewDeploymentSpan = makeWithSpan(newDeploymentTracer);

export const newDeployment: Handler<Event.DeploymentCreated> =
withNewDeploymentSpan("new-deployment", async (span, event) => {
span.setAttribute("event.type", event.eventType);
span.setAttribute("deployment.id", event.payload.id);
span.setAttribute("workspace.id", event.workspaceId);
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws).deployment(event.payload).dispatch();
});
Comment on lines +17 to +20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Do not silently drop events; fix WorkspaceManager.getOrLoad return bug and set span status on missing workspace.

Right now, the first event for any workspace will be dropped: WorkspaceManager.getOrLoad() returns the pre-load value instead of the loaded instance. Your if (ws == null) return; then exits, skipping the pipeline. This is a functional bug. Also, missing workspaces should mark the span as an error, not silently return. (See apps/event-queue/src/workspace/workspace.ts snippet.)

Apply this in-file safeguard and error status, and separately fix getOrLoad:

@@
-    const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
-    if (ws == null) return;
-    await OperationPipeline.update(ws).deployment(event.payload).dispatch();
+    const ws0 = await WorkspaceManager.getOrLoad(event.workspaceId);
+    const ws = ws0 ?? WorkspaceManager.get(event.workspaceId);
+    if (ws == null) {
+      span.setAttribute("workspace.missing", true);
+      span.setStatus({ code: SpanStatusCode.ERROR, message: `workspace not found: ${event.workspaceId}` });
+      return;
+    }
+    await OperationPipeline.update(ws).deployment(event.payload).dispatch();
@@
-    const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
-    if (ws == null) return;
+    const ws0 = await WorkspaceManager.getOrLoad(event.workspaceId);
+    const ws = ws0 ?? WorkspaceManager.get(event.workspaceId);
+    if (ws == null) {
+      span.setAttribute("workspace.missing", true);
+      span.setStatus({ code: SpanStatusCode.ERROR, message: `workspace not found: ${event.workspaceId}` });
+      return;
+    }
     await OperationPipeline.update(ws)
       .deployment(event.payload.current)
       .dispatch();
@@
-    const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
-    if (ws == null) return;
-    await OperationPipeline.delete(ws).deployment(event.payload).dispatch();
+    const ws0 = await WorkspaceManager.getOrLoad(event.workspaceId);
+    const ws = ws0 ?? WorkspaceManager.get(event.workspaceId);
+    if (ws == null) {
+      span.setAttribute("workspace.missing", true);
+      span.setStatus({ code: SpanStatusCode.ERROR, message: `workspace not found: ${event.workspaceId}` });
+      return;
+    }
+    await OperationPipeline.delete(ws).deployment(event.payload).dispatch();

And add the missing import to support setStatus:

@@
-import { makeWithSpan, trace } from "@ctrlplane/logger";
+import { makeWithSpan, trace } from "@ctrlplane/logger";
+import { SpanStatusCode } from "@opentelemetry/api";

Fix WorkspaceManager.getOrLoad to actually return the loaded instance:

-  static async getOrLoad(id: string) {
-    const workspace = WorkspaceManager.get(id);
-    if (!workspace) {
-      const ws = await Workspace.load(id);
-      WorkspaceManager.set(id, ws);
-    }
-
-    return workspace;
-  }
+  static async getOrLoad(id: string) {
+    let workspace = WorkspaceManager.get(id);
+    if (!workspace) {
+      const ws = await Workspace.load(id);
+      WorkspaceManager.set(id, ws);
+      workspace = ws;
+    }
+    return workspace;
+  }

Run to confirm other callers aren’t affected and that getOrLoad is fixed everywhere:

Also applies to: 30-34, 45-48


🏁 Script executed:

#!/bin/bash
set -euo pipefail
echo "Check getOrLoad definition:"
rg -n "static\s+async\s+getOrLoad" -C2

echo -e "\nCall sites that might rely on the buggy behavior:"
rg -n --type ts -C2 '\bgetOrLoad\s*\(' apps/event-queue/src

Length of output: 14398


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show full getOrLoad implementation
rg -n -C5 "static async getOrLoad" apps/event-queue/src/workspace/workspace.ts

# Show imports and initial handler logic in deployments.ts
sed -n '1,30p' apps/event-queue/src/events/deployments.ts

Length of output: 1738


Fix getOrLoad return and enforce error spans in event handlers

  • In apps/event-queue/src/workspace/workspace.ts, amend getOrLoad to return the loaded workspace instance:
     static async getOrLoad(id: string) {
  • const workspace = WorkspaceManager.get(id);
  • if (!workspace) {
  •  const ws = await Workspace.load(id);
    
  •  WorkspaceManager.set(id, ws);
    
  • }
  • //← missing return
  • let workspace = WorkspaceManager.get(id);
  • if (!workspace) {
  •  workspace = await Workspace.load(id);
    
  •  WorkspaceManager.set(id, workspace);
    
  • }
  • return workspace;
    }
- In apps/event-queue/src/events/deployments.ts, replace the silent‐drop null‐check with a fallback and error‐span logic as originally diffed (including `import { SpanStatusCode } from "@opentelemetry/api";`).  
- Propagate that same pattern (fallback against WorkspaceManager.get + `span.setStatus` on null) to every handler in apps/event-queue/src/events/** that uses `getOrLoad` to prevent dropping the first event.  

> Committable suggestion skipped: line range outside the PR's diff.

<details>
<summary>🤖 Prompt for AI Agents</summary>

In apps/event-queue/src/events/deployments.ts around lines 17-20, the handler
silently returns when WorkspaceManager.getOrLoad yields null which drops the
first event; update WorkspaceManager.getOrLoad in
apps/event-queue/src/workspace/workspace.ts to actually return the loaded
Workspace instance, then in this file replace the simple null-check with the
original fallback + error-span logic: import { SpanStatusCode } from
"@opentelemetry/api", create/obtain the span, if getOrLoad returns null call
span.setStatus({ code: SpanStatusCode.ERROR, message: "workspace not found" })
(or similar) and proceed to use a fallback WorkspaceManager.get before deciding
to stop; apply the same pattern (fallback against WorkspaceManager.get and
span.setStatus on null) to every event handler file under
apps/event-queue/src/events/** that calls getOrLoad so events aren’t silently
dropped.


</details>

<!-- fingerprinting:phantom:poseidon:chinchilla -->

<!-- This is an auto-generated comment by CodeRabbit -->


const updatedDeploymentTracer = trace.getTracer("updated-deployment");
const withUpdatedDeploymentSpan = makeWithSpan(updatedDeploymentTracer);

export const updatedDeployment: Handler<Event.DeploymentUpdated> =
withUpdatedDeploymentSpan("updated-deployment", async (span, event) => {
span.setAttribute("event.type", event.eventType);
span.setAttribute("deployment.id", event.payload.current.id);
span.setAttribute("workspace.id", event.workspaceId);
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.update(ws)
.deployment(event.payload.current)
.dispatch();
});

const deletedDeploymentTracer = trace.getTracer("deleted-deployment");
const withDeletedDeploymentSpan = makeWithSpan(deletedDeploymentTracer);

export const deletedDeployment: Handler<Event.DeploymentDeleted> =
withDeletedDeploymentSpan("deleted-deployment", async (span, event) => {
span.setAttribute("event.type", event.eventType);
span.setAttribute("deployment.id", event.payload.id);
span.setAttribute("workspace.id", event.workspaceId);
const ws = await WorkspaceManager.getOrLoad(event.workspaceId);
if (ws == null) return;
await OperationPipeline.delete(ws).deployment(event.payload).dispatch();
});
Loading
Loading