-
Couldn't load subscription status.
- Fork 11
chore: init gcs store #694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds persistent workspace snapshots with file and GCS backends, refactors Kafka consumer to load workspaces at startup and persist snapshots per message, removes Kafka progress tracking, adds DB snapshot metadata, and introduces new loader helpers and comprehensive e2e persistence tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Kafka as Kafka Consumer
participant Storage as Storage (File/GCS)
participant DB as Database
participant Handler as Event Handler
participant WS as Workspace
Kafka->>Storage: Initialize (NewFileStorage or NewGCSStorageClient)
Kafka->>DB: Query assigned workspace IDs (startup)
loop Load each workspace
Kafka->>DB: GetWorkspaceSnapshot(workspaceID)
DB-->>Kafka: snapshot metadata or none
alt snapshot exists
Kafka->>Storage: Get(snapshot.Path)
Storage-->>WS: decoded workspace state
else no snapshot
Kafka->>WS: PopulateWorkspaceWithInitialState
end
end
loop For each Kafka message
Kafka->>Handler: ListenAndRoute(message)
Handler->>WS: Apply event (uses RawEvent.Timestamp)
WS->>Storage: Save encoded state (path)
WS->>DB: WriteWorkspaceSnapshot(metadata)
Kafka->>Kafka: Commit offset
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
🧹 Nitpick comments (3)
apps/workspace-engine/pkg/db/workspaces.go (2)
90-95: Prefer time.Time for Timestamp (avoid string parsing issues).Switch the struct’s Timestamp to time.Time so pgx handles conversions natively; keeps ORDER BY timestamp stable and locale-safe. Will require adjusting Scan/Exec call sites.
type WorkspaceSnapshot struct { Path string - Timestamp string + Timestamp time.Time Partition int32 NumPartitions int32 }Import time and update callers in this file and workspace/storage_gcs.go accordingly.
97-99: Add supporting index for select-by-workspace ordered by timestamp.To keep this query efficient at scale, add an index like:
CREATE INDEX ON workspace_snapshot (workspace_id, timestamp DESC);apps/workspace-engine/pkg/events/handler/handler.go (1)
132-137: Telemetry nit: use Int64 for timestamp attribute.Avoid precision loss in OTel attributes.
-attribute.Float64("event.timestamp", float64(msg.Timestamp.Unix())), +attribute.Int64("event.timestamp", msg.Timestamp.Unix()),
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
apps/workspace-engine/go.sumis excluded by!**/*.sum
📒 Files selected for processing (7)
apps/workspace-engine/go.mod(5 hunks)apps/workspace-engine/pkg/db/workspaces.go(2 hunks)apps/workspace-engine/pkg/events/events.go(2 hunks)apps/workspace-engine/pkg/events/handler/handler.go(4 hunks)apps/workspace-engine/pkg/kafka/kafka.go(3 hunks)apps/workspace-engine/pkg/workspace/storage_gcs.go(1 hunks)apps/workspace-engine/pkg/workspace/workspace.go(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/db/workspaces.goapps/workspace-engine/pkg/events/handler/handler.goapps/workspace-engine/pkg/workspace/storage_gcs.goapps/workspace-engine/pkg/events/events.goapps/workspace-engine/pkg/kafka/kafka.goapps/workspace-engine/pkg/workspace/workspace.go
🧬 Code graph analysis (6)
apps/workspace-engine/pkg/db/workspaces.go (2)
apps/workspace-engine/pkg/workspace/storage_gcs.go (1)
GetWorkspaceSnapshot(24-59)apps/workspace-engine/pkg/db/client.go (1)
GetDB(60-70)
apps/workspace-engine/pkg/events/handler/handler.go (1)
apps/workspace-engine/pkg/workspace/workspace.go (1)
WorkspaceSaver(297-297)
apps/workspace-engine/pkg/workspace/storage_gcs.go (1)
apps/workspace-engine/pkg/db/workspaces.go (3)
GetWorkspaceSnapshot(101-122)WorkspaceSnapshot(90-95)WriteWorkspaceSnapshot(129-141)
apps/workspace-engine/pkg/events/events.go (2)
apps/workspace-engine/pkg/workspace/workspace.go (1)
WorkspaceSaver(297-297)apps/workspace-engine/pkg/events/handler/handler.go (2)
EventListener(97-100)NewEventListenerWithWorkspaceSaver(107-109)
apps/workspace-engine/pkg/kafka/kafka.go (4)
apps/workspace-engine/pkg/events/handler/handler.go (1)
EventListener(97-100)apps/workspace-engine/pkg/workspace/storage_gcs.go (1)
IsGCSStorageEnabled(14-16)apps/workspace-engine/pkg/events/events.go (2)
NewEventHandlerWithWorkspaceSaver(84-86)NewEventHandler(80-82)apps/workspace-engine/pkg/workspace/workspace.go (3)
CreateGCSWorkspaceSaver(299-308)CreateGCSWorkspaceLoader(354-374)WorkspaceLoader(310-310)
apps/workspace-engine/pkg/workspace/workspace.go (2)
apps/workspace-engine/pkg/workspace/storage_gcs.go (3)
IsGCSStorageEnabled(14-16)PutWorkspaceSnapshot(63-97)GetWorkspaceSnapshot(24-59)apps/workspace-engine/pkg/workspace/kafka/state.go (3)
PartitionForWorkspace(41-45)WorkspaceIDDiscoverer(59-59)GetAssignedWorkspaceIDs(61-80)
🔇 Additional comments (8)
apps/workspace-engine/go.mod (1)
8-31: Dependency additions look solid.The direct pulls for
cloud.google.com/go/storageandgoogle.golang.org/apimatch the new GCS store implementation, and the expanded indirect set is consistent with what those modules transitively require. No issues from my side.apps/workspace-engine/pkg/events/events.go (1)
19-19: Import looks correct.No cycles; aligns with new constructor.
apps/workspace-engine/pkg/db/workspaces.go (2)
101-122: Good no-rows handling.pgx.ErrNoRows mapping to (nil, nil) is correct for “no snapshot yet.”
136-137: Fix WORKSPACE_SNAPSHOT_INSERT_QUERY to include all five placeholders ($1, $2, $3, $4, $5).apps/workspace-engine/pkg/kafka/kafka.go (3)
33-38: Handler factory is fine; passes numPartitions into saver creation.Works with the new saver/loader wiring. Note: depends on handler flushing changes even in saver mode (see handler comment).
124-124: Initialize handler once outside the loop — good.Avoids per-message allocations.
49-52: Nil discoverer safe: default fallback used
getAssignedWorkspaceIDs explicitly checks for a nil discoverer and invokes the internal default logic when none is provided, so passing nil to CreateGCSWorkspaceLoader is handled safely.apps/workspace-engine/pkg/events/handler/handler.go (1)
87-87: Document expected timestamp unit and add fallback
Verify all event producers emit seconds-based timestamps and update the handler to default to the current time if the field is missing or in milliseconds.
| // getBucketURL parses a GCS URL like "gcs://bucket-name/base-path" | ||
| // Returns bucket name and base path. | ||
| func getBucketURL() string { | ||
| return strings.TrimPrefix(os.Getenv("WORKSPACE_STATES_BUCKET_URL"), "gcs://") | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bucket parsing bug: using “gcs://bucket/prefix” as bucket name and ignoring prefix.
cloud.google.com/go/storage expects just the bucket name. Parse bucket and optional prefix; include prefix in the object path you store. Also accept gs://.
Apply this refactor:
@@
-import (
+import (
"context"
"fmt"
"io"
"os"
+ "path"
"strings"
"workspace-engine/pkg/db"
@@
-func IsGCSStorageEnabled() bool {
- return strings.HasPrefix(os.Getenv("WORKSPACE_STATES_BUCKET_URL"), "gcs://")
-}
+func IsGCSStorageEnabled() bool {
+ url := os.Getenv("WORKSPACE_STATES_BUCKET_URL")
+ return strings.HasPrefix(url, "gcs://") || strings.HasPrefix(url, "gs://")
+}
@@
-// getBucketURL parses a GCS URL like "gcs://bucket-name/base-path"
-// Returns bucket name and base path.
-func getBucketURL() string {
- return strings.TrimPrefix(os.Getenv("WORKSPACE_STATES_BUCKET_URL"), "gcs://")
-}
+// Helpers to parse WORKSPACE_STATES_BUCKET_URL, e.g. gcs://bucket[/prefix]
+func getBucketName() string {
+ raw := os.Getenv("WORKSPACE_STATES_BUCKET_URL")
+ raw = strings.TrimPrefix(raw, "gcs://")
+ raw = strings.TrimPrefix(raw, "gs://")
+ if i := strings.IndexByte(raw, '/'); i >= 0 {
+ return raw[:i]
+ }
+ return raw
+}
+
+func getObjectPrefix() string {
+ raw := os.Getenv("WORKSPACE_STATES_BUCKET_URL")
+ raw = strings.TrimPrefix(raw, "gcs://")
+ raw = strings.TrimPrefix(raw, "gs://")
+ if i := strings.IndexByte(raw, '/'); i >= 0 {
+ return strings.TrimPrefix(raw[i+1:], "/")
+ }
+ return ""
+}[Based on learnings]
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/storage_gcs.go around lines 18–23, the
current getBucketURL returns the entire env value with the "gcs://" prefix
trimmed, which treats "bucket/prefix" as the bucket name and drops the object
prefix; change this to parse the environment variable (accepting both "gcs://"
and "gs://") into two return values: the bucket name and an optional base
path/prefix (empty if none). Trim the scheme, split on the first '/' to separate
bucket and prefix, URL-decode or normalize the prefix (no leading '/'), and
update all callers to use the bucket for storage.NewClient/handle and prepend
the prefix to object paths when composing object names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
apps/workspace-engine/pkg/events/handler/handler.go (2)
107-110: Document saver behavior and ordering guarantees.Please expand this GoDoc to explain that providing a
workspaceSaverenables per-event snapshot persistence and spell out the ordering/flush semantics (e.g., change sets are flushed before the saver runs, what happens on saver errors). Callers need that context to understand durability guarantees.
Apply this diff to clarify the documentation:-// NewEventListenerWithWorkspaceSaver creates a new event listener with the provided handlers and workspace saver +// NewEventListenerWithWorkspaceSaver constructs an EventListener that flushes each processed event and then invokes +// the supplied WorkspaceSaver to persist a snapshot. Callers should expect saver invocations to follow handler +// execution order synchronously; if the saver returns an error the event is treated as failed and no snapshot is recorded.
196-214: Flush changes before snapshotting and normalize the timestamp.The early return still skips
FlushChangeset, so DB persistence is bypassed whenever a saver is configured—exactly the critical path we flagged earlier. On top of that,time.Unix(rawEvent.Timestamp, 0)interprets millisecond payloads as future seconds and emits1970-...when the field is absent. Please flush first and derive a stable timestamp (fall back to Kafka’s timestamp, coerce ms vs s, emit UTC) before calling the saver.
Apply this diff to persist and timestamp safely:- if el.workspaceSaver != nil { - timestampStr := time.Unix(rawEvent.Timestamp, 0).Format(time.RFC3339) - if err := el.workspaceSaver(ctx, ws.ID, timestampStr); err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, "failed to save workspace") - log.Error("Failed to save workspace", "error", err, "workspaceID", ws.ID) - return nil, fmt.Errorf("failed to save workspace: %w", err) - } - - span.SetStatus(codes.Ok, "event processed successfully") - log.Debug("Successfully processed event", - "eventType", rawEvent.EventType) - - return ws, nil - } - - if !workspace.IsGCSStorageEnabled() { - if err := ws.ChangesetConsumer().FlushChangeset(ctx, changeSet); err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, "failed to flush changeset") - log.Error("Failed to flush changeset", "error", err) - return nil, fmt.Errorf("failed to flush changeset: %w", err) - } - } + if err := ws.ChangesetConsumer().FlushChangeset(ctx, changeSet); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "failed to flush changeset") + log.Error("Failed to flush changeset", "error", err) + return nil, fmt.Errorf("failed to flush changeset: %w", err) + } + + if el.workspaceSaver != nil { + eventTime := msg.Timestamp + if rawEvent.Timestamp > 0 { + if rawEvent.Timestamp > 1_000_000_000_000 { + eventTime = time.Unix(0, rawEvent.Timestamp*int64(time.Millisecond)) + } else { + eventTime = time.Unix(rawEvent.Timestamp, 0) + } + } + timestampStr := eventTime.UTC().Format(time.RFC3339) + if err := el.workspaceSaver(ctx, ws.ID, timestampStr); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "failed to save workspace") + log.Error("Failed to save workspace", "error", err, "workspaceID", ws.ID) + return nil, fmt.Errorf("failed to save workspace: %w", err) + } + }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
apps/workspace-engine/pkg/events/events.go(2 hunks)apps/workspace-engine/pkg/events/handler/handler.go(5 hunks)apps/workspace-engine/pkg/workspace/storage_gcs.go(1 hunks)apps/workspace-engine/pkg/workspace/workspace.go(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- apps/workspace-engine/pkg/workspace/storage_gcs.go
- apps/workspace-engine/pkg/events/events.go
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/events/handler/handler.goapps/workspace-engine/pkg/workspace/workspace.go
🧬 Code graph analysis (2)
apps/workspace-engine/pkg/events/handler/handler.go (6)
apps/workspace-engine/pkg/workspace/workspace.go (5)
WorkspaceSaver(298-298)GetWorkspace(205-212)Exists(192-195)New(19-31)Set(197-199)apps/workspace-engine/pkg/workspace/storage_gcs.go (1)
IsGCSStorageEnabled(14-16)apps/workspace-engine/pkg/workspace/populate_workspace.go (1)
PopulateWorkspaceWithInitialState(8-76)apps/workspace-engine/pkg/changeset/context.go (1)
WithChangeSet(13-15)apps/workspace-engine/pkg/changeset/changeset_consumer.go (1)
ChangesetConsumer(7-9)apps/workspace-engine/pkg/db/changeset.go (1)
FlushChangeset(15-61)
apps/workspace-engine/pkg/workspace/workspace.go (3)
apps/workspace-engine/pkg/workspace/storage_gcs.go (3)
IsGCSStorageEnabled(14-16)PutWorkspaceSnapshot(83-124)GetWorkspaceSnapshot(38-79)apps/workspace-engine/pkg/db/workspaces.go (1)
GetWorkspaceSnapshot(101-122)apps/workspace-engine/pkg/workspace/kafka/state.go (3)
PartitionForWorkspace(41-45)WorkspaceIDDiscoverer(59-59)GetAssignedWorkspaceIDs(61-80)
🪛 GitHub Actions: Go Tests
apps/workspace-engine/pkg/workspace/workspace.go
[error] 156-156: Data race observed in GobDecode path during Workspace.LoadFromStorage execution.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build (linux/amd64)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
apps/workspace-engine/pkg/events/handler/handler.go (3)
107-110: Add GoDoc for exported constructor.Briefly document saver semantics (when it runs, ordering vs flush, error behavior) so callers understand persistence guarantees. As per coding guidelines.
Apply:
-// NewEventListenerWithWorkspaceSaver creates a new event listener with the provided handlers and workspace saver +// NewEventListenerWithWorkspaceSaver returns an EventListener that, after each +// event is handled, can persist a workspace snapshot using the provided saver. +// Snapshots are invoked after processing the event and after changes are flushed +// (see ListenAndRoute). Saver is called synchronously; on saver error the event +// processing is treated as failed and the error is returned. Ordering follows +// event processing order. func NewEventListenerWithWorkspaceSaver(handlers HandlerRegistry, workspaceSaver workspace.WorkspaceSaver) *EventListener { return &EventListener{handlers: handlers, workspaceSaver: workspaceSaver} }
153-156: ChangeSet missing from context when GCS is enabled → handlers drop mutations.
WithChangeSetis only applied in the non‑GCS branch, so every handler on GCS runs without a bound ChangeSet. This causes no ops for DB consumer/release manager.Apply:
- changeSet := changeset.NewChangeSet[any]() - if !workspace.IsGCSStorageEnabled() { + changeSet := changeset.NewChangeSet[any]() + // Always attach the ChangeSet so handlers can record mutations. + ctx = changeset.WithChangeSet(ctx, changeSet) + if !workspace.IsGCSStorageEnabled() { @@ - changeSet.IsInitialLoad = true + changeSet.IsInitialLoad = true - } - ctx = changeset.WithChangeSet(ctx, changeSet) + } }Also applies to: 158-175
196-210: Early return before FlushChangeset risks lost/unstable state; also handle ms/s timestamps.Flush changes first, then snapshot (if saver set). Derive a stable timestamp (payload → ms/s → Kafka).
Apply:
- span.SetAttributes(attribute.Int("release-target.changed", len(releaseTargetChanges.Keys()))) - - if el.workspaceSaver != nil { - timestampStr := time.Unix(rawEvent.Timestamp, 0).Format(time.RFC3339) - if err := el.workspaceSaver(ctx, ws.ID, timestampStr); err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, "failed to save workspace") - log.Error("Failed to save workspace", "error", err, "workspaceID", ws.ID) - return nil, fmt.Errorf("failed to save workspace: %w", err) - } - - span.SetStatus(codes.Ok, "event processed successfully") - log.Debug("Successfully processed event", - "eventType", rawEvent.EventType) - - return ws, nil - } - - if err := ws.ChangesetConsumer().FlushChangeset(ctx, changeSet); err != nil { + span.SetAttributes(attribute.Int("release-target.changed", len(releaseTargetChanges.Keys()))) + + // Always flush accumulated changes before any snapshotting. + if err := ws.ChangesetConsumer().FlushChangeset(ctx, changeSet); err != nil { span.RecordError(err) span.SetStatus(codes.Error, "failed to flush changeset") log.Error("Failed to flush changeset", "error", err) return nil, fmt.Errorf("failed to flush changeset: %w", err) } + + // Optionally persist a snapshot with a stable timestamp. + if el.workspaceSaver != nil { + evtTS := msg.Timestamp.Unix() + if rawEvent.Timestamp > 0 { + // Heuristic: treat large values as milliseconds. + if rawEvent.Timestamp > 1_000_000_000_000 { + evtTS = rawEvent.Timestamp / 1000 + } else { + evtTS = rawEvent.Timestamp + } + } + timestampStr := time.Unix(evtTS, 0).UTC().Format(time.RFC3339) + if err := el.workspaceSaver(ctx, ws.ID, timestampStr); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "failed to save workspace") + log.Error("Failed to save workspace", "error", err, "workspaceID", ws.ID) + return nil, fmt.Errorf("failed to save workspace: %w", err) + } + }Also applies to: 212-217
🧹 Nitpick comments (2)
apps/workspace-engine/pkg/events/handler/handler.go (1)
137-137: Use Int64 attribute for Kafka timestamp.Avoid lossy float conversion; keep full precision.
Apply:
- attribute.Float64("event.timestamp", float64(msg.Timestamp.Unix())), + attribute.Int64("event.timestamp", msg.Timestamp.Unix()),apps/workspace-engine/pkg/kafka/kafka.go (1)
124-124: Avoid shadowing imported package name ‘handler’.The local var shadows the imported package identifier; rename for clarity.
Apply:
- handler := getEventHandler(numPartitions) + el := getEventHandler(numPartitions)And update usage:
- if err := processMessage(ctx, consumer, handler, msg); err != nil { + if err := processMessage(ctx, consumer, el, msg); err != nil {
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
apps/workspace-engine/pkg/events/handler/handler.go(5 hunks)apps/workspace-engine/pkg/kafka/kafka.go(3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/events/handler/handler.goapps/workspace-engine/pkg/kafka/kafka.go
🧬 Code graph analysis (2)
apps/workspace-engine/pkg/events/handler/handler.go (4)
apps/workspace-engine/pkg/workspace/workspace.go (5)
WorkspaceSaver(298-298)GetWorkspace(205-212)Exists(192-195)New(19-31)Set(197-199)apps/workspace-engine/pkg/workspace/storage_gcs.go (1)
IsGCSStorageEnabled(14-16)apps/workspace-engine/pkg/workspace/populate_workspace.go (1)
PopulateWorkspaceWithInitialState(8-76)apps/workspace-engine/pkg/changeset/context.go (1)
WithChangeSet(13-15)
apps/workspace-engine/pkg/kafka/kafka.go (4)
apps/workspace-engine/pkg/events/handler/handler.go (1)
EventListener(97-100)apps/workspace-engine/pkg/workspace/storage_gcs.go (1)
IsGCSStorageEnabled(14-16)apps/workspace-engine/pkg/events/events.go (2)
NewEventHandlerWithWorkspaceSaver(85-87)NewEventHandler(80-82)apps/workspace-engine/pkg/workspace/workspace.go (2)
CreateGCSWorkspaceSaver(301-310)CreateGCSWorkspaceLoader(357-377)
🔇 Additional comments (4)
apps/workspace-engine/pkg/kafka/kafka.go (4)
10-10: Import looks fine.
33-38: Dynamic handler selection LGTM.Cleanly injects GCS saver when enabled.
43-45: GCS route from RunConsumer LGTM.Keeps legacy path unchanged.
49-52: Confirm nil discoverer is supported by CreateGCSWorkspaceLoader.If nil isn’t explicitly handled, pass a concrete discoverer (e.g., wskafka implementation) to avoid nil deref.
Would you verify CreateGCSWorkspaceLoader(nil) handles nil? If not, prefer wiring a default discoverer here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
apps/workspace-engine/pkg/events/handler/handler.go (2)
107-110: Add GoDoc for the exported constructor.Per coding guidelines and the previous review comment, this exported function needs documentation explaining that providing a WorkspaceSaver enables snapshot persistence after event processing, and any ordering or error-handling semantics.
Apply this diff to add documentation:
+// NewEventListenerWithWorkspaceSaver creates a new event listener with workspace snapshot persistence. +// When workspaceSaver is non-nil, the workspace state is persisted after each event is processed. +// Snapshot persistence occurs synchronously; if saving fails, the event processing fails. func NewEventListenerWithWorkspaceSaver(handlers HandlerRegistry, workspaceSaver workspace.WorkspaceSaver) *EventListener { return &EventListener{handlers: handlers, workspaceSaver: workspaceSaver} }As per coding guidelines.
198-212: CRITICAL: Early return skips FlushChangeset, causing data loss.When a workspaceSaver is provided, the function returns early at line 211 without calling
ws.ChangesetConsumer().FlushChangeset(ctx, changeSet)at line 214. This means all accumulated changes in the changeset are never persisted to the database, leading to data loss and inconsistent state between the GCS snapshot and the DB.Apply this diff to fix the issue:
+ // Flush accumulated changes to DB before snapshotting + if err := ws.ChangesetConsumer().FlushChangeset(ctx, changeSet); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "failed to flush changeset") + log.Error("Failed to flush changeset", "error", err) + return nil, fmt.Errorf("failed to flush changeset: %w", err) + } + + // Optionally persist snapshot when saver is configured if el.workspaceSaver != nil { timestampStr := time.Unix(rawEvent.Timestamp, 0).Format(time.RFC3339) if err := el.workspaceSaver(ctx, ws.ID, timestampStr); err != nil { span.RecordError(err) span.SetStatus(codes.Error, "failed to save workspace") log.Error("Failed to save workspace", "error", err, "workspaceID", ws.ID) return nil, fmt.Errorf("failed to save workspace: %w", err) } - - span.SetStatus(codes.Ok, "event processed successfully") - log.Debug("Successfully processed event", - "eventType", rawEvent.EventType) - - return ws, nil } - - if err := ws.ChangesetConsumer().FlushChangeset(ctx, changeSet); err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, "failed to flush changeset") - log.Error("Failed to flush changeset", "error", err) - return nil, fmt.Errorf("failed to flush changeset: %w", err) - } span.SetStatus(codes.Ok, "event processed successfully") log.Debug("Successfully processed event",This ensures changes are persisted to the DB before snapshotting, regardless of whether a saver is configured.
🧹 Nitpick comments (4)
apps/workspace-engine/pkg/db/workspaces.go (1)
129-141: Consider idempotency for snapshot writes.The function performs a raw INSERT without checking for existing snapshots with the same workspace_id and timestamp. If a write is retried (e.g., after a transient error), duplicate rows will be created. While this doesn't break reads (GetWorkspaceSnapshot uses LIMIT 1), it could bloat the table over time.
If idempotency is desired, consider using an UPSERT pattern or adding a unique constraint on (workspace_id, timestamp):
const WORKSPACE_SNAPSHOT_INSERT_QUERY = ` - INSERT INTO workspace_snapshot (workspace_id, path, timestamp, partition, num_partitions) - VALUES ($1, $2, $3, $4, $5) + INSERT INTO workspace_snapshot (workspace_id, path, timestamp, partition, num_partitions) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (workspace_id, timestamp) DO UPDATE SET + path = EXCLUDED.path, + partition = EXCLUDED.partition, + num_partitions = EXCLUDED.num_partitions `Note: This requires a unique index on (workspace_id, timestamp). If append-only behavior is intentional, this suggestion can be ignored.
apps/workspace-engine/pkg/workspace/storage_gcs.go (3)
14-16: Consider supporting both "gs://" and "gcs://" schemes.The current implementation only checks for the standard "gs://" prefix. While this is correct, supporting "gcs://" as well would provide defensive handling if the URL format varies.
Apply this diff if broader scheme support is desired:
func IsGCSStorageEnabled() bool { - return strings.HasPrefix(os.Getenv("WORKSPACE_STATES_BUCKET_URL"), "gs://") + url := os.Getenv("WORKSPACE_STATES_BUCKET_URL") + return strings.HasPrefix(url, "gs://") || strings.HasPrefix(url, "gcs://") }
18-59: Validate bucket name is non-empty.After parsing the snapshot path at lines 33-38, the bucket name could theoretically be empty if the path is malformed (e.g., "/objectpath"). While the len(parts) check prevents index errors, passing an empty bucket name to the GCS client could cause runtime errors.
Apply this diff to add validation:
parts := strings.SplitN(snapshot.Path, "/", 2) if len(parts) != 2 { return nil, fmt.Errorf("invalid GCS path: %s", snapshot.Path) } bucket := parts[0] + if bucket == "" { + return nil, fmt.Errorf("invalid GCS path (empty bucket): %s", snapshot.Path) + } objectPath := parts[1]
61-115: Validate bucket name is non-empty.After parsing the WORKSPACE_STATES_BUCKET_URL at lines 67-74, the bucket name could be empty if the environment variable is malformed (e.g., "gs://" or "gs:///prefix"). Passing an empty bucket to the GCS client would cause runtime errors.
Apply this diff to add validation:
gsPath := strings.TrimPrefix(bucketURL, "gs://") parts := strings.SplitN(gsPath, "/", 2) bucket := parts[0] + if bucket == "" { + return fmt.Errorf("WORKSPACE_STATES_BUCKET_URL is invalid (empty bucket): %s", bucketURL) + } prefix := "" if len(parts) > 1 { prefix = parts[1] }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
apps/workspace-engine/pkg/db/workspaces.go(2 hunks)apps/workspace-engine/pkg/events/handler/handler.go(5 hunks)apps/workspace-engine/pkg/workspace/storage_gcs.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/db/workspaces.goapps/workspace-engine/pkg/events/handler/handler.goapps/workspace-engine/pkg/workspace/storage_gcs.go
🧬 Code graph analysis (3)
apps/workspace-engine/pkg/db/workspaces.go (2)
apps/workspace-engine/pkg/workspace/storage_gcs.go (1)
GetWorkspaceSnapshot(18-59)apps/workspace-engine/pkg/db/client.go (1)
GetDB(60-70)
apps/workspace-engine/pkg/events/handler/handler.go (4)
apps/workspace-engine/pkg/workspace/workspace.go (5)
WorkspaceSaver(298-298)GetWorkspace(205-212)Exists(192-195)New(19-31)Set(197-199)apps/workspace-engine/pkg/changeset/changeset.go (1)
NewChangeSet(32-36)apps/workspace-engine/pkg/workspace/storage_gcs.go (1)
IsGCSStorageEnabled(14-16)apps/workspace-engine/pkg/workspace/populate_workspace.go (1)
PopulateWorkspaceWithInitialState(8-76)
apps/workspace-engine/pkg/workspace/storage_gcs.go (1)
apps/workspace-engine/pkg/db/workspaces.go (3)
GetWorkspaceSnapshot(101-122)WorkspaceSnapshot(90-95)WriteWorkspaceSnapshot(129-141)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build (linux/amd64)
🔇 Additional comments (9)
apps/workspace-engine/pkg/db/workspaces.go (5)
3-7: LGTM!The pgx/v5 import is correctly added to support ErrNoRows handling in GetWorkspaceSnapshot.
90-95: LGTM!The WorkspaceSnapshot struct definition is clean and appropriate for storing snapshot metadata.
97-99: LGTM!The SELECT query correctly retrieves the latest snapshot for a workspace using ORDER BY timestamp DESC LIMIT 1.
101-122: LGTM!GetWorkspaceSnapshot correctly handles the ErrNoRows case by returning (nil, nil), which is an idiomatic pattern for distinguishing "not found" from errors.
124-127: LGTM!The INSERT query now correctly provides 5 placeholders for 5 columns, addressing the previously reported issue.
apps/workspace-engine/pkg/events/handler/handler.go (4)
7-7: LGTM!The time import is correctly added to support timestamp formatting in the saver logic.
83-88: LGTM!The Timestamp field addition to RawEvent is appropriate. Note that line 199 assumes the timestamp is in seconds; if upstream events provide milliseconds, conversion will be needed.
152-177: LGTM!The changeset is now correctly attached to the context unconditionally at line 177, regardless of storage mode. This addresses the previously reported issue.
188-196: LGTM!The release manager change processing is correctly integrated with the changeset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
apps/workspace-engine/pkg/workspace/workspace.go (3)
260-275: Consider validating workspaceID matches w.ID.The method accepts
workspaceIDas a parameter while the workspace already hasw.ID. Although the parameter is necessary for the storage key, there's no validation that they match. A mismatch could lead to workspace data being saved under an incorrect key.Consider adding a validation check:
func (w *Workspace) SaveToGCS(ctx context.Context, workspaceID string, timestamp string, partition int32, numPartitions int32) error { + if workspaceID != w.ID { + return fmt.Errorf("workspaceID parameter (%s) does not match workspace ID (%s)", workspaceID, w.ID) + } + if !IsGCSStorageEnabled() { return nil }Alternatively, remove the parameter and use
w.IDdirectly if they're always expected to match.
279-298: Consider validating workspaceID matches w.ID.Similar to
SaveToGCS, this method acceptsworkspaceIDas a parameter whilew.IDalready exists. Consider adding validation to ensure consistency or usingw.IDdirectly.func (w *Workspace) LoadFromGCS(ctx context.Context, workspaceID string) error { + if workspaceID != w.ID { + return fmt.Errorf("workspaceID parameter (%s) does not match workspace ID (%s)", workspaceID, w.ID) + } + if !IsGCSStorageEnabled() { return nil }
369-386: The explicit Set calls may be redundant.After
GetWorkspace(workspaceID)at line 370, the workspace is already in the map (either it existed before, orGetWorkspacejust created and registered it). SinceLoadFromGCSmodifies the workspace in-place and the map stores pointers, the explicitSetcalls at lines 373 and 381 are storing the same object reference that's already in the map.While these calls are harmless and make the intent explicit, they can be removed if desired:
for _, workspaceID := range allWorkspaceIDs { ws := GetWorkspace(workspaceID) err := ws.LoadFromGCS(ctx, workspaceID) if err == nil { - Set(workspaceID, ws) continue } if err == ErrWorkspaceSnapshotNotFound { if err := PopulateWorkspaceWithInitialState(ctx, ws); err != nil { return fmt.Errorf("failed to populate workspace %s with initial state: %w", workspaceID, err) } - Set(workspaceID, ws) continue }Alternatively, keep them for explicitness and clarity.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
apps/workspace-engine/pkg/workspace/workspace.go(3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/workspace/workspace.go
🧬 Code graph analysis (1)
apps/workspace-engine/pkg/workspace/workspace.go (3)
apps/workspace-engine/pkg/workspace/storage_gcs.go (3)
IsGCSStorageEnabled(14-16)PutWorkspaceSnapshot(62-115)GetWorkspaceSnapshot(18-59)apps/workspace-engine/pkg/workspace/kafka/state.go (3)
PartitionForWorkspace(41-45)WorkspaceIDDiscoverer(59-59)GetAssignedWorkspaceIDs(61-80)apps/workspace-engine/pkg/workspace/populate_workspace.go (1)
PopulateWorkspaceWithInitialState(8-76)
🔇 Additional comments (4)
apps/workspace-engine/pkg/workspace/workspace.go (4)
277-278: LGTM!Good use of a sentinel error to distinguish "snapshot not found" from other error conditions. This enables graceful fallback to initial state population in
CreateGCSWorkspaceLoader.
300-313: LGTM!The saver correctly calculates the partition using the same hashing function as the loader, ensuring consistency. Error messages correctly reference "GCS".
318-354: LGTM!The helper function provides good flexibility by accepting an optional discoverer while falling back to the default implementation. The use of a map for deduplication when collecting IDs across multiple partitions is efficient and correct.
401-403: LGTM!Good refactoring to use the shared
getAssignedWorkspaceIDshelper. This eliminates code duplication and ensures consistent behavior between GCS and non-GCS loaders.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/workspace-engine/pkg/workspace/workspace.go (1)
140-146: Initialize store via constructor; trim stray whitespace.Use store.New() to ensure internal invariants before GobDecode; remove trailing whitespace on Line 140.
- w.ID = wsData.ID + w.ID = wsData.ID @@ - if w.store == nil { - w.store = &store.Store{} - } + if w.store == nil { + w.store = store.New() + }
🧹 Nitpick comments (7)
apps/workspace-engine/pkg/kafka/kafka.go (1)
97-108: Redundant nil check on ws; GetWorkspace never returns nil.Simplify the preload loop; optionally log when a workspace was newly created vs loaded.
- ws := workspace.GetWorkspace(workspaceID) - if ws == nil { - log.Error("Workspace not found", "workspaceID", workspaceID) - continue - } + ws := workspace.GetWorkspace(workspaceID)apps/workspace-engine/pkg/events/handler/handler.go (1)
147-151: Refactor nil check or clarify auto-create intent.The review comment is verified as correct.
GetWorkspace()at line 197 inworkspace.goalways returns a non-nil*Workspace—it auto-creates withNew(id)if the workspace doesn't exist. The nil check at lines 147–151 inhandler.gois unreachable dead code.Choose one approach:
- Option A: Remove the dead check if auto-create is the intended behavior.
- Option B: Replace with
workspace.Exists()check (line 184) if you require the workspace to exist prior without auto-creation.apps/workspace-engine/pkg/workspace/loader.go (5)
61-67: Improve error context and consistency in Load.
- Error mentions “disk” though StorageClient may be GCS.
- GobDecode error is not wrapped with context.
- data, err := storage.Get(ctx, dbSnapshotPath) - if err != nil { - return fmt.Errorf("failed to read workspace from disk: %w", err) - } - - return workspace.GobDecode(data) + data, err := storage.Get(ctx, dbSnapshotPath) + if err != nil { + return fmt.Errorf("failed to read workspace from storage: %w", err) + } + if err := workspace.GobDecode(data); err != nil { + return fmt.Errorf("failed to decode workspace gob: %w", err) + } + return nil
14-24: Consider loading all workspaces with partial failure tolerance.Fail‑fast stops all subsequent workspaces on the first error. For resilience at startup, log/collect errors per workspace and continue; return a joined error at the end. Also consider honoring ctx cancellation inside the loop.
Would you prefer fail‑fast semantics here, or should we aggregate errors (e.g., using errors.Join) and proceed with the rest?
10-12: Optional: path layout for better organization and listing performance.Using an underscore‑joined filename works, but a hierarchical path improves list/GC routines and human navigation (e.g., "/.gob"). Safe to defer.
-func getPath(workspaceID string, timestamp string) string { - return fmt.Sprintf("%s_%s.gob", workspaceID, timestamp) -} +func getPath(workspaceID, timestamp string) string { + return fmt.Sprintf("%s/%s.gob", workspaceID, timestamp) +}
14-14: Add doc comments for exported APIs (per guidelines).Brief GoDoc for LoadAll, Save, and Load helps callers understand behavior (e.g., initial‑state population when no snapshot, snapshot semantics).
- func LoadAll(ctx context.Context, storage StorageClient) error { + // LoadAll loads each known workspace from its latest snapshot if present, + // otherwise populates it with initial state. Returns the first/aggregated error. + func LoadAll(ctx context.Context, storage StorageClient) error { @@ - func Save(ctx context.Context, storage StorageClient, workspace *Workspace, snapshot *db.WorkspaceSnapshot) error { + // Save encodes the workspace, writes it to storage, and persists the snapshot metadata. + // Path and timestamp are aligned so storage and DB reference the same object. + func Save(ctx context.Context, storage StorageClient, workspace *Workspace, snapshot *db.WorkspaceSnapshot) error { @@ - func Load(ctx context.Context, storage StorageClient, workspace *Workspace) error { + // Load restores the workspace from the latest snapshot if available; otherwise it + // populates the workspace with initial state from the DB. + func Load(ctx context.Context, storage StorageClient, workspace *Workspace) error {As per coding guidelines.
Also applies to: 26-26, 46-46
52-57: Optional: persist an initial snapshot after first population.After PopulateWorkspaceWithInitialState, consider saving an initial snapshot to avoid repeating the full DB load on subsequent starts.
Do you intend to defer the first snapshot until after initial Kafka catch‑up, or should loader write it immediately?
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (10)
apps/workspace-engine/pkg/events/handler/handler.go(3 hunks)apps/workspace-engine/pkg/kafka/kafka.go(4 hunks)apps/workspace-engine/pkg/kafka/message.go(0 hunks)apps/workspace-engine/pkg/kafka/offset.go(0 hunks)apps/workspace-engine/pkg/workspace/kafka/state.go(0 hunks)apps/workspace-engine/pkg/workspace/loader.go(1 hunks)apps/workspace-engine/pkg/workspace/storage.go(1 hunks)apps/workspace-engine/pkg/workspace/storage_file.go(1 hunks)apps/workspace-engine/pkg/workspace/storage_gcs.go(1 hunks)apps/workspace-engine/pkg/workspace/workspace.go(1 hunks)
💤 Files with no reviewable changes (3)
- apps/workspace-engine/pkg/workspace/kafka/state.go
- apps/workspace-engine/pkg/kafka/offset.go
- apps/workspace-engine/pkg/kafka/message.go
🚧 Files skipped from review as they are similar to previous changes (1)
- apps/workspace-engine/pkg/workspace/storage_gcs.go
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/workspace/storage.goapps/workspace-engine/pkg/workspace/storage_file.goapps/workspace-engine/pkg/kafka/kafka.goapps/workspace-engine/pkg/events/handler/handler.goapps/workspace-engine/pkg/workspace/loader.goapps/workspace-engine/pkg/workspace/workspace.go
🧬 Code graph analysis (3)
apps/workspace-engine/pkg/kafka/kafka.go (5)
apps/workspace-engine/pkg/workspace/storage_file.go (1)
NewFileStorage(20-22)apps/workspace-engine/pkg/workspace/storage_gcs.go (2)
IsGCSStorageEnabled(15-17)NewGCSStorageClient(27-48)apps/workspace-engine/pkg/workspace/workspace.go (1)
GetWorkspace(197-204)apps/workspace-engine/pkg/workspace/loader.go (2)
Load(46-67)Save(26-44)apps/workspace-engine/pkg/db/workspaces.go (1)
WorkspaceSnapshot(90-95)
apps/workspace-engine/pkg/events/handler/handler.go (2)
apps/workspace-engine/pkg/workspace/workspace.go (1)
GetWorkspace(197-204)apps/workspace-engine/pkg/server/openapi/utils/utils.go (1)
GetWorkspace(11-30)
apps/workspace-engine/pkg/workspace/loader.go (4)
apps/workspace-engine/pkg/workspace/storage_file.go (1)
StorageClient(9-12)apps/workspace-engine/pkg/workspace/workspace.go (3)
GetAllWorkspaceIds(215-217)GetWorkspace(197-204)Workspace(42-48)apps/workspace-engine/pkg/db/workspaces.go (3)
WorkspaceSnapshot(90-95)WriteWorkspaceSnapshot(129-141)GetWorkspaceSnapshot(101-122)apps/workspace-engine/pkg/workspace/populate_workspace.go (1)
PopulateWorkspaceWithInitialState(8-76)
🔇 Additional comments (1)
apps/workspace-engine/pkg/workspace/loader.go (1)
3-8: The import path is correct; no issues found.The verification confirms that
"workspace-engine/pkg/db"is the correct import path. The module is declared asworkspace-engineinapps/workspace-engine/go.mod, and all internal package imports consistently use this module name as the prefix. The concern about needing a full repository path prefix (e.g.,github.com/ctrlplanedev/ctrlplane/apps/...) does not apply here—the codebase uses local module declarations, not repository-level paths.Likely an incorrect or invalid review comment.
| ws, err := handler.ListenAndRoute(ctx, msg) | ||
| if err != nil { | ||
| log.Error("Failed to route message", "error", err) | ||
| continue | ||
| } | ||
|
|
||
| // Commit offset to Kafka | ||
| if _, err := consumer.CommitMessage(msg); err != nil { | ||
| log.Error("Failed to commit message", "error", err) | ||
| continue | ||
| } | ||
|
|
||
| snapshot := &db.WorkspaceSnapshot{ | ||
| Timestamp: time.Now().Format(time.RFC3339), | ||
| Partition: int32(msg.TopicPartition.Partition), | ||
| NumPartitions: numPartitions, | ||
| } | ||
|
|
||
| workspace.Save(ctx, storage, ws, snapshot) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t commit before persisting; handle Save error and use event time.
Current flow commits offsets, then ignores workspace.Save error — risking data loss. Persist first; on failure, skip commit so the message is retried. Also prefer msg.Timestamp for snapshot time.
- ws, err := handler.ListenAndRoute(ctx, msg)
+ ws, err := handler.ListenAndRoute(ctx, msg)
if err != nil {
log.Error("Failed to route message", "error", err)
continue
}
-
- // Commit offset to Kafka
- if _, err := consumer.CommitMessage(msg); err != nil {
- log.Error("Failed to commit message", "error", err)
- continue
- }
-
- snapshot := &db.WorkspaceSnapshot{
- Timestamp: time.Now().Format(time.RFC3339),
- Partition: int32(msg.TopicPartition.Partition),
- NumPartitions: numPartitions,
- }
-
- workspace.Save(ctx, storage, ws, snapshot)
+
+ snapshot := &db.WorkspaceSnapshot{
+ Timestamp: msg.Timestamp.UTC().Format(time.RFC3339),
+ Partition: int32(msg.TopicPartition.Partition),
+ NumPartitions: numPartitions,
+ }
+
+ if err := workspace.Save(ctx, storage, ws, snapshot); err != nil {
+ log.Error("Failed to save workspace snapshot", "workspaceID", ws.ID, "error", err)
+ // Do not commit; let the message be retried.
+ continue
+ }
+
+ // Commit offset only after durable save succeeds.
+ if _, err := consumer.CommitMessage(msg); err != nil {
+ log.Error("Failed to commit message", "error", err)
+ continue
+ }Also applies to: 141-145
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/kafka/kafka.go around lines 129-148 (and likewise
141-145), the code currently commits the Kafka offset before ensuring
workspace.Save succeeded and uses time.Now() for the snapshot; change the flow
to first build the snapshot using the Kafka message event time (msg.Timestamp or
msg.Time depending on the Kafka client), call workspace.Save(ctx, storage, ws,
snapshot) and check its error, and only if Save returns nil then call
consumer.CommitMessage(msg); on Save error log the failure with context and skip
committing so the message will be retried. Ensure all error paths
return/continue appropriately and remove/adjust the earlier commit-before-save
logic.
|
|
||
| // NewFileStorage returns a FileStorage rooted at the given base directory. | ||
| func NewFileStorage(baseDir string) *FileStorage { | ||
| func NewFileStorage(baseDir string) StorageClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Path traversal risk in Get/Put; harden path handling.
User‑controlled or computed paths can escape BaseDir via “..”. Clean and enforce prefix, and prefer absolute BaseDir. Update comment to reflect interface return.
Apply:
-// NewFileStorage returns a FileStorage rooted at the given base directory.
-func NewFileStorage(baseDir string) StorageClient {
- return &FileStorage{BaseDir: baseDir}
-}
+// NewFileStorage returns a StorageClient rooted at baseDir.
+func NewFileStorage(baseDir string) StorageClient {
+ abs, err := filepath.Abs(baseDir)
+ if err != nil {
+ abs = baseDir
+ }
+ return &FileStorage{BaseDir: abs}
+}
@@
-func (fs *FileStorage) Get(ctx context.Context, path string) ([]byte, error) {
- fullPath := filepath.Join(fs.BaseDir, path)
- return os.ReadFile(fullPath)
-}
+func (fs *FileStorage) Get(ctx context.Context, path string) ([]byte, error) {
+ clean := filepath.Clean(path)
+ fullPath := filepath.Join(fs.BaseDir, clean)
+ base := fs.BaseDir
+ if !strings.HasSuffix(base, string(os.PathSeparator)) {
+ base += string(os.PathSeparator)
+ }
+ if !strings.HasPrefix(fullPath, base) {
+ return nil, fmt.Errorf("invalid path outside base dir: %s", clean)
+ }
+ return os.ReadFile(fullPath)
+}
@@
-func (fs *FileStorage) Put(ctx context.Context, path string, data []byte) error {
- fullPath := filepath.Join(fs.BaseDir, path)
+func (fs *FileStorage) Put(ctx context.Context, path string, data []byte) error {
+ clean := filepath.Clean(path)
+ fullPath := filepath.Join(fs.BaseDir, clean)
+ base := fs.BaseDir
+ if !strings.HasSuffix(base, string(os.PathSeparator)) {
+ base += string(os.PathSeparator)
+ }
+ if !strings.HasPrefix(fullPath, base) {
+ return fmt.Errorf("invalid path outside base dir: %s", clean)
+ }Note: add imports “fmt” and “strings”.
Also applies to: 25-38
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/storage_file.go around lines 20 and
25-38, NewFileStorage currently accepts baseDir and subsequent Get/Put build
paths that can escape baseDir via “..”; make baseDir absolute (use filepath.Abs)
on creation, store it, and in Get/Put clean and join the user path with that
absolute baseDir using filepath.Join then call filepath.Clean and verify the
resulting path has the absolute baseDir as a prefix (use strings.HasPrefix or a
safe equivalent) before any file operations to prevent path traversal; update
the function comment to state it returns a StorageClient interface, and add
imports "fmt" and "strings" as requested; when the prefix check fails return a
descriptive error (fmt.Errorf) rather than performing IO.
| "errors" | ||
| ) | ||
|
|
||
| var ErrWorkspaceSnapshotNotFound = errors.New("workspace snapshot not found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exported symbols need GoDoc; clarify gob compatibility.
Please add short comments for ErrWorkspaceSnapshotNotFound and WorkspaceStorageObject to document semantics and backward-compat expectations for gob (e.g., unknown/removed fields are ignored on decode). This aids API users and avoids misuse across storage backends.
Example:
// ErrWorkspaceSnapshotNotFound is returned when no snapshot metadata exists
// for the given workspace in the snapshot store.
var ErrWorkspaceSnapshotNotFound = errors.New("workspace snapshot not found")
// WorkspaceStorageObject is the gob-encoded wire form of a workspace snapshot.
// Fields may evolve; gob ignores unknown fields on decode for backward compat.
type WorkspaceStorageObject struct { ... }Also applies to: 9-12
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/storage.go around lines 7 and 9-12, add
GoDoc comments for the exported ErrWorkspaceSnapshotNotFound and
WorkspaceStorageObject symbols: document the semantics (e.g.,
ErrWorkspaceSnapshotNotFound is returned when no snapshot metadata exists for a
workspace) and explicitly state gob compatibility expectations (that
WorkspaceStorageObject is the gob-encoded wire form, that fields may evolve and
gob ignores unknown/removed fields on decode to support backward/forward
compatibility). Keep comments short, one- or two-line GoDoc style sentences
placed immediately above each exported declaration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
apps/workspace-engine/pkg/workspace/loader.go (1)
9-25: Good: storage path now sourced from snapshot.Using snapshot.Path consistently removes metadata/path drift risk flagged earlier.
apps/workspace-engine/pkg/kafka/kafka.go (1)
135-149: Critical: don’t commit before persisting; handle Save error.Commit happens before Save and the Save error is ignored, risking data loss. Persist first; only commit after a successful Save. Also normalize timestamp to UTC.
- // Commit offset to Kafka - if _, err := consumer.CommitMessage(msg); err != nil { - log.Error("Failed to commit message", "error", err) - continue - } - - snapshot := &db.WorkspaceSnapshot{ - Path: fmt.Sprintf("%s_%s.gob", ws.ID, msg.Timestamp.Format(time.RFC3339Nano)), - Timestamp: msg.Timestamp.Format(time.RFC3339Nano), - Partition: int32(msg.TopicPartition.Partition), - NumPartitions: numPartitions, - } - - workspace.Save(ctx, storage, ws, snapshot) + // Build snapshot metadata (UTC) + ts := msg.Timestamp.UTC().Format(time.RFC3339Nano) + snapshot := &db.WorkspaceSnapshot{ + Path: fmt.Sprintf("%s_%s.gob", ws.ID, ts), + Timestamp: ts, + Partition: int32(msg.TopicPartition.Partition), + NumPartitions: numPartitions, + } + if err := workspace.Save(ctx, storage, ws, snapshot); err != nil { + log.Error("Failed to save workspace snapshot", "workspaceID", ws.ID, "error", err) + continue // do not commit; let it retry + } + if _, err := consumer.CommitMessage(msg); err != nil { + log.Error("Failed to commit message", "error", err) + continue + }
🧹 Nitpick comments (7)
.gitignore (1)
62-63: Clarify scope of ignore and mirror in .dockerignore.If you only mean the repo‑root dir, prefer “/state/”. If the intent is any nested “state/”, current rule is fine. Consider adding a .gitkeep if you want the dir present locally, and mirror this in .dockerignore to avoid bloating images.
apps/workspace-engine/pkg/workspace/loader.go (3)
15-18: Make storage wording neutral and drop redundant comment.“disk” is misleading with GCS; also the inline comment adds no value. Suggest: remove the comment and adjust the error to say “storage”.
- // Write to file with appropriate permissions - if err := storage.Put(ctx, snapshot.Path, data); err != nil { - return fmt.Errorf("failed to write workspace to disk: %w", err) + if err := storage.Put(ctx, snapshot.Path, data); err != nil { + return fmt.Errorf("failed to write workspace to storage: %w", err) }
42-45: Neutralize “disk” in read error.Keep error transport‑agnostic.
- return fmt.Errorf("failed to read workspace from disk: %w", err) + return fmt.Errorf("failed to read workspace from storage: %w", err)
9-25: Optional: handle Put success + DB write failure to reduce orphaned blobs.If DB write fails after Put, the object is orphaned. Options: best‑effort delete, or make WriteWorkspaceSnapshot idempotent/upsert and retry on transient errors.
Can you confirm whether WriteWorkspaceSnapshot is idempotent (unique key on (workspace_id, path))?
apps/workspace-engine/pkg/kafka/kafka.go (3)
89-95: Make local storage base dir configurable.Use an env var for the base directory so ops can relocate state without code changes.
- storage := workspace.NewFileStorage("./state") + baseDir := getEnv("WORKSPACE_STATES_DIR", "./state") + storage := workspace.NewFileStorage(baseDir)
97-108: Remove unreachable nil check.workspace.GetWorkspace always returns a non‑nil workspace.
- ws := workspace.GetWorkspace(workspaceID) - if ws == nil { - log.Error("Workspace not found", "workspaceID", workspaceID) - continue - } + ws := workspace.GetWorkspace(workspaceID)
141-146: Prefer filesystem‑safe, directory‑scoped snapshot paths.Colons in RFC3339 can be problematic on some filesystems; nesting by workspace eases listing/GC.
- ts := msg.Timestamp.UTC().Format(time.RFC3339Nano) + // Filesystem‑safe, sortable timestamp + ts := msg.Timestamp.UTC().Format("20060102T150405.000000000Z") - snapshot := &db.WorkspaceSnapshot{ - Path: fmt.Sprintf("%s_%s.gob", ws.ID, ts), + snapshot := &db.WorkspaceSnapshot{ + Path: fmt.Sprintf("%s/%s.gob", ws.ID, ts), Timestamp: ts, Partition: int32(msg.TopicPartition.Partition), NumPartitions: numPartitions, }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
.gitignore(1 hunks)apps/workspace-engine/pkg/kafka/kafka.go(4 hunks)apps/workspace-engine/pkg/workspace/loader.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/workspace/loader.goapps/workspace-engine/pkg/kafka/kafka.go
🧬 Code graph analysis (2)
apps/workspace-engine/pkg/workspace/loader.go (4)
apps/workspace-engine/pkg/workspace/storage_file.go (1)
StorageClient(9-12)apps/workspace-engine/pkg/workspace/workspace.go (1)
Workspace(42-48)apps/workspace-engine/pkg/db/workspaces.go (3)
WorkspaceSnapshot(90-95)WriteWorkspaceSnapshot(129-141)GetWorkspaceSnapshot(101-122)apps/workspace-engine/pkg/workspace/populate_workspace.go (1)
PopulateWorkspaceWithInitialState(8-76)
apps/workspace-engine/pkg/kafka/kafka.go (5)
apps/workspace-engine/pkg/workspace/storage_file.go (1)
NewFileStorage(20-22)apps/workspace-engine/pkg/workspace/storage_gcs.go (2)
IsGCSStorageEnabled(15-17)NewGCSStorageClient(27-48)apps/workspace-engine/pkg/workspace/workspace.go (1)
GetWorkspace(197-204)apps/workspace-engine/pkg/workspace/loader.go (2)
Load(27-48)Save(9-25)apps/workspace-engine/pkg/db/workspaces.go (1)
WorkspaceSnapshot(90-95)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build (linux/amd64)
| func Save(ctx context.Context, storage StorageClient, workspace *Workspace, snapshot *db.WorkspaceSnapshot) error { | ||
| data, err := workspace.GobEncode() | ||
| if err != nil { | ||
| return fmt.Errorf("failed to encode workspace: %w", err) | ||
| } | ||
|
|
||
| // Write to file with appropriate permissions | ||
| if err := storage.Put(ctx, snapshot.Path, data); err != nil { | ||
| return fmt.Errorf("failed to write workspace to disk: %w", err) | ||
| } | ||
|
|
||
| if err := db.WriteWorkspaceSnapshot(ctx, workspace.ID, snapshot); err != nil { | ||
| return fmt.Errorf("failed to write workspace snapshot: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate snapshot.Path before writing.
Guard against empty/invalid paths to avoid clobbering a base dir or writing to an unintended key.
func Save(ctx context.Context, storage StorageClient, workspace *Workspace, snapshot *db.WorkspaceSnapshot) error {
+ if snapshot == nil || snapshot.Path == "" {
+ return fmt.Errorf("invalid snapshot: missing Path")
+ }
data, err := workspace.GobEncode()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func Save(ctx context.Context, storage StorageClient, workspace *Workspace, snapshot *db.WorkspaceSnapshot) error { | |
| data, err := workspace.GobEncode() | |
| if err != nil { | |
| return fmt.Errorf("failed to encode workspace: %w", err) | |
| } | |
| // Write to file with appropriate permissions | |
| if err := storage.Put(ctx, snapshot.Path, data); err != nil { | |
| return fmt.Errorf("failed to write workspace to disk: %w", err) | |
| } | |
| if err := db.WriteWorkspaceSnapshot(ctx, workspace.ID, snapshot); err != nil { | |
| return fmt.Errorf("failed to write workspace snapshot: %w", err) | |
| } | |
| return nil | |
| } | |
| func Save(ctx context.Context, storage StorageClient, workspace *Workspace, snapshot *db.WorkspaceSnapshot) error { | |
| if snapshot == nil || snapshot.Path == "" { | |
| return fmt.Errorf("invalid snapshot: missing Path") | |
| } | |
| data, err := workspace.GobEncode() | |
| if err != nil { | |
| return fmt.Errorf("failed to encode workspace: %w", err) | |
| } | |
| // Write to file with appropriate permissions | |
| if err := storage.Put(ctx, snapshot.Path, data); err != nil { | |
| return fmt.Errorf("failed to write workspace to disk: %w", err) | |
| } | |
| if err := db.WriteWorkspaceSnapshot(ctx, workspace.ID, snapshot); err != nil { | |
| return fmt.Errorf("failed to write workspace snapshot: %w", err) | |
| } | |
| return nil | |
| } |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/loader.go around lines 9 to 25, validate
snapshot.Path before calling storage.Put: ensure snapshot != nil and
snapshot.Path is non-empty, not "/" or ".", not an absolute path (doesn't start
with "/"), and does not contain path traversal segments like ".."; normalize
with filepath.Clean and reject the value if cleaning produces an empty path or
starts with ".." or "/" or equals "."; if invalid, return a clear error (e.g.
fmt.Errorf("invalid snapshot path: %q", snapshot.Path)) so you never write to
unintended locations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (2)
apps/workspace-engine/pkg/workspace/loader.go (2)
9-25: Validate snapshot.Path before writing.As flagged in the previous review,
snapshot.Pathshould be validated to prevent writing to unintended locations (e.g., empty paths, path traversal with "..", absolute paths).
9-25: Validate inputs to prevent nil pointer dereferences.The function doesn't check whether
workspaceorsnapshotare nil before dereferencing them. Callingworkspace.GobEncode()or accessingsnapshot.Pathwith nil values will cause a panic.Apply this diff to add defensive checks:
func Save(ctx context.Context, storage StorageClient, workspace *Workspace, snapshot *db.WorkspaceSnapshot) error { + if workspace == nil { + return fmt.Errorf("workspace cannot be nil") + } + if snapshot == nil { + return fmt.Errorf("snapshot cannot be nil") + } + if snapshot.Path == "" { + return fmt.Errorf("snapshot.Path cannot be empty") + } data, err := workspace.GobEncode()
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/workspace/loader.go (1)
45-45: Wrap GobDecode error for consistency.The error from
GobDecodeis returned directly without wrapping, which is inconsistent with the rest of the function. Consider wrapping it for context.Apply this diff:
- return workspace.GobDecode(data) + if err := workspace.GobDecode(data); err != nil { + return fmt.Errorf("failed to decode workspace: %w", err) + } + return nil
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
apps/workspace-engine/pkg/workspace/loader.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/workspace/loader.go
🧬 Code graph analysis (1)
apps/workspace-engine/pkg/workspace/loader.go (4)
apps/workspace-engine/pkg/workspace/storage_file.go (1)
StorageClient(9-12)apps/workspace-engine/pkg/workspace/workspace.go (1)
Workspace(42-48)apps/workspace-engine/pkg/db/workspaces.go (3)
WorkspaceSnapshot(90-95)WriteWorkspaceSnapshot(129-141)GetWorkspaceSnapshot(101-122)apps/workspace-engine/pkg/workspace/populate_workspace.go (1)
PopulateWorkspaceWithInitialState(8-76)
| if err := storage.Put(ctx, snapshot.Path, data); err != nil { | ||
| return fmt.Errorf("failed to write workspace to disk: %w", err) | ||
| } | ||
|
|
||
| if err := db.WriteWorkspaceSnapshot(ctx, workspace.ID, snapshot); err != nil { | ||
| return fmt.Errorf("failed to write workspace snapshot: %w", err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Risk of inconsistent state: storage write succeeds but DB write fails.
If storage.Put succeeds but db.WriteWorkspaceSnapshot fails, the storage will contain data that isn't tracked in the database. Consider either:
- Writing to DB first, then storage (so storage is authoritative and DB is a cache/index)
- Implementing rollback/cleanup if DB write fails after storage write
Without this, orphaned snapshots can accumulate in storage.
|
|
||
| // Write to file with appropriate permissions | ||
| if err := storage.Put(ctx, snapshot.Path, data); err != nil { | ||
| return fmt.Errorf("failed to write workspace to disk: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error message references "disk" but storage may be GCS.
The error message "failed to write workspace to disk" is misleading when using GCS storage. Use a generic term like "storage" instead.
Apply this diff:
- return fmt.Errorf("failed to write workspace to disk: %w", err)
+ return fmt.Errorf("failed to write workspace to storage: %w", err)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| return fmt.Errorf("failed to write workspace to disk: %w", err) | |
| return fmt.Errorf("failed to write workspace to storage: %w", err) |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/loader.go around line 17, update the
error message that currently says "failed to write workspace to disk" to use a
neutral term such as "storage" so it’s accurate for GCS and other backends;
modify the fmt.Errorf call to return fmt.Errorf("failed to write workspace to
storage: %w", err) (or equivalent) so the underlying error is preserved while
the message is storage-agnostic.
| func Load(ctx context.Context, storage StorageClient, workspace *Workspace) error { | ||
| dbSnapshot, err := db.GetWorkspaceSnapshot(ctx, workspace.ID) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get workspace snapshot: %w", err) | ||
| } | ||
|
|
||
| if dbSnapshot == nil { | ||
| if err := PopulateWorkspaceWithInitialState(ctx, workspace); err != nil { | ||
| return fmt.Errorf("failed to populate workspace with initial state: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| data, err := storage.Get(ctx, dbSnapshot.Path) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to read workspace from disk: %w", err) | ||
| } | ||
|
|
||
| return workspace.GobDecode(data) | ||
| } No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate workspace input to prevent nil pointer dereferences.
The function doesn't check whether workspace is nil before accessing workspace.ID. This will cause a panic if nil is passed.
Apply this diff:
func Load(ctx context.Context, storage StorageClient, workspace *Workspace) error {
+ if workspace == nil {
+ return fmt.Errorf("workspace cannot be nil")
+ }
dbSnapshot, err := db.GetWorkspaceSnapshot(ctx, workspace.ID)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func Load(ctx context.Context, storage StorageClient, workspace *Workspace) error { | |
| dbSnapshot, err := db.GetWorkspaceSnapshot(ctx, workspace.ID) | |
| if err != nil { | |
| return fmt.Errorf("failed to get workspace snapshot: %w", err) | |
| } | |
| if dbSnapshot == nil { | |
| if err := PopulateWorkspaceWithInitialState(ctx, workspace); err != nil { | |
| return fmt.Errorf("failed to populate workspace with initial state: %w", err) | |
| } | |
| return nil | |
| } | |
| data, err := storage.Get(ctx, dbSnapshot.Path) | |
| if err != nil { | |
| return fmt.Errorf("failed to read workspace from disk: %w", err) | |
| } | |
| return workspace.GobDecode(data) | |
| } | |
| func Load(ctx context.Context, storage StorageClient, workspace *Workspace) error { | |
| if workspace == nil { | |
| return fmt.Errorf("workspace cannot be nil") | |
| } | |
| dbSnapshot, err := db.GetWorkspaceSnapshot(ctx, workspace.ID) | |
| if err != nil { | |
| return fmt.Errorf("failed to get workspace snapshot: %w", err) | |
| } | |
| if dbSnapshot == nil { | |
| if err := PopulateWorkspaceWithInitialState(ctx, workspace); err != nil { | |
| return fmt.Errorf("failed to populate workspace with initial state: %w", err) | |
| } | |
| return nil | |
| } | |
| data, err := storage.Get(ctx, dbSnapshot.Path) | |
| if err != nil { | |
| return fmt.Errorf("failed to read workspace from disk: %w", err) | |
| } | |
| return workspace.GobDecode(data) | |
| } |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/loader.go around lines 27 to 46, add a
nil-check for the workspace parameter at the top of Load to avoid dereferencing
workspace.ID when workspace is nil; if workspace == nil return a descriptive
error (e.g. fmt.Errorf("workspace is nil")) so callers receive a clear error
instead of panicking, then proceed with the existing logic unchanged.
| data, err := storage.Get(ctx, dbSnapshot.Path) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to read workspace from disk: %w", err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate dbSnapshot.Path before using it.
dbSnapshot.Path is passed directly to storage.Get without validation. If the path is malformed or empty, it could cause unexpected behavior or errors. Consider validating the path similar to the validation needed in the Save function.
Apply this diff:
+ if dbSnapshot.Path == "" {
+ return fmt.Errorf("snapshot path is empty")
+ }
data, err := storage.Get(ctx, dbSnapshot.Path)🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/loader.go around lines 40 to 43, validate
dbSnapshot.Path before calling storage.Get: ensure it's non-empty, run
filepath.Clean on it, reject paths containing path traversal components (e.g.
"..") or other invalid forms consistent with the Save function's validation, and
return a clear error if validation fails; then call storage.Get with the
cleaned/validated path.
|
|
||
| data, err := storage.Get(ctx, dbSnapshot.Path) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to read workspace from disk: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error message references "disk" but storage may be GCS.
The error message "failed to read workspace from disk" is misleading when using GCS storage. Use a generic term like "storage" instead.
Apply this diff:
- return fmt.Errorf("failed to read workspace from disk: %w", err)
+ return fmt.Errorf("failed to read workspace from storage: %w", err)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| return fmt.Errorf("failed to read workspace from disk: %w", err) | |
| return fmt.Errorf("failed to read workspace from storage: %w", err) |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/loader.go around line 42, the error
message uses the term "disk" which is misleading for non-disk backends like GCS;
update the formatted error string to use a neutral term such as "storage" (e.g.,
change "failed to read workspace from disk" to "failed to read workspace from
storage") so the error accurately reflects any backend and preserve the %w error
wrapping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (6)
apps/workspace-engine/test/e2e/engine_workspace_persistence_test.go (6)
3-15: Add imports required by refactors (reflect,sort).Needed for deep-equality on configs and deterministic job ID ordering.
import ( "context" "fmt" "os" + "reflect" + "sort" "sync" "testing" "time" "workspace-engine/pkg/oapi" "workspace-engine/pkg/workspace" "workspace-engine/test/integration" "github.com/google/uuid" )
54-58: Deep-compareResource.Configinstead of nil-only check.Nil-only check can miss serialization regressions. Use
reflect.DeepEqual.- // Verify config (deep comparison would require reflection or JSON marshaling) - if (expected.Config == nil) != (actual.Config == nil) { - t.Errorf("%s: config nil mismatch", context) - } + // Verify config deeply + if !reflect.DeepEqual(expected.Config, actual.Config) { + t.Errorf("%s: config mismatch: expected %#v, got %#v", context, expected.Config, actual.Config) + }
132-150: Also assertDeployment.Slug.
DeploymentNamehelper setsSlug = name; ensure it round-trips.if actual.Name != expected.Name { t.Errorf("%s: deployment name mismatch: expected %s, got %s", context, expected.Name, actual.Name) } +if actual.Slug != expected.Slug { + t.Errorf("%s: deployment slug mismatch: expected %s, got %s", context, expected.Slug, actual.Slug) +}
934-937: Handle tzdata absence to avoid spurious CI failures.
time.LoadLocationcan fail in minimal environments. Skip the timezone-specific test when tzdata is unavailable.utcLoc := time.UTC -estLoc, _ := time.LoadLocation("America/New_York") -pstLoc, _ := time.LoadLocation("America/Los_Angeles") +estLoc, err := time.LoadLocation("America/New_York") +if err != nil { + t.Skip("tzdata not available; skipping timezone-precision checks") +} +pstLoc, err := time.LoadLocation("America/Los_Angeles") +if err != nil { + t.Skip("tzdata not available; skipping timezone-precision checks") +}
1004-1004: Preferstrconv.Itoa(i)for clarity.
string(rune('0'+i))is cryptic and breaks above 9. Usestrconv.Itoa(i). Requiresstrconvimport.- Metadata: map[string]string{"test": ts.name, "index": string(rune('0' + i))}, + Metadata: map[string]string{"test": ts.name, "index": strconv.Itoa(i)},
17-21: Align header comment with actual tests or gate a GCS test by env.This file currently tests file-based storage only. Either remove “GCS Put/Get” from the header or add an env-guarded GCS test (skipped unless bucket/creds are set).
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
apps/workspace-engine/test/e2e/engine_workspace_loader_test.go(0 hunks)apps/workspace-engine/test/e2e/engine_workspace_persistence_test.go(1 hunks)apps/workspace-engine/test/e2e/engine_workspace_storage_test.go(0 hunks)
💤 Files with no reviewable changes (2)
- apps/workspace-engine/test/e2e/engine_workspace_loader_test.go
- apps/workspace-engine/test/e2e/engine_workspace_storage_test.go
🧰 Additional context used
📓 Path-based instructions (2)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/test/e2e/engine_workspace_persistence_test.go
apps/workspace-engine/**/*_test.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
Follow the existing test structure used in *_test.go files
Files:
apps/workspace-engine/test/e2e/engine_workspace_persistence_test.go
🧠 Learnings (1)
📚 Learning: 2025-08-12T18:13:54.630Z
Learnt from: CR
PR: ctrlplanedev/ctrlplane#0
File: apps/workspace-engine/CLAUDE.md:0-0
Timestamp: 2025-08-12T18:13:54.630Z
Learning: Applies to apps/workspace-engine/**/*_test.go : Follow the existing test structure used in *_test.go files
Applied to files:
apps/workspace-engine/test/e2e/engine_workspace_persistence_test.go
🧬 Code graph analysis (1)
apps/workspace-engine/test/e2e/engine_workspace_persistence_test.go (4)
apps/workspace-engine/test/integration/workspace.go (1)
NewTestWorkspace(35-74)apps/workspace-engine/test/integration/opts.go (23)
WithResource(107-132)ResourceID(455-459)ResourceName(449-453)WithJobAgent(134-150)JobAgentID(525-529)JobAgentName(519-523)WithSystem(84-105)SystemID(253-257)SystemName(241-245)WithDeployment(259-277)DeploymentID(310-314)DeploymentName(297-302)DeploymentJobAgent(316-320)WithWorkspaceID(77-82)ResourceConfig(479-483)WithDeploymentVersion(353-367)DeploymentVersionID(383-387)DeploymentVersionTag(377-381)WithEnvironment(279-293)EnvironmentID(427-431)EnvironmentName(415-419)PolicyID(551-555)PolicyName(539-543)apps/workspace-engine/pkg/workspace/storage_file.go (1)
NewFileStorage(20-22)apps/workspace-engine/pkg/workspace/workspace.go (1)
NewNoFlush(29-40)
🪛 GitHub Actions: Go Tests
apps/workspace-engine/test/e2e/engine_workspace_persistence_test.go
[error] 407-407: TestEngine_Persistence_EmptyWorkspace failed due to data race detected in workspace persistence flow. See GobDecode/ReleaseTargets concurrency traces for reproduction.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build (linux/amd64)
🔇 Additional comments (1)
apps/workspace-engine/test/e2e/engine_workspace_persistence_test.go (1)
290-290: Review comment is inaccurate — references incorrect line and omits one that needs change.The verification reveals:
- Line 1179: Already uses
workspace.NewNoFlush("temp")— no change needed- Line 398: Uses
workspace.New(workspaceID)— needs change but not mentioned in reviewThe complete list of lines requiring the change from
workspace.New()toworkspace.NewNoFlush()for thenewWsassignment is: 290, 398, 490, 602, 814, 1041 (not 1179 as claimed).Likely an incorrect or invalid review comment.
| } | ||
|
|
||
| // Load into new workspace | ||
| newWs := workspace.New(workspaceID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Fix race: load into a no-flush workspace before GobDecode.
Race in CI points to background activity from workspace.New(...). Use workspace.NewNoFlush(...) for pure decode/verify tests.
- newWs := workspace.New(workspaceID)
+ newWs := workspace.NewNoFlush(workspaceID)Run a quick scan to spot other loads that still use workspace.New(:
🏁 Script executed:
#!/usr/bin/env bash
rg -nP -C1 '\bworkspace\.New\(' apps/workspace-engine/test/e2eLength of output: 1730
Fix race conditions: use workspace.NewNoFlush() for all persistence verification loads.
All 6 workspace loads in this test file trigger the same race during GobDecode/ReleaseTargets due to background activity in workspace.New(). Use workspace.NewNoFlush() for pure decode/verify scenarios to block that activity.
- newWs := workspace.New(workspaceID)
+ newWs := workspace.NewNoFlush(workspaceID)Apply to lines: 290, 398, 490, 602, 814, 1041.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| newWs := workspace.New(workspaceID) | |
| newWs := workspace.NewNoFlush(workspaceID) |
🤖 Prompt for AI Agents
In apps/workspace-engine/test/e2e/engine_workspace_persistence_test.go around
lines 290, 398, 490, 602, 814, and 1041, the test uses workspace.New(...) which
starts background activity and causes race conditions during
GobDecode/ReleaseTargets; replace each call to workspace.New(workspaceID) with
workspace.NewNoFlush(workspaceID) for these pure decode/verify loads to block
background flushing and eliminate the race. Ensure each of the six locations is
updated consistently and run tests to confirm races are resolved.
| // Get all jobs created by the deployment version | ||
| allJobs := ws.Jobs().Items() | ||
| if len(allJobs) == 0 { | ||
| t.Fatal("expected at least one job to be created") | ||
| } | ||
|
|
||
| // Update job statuses | ||
| jobsByStatus := make(map[oapi.JobStatus]string) | ||
| testStatuses := []oapi.JobStatus{ | ||
| oapi.Pending, | ||
| oapi.InProgress, | ||
| oapi.Successful, | ||
| } | ||
|
|
||
| jobIndex := 0 | ||
| for _, status := range testStatuses { | ||
| var jobId string | ||
| if jobIndex < len(allJobs) { | ||
| // Use existing job | ||
| for id := range allJobs { | ||
| jobId = id | ||
| break | ||
| } | ||
| delete(allJobs, jobId) | ||
| } else { | ||
| // Create new job | ||
| jobId = uuid.New().String() | ||
| releaseId := uuid.New().String() | ||
|
|
||
| job := &oapi.Job{ | ||
| Id: jobId, | ||
| Status: status, | ||
| JobAgentId: jobAgentId, | ||
| ReleaseId: releaseId, | ||
| CreatedAt: time.Now(), | ||
| UpdatedAt: time.Now(), | ||
| JobAgentConfig: make(map[string]interface{}), | ||
| Metadata: make(map[string]string), | ||
| } | ||
| ws.Jobs().Upsert(ctx, job) | ||
| } | ||
|
|
||
| // Update job status | ||
| job, _ := ws.Jobs().Get(jobId) | ||
| job.Status = status | ||
| ws.Jobs().Upsert(ctx, job) | ||
| jobsByStatus[status] = jobId | ||
| jobIndex++ | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not mutate Items() map; make selection deterministic.
Deleting from the map returned by Items() risks mutating internal state and is nondeterministic. Copy IDs to a slice, sort, then index.
- allJobs := ws.Jobs().Items()
+ allJobs := ws.Jobs().Items()
@@
- jobIndex := 0
- for _, status := range testStatuses {
+ // Build a stable, deterministic list of existing job IDs
+ existingIDs := make([]string, 0, len(allJobs))
+ for id := range allJobs {
+ existingIDs = append(existingIDs, id)
+ }
+ sort.Strings(existingIDs)
+
+ jobIndex := 0
+ for _, status := range testStatuses {
var jobId string
- if jobIndex < len(allJobs) {
- // Use existing job
- for id := range allJobs {
- jobId = id
- break
- }
- delete(allJobs, jobId)
+ if jobIndex < len(existingIDs) {
+ // Use existing job by stable index
+ jobId = existingIDs[jobIndex]
} else {
// Create new job
jobId = uuid.New().String()
releaseId := uuid.New().String()
@@
jobIndex++
}Note: add sort to imports.
🤖 Prompt for AI Agents
In apps/workspace-engine/test/e2e/engine_workspace_persistence_test.go around
lines 742 to 791, the test mutates the map returned by ws.Jobs().Items() and
selects IDs nondeterministically; instead copy the keys into a slice, sort the
slice (add "sort" to imports), then iterate deterministically by index to pick
existing job IDs without deleting from the original map; when needing a new job
beyond existing IDs, generate a UUID and upsert as before, then use
ws.Jobs().Get/upsert for status updates and populate jobsByStatus.
| // Launch multiple goroutines that simultaneously write to the same file | ||
| const numGoroutines = 10 | ||
| var wg sync.WaitGroup | ||
| errChan := make(chan error, numGoroutines) | ||
|
|
||
| for i := 0; i < numGoroutines; i++ { | ||
| wg.Add(1) | ||
| go func(index int) { | ||
| defer wg.Done() | ||
|
|
||
| // Each goroutine writes the same data | ||
| if err := storage.Put(ctx, "concurrent.gob", data); err != nil { | ||
| errChan <- err | ||
| } | ||
| }(i) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Concurrent writes may be flaky without atomic Put.
If FileStorage.Put doesn’t use write-to-temp + fsync + rename, concurrent writers can corrupt bytes. Either:
- make
Putatomic, or - vary filenames per goroutine and verify final file via a separate deterministic pass.
Please confirm apps/workspace-engine/pkg/workspace/storage_file.go:Put uses atomic rename semantics. If not, consider updating it. You can locate it with:
🏁 Script executed:
#!/usr/bin/env bash
rg -nP 'type FileStorage|func\s+\(.*\)\s+Put\(' apps/workspace-engine/pkg/workspaceLength of output: 357
Now let me examine the actual implementation of the FileStorage.Put method:Now I need to examine the actual FileStorage.Put implementation. Let me run a script to read the file directly:
---
🏁 Script executed:
```shell
#!/bin/bash
cat apps/workspace-engine/pkg/workspace/storage_file.go
Length of output: 1115
FileStorage.Put does not use atomic semantics and concurrent writes to the same file can cause corruption.
The implementation at apps/workspace-engine/pkg/workspace/storage_file.go:31-39 directly calls os.WriteFile() without atomic writes (temp file + fsync + rename) or synchronization. The concurrent test at lines 1151-1166 launches 10 goroutines writing simultaneously to the same file path—a scenario that can result in file corruption under POSIX systems.
Either:
- Add a mutex to
FileStorageto serialize writes, or - Implement atomic writes using temp file + fsync + atomic rename.
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/storage_file.go around lines 31-39,
FileStorage.Put currently calls os.WriteFile() allowing concurrent writers to
corrupt the file; replace that with an atomic write: create a temp file in the
destination directory, write the contents to the temp file, call File.Sync()
after writing, close it, then perform an atomic os.Rename(tempPath, finalPath),
and finally sync the containing directory (open dir and call Sync) to ensure
durability; ensure proper permissions are preserved, return or wrap errors from
each step, and remove the temp file on any error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
apps/workspace-engine/pkg/kafka/kafka.go (1)
135-148: Commit after durable save; handle Save error; use explicit int32 cast.Offsets are committed before persisting and Save error is ignored, risking data loss. Persist first; on Save failure, skip commit to trigger retry. Also ensure NumPartitions type matches int32.
- // Commit offset to Kafka - if _, err := consumer.CommitMessage(msg); err != nil { - log.Error("Failed to commit message", "error", err) - continue - } - - snapshot := &db.WorkspaceSnapshot{ - Path: fmt.Sprintf("%s_%s.gob", ws.ID, msg.Timestamp.Format(time.RFC3339Nano)), - Timestamp: msg.Timestamp, - Partition: int32(msg.TopicPartition.Partition), - NumPartitions: numPartitions, - } - - workspace.Save(ctx, storage, ws, snapshot) + snapshot := &db.WorkspaceSnapshot{ + Path: fmt.Sprintf("%s_%s.gob", ws.ID, msg.Timestamp.Format(time.RFC3339Nano)), + Timestamp: msg.Timestamp.UTC(), + Partition: int32(msg.TopicPartition.Partition), + NumPartitions: int32(numPartitions), + } + + if err := workspace.Save(ctx, storage, ws, snapshot); err != nil { + log.Error("Failed to save workspace snapshot", "workspaceID", ws.ID, "error", err) + // Do not commit; let the message be retried. + continue + } + + // Commit offset only after durable save succeeds. + if _, err := consumer.CommitMessage(msg); err != nil { + log.Error("Failed to commit message", "error", err) + continue + }
🧹 Nitpick comments (4)
apps/workspace-engine/pkg/db/workspaces.go (3)
90-96: Add GoDoc for exported type and funcs; document nil-on-miss semantics.Exported identifiers lack comments. Please add brief doc comments explaining fields and that GetWorkspaceSnapshot returns (nil, nil) when no snapshot exists so callers must nil-check.
+// WorkspaceSnapshot describes a durable pointer to a workspace's serialized state +// stored in blob storage. Timestamp is the event time of the message that produced +// the snapshot; Partition and NumPartitions capture the Kafka topology at save time. type WorkspaceSnapshot struct { Path string Timestamp time.Time Partition int32 NumPartitions int32 } +// GetWorkspaceSnapshot returns the latest snapshot for the workspace. +// It returns (nil, nil) if no snapshot exists. func GetWorkspaceSnapshot(ctx context.Context, workspaceID string) (*WorkspaceSnapshot, error) { ... } +// WriteWorkspaceSnapshot persists a snapshot row associated with a workspace. func WriteWorkspaceSnapshot(ctx context.Context, workspaceID string, snapshot *WorkspaceSnapshot) error { ... }Also applies to: 102-123, 130-142
98-101: Index hint for SELECT latest snapshot.Query scans by workspace_id and ORDER BY timestamp DESC. Ensure a supporting index exists to keep this O(log N): e.g., CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_ws_snap_ws_ts ON workspace_snapshot (workspace_id, timestamp DESC);
130-142: Guard against nil snapshot arg in WriteWorkspaceSnapshot.Defensive check avoids panics and clearer error semantics.
-import ( - "context" - "time" - - "github.com/jackc/pgx/v5" -) +import ( + "context" + "errors" + "time" + + "github.com/jackc/pgx/v5" +) func WriteWorkspaceSnapshot(ctx context.Context, workspaceID string, snapshot *WorkspaceSnapshot) error { + if snapshot == nil { + return errors.New("nil WorkspaceSnapshot") + } db, err := GetDB(ctx) if err != nil { return err }apps/workspace-engine/pkg/kafka/kafka.go (1)
100-103: Remove unreachable nil-check for workspace.GetWorkspace.GetWorkspace always returns a non-nil instance (creates if missing); the nil branch never runs.
- if ws == nil { - log.Error("Workspace not found", "workspaceID", workspaceID) - continue - }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
apps/workspace-engine/pkg/db/workspaces.go(2 hunks)apps/workspace-engine/pkg/kafka/kafka.go(4 hunks)apps/workspace-engine/pkg/workspace/loader.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- apps/workspace-engine/pkg/workspace/loader.go
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/kafka/kafka.goapps/workspace-engine/pkg/db/workspaces.go
🧬 Code graph analysis (2)
apps/workspace-engine/pkg/kafka/kafka.go (5)
apps/workspace-engine/pkg/workspace/storage_file.go (1)
NewFileStorage(20-22)apps/workspace-engine/pkg/workspace/storage_gcs.go (2)
IsGCSStorageEnabled(15-17)NewGCSStorageClient(27-48)apps/workspace-engine/pkg/workspace/workspace.go (1)
GetWorkspace(197-204)apps/workspace-engine/pkg/workspace/loader.go (2)
Load(30-49)Save(11-28)apps/workspace-engine/pkg/db/workspaces.go (1)
WorkspaceSnapshot(91-96)
apps/workspace-engine/pkg/db/workspaces.go (1)
apps/workspace-engine/pkg/db/client.go (1)
GetDB(60-70)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build (linux/amd64)
🔇 Additional comments (1)
apps/workspace-engine/pkg/kafka/kafka.go (1)
141-146: The review comment is based on an incorrect assumption and should be dismissed.The
getTopicPartitionCountfunction already returnsint32(notint), so the assignment at line 145 is type-correct. No casting is needed and there is no compile error. The code is already properly typed.Likely an incorrect or invalid review comment.
📊 DB Package Test Coveragepkg/db coverage: |
📊 DB Package Test Coveragepkg/db coverage: |
📊 DB Package Test Coveragepkg/db coverage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (3)
apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go (2)
541-589: Deterministic job selection; don’t mutate the map returned by Items().Copy keys, sort, index—avoid deleting from the original map and nondeterministic iteration.
@@ - allJobs := ws.Jobs().Items() + allJobs := ws.Jobs().Items() @@ - jobIndex := 0 - for _, status := range testStatuses { + // Stable list of job IDs + existingIDs := make([]string, 0, len(allJobs)) + for id := range allJobs { + existingIDs = append(existingIDs, id) + } + sort.Strings(existingIDs) + + jobIndex := 0 + for _, status := range testStatuses { var jobId string - if jobIndex < len(allJobs) { - // Use existing job - for id := range allJobs { - jobId = id - break - } - delete(allJobs, jobId) + if jobIndex < len(existingIDs) { + // Use existing job deterministically + jobId = existingIDs[jobIndex] } else { @@ jobIndex++ }And add import:
import ( "context" "fmt" "os" "sync" + "sort" "testing"Also applies to: 3-15
927-992: Concurrent writes test may be flaky unless FileStorage.Put is atomic.Confirm Put uses temp file + fsync + rename (and dir fsync). Otherwise, simultaneous writers can corrupt data.
#!/usr/bin/env bash # Inspect FileStorage.Put implementation rg -nP 'type\s+FileStorage\b|func\s+\(\s*\*FileStorage\s*\)\s+Put\(' apps/workspace-engine/pkg/workspace sed -n '1,160p' apps/workspace-engine/pkg/workspace/storage_file.goapps/workspace-engine/pkg/workspace/storage_gcs.go (1)
15-17: Enable both gs:// and gcs://; validate bucket parsing and empty bucket.Currently only gs:// is recognized and bucket is not validated. Parse both schemes and fail fast if bucket is empty.
@@ -import ( +import ( "context" "errors" "fmt" "io" "os" - "path/filepath" + "path" "strings" @@ -func IsGCSStorageEnabled() bool { - return strings.HasPrefix(os.Getenv("WORKSPACE_STATES_BUCKET_URL"), "gs://") -} +func IsGCSStorageEnabled() bool { + u := os.Getenv("WORKSPACE_STATES_BUCKET_URL") + return strings.HasPrefix(u, "gs://") || strings.HasPrefix(u, "gcs://") +} @@ - bucketURL := os.Getenv("WORKSPACE_STATES_BUCKET_URL") + bucketURL := os.Getenv("WORKSPACE_STATES_BUCKET_URL") @@ - gsPath := strings.TrimPrefix(bucketURL, "gs://") + gsPath := strings.TrimPrefix(strings.TrimPrefix(bucketURL, "gs://"), "gcs://") parts := strings.SplitN(gsPath, "/", 2) bucket := parts[0] prefix := "" if len(parts) > 1 { prefix = parts[1] } + if bucket == "" { + return nil, fmt.Errorf("WORKSPACE_STATES_BUCKET_URL must be gs://bucket[/prefix]") + }Also applies to: 35-41
🧹 Nitpick comments (8)
apps/workspace-engine/pkg/workspace/storage_gcs.go (2)
68-76: Avoid double round‑trip; rely on NewReader and map not-found via errors.Is.Attrs() before NewReader() adds latency. NewReader returns ErrObjectNotExist you can map directly.
- // Check if the object exists before trying to read - _, err := obj.Attrs(ctx) - if err != nil { - if err == storage.ErrObjectNotExist { - return nil, ErrWorkspaceSnapshotNotFound - } - return nil, fmt.Errorf("failed to stat GCS object: %w", err) - } - - reader, err := obj.NewReader(ctx) + reader, err := obj.NewReader(ctx) if err != nil { - return nil, fmt.Errorf("failed to read snapshot: %w", err) + if errors.Is(err, storage.ErrObjectNotExist) { + return nil, ErrWorkspaceSnapshotNotFound + } + return nil, fmt.Errorf("failed to read snapshot: %w", err) }Also applies to: 77-80
50-61: Optional: set content type for clarity.Consider setting writer.ContentType = "application/octet-stream".
apps/workspace-engine/test/e2e/engine_workspace_persistence_helpers_test.go (2)
3-7: Deep-compare Resource.Config using reflect.DeepEqual.Currently only nil-ness is checked; values can differ undetected.
import ( "testing" "time" + "reflect" "workspace-engine/pkg/oapi" ) @@ // Verify config (deep comparison would require reflection or JSON marshaling) - if (expected.Config == nil) != (actual.Config == nil) { + if (expected.Config == nil) != (actual.Config == nil) { t.Errorf("%s: config nil mismatch", context) } + if expected.Config != nil && actual.Config != nil && !reflect.DeepEqual(actual.Config, expected.Config) { + t.Errorf("%s: config mismatch", context) + }Also applies to: 42-46
212-221: Verify Policy.Rules and Policy.Selectors contents, not just lengths.Length-only checks can miss reordered or altered elements.
// Verify rules array if len(actual.Rules) != len(expected.Rules) { t.Errorf("%s: rules length mismatch: expected %d, got %d", context, len(expected.Rules), len(actual.Rules)) } + // Optionally compare contents (order-sensitive) + // if !reflect.DeepEqual(actual.Rules, expected.Rules) { + // t.Errorf("%s: rules content mismatch", context) + // } // Verify selectors array if len(actual.Selectors) != len(expected.Selectors) { t.Errorf("%s: selectors length mismatch: expected %d, got %d", context, len(expected.Selectors), len(actual.Selectors)) } + // if !reflect.DeepEqual(actual.Selectors, expected.Selectors) { + // t.Errorf("%s: selectors content mismatch", context) + // }apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go (2)
733-735: Handle LoadLocation errors in tests.Avoid silently ignoring timezone load failures.
- estLoc, _ := time.LoadLocation("America/New_York") - pstLoc, _ := time.LoadLocation("America/Los_Angeles") + estLoc, err := time.LoadLocation("America/New_York") + if err != nil { t.Fatalf("load EST location: %v", err) } + pstLoc, err := time.LoadLocation("America/Los_Angeles") + if err != nil { t.Fatalf("load PST location: %v", err) }
802-803: Tiny nit: prefer strconv.Itoa for numeric string metadata.Clearer than converting via rune arithmetic.
- Metadata: map[string]string{"test": ts.name, "index": string(rune('0' + i))}, + Metadata: map[string]string{"test": ts.name, "index": strconv.Itoa(i)},(Remember to import strconv.)
apps/workspace-engine/test/e2e/engine_workspace_gcs_persistence_test.go (2)
1091-1114: Use strings.Contains for readability; simplify helpers.Reimplementing substring search is unnecessary.
import ( "context" "fmt" "testing" "time" + "strings" "workspace-engine/pkg/oapi" "workspace-engine/pkg/workspace" "workspace-engine/test/integration" @@ func isNotFoundError(err error) bool { if err == nil { return false } - errMsg := err.Error() - return contains(errMsg, "not found") || - contains(errMsg, "does not exist") || - contains(errMsg, "ErrWorkspaceSnapshotNotFound") + errMsg := err.Error() + return strings.Contains(errMsg, "not found") || + strings.Contains(errMsg, "does not exist") || + strings.Contains(errMsg, "workspace snapshot not found") } -func contains(s, substr string) bool { - return len(s) >= len(substr) && (s == substr || len(substr) == 0 || - (len(s) > 0 && len(substr) > 0 && indexOfString(s, substr) >= 0)) -} - -func indexOfString(s, substr string) int { - for i := 0; i <= len(s)-len(substr); i++ { - if s[i:i+len(substr)] == substr { - return i - } - } - return -1 -} +// contains/indexOfString helpers removedAlso applies to: 3-13
20-32: Minor: consider skipping when GCS is unavailable instead of fatal.If credentials/bucket are not configured, use t.Skip to keep local runs green.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
apps/workspace-engine/pkg/workspace/storage_gcs.go(1 hunks)apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go(1 hunks)apps/workspace-engine/test/e2e/engine_workspace_gcs_persistence_test.go(1 hunks)apps/workspace-engine/test/e2e/engine_workspace_persistence_helpers_test.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.goapps/workspace-engine/test/e2e/engine_workspace_persistence_helpers_test.goapps/workspace-engine/test/e2e/engine_workspace_gcs_persistence_test.goapps/workspace-engine/pkg/workspace/storage_gcs.go
apps/workspace-engine/**/*_test.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
Follow the existing test structure used in *_test.go files
Files:
apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.goapps/workspace-engine/test/e2e/engine_workspace_persistence_helpers_test.goapps/workspace-engine/test/e2e/engine_workspace_gcs_persistence_test.go
🧠 Learnings (1)
📚 Learning: 2025-08-12T18:13:54.630Z
Learnt from: CR
PR: ctrlplanedev/ctrlplane#0
File: apps/workspace-engine/CLAUDE.md:0-0
Timestamp: 2025-08-12T18:13:54.630Z
Learning: Applies to apps/workspace-engine/**/*_test.go : Follow the existing test structure used in *_test.go files
Applied to files:
apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.goapps/workspace-engine/test/e2e/engine_workspace_persistence_helpers_test.goapps/workspace-engine/test/e2e/engine_workspace_gcs_persistence_test.go
🧬 Code graph analysis (3)
apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go (5)
apps/workspace-engine/test/integration/workspace.go (1)
NewTestWorkspace(35-74)apps/workspace-engine/test/integration/opts.go (23)
WithResource(107-132)ResourceID(455-459)ResourceName(449-453)WithJobAgent(134-150)JobAgentID(525-529)JobAgentName(519-523)WithSystem(84-105)SystemID(253-257)SystemName(241-245)WithDeployment(259-277)DeploymentID(310-314)DeploymentName(297-302)DeploymentJobAgent(316-320)WithWorkspaceID(77-82)ResourceConfig(479-483)WithDeploymentVersion(353-367)DeploymentVersionID(383-387)DeploymentVersionTag(377-381)WithEnvironment(279-293)EnvironmentID(427-431)EnvironmentName(415-419)PolicyID(551-555)PolicyName(539-543)apps/workspace-engine/pkg/workspace/storage_file.go (1)
NewFileStorage(20-22)apps/workspace-engine/pkg/oapi/oapi.gen.go (6)
Pending(41-41)InProgress(38-38)Successful(43-43)Id(51-51)Status(57-57)JobAgentId(53-53)apps/workspace-engine/pkg/workspace/workspace.go (1)
NewNoFlush(29-40)
apps/workspace-engine/test/e2e/engine_workspace_gcs_persistence_test.go (5)
apps/workspace-engine/pkg/workspace/storage_file.go (1)
StorageClient(9-12)apps/workspace-engine/pkg/workspace/storage_gcs.go (2)
NewGCSStorageClient(27-48)GCSStorageClient(21-25)apps/workspace-engine/test/integration/workspace.go (1)
NewTestWorkspace(35-74)apps/workspace-engine/test/integration/opts.go (19)
WithResource(107-132)ResourceID(455-459)ResourceName(449-453)WithJobAgent(134-150)JobAgentID(525-529)JobAgentName(519-523)WithSystem(84-105)WithDeployment(259-277)DeploymentID(310-314)DeploymentName(297-302)DeploymentJobAgent(316-320)ResourceConfig(479-483)WorkspaceOption(11-11)WithDeploymentVersion(353-367)WithEnvironment(279-293)EnvironmentID(427-431)EnvironmentName(415-419)PolicyID(551-555)PolicyName(539-543)apps/workspace-engine/pkg/workspace/workspace.go (1)
NewNoFlush(29-40)
apps/workspace-engine/pkg/workspace/storage_gcs.go (2)
apps/workspace-engine/pkg/workspace/storage_file.go (1)
StorageClient(9-12)apps/workspace-engine/pkg/workspace/storage.go (1)
ErrWorkspaceSnapshotNotFound(7-7)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: workspace-engine-tests
- GitHub Check: Format
- GitHub Check: Typecheck
- GitHub Check: Lint
- GitHub Check: build (linux/amd64)
- GitHub Check: workspace-engine-tests
🔇 Additional comments (1)
apps/workspace-engine/test/e2e/engine_workspace_disk_persistence_test.go (1)
88-88: The review comment is incorrect and should be ignored.The premise that "workspace.New starts background processes" is false. After examining the codebase:
workspace.New()constructor simply initializes struct fields; it doesn't spawn any goroutines- Goroutines in the store (e.g.,
resources.go) are only started by explicit method calls likeUpsert(),Remove(), orProcessChanges()- The test cases in this file only perform deserialization via
GobDecode(); they never invoke these methods that would trigger background activity- Therefore, no race condition exists and no background activity occurs during test execution
The difference between
New()andNewNoFlush()is architecturally real (real consumer vs. noop consumer), but irrelevant to pure deserialization operations.Likely an incorrect or invalid review comment.
| func (c *GCSStorageClient) Put(ctx context.Context, path string, data []byte) error { | ||
| path = filepath.Join(c.prefix, path) | ||
| obj := c.client.Bucket(c.bucket).Object(path) | ||
| writer := obj.NewWriter(ctx) | ||
| if _, err := writer.Write(data); err != nil { | ||
| writer.Close() | ||
| return fmt.Errorf("failed to write snapshot: %w", err) | ||
| } | ||
| if err := writer.Close(); err != nil { | ||
| return fmt.Errorf("failed to close writer: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (c *GCSStorageClient) Get(ctx context.Context, path string) ([]byte, error) { | ||
| path = filepath.Join(c.prefix, path) | ||
| obj := c.client.Bucket(c.bucket).Object(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use path.Join for GCS object keys, not filepath.Join.
GCS keys are slash-delimited; filepath.Join is OS-dependent and can introduce backslashes on Windows.
-func (c *GCSStorageClient) Put(ctx context.Context, path string, data []byte) error {
- path = filepath.Join(c.prefix, path)
- obj := c.client.Bucket(c.bucket).Object(path)
+func (c *GCSStorageClient) Put(ctx context.Context, objectPath string, data []byte) error {
+ objectPath = path.Join(c.prefix, objectPath)
+ obj := c.client.Bucket(c.bucket).Object(objectPath)
@@
-func (c *GCSStorageClient) Get(ctx context.Context, path string) ([]byte, error) {
- path = filepath.Join(c.prefix, path)
- obj := c.client.Bucket(c.bucket).Object(path)
+func (c *GCSStorageClient) Get(ctx context.Context, objectPath string) ([]byte, error) {
+ objectPath = path.Join(c.prefix, objectPath)
+ obj := c.client.Bucket(c.bucket).Object(objectPath)
@@
-func (c *GCSStorageClient) Delete(ctx context.Context, path string) error {
- path = filepath.Join(c.prefix, path)
- obj := c.client.Bucket(c.bucket).Object(path)
+func (c *GCSStorageClient) Delete(ctx context.Context, objectPath string) error {
+ objectPath = path.Join(c.prefix, objectPath)
+ obj := c.client.Bucket(c.bucket).Object(objectPath)Also applies to: 91-93
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/storage_gcs.go around lines 50-66 (and
also at lines 91-93), replace usage of filepath.Join when composing GCS object
keys with path.Join to ensure slash-delimited, OS-independent object names;
update the Put and Get (and any other occurrences at 91-93) to use
path.Join(c.prefix, path) and import the standard "path" package instead of or
in addition to "path/filepath".
| defer cleanupGCSFile(t, ctx, storage, testPath) | ||
|
|
||
| // Create a new workspace and load from GCS | ||
| newWs := workspace.New(workspaceID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Use NewNoFlush for decode-only loads to avoid races.
Same rationale as disk tests; swap workspace.New(...) with NewNoFlush(...).
- newWs := workspace.New(workspaceID)
+ newWs := workspace.NewNoFlush(workspaceID)Also applies to: 332-332, 495-495, 603-603, 748-748, 984-984
🏁 Script executed:
#!/usr/bin/env bash
rg -nP -C1 '\bworkspace\.New\(' apps/workspace-engine/test/e2e/engine_workspace_gcs_persistence_test.goLength of output: 483
I'll verify that workspace.NewNoFlush exists and understand the context better.
Let me search the codebase directly for workspace.NewNoFlush and compare with disk tests.
🏁 Script executed:
#!/bin/bash
# Find workspace package and check for NewNoFlush function
rg -nP 'NewNoFlush' apps/workspace-engine --type=go -A2 -B2Length of output: 8306
🏁 Script executed:
#!/bin/bash
# Search for disk persistence test file mentioned in the comment
fd 'disk.*persistence.*test\.go' apps/workspace-engineLength of output: 72
🏁 Script executed:
#!/bin/bash
# Check workspace package for New and NewNoFlush function definitions
rg -nP '^\s*func\s+\(.*\)\s+New(NoFlush)?\s*\(' apps/workspace-engine --type=go -A5Length of output: 0
Let me check the function signatures and verify the context of these specific lines.
🏁 Script executed:
#!/bin/bash
# Get workspace.New and NewNoFlush function signatures
rg -nP '^\s*func\s+(New|NewNoFlush)' apps/workspace-engine/pkg/workspace/workspace.go -A8Length of output: 541
🏁 Script executed:
#!/bin/bash
# Get the exact context for the 6 lines in question to see if they're decode-only loads
rg -nP '(108|332|495|603|748|984):' apps/workspace-engine/test/e2e/engine_workspace_gcs_persistence_test.go -B5 -A5Length of output: 0
🏁 Script executed:
#!/bin/bash
# View context around the identified lines using sed
sed -n '103,113p; 327,337p; 490,500p; 598,608p; 743,753p; 979,989p' apps/workspace-engine/test/e2e/engine_workspace_gcs_persistence_test.goLength of output: 1816
Replace workspace.New() with workspace.NewNoFlush() at all 6 locations for decode-only loads to avoid races.
The 6 identified lines (108, 332, 495, 603, 748, 984) all follow the same decode-only pattern: create a workspace, then immediately load persisted data from GCS via storage.Get() followed by GobDecode(). Using workspace.New() unnecessarily initializes database integration through db.NewChangesetConsumer(), whereas workspace.NewNoFlush() uses changeset.NewNoopChangesetConsumer() to avoid this interaction. This pattern is already established in disk persistence tests and prevents potential race conditions during loading.
- newWs := workspace.New(workspaceID)
+ newWs := workspace.NewNoFlush(workspaceID)Apply this change to all 6 lines: 108, 332, 495, 603, 748, 984.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| newWs := workspace.New(workspaceID) | |
| newWs := workspace.NewNoFlush(workspaceID) |
🤖 Prompt for AI Agents
In apps/workspace-engine/test/e2e/engine_workspace_gcs_persistence_test.go
around lines 108, 332, 495, 603, 748, and 984, replace calls to
workspace.New(workspaceID) with workspace.NewNoFlush(workspaceID) at each of
those six locations so the test creates decode-only workspaces that use a noop
changeset consumer instead of initializing DB integration; ensure each
occurrence is changed consistently to avoid races during GobDecode-based loads
from GCS.
Summary by CodeRabbit
New Features
Behavior Changes
Tests