-
Couldn't load subscription status.
- Fork 11
chore: init workspaces from db #674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds partition-aware Kafka consumer callbacks and a workspace preloader: Murmur2 hashing, partition filtering, concurrent workspace loading into the in-memory cache, workspace.Exists/Set helpers, and additional logging in DB workspace loading. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant App as Workspace Engine
participant Consumer as Kafka Consumer
participant Broker as Kafka Broker
participant Meta as Kafka Metadata
participant Init as initWorkspaces
participant DB as db
participant Cache as workspace (in-memory)
App->>Consumer: RunConsumer()
Consumer->>Broker: Subscribe(topic, with rebalance callbacks)
rect rgb(240,250,255)
Broker-->>Consumer: AssignedPartitions(partitions)
Consumer->>Consumer: getAssignedPartitions()
Consumer->>Meta: getTopicPartitionCount(topic)
Meta-->>Consumer: partitionCount
Consumer->>Init: initWorkspaces(ctx, assigned, partitionCount)
Init->>DB: GetWorkspaceIDs()
DB-->>Init: [ids...]
loop per id hashed to assigned partition
Init->>Cache: Exists(id)?
alt missing
par concurrent
Init->>DB: LoadWorkspace(id)
DB-->>Init: Workspace
Init->>Cache: Set(id, workspace)
end
end
end
Init-->>Consumer: done / error
end
rect rgb(255,245,240)
Broker-->>Consumer: RevokedPartitions(partitions)
Consumer->>Consumer: unassign partitions
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~40 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/kafka/kafka.go (1)
40-56: Consider making the metadata timeout configurable.The hardcoded 5000ms timeout on line 41 may not be suitable for all deployment environments. Consider making this configurable via an environment variable or constant.
Apply this diff to make the timeout configurable:
+var MetadataTimeoutMs = getEnvInt("KAFKA_METADATA_TIMEOUT_MS", 5000) + func getTopicPartitionCount(c *kafka.Consumer) (int, error) { - md, err := c.GetMetadata(&Topic, false, 5000) + md, err := c.GetMetadata(&Topic, false, MetadataTimeoutMs) if err != nil { return 0, err }You would also need to add a helper function:
func getEnvInt(varName string, defaultValue int) int { v := os.Getenv(varName) if v == "" { return defaultValue } if i, err := strconv.Atoi(v); err == nil { return i } return defaultValue }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
apps/workspace-engine/pkg/kafka/init_workspace.go(1 hunks)apps/workspace-engine/pkg/kafka/kafka.go(3 hunks)apps/workspace-engine/pkg/workspace/workspace.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/workspace/workspace.goapps/workspace-engine/pkg/kafka/init_workspace.goapps/workspace-engine/pkg/kafka/kafka.go
🧬 Code graph analysis (1)
apps/workspace-engine/pkg/kafka/init_workspace.go (3)
apps/workspace-engine/pkg/workspace/workspace.go (2)
Exists(102-105)Set(107-109)apps/workspace-engine/pkg/db/workspace_load.go (1)
LoadWorkspace(11-86)apps/workspace-engine/pkg/db/workspaces.go (1)
GetWorkspaceIDs(9-35)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Lint
- GitHub Check: Typecheck
🔇 Additional comments (7)
apps/workspace-engine/pkg/workspace/workspace.go (1)
102-109: LGTM! Clean API additions for workspace cache management.The
ExistsandSetfunctions provide a clean, thread-safe interface to the in-memory workspace cache, complementing the existing lazy-loading behavior ofGetWorkspace.apps/workspace-engine/pkg/kafka/kafka.go (3)
28-38: LGTM! Correctly extracts partition assignments.The function properly builds a set of assigned partition numbers from the consumer's assignment.
58-71: LGTM! Proper orchestration of workspace cache initialization.The function correctly coordinates getting assigned partitions, partition count, and initializing workspaces. Error propagation is appropriate.
88-109: LGTM! Partition lifecycle handled correctly.The callback properly handles partition assignment and revocation events:
- On assignment: assigns partitions then populates workspace cache
- On revocation: unassigns partitions
- Errors are logged and propagated appropriately
apps/workspace-engine/pkg/kafka/init_workspace.go (3)
44-46: LGTM! Simple wrapper for partition calculation.The function correctly wraps the murmur2 hash and maps it to a partition number.
73-88: LGTM! Correct partition-based workspace filtering.The function properly:
- Fetches all workspace IDs
- Filters them by assigned partitions using the partition hash
- Delegates to
loadFullWorkspacesfor loading
13-42: Murmur2 matches Kafka’s default partitioner
The murmur2 implementation (seed=0x9747b28c, m=0x5bd1e995, r=24, and finalization) and its toPositive mask align exactly with Kafka’s Utils.murmur2 and DefaultPartitioner logic.
| func loadFullWorkspaces(ctx context.Context, workspaceIDs []string) error { | ||
| g, ctx := errgroup.WithContext(ctx) | ||
|
|
||
| for _, workspaceID := range workspaceIDs { | ||
| wsID := workspaceID // capture loop variable | ||
| g.Go(func() error { | ||
| if workspace.Exists(wsID) { | ||
| return nil | ||
| } | ||
|
|
||
| fullWorkspace, err := db.LoadWorkspace(ctx, wsID) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to load workspace %s: %w", wsID, err) | ||
| } | ||
| workspace.Set(wsID, fullWorkspace) | ||
| return nil | ||
| }) | ||
| } | ||
|
|
||
| if err := g.Wait(); err != nil { | ||
| return fmt.Errorf("failed to load full workspaces: %w", err) | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Limit concurrency to prevent resource exhaustion.
The function spawns an unbounded number of goroutines (one per workspace). For large workspace counts, this could lead to memory exhaustion and database connection pool saturation.
Apply this diff to add concurrency limits:
func loadFullWorkspaces(ctx context.Context, workspaceIDs []string) error {
g, ctx := errgroup.WithContext(ctx)
+ g.SetLimit(10) // Limit to 10 concurrent workspace loads
for _, workspaceID := range workspaceIDs {Additionally, add observability for production debugging.
The loading process lacks logging, making it difficult to diagnose issues in production.
Add logging at key points:
+ "github.com/charmbracelet/log" func loadFullWorkspaces(ctx context.Context, workspaceIDs []string) error {
+ log.Info("Loading workspaces", "count", len(workspaceIDs))
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10)
for _, workspaceID := range workspaceIDs {
wsID := workspaceID
g.Go(func() error {
if workspace.Exists(wsID) {
return nil
}
+ log.Debug("Loading workspace", "id", wsID)
fullWorkspace, err := db.LoadWorkspace(ctx, wsID)
if err != nil {
return fmt.Errorf("failed to load workspace %s: %w", wsID, err)
}
workspace.Set(wsID, fullWorkspace)
+ log.Debug("Loaded workspace", "id", wsID)
return nil
})
}
if err := g.Wait(); err != nil {
return fmt.Errorf("failed to load full workspaces: %w", err)
}
+ log.Info("Successfully loaded all workspaces")
return nil
}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/kafka/init_workspace.go around lines 48 to 71, the
current implementation spawns an unbounded goroutine per workspace which can
exhaust memory and DB connections and also lacks logging; fix it by limiting
concurrency (use a semaphore or buffered channel with a configurable maxWorkers
value and acquire/release around each goroutine body while still using
errgroup.WithContext to capture errors) and add observability by logging when
the overall load starts and finishes, and logging per-workspace start, success,
and error cases (include workspace ID and error details) so production runs can
be diagnosed.
baa014d to
d7deb9d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/db/workspace_load.go (1)
33-84: Consider including the workspace ID in the per-section logs.Once multiple workspaces load in parallel, the
log.Info("Loaded …")lines lose context. IncludingworkspaceIDalongside the counts would keep these messages actionable during concurrent loads.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
apps/workspace-engine/pkg/db/workspace_load.go(6 hunks)apps/workspace-engine/pkg/kafka/init_workspace.go(1 hunks)apps/workspace-engine/pkg/kafka/kafka.go(3 hunks)apps/workspace-engine/pkg/workspace/workspace.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- apps/workspace-engine/pkg/kafka/init_workspace.go
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/kafka/kafka.goapps/workspace-engine/pkg/workspace/workspace.goapps/workspace-engine/pkg/db/workspace_load.go
🧬 Code graph analysis (1)
apps/workspace-engine/pkg/kafka/kafka.go (1)
apps/workspace-engine/pkg/events/events.go (1)
NewEventHandler(52-54)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Typecheck
- GitHub Check: Lint
🔇 Additional comments (1)
apps/workspace-engine/pkg/workspace/workspace.go (1)
102-109: Public helpers look good.The exposed cache helpers are thin wrappers around the shared cmap and align with current usage patterns.
Summary by CodeRabbit