*: use per-stream Finished signal instead of shared sfin channel#45
Conversation
2b7433e to
cf11131
Compare
779b184 to
341397a
Compare
cf11131 to
b7ec558
Compare
341397a to
dcda07e
Compare
There was a problem hiding this comment.
Pull request overview
This PR updates stream completion signaling to use the per-stream (*drpcstream.Stream).Finished() channel rather than a single shared “stream finished” channel, which is necessary for correct behavior when multiple streams can complete independently (multiplexing).
Changes:
- Removed the shared
sfinchannel fromdrpcmanager.Managerand switched waiting logic tostream.Finished(). - Removed
finplumbing fromdrpcstream.Streamconstruction and finish-path signaling. - Deleted
GetStreamFin/SetStreamFinfrom internal stream options.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| internal/drpcopts/stream.go | Removes accessors for the old shared finish channel in internal stream options. |
| drpcstream/stream.go | Removes the stream-level fin channel and stops emitting to a shared finished notifier. |
| drpcmanager/manager.go | Replaces waits on the shared sfin channel with waits on stream.Finished(). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // GetStreamTransport returns the drpc.Transport stored in the options. | ||
| func GetStreamTransport(opts *Stream) drpc.Transport { return opts.transport } | ||
|
|
||
| // SetStreamTransport sets the drpc.Transport stored in the options. | ||
| func SetStreamTransport(opts *Stream, tr drpc.Transport) { opts.transport = tr } | ||
|
|
||
| // GetStreamFin returns the chan<- struct{} stored in the options. | ||
| func GetStreamFin(opts *Stream) chan<- struct{} { return opts.fin } | ||
|
|
||
| // SetStreamFin sets the chan<- struct{} stored in the options. | ||
| func SetStreamFin(opts *Stream, fin chan<- struct{}) { opts.fin = fin } | ||
|
|
||
| // GetStreamKind returns the StreamKind stored in the options. | ||
| func GetStreamKind(opts *Stream) drpc.StreamKind { return opts.kind } | ||
|
|
There was a problem hiding this comment.
After removing GetStreamFin/SetStreamFin, the underlying fin chan<- struct{} field still remains in drpcopts.Stream (see internal/drpcopts/stream.go:14) but is now unreachable outside the package and has no remaining references in the repo. Consider deleting the fin field from drpcopts.Stream to avoid dead/unused internal state and confusion about how stream completion is signaled (now via (*drpcstream.Stream).Finished()).
|
I don't believe this change is needed in mux world? The only reason |
| if s.sigs.fin.Set(nil) { | ||
| s.log("FIN", func() string { return "" }) | ||
| s.ctx.sig.Set(context.Canceled) | ||
| if s.fin != nil { |
There was a problem hiding this comment.
This change semantically doesn't match with the current behavior. With s.fin() the caller is unblocked after s.fin <- struct{}{}.
With this change the caller is unblocked immediately after s.sigs.fin.Set(nil).
There was a problem hiding this comment.
[pp] we think we don't need checkFinished
There was a problem hiding this comment.
If above comments from chandra are true, then we will remove this later in multiplexing code. If it's actually needed then we need to revisit the semantic differnece here.
| // checkFinished checks to see if the stream is terminated, and if so, sets the | ||
| // finished flag. This must be called after every read or write is complete, as | ||
| // well as when the stream becomes terminated. | ||
| func (s *Stream) checkFinished() { |
There was a problem hiding this comment.
Also, the main goal of checkFinished() appears to be to ensure the read/write/terminate of one stream happens before any admitting any other stream.
There was a problem hiding this comment.
added to the list and will address in multiplexing
cthumuluru-crdb
left a comment
There was a problem hiding this comment.
I don't see much use of checkFinished() in multiplexing world and I would expect that to be removed. Given that I'm now worried about the semantic difference this change brings to checkFinished(). This change looks good.
b7ec558 to
4705799
Compare
Replace the shared sfin channel with stream.Finished(), giving each stream its own completion signal. The shared channel worked for single-stream-at-a-time. Per-stream signals are required for multiplexing where multiple streams finish independently.
dcda07e to
ce3c970
Compare
Squashed result of the upstream stream-multiplexing branch. See the merged PRs for granular history: - #39 drpcmanager: fix race between manageReader and stream creation - #42 *: move frame assembly from reader to stream - #43 *: extract PacketAssembler for frame-to-packet assembly - #44 drpcmanager: replace manageStreams loop with per-stream goroutines - #45 *: use per-stream Finished signal instead of shared sfin channel - #46 drpcmanager: use atomic counter for client stream ID generation - #47 drpcmanager: replace streamBuffer with a streams registry - #51 drpc: enable stream multiplexing A connection now runs multiple concurrent client and server streams over a single transport. Frames carry stream IDs and are interleaved on the wire by a shared MuxWriter. Each stream owns its own packet ring buffer, Finished signal, and goroutine, and the manager tracks live streams in an activeStreams registry. New: drpcwire.MuxWriter, drpcwire.PacketAssembler, drpcstream.ringBuffer, drpcmanager.activeStreams. Removed: the drpccache package, drpcwire/writer (now MuxWriter), drpcstream/pktbuf (now ringBuffer), drpcmanager/streambuf (now activeStreams), drpcmanager.Options.InactivityTimeout, and the drpcconn shared write buffer plus stats infrastructure (CollectStats, Stats, drpcstats wiring).
Replace the shared sfin channel with stream.Finished(), giving each
stream its own completion signal. The shared channel worked for
single-stream-at-a-time. Per-stream signals are required for
multiplexing where multiple streams finish independently.