*: extract PacketAssembler for frame-to-packet assembly#43
*: extract PacketAssembler for frame-to-packet assembly#43shubhamdhama wants to merge 6 commits intostream-multiplexingfrom
Conversation
54baa4d to
2e5ff8e
Compare
There was a problem hiding this comment.
Pull request overview
This PR refactors wire handling to separate frame parsing from frame-to-packet assembly by introducing a reusable PacketAssembler in drpcwire. This centralizes assembly invariants and lets both drpcstream.Stream and drpcmanager.Manager assemble packets from frames in a consistent way, while drpcwire.Reader now returns individual frames.
Changes:
drpcwire.Readernow exposesReadFrame(frame parsing only) and tests were updated accordingly.- Added
drpcwire.PacketAssemblerto assemble frames into packets with monotonicity/kind invariants and stream-ID consistency. - Updated
drpcstream.Streamanddrpcmanager.Managerto ingest frames and assemble/route complete packets; expanded test coverage around frame handling.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| drpcwire/reader.go | Convert reader from packet reconstruction to single-frame reads (ReadFrame). |
| drpcwire/reader_test.go | Update tests to validate frame-by-frame reading and buffering behavior. |
| drpcwire/packet_builder.go | Introduce PacketAssembler for reusable frame-to-packet assembly and invariants. |
| drpcstream/stream.go | Replace HandlePacket entrypoint with HandleFrame + internal handlePacket. |
| drpcstream/stream_test.go | Update existing tests for frame handling; add a large suite of HandleFrame tests. |
| drpcmanager/manager.go | Switch manager reader loop to ReadFrame, add invoke-sequence assembly and routing updates. |
| drpcmanager/manager_test.go | Add tests for global monotonicity, invoke sequencing, and frame delivery behavior. |
| drpcconn/conn_test.go | Update conn tests to use ReadFrame for verifying on-wire output. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // SetStreamID sets the expected stream ID. Frames for a different stream will | ||
| // be rejected. If not called, the stream ID is inferred from the first frame. | ||
| func (pa *PacketAssembler) SetStreamID(streamID uint64) { | ||
| pa.pk.ID.Stream = streamID | ||
| } | ||
|
|
||
| // Reset clears all assembly state, preparing the assembler for a new stream. | ||
| func (pa *PacketAssembler) Reset() { | ||
| pa.pk = Packet{ | ||
| ID: ID{Stream: 0, Message: 1}, | ||
| } | ||
| pa.assembling = false | ||
| } | ||
|
|
||
| // AppendFrame adds a frame to the in-progress packet. It returns the completed | ||
| // packet and true when a frame with Done=true is received. It returns false | ||
| // when more frames are needed to complete the packet. | ||
| func (pa *PacketAssembler) AppendFrame(fr Frame) (packet Packet, packetReady bool, err error) { | ||
| // Enforce stream ID consistency: infer from first frame or reject mismatches. | ||
| if pa.pk.ID.Stream == 0 { | ||
| pa.pk.ID.Stream = fr.ID.Stream | ||
| } else if fr.ID.Stream != pa.pk.ID.Stream { | ||
| return Packet{}, false, drpc.ProtocolError.New( | ||
| "frame stream mismatch: got stream %d, expected %d", fr.ID.Stream, pa.pk.ID.Stream) | ||
| } |
There was a problem hiding this comment.
PacketAssembler uses pk.ID.Stream==0 as the sentinel for “stream ID not set yet / infer from first frame”. That makes SetStreamID(0) indistinguishable from not calling SetStreamID at all, so a stream with ID 0 cannot enforce stream-ID matching and may accept frames for the wrong stream (it will just infer the first frame’s ID). If stream ID 0 is valid in this codebase, track “stream ID set” with a separate boolean; if stream ID 0 is invalid, consider rejecting it explicitly in SetStreamID/AppendFrame (and updating call sites/tests accordingly).
There was a problem hiding this comment.
I don't feel strongly about this comment. If other agree let me know.
There was a problem hiding this comment.
I prefer we differentiate between uninitialized vs initialized but to default value.
While it is not important just yet, once we introduce flow control we have to dedicate a stream for control messages that is not subjected to flow control. In general that stream will be stream 0.
Extract the frame assembly logic from Stream.HandleFrame into a reusable PacketAssembler type in drpcwire. Both the stream and the manager now use their own PacketAssembler instance, keeping assembly logic in one place. The manager's assembler handles the invoke sequence (metadata + invoke), which removes the restriction that only KindMessage packets could be split across frames. This also simplifies NewServerStream from a packet-at-a-time loop into a single receive.
2e5ff8e to
0f1fbbe
Compare
| switch curr := m.sbuf.Get(); { | ||
| // If the frame is for the current stream, deliver it. | ||
| case curr != nil && incomingFrame.ID.Stream == curr.ID(): | ||
| if err := curr.HandleFrame(incomingFrame); err != nil { |
There was a problem hiding this comment.
When assembling a KindMessage packet, since Frame has Kind too, we can have some sanity checks even before calling AppendFrame in HandleFrame to bail out early when we get a invalid frames in between, instead of waiting till handlePacket to do the validation.
Example:
Stream s1 was successfully created and curr.ID = s1.
manageReader() gets the below frames for this stream after KindInvoke:
[s1, m2, KindInvokeMetadata, d=f]
--> Invalid frame but we don't detect it because stream id is valid (curr != nil && incomingFrame.ID.Stream == curr.ID()). curr.HandleFrame is called, goes to the fr.ID.Message > pa.pk.ID.Message condition in AppendFrame, and a new packet is started.
[s1, m3, KindMessage, d=f]
--> curr.HandleFrame is called, HandleFrame starts a new packet again
[s1, m3, KindMessage, d=t]
--> Packet completed, so HandleFrame calls handlePacket
In this case, we go ahead and assemble the KindMessage packet even though the KindInvokeMetadata frame before it was invalid. We may eventually error out whenever the continuation frame for the KindInvokeMetadata packet arrives but the error could have been detected much earlier, something like this.
func (s *Stream) HandleFrame(fr drpcwire.Frame) (err error) {
if s.sigs.term.IsSet() {
return nil
}
if fr.Kind == drpcwire.KindInvoke || fr.Kind == drpcwire.KindInvokeMetadata {
err := drpc.ProtocolError.New("invoke on existing stream")
s.terminate(err)
return err
}
packet, packetReady, err := s.pb.AppendFrame(fr)
if err != nil {
return err
}
if !packetReady {
return nil
}
return s.handlePacket(packet)
}
There was a problem hiding this comment.
Also, I think we need some guard against never getting a done=true frame in the code. A max limit on the packet size can do it I guess.
There was a problem hiding this comment.
Updated the above comment with a clearer example
There was a problem hiding this comment.
s1, m1, KindMessage, d=f
s1, m2, KindInvokeMetadata, d=f (starts a new packet)
I guess this state may not be possible?
There was a problem hiding this comment.
I actually think there are more such cases to think about here - mainly related to validating frames by kind early. We can discuss this on Monday.
There was a problem hiding this comment.
Sure. One thing to keep in mind is, since the transport is TCP unless the packets are written from different goroutines (ex: cancel), some combinations are not even possible.
There was a problem hiding this comment.
Sure. One thing to keep in mind is, since the transport is TCP unless the packets are written from different goroutines (ex: cancel), some combinations are not even possible.
Yes, these fall under protocol violations - they cannot happen unless the client is buggy. The existing code already guards against KindInvokeMetadata packets coming out of turn (fails with a protocol violation "invoke on existing stream") - I'm just suggesting that for such errors, there are cases where we know as soon as the frame is read that it's an invalid frame, we don't need to wait to read the whole packet before erroring out. There is also a possibility of handling valid packet kinds (KindError, KindCancel) earlier, which I wanted to discuss on Monday.
There was a problem hiding this comment.
[discussed offline too]
I see your point. But I'm not convinced that we should move it above. I certainly see this as an improvement to avoid doing unnecessary work but this is a rare case and likely only happen while development. Keeping it along with other switch cases keep things more readable without any performance hit.
There was a problem hiding this comment.
Can one of you please summarize the discussion on resolve this comment if this is fully addressed?
| switch curr := m.sbuf.Get(); { | ||
| // If the frame is for the current stream, deliver it. | ||
| case curr != nil && incomingFrame.ID.Stream == curr.ID(): | ||
| if err := curr.HandleFrame(incomingFrame); err != nil { |
There was a problem hiding this comment.
s1, m1, KindMessage, d=f
s1, m2, KindInvokeMetadata, d=f (starts a new packet)
I guess this state may not be possible?
0b8ee3d to
162c5ab
Compare
|
I've moved the invoke packet assembler and metadata from from manageReader to Manager as a field and clearly mentioned that I'm think we may move manageReader to its own struct. |
| switch curr := m.sbuf.Get(); { | ||
| // If the frame is for the current stream, deliver it. | ||
| case curr != nil && incomingFrame.ID.Stream == curr.ID(): | ||
| if err := curr.HandleFrame(incomingFrame); err != nil { |
There was a problem hiding this comment.
Can one of you please summarize the discussion on resolve this comment if this is fully addressed?
| // SetStreamID sets the expected stream ID. Frames for a different stream will | ||
| // be rejected. If not called, the stream ID is inferred from the first frame. | ||
| func (pa *PacketAssembler) SetStreamID(streamID uint64) { | ||
| pa.pk.ID.Stream = streamID | ||
| } | ||
|
|
||
| // Reset clears all assembly state, preparing the assembler for a new stream. | ||
| func (pa *PacketAssembler) Reset() { | ||
| pa.pk = Packet{ | ||
| ID: ID{Stream: 0, Message: 1}, | ||
| } | ||
| pa.assembling = false | ||
| } | ||
|
|
||
| // AppendFrame adds a frame to the in-progress packet. It returns the completed | ||
| // packet and true when a frame with Done=true is received. It returns false | ||
| // when more frames are needed to complete the packet. | ||
| func (pa *PacketAssembler) AppendFrame(fr Frame) (packet Packet, packetReady bool, err error) { | ||
| // Enforce stream ID consistency: infer from first frame or reject mismatches. | ||
| if pa.pk.ID.Stream == 0 { | ||
| pa.pk.ID.Stream = fr.ID.Stream | ||
| } else if fr.ID.Stream != pa.pk.ID.Stream { | ||
| return Packet{}, false, drpc.ProtocolError.New( | ||
| "frame stream mismatch: got stream %d, expected %d", fr.ID.Stream, pa.pk.ID.Stream) | ||
| } |
There was a problem hiding this comment.
I prefer we differentiate between uninitialized vs initialized but to default value.
While it is not important just yet, once we introduce flow control we have to dedicate a stream for control messages that is not subjected to flow control. In general that stream will be stream 0.
Rename packet_builder.go to packet_assembler.go for clarity. Move frame-to-packet assembly tests from drpcstream/stream_test.go to drpcwire/packet_assembler_test.go, testing PacketAssembler directly without Stream dependencies. Stream tests now focus on handlePacket behavior (delivery, termination, invoke rejection).
04265e8 to
6ea6efb
Compare
Extract the frame assembly logic from Stream.HandleFrame into a
reusable PacketAssembler type in drpcwire. Both the stream and the
manager now use their own PacketAssembler instance, keeping assembly
logic in one place.
The manager's assembler handles the invoke sequence (metadata +
invoke), which removes the restriction that only KindMessage packets
could be split across frames. This also simplifies NewServerStream
from a packet-at-a-time loop into a single receive.