drpcmanager: replace streamBuffer with a streams registry#47
drpcmanager: replace streamBuffer with a streams registry#47shubhamdhama merged 1 commit intostream-multiplexingfrom
Conversation
f5869a5 to
f204e04
Compare
86bb186 to
e16d021
Compare
f204e04 to
6abe605
Compare
e16d021 to
0129d8f
Compare
drpcmanager/registry.go
Outdated
| r.mu.RLock() | ||
| defer r.mu.RUnlock() | ||
|
|
||
| s, ok := r.streams[id] |
There was a problem hiding this comment.
Don't you have to check if registry is closed?
There was a problem hiding this comment.
We can do that now since Close means remove all the entries. I can do it in next PR.
drpcmanager/registry.go
Outdated
| func (r *streamRegistry) Unregister(id uint64) { | ||
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
|
|
There was a problem hiding this comment.
Don't you have to check if registry is closed?
There was a problem hiding this comment.
I don't think so. If registry is closed the entry would have most likely removed and doing this is noop.
drpcmanager/manager.go
Outdated
| m.log("TERM", func() string { return fmt.Sprint(err) }) | ||
| m.sigs.tport.Set(m.tr.Close()) | ||
| m.sbuf.Close() | ||
| m.reg.ForEach(func(s *drpcstream.Stream) { s.Close() }) |
There was a problem hiding this comment.
Should this be s.Cancel() instead of s.Close()? s.Close() writes to the transport, and the current code already holds the lock and writes to the transport for every stream. I'm assuming this path isn't intended for mux streams?
There was a problem hiding this comment.
This is just closing the registry. Anyways, Close will Cancel the streams now.
| case <-m.sigs.term.Signal(): | ||
| err := m.sigs.term.Err() | ||
| if errors.Is(err, io.EOF) { | ||
| err = context.Canceled | ||
| if stream.Kind() == drpc.StreamKindClient { | ||
| err = drpc.ClosedError.New("connection closed") | ||
| } | ||
| } | ||
| stream.Cancel(err) | ||
| <-stream.Finished() | ||
| m.sem.Recv() |
There was a problem hiding this comment.
[pp] we can remove it and cancel all the streams in reg.Close().
| @@ -217,7 +220,7 @@ | |||
| if m.sigs.term.Set(err) { | |||
There was a problem hiding this comment.
[pp] Let's depend on .term in the registry.Register. And close the streams in reg.Close and remove the .term dependency from manageStreams
6abe605 to
e3212b7
Compare
a4092e3 to
9155272
Compare
shubhamdhama
left a comment
There was a problem hiding this comment.
I'm going to address some of the feedback in a separate PR.
drpcmanager/registry.go
Outdated
| r.mu.RLock() | ||
| defer r.mu.RUnlock() | ||
|
|
||
| s, ok := r.streams[id] |
There was a problem hiding this comment.
We can do that now since Close means remove all the entries. I can do it in next PR.
| @@ -217,7 +220,7 @@ func (m *Manager) terminate(err error) { | |||
| if m.sigs.term.Set(err) { | |||
drpcmanager/manager.go
Outdated
| m.log("TERM", func() string { return fmt.Sprint(err) }) | ||
| m.sigs.tport.Set(m.tr.Close()) | ||
| m.sbuf.Close() | ||
| m.reg.ForEach(func(s *drpcstream.Stream) { s.Close() }) |
There was a problem hiding this comment.
This is just closing the registry. Anyways, Close will Cancel the streams now.
Replace the single-stream streamBuffer with a stream registry that maps stream IDs to stream objects. The registry currently holds at most one active stream (two briefly during handoff), but provides the foundation for stream multiplexing where callers will look up streams by ID directly.
9155272 to
e14c876
Compare
cthumuluru-crdb
left a comment
There was a problem hiding this comment.
Just added one comment about a test that's removed. Otherwise changes look good to me.
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
|
|
||
| if r.closed { |
There was a problem hiding this comment.
(for future commits), if you make this an atomic boolean, you can do this check outside. You can also move this check outside once you rely on manager closed signal.
Replace the single-stream streamBuffer with a stream registry that maps
stream IDs to stream objects. The registry currently holds at most one
active stream (two briefly during handoff), but provides the foundation
for stream multiplexing where callers will look up streams by ID directly.