*: move frame assembly from reader to stream#42
Conversation
|
0625d9c to
e1a35fa
Compare
e1a35fa to
ee07ef0
Compare
There was a problem hiding this comment.
Pull request overview
This PR refactors the wire receive path to support future stream multiplexing by making drpcwire.Reader return individual frames and moving packet assembly/validation responsibilities into drpcstream.Stream and drpcmanager.Manager.
Changes:
- Replace packet reconstruction in
drpcwire.ReaderwithReadFrame()that returns single frames. - Add
Stream.HandleFrameto assemble multi-frame packets and feed completed packets into the stream state machine. - Update
Manager.manageReaderto dispatch frames, enforce global frame-ID monotonicity, and adapt tests/call sites.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| drpcwire/reader.go | Reader now parses/returns single frames (ReadFrame) instead of assembling packets. |
| drpcwire/reader_test.go | Updates and expands tests to validate frame-level reader behavior. |
| drpcstream/stream.go | Introduces frame assembly (HandleFrame) and adjusts write splitting behavior. |
| drpcstream/stream_test.go | Adds extensive HandleFrame/write-side tests and updates existing stream tests to use frames. |
| drpcmanager/manager.go | Updates manager read loop to read/route frames and enforce global monotonicity. |
| drpcmanager/manager_test.go | Adds manager-level frame/monotonicity/routing tests. |
| drpcconn/conn_test.go | Updates conn tests to read frames instead of packets. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
cthumuluru-crdb
left a comment
There was a problem hiding this comment.
I've a few comments for you to address. I didn't take a deeper look at the tests yet but would really like if the tests also include unit tests.
ee07ef0 to
13c4b53
Compare
13c4b53 to
e1648dc
Compare
Previously, Reader assembled wire frames into complete packets before handing them to the manager. This change makes Reader return individual frames (ReadFrame), and the stream handles frame assembly itself (HandleFrame). The manager now enforces global frame ID monotonicity and other validation that are beyond a stream's scope. This is groundwork for stream multiplexing, where frames from different streams will be interleaved on the wire and must be routed to the correct stream before assembly.
e1648dc to
53e0b30
Compare
suj-krishnan
left a comment
There was a problem hiding this comment.
LGTM! My comments are all mostly around the reader code trying to handle both client and server logic and as discussed, we should refactor it to reduce the cognitive load.
| // if an old message has been sent, just ignore it. | ||
| case curr != nil && pkt.ID.Stream < curr.ID(): | ||
| // If a frame arrives for an old stream, just ignore it. | ||
| case curr != nil && incomingFrame.ID.Stream < curr.ID(): |
There was a problem hiding this comment.
This case is redundant for the server. A frame that passes the monotonicity check above can never fail this check for the server since incomingFrame.Stream >= lastFrameID.Stream >= curr.ID() will always be true on the server side. However, it is needed for the client. It will help to add a comment here explaining why this is needed.
| // new stream must be KindInvoke or KindInvokeMetadata. | ||
| // A non-invoke frame arrived for a stream that doesn't exist yet | ||
| // (curr is nil or incomingFrame.ID.Stream > curr.ID). The first | ||
| // frame of a new stream must be KindInvoke or KindInvokeMetadata. |
There was a problem hiding this comment.
This comment and the error is true for the server but doesn't make sense for the client side. Can we make it generic or clarify client vs server explicitly.
|
|
||
| func (m *Manager) checkStreamMonotonicity(incomingFrame drpcwire.Frame) bool { | ||
| ok := incomingFrame.ID.Stream >= m.lastFrameID.Stream | ||
| m.lastFrameKind = incomingFrame.Kind |
There was a problem hiding this comment.
nit: This is not being used anywhere - I'm guessing for future use.
| // TODO(shubham): add buf reuse | ||
| s.pktBuf = append(s.pktBuf, fr.Data...) | ||
|
|
||
| s.pktKind = fr.Kind |
There was a problem hiding this comment.
nit: only needs to be set for the first frame right? Can be moved to the else if block on line 238
Stacked on #39
Previously, Reader assembled wire frames into complete packets before
handing them to the manager. This change makes Reader return individual
frames (ReadFrame), and the stream handles frame assembly itself
(HandleFrame). The manager now enforces global frame ID monotonicity and
other validation that are beyond a stream's scope.
This is groundwork for stream multiplexing, where frames from different
streams will be interleaved on the wire and must be routed to the correct
stream before assembly.