fix: preserve BrainBar MCP initialize handshake under backpressure#247
fix: preserve BrainBar MCP initialize handshake under backpressure#247
Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 21 minutes and 46 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (3)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Follow-up handshake fix is pushed on ddbd400. @codex review @cursor @BugBot re-review @coderabbitai review |
|
🧠 Learnings used✅ Actions performedReview triggered.
|
| private func readMCPMessage(fd: Int32, timeout: TimeInterval = 5.0) throws -> [String: Any] { | ||
| var buffer = Data() | ||
| var buffer = bufferedMessagesByFD[fd] ?? Data() | ||
| var readBuf = [UInt8](repeating: 0, count: 65536) | ||
| let deadline = Date().addingTimeInterval(timeout) | ||
|
|
||
| while Date() < deadline { | ||
| if let message = try decodeBufferedMCPMessage(fd: fd, buffer: &buffer) { | ||
| return message | ||
| } | ||
|
|
||
| let n = read(fd, &readBuf, readBuf.count) | ||
| if n > 0 { | ||
| buffer.append(contentsOf: readBuf[0..<n]) | ||
| // Try to parse Content-Length framed response | ||
| if let headerEnd = buffer.range(of: Data("\r\n\r\n".utf8)) { | ||
| let headerStr = String(data: buffer[buffer.startIndex..<headerEnd.lowerBound], encoding: .utf8) ?? "" | ||
| if let clLine = headerStr.split(separator: "\r\n").first(where: { $0.hasPrefix("Content-Length:") }) { | ||
| let cl = Int(clLine.split(separator: ":")[1].trimmingCharacters(in: .whitespaces)) ?? 0 | ||
| let bodyStart = headerEnd.upperBound | ||
| if buffer.count >= bodyStart + cl { | ||
| let bodyData = buffer[bodyStart..<(bodyStart + cl)] | ||
| return try JSONSerialization.jsonObject(with: bodyData) as? [String: Any] ?? [:] | ||
| } | ||
| } | ||
| if let message = try decodeBufferedMCPMessage(fd: fd, buffer: &buffer) { | ||
| return message | ||
| } | ||
| } else if n == 0 { | ||
| bufferedMessagesByFD.removeValue(forKey: fd) | ||
| break // EOF | ||
| } else if errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK { | ||
| bufferedMessagesByFD.removeValue(forKey: fd) | ||
| break | ||
| } | ||
| Thread.sleep(forTimeInterval: 0.01) | ||
| } | ||
|
|
||
| bufferedMessagesByFD[fd] = buffer | ||
| throw NSError(domain: "test", code: 4, userInfo: [NSLocalizedDescriptionKey: "Timeout reading response"]) | ||
| } |
There was a problem hiding this comment.
🟢 Low BrainBarTests/SocketIntegrationTests.swift:718
After the readMCPMessage function breaks from the while loop on EOF (line 736) or error (line 739), it removes the fd's buffer from bufferedMessagesByFD. However, line 744 then unconditionally re-adds buffer to the dictionary, restoring the stale data that was just cleaned up. If a subsequent test reuses the same fd number, this stale data will be prepended to its first read, potentially causing message parsing failures or cross-test data contamination. Consider moving the save operation so it only executes on timeout, not on EOF/error.
} else if n == 0 {
bufferedMessagesByFD.removeValue(forKey: fd)
break // EOF
} else if errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK {
bufferedMessagesByFD.removeValue(forKey: fd)
break
}
Thread.sleep(forTimeInterval: 0.01)
}
- bufferedMessagesByFD[fd] = buffer
- throw NSError(domain: "test", code: 4, userInfo: [NSLocalizedDescriptionKey: "Timeout reading response"])
+ throw NSError(domain: "test", code: 4, userInfo: [NSLocalizedDescriptionKey: "Timeout reading response"])
}
private func decodeBufferedMCPMessage🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file brain-bar/Tests/BrainBarTests/SocketIntegrationTests.swift around lines 718-746:
After the `readMCPMessage` function breaks from the while loop on EOF (line 736) or error (line 739), it removes the fd's buffer from `bufferedMessagesByFD`. However, line 744 then unconditionally re-adds `buffer` to the dictionary, restoring the stale data that was just cleaned up. If a subsequent test reuses the same fd number, this stale data will be prepended to its first read, potentially causing message parsing failures or cross-test data contamination. Consider moving the save operation so it only executes on timeout, not on EOF/error.
Evidence trail:
brain-bar/Tests/BrainBarTests/SocketIntegrationTests.swift lines 710-755 at REVIEWED_COMMIT. Line 736 removes buffer on EOF (`bufferedMessagesByFD.removeValue(forKey: fd)`), line 739 removes it on error with same call, both followed by `break`. Line 744 (`bufferedMessagesByFD[fd] = buffer`) executes after any loop exit, unconditionally re-adding the buffer that was just cleaned up.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ddbd4000aa
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| guard var state = clients[fd] else { return false } | ||
| state.pendingWrites.append( | ||
| PendingWrite( | ||
| data: data, | ||
| totalWritten: 0, | ||
| lastProgressAt: DispatchTime.now().uptimeNanoseconds, | ||
| onDelivered: onDelivered | ||
| ) | ||
| ) | ||
| clients[fd] = state |
There was a problem hiding this comment.
Prevent unbounded pending-write growth for slow readers
This change queues every outbound frame in pendingWrites but never caps queue length/bytes, and the server continues processing incoming requests while writes are backpressured. In a slow-reader scenario (or a client that periodically drains just enough to avoid the 250ms stall timeout), enqueueWrite keeps appending responses indefinitely, so memory can grow without bound and eventually destabilize/kill BrainBar. Please add per-client queue limits (or stop reading/handling new requests once a write backlog threshold is crossed) so backpressured clients cannot cause unbounded buffering.
Useful? React with 👍 / 👎.
Summary
initializehandshakeEntityCard.Relationparity fixture so the BrainBar test target compiles onmainTest plan
swift test --package-path brain-bar --filter 'SocketIntegrationTests/testInitializeHandshakeSurvivesBriefBackpressureBurst|SocketIntegrationTests/testServerDisconnectsStalledClient|SocketIntegrationTests/testStdioAdapterBridgesInitializeAndSubscribe'swift test --package-path brain-bar --filter 'SocketIntegrationTests|MCPFramingTests|MCPRouterTests|TextFormatterParityTests'(socket/MCP framing coverage passed; unrelated baseline failures remain inMCPRouterTestsandTextFormatterParityTestson currentmainexpectations)bash brain-bar/build-app.sh(build and bundle completed; LaunchAgent bootstrap failed locally withBootstrap failed: 5: Input/output error)Note
Fix BrainBar MCP initialize handshake to survive backpressure by replacing synchronous writes with async chunked flush
BrainBarServer.sendResponsewith an async, non-blocking queue: outbound data is enqueued viaenqueueWriteand flushed in 4 KB chunks byflushPendingWrites.EAGAIN/EWOULDBLOCK, the server schedules a 2 ms retry instead of busy-waiting; clients are only disconnected after a 250 ms stall with no write progress.markDeliveredcalls are deferred until the full payload has been written to the socket via anonDeliveredcallback, preventing premature delivery acknowledgment.testInitializeHandshakeSurvivesBriefBackpressureBurst) that constrains the client receive buffer to trigger backpressure and verifies the initialize response and subsequent requests succeed.📊 Macroscope summarized ddbd400. 3 files reviewed, 1 issue evaluated, 0 issues filtered, 1 comment posted
🗂️ Filtered Issues