-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Fix concurrent client connections watch mode #1651
Fix concurrent client connections watch mode #1651
Conversation
WalkthroughThe changes update how nil objects are handled in the command module by replacing a static nil response with a dynamically generated one via a new function, Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Caller
participant CmdGet as cmdResFromObject
participant NilRes as GetNilRes
Caller->>CmdGet: Call cmdResFromObject(obj)
alt obj is nil
CmdGet->>NilRes: Request nil response
NilRes-->>CmdGet: Return CmdRes with Response_VNil
else obj is valid
CmdGet-->>Caller: Processed non-nil response
end
CmdGet-->>Caller: Return final response
sequenceDiagram
participant Thread as Thread
participant WM as WatchManager
Thread->>WM: Call method (e.g., RegisterThread)
WM->>WM: Acquire mu lock (write)
WM-->>WM: Update maps (clientWatchThreadMap, keyFPMap, fpClientMap)
WM->>WM: Defer release of mu lock
WM-->>Thread: Return result
Poem
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
internal/cmd/cmd_get.go (1)
59-62: Improves thread safety by using per-request nil responseThe change from
cmdResNiltoGetNilRes()addresses potential race conditions when multiple goroutines access the same shared object. This ensures that each request gets its own instance of the nil response object, preventing concurrent modifications.Consider adding a small comment explaining why
GetNilRes()is used instead of the staticcmdResNil:if obj == nil { + // Use GetNilRes() instead of cmdResNil to avoid race conditions in concurrent environments return GetNilRes(), nil }internal/cmd/cmds.go (1)
155-159: Good encapsulation of nil response creationThis new function properly encapsulates the creation of a nil response object, which helps maintain code consistency and addresses the concurrency issues mentioned in the PR.
Consider adding documentation to explain the purpose of this function:
+// GetNilRes returns a new CmdRes with a nil value response. +// This function is used to ensure thread safety by creating a new response object +// for each request instead of using a shared static object. func GetNilRes() *CmdRes { return &CmdRes{R: &wire.Response{ Value: &wire.Response_VNil{VNil: true}, }} }internal/server/ironhawk/watch_manager.go (2)
36-42: Proper locking for RegisterThreadAdding mutex locking for the
RegisterThreadmethod ensures thread-safe modifications to theclientWatchThreadMap. The use ofdeferto unlock is a good practice to ensure the mutex is always released.Since this method only writes to the map when
t.Mode == "watch", consider using a read lock first and upgrading to a write lock only when necessary:func (w *WatchManager) RegisterThread(t *IOThread) { - w.mu.Lock() - defer w.mu.Unlock() + if t.Mode == "watch" { + w.mu.Lock() + defer w.mu.Unlock() + w.clientWatchThreadMap[t.ClientID] = t + } - if t.Mode == "watch" { - w.clientWatchThreadMap[t.ClientID] = t - } }
132-135: Thread safety added to NotifyWatchersAdding mutex locking to the
NotifyWatchersmethod ensures thread-safe access to the various maps when notifying watchers. This is a critical method for the watch functionality, and the locking improves reliability.For optimal performance, you might consider using read locks for parts of this method that only read from the maps (before executing commands), as this would allow multiple concurrent reads:
func (w *WatchManager) NotifyWatchers(c *cmd.Cmd, shardManager *shardmanager.ShardManager, t *IOThread) { - w.mu.Lock() - defer w.mu.Unlock() + w.mu.RLock() + defer w.mu.RUnlock() key := c.Key() // Rest of the method... }However, since this method does modify maps in certain conditions (line 156), a more complex locking strategy with upgrading from read to write locks would be needed, which might add unnecessary complexity. The current approach with a single write lock is safer and easier to maintain.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
internal/cmd/cmd_get.go(1 hunks)internal/cmd/cmds.go(2 hunks)internal/server/ironhawk/watch_manager.go(5 hunks)
🧰 Additional context used
🧬 Code Definitions (2)
internal/server/ironhawk/watch_manager.go (2)
internal/server/ironhawk/iothread.go (3)
t(36-84)t(86-89)IOThread(17-22)internal/cmd/cmds.go (6)
c(31-33)c(35-37)c(39-44)c(46-65)c(148-148)Cmd(23-29)
internal/cmd/cmd_get.go (1)
internal/cmd/cmds.go (1)
GetNilRes(155-159)
🔇 Additional comments (5)
internal/cmd/cmds.go (1)
46-48: Thread safety improvement for command executionChanging from the static
cmdResNilto a dynamically generated response throughGetNilRes()is an excellent improvement for thread safety. This prevents race conditions in concurrent environments by ensuring each command execution gets its own fresh response object.internal/server/ironhawk/watch_manager.go (4)
17-19: Thread safety improvement for WatchManagerAdding a mutex to the
WatchManagerstruct is an excellent change that addresses race conditions when multiple goroutines interact with the watch manager's maps. This is a key fix for the concurrency issues described in the PR.
44-47: Thread safety added to HandleWatchAdding mutex locking for the
HandleWatchmethod ensures thread-safe access to the various maps modified in this method. Good use ofdeferto ensure the mutex is always unlocked.
77-80: Thread safety added to HandleUnwatchThe addition of mutex locking here addresses potential race conditions when multiple goroutines attempt to unsubscribe from watches concurrently. The use of
deferensures the mutex is properly released.
114-117: Thread safety added to CleanupThreadWatchSubscriptionsAdding mutex locking to the cleanup method ensures that the removal of watch subscriptions happens safely in concurrent environments. The consistent pattern of using
deferfor unlocking is good practice.
| func GetNilRes() *CmdRes { | ||
| return &CmdRes{R: &wire.Response{ | ||
| Value: &wire.Response_VNil{VNil: true}, | ||
| }} | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this function? why can't we reuse cmdResNil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cmdResNil was a shared object used across cmd package. At time multiple go-routines were updating the same object as part of r.R.Attrs.Fields["fingerprint"] = structpb.NewStringValue(strconv.FormatUint(uint64(c.Fingerprint()), 10)) or proto.Marshal which was causing concurrency error.
Example it was returned in cmdResFromObject for different go-routines when there's no value for object
func cmdResFromObject(obj *object.Obj) (*CmdRes, error) {
if obj == nil {
return cmdResNil, nil
}
Can be validated from below logs, they've same object address for different go-routines
2025-03-25T12:38:46+05:30 DBG Response object address address=0x104c9cea0
2025-03-25T12:38:46+05:30 DBG notifying watchers for key key=k1 watchers=141
2025-03-25T12:38:46+05:30 DBG command executed client_id=f94ec20d-28d1-454e-b505-b0e7e0f0ab0d cmd="HANDSHAKE f94ec20d-28d1-454e-b505-b0e7e0f0ab0d command" mode=command took_ns=334
2025-03-25T12:38:46+05:30 DBG command executed client_id=28cd16a8-8f5a-4924-b16f-437a067b9185 cmd="GET.WATCH k1" mode=command took_ns=417
2025-03-25T12:38:46+05:30 DBG Response object address address=0x104c9cea0
fatal error: concurrent map iteration and map write
|
Thanks for the patch merged! |
Issues:
watch_managermap is not thread safe thus, when multiple go-routines forwatchmode are triggered it causes race condition and errors out.cmdResNilwas a shared object used acrosscmdpackage. At time multiple go-routines were updating the same object as part ofr.R.Attrs.Fields["fingerprint"] = structpb.NewStringValue(strconv.FormatUint(uint64(c.Fingerprint()), 10))orproto.Marshalwhich was causing concurrency error.Testing done:
Summary by CodeRabbit
New Features
Refactor