-
Notifications
You must be signed in to change notification settings - Fork 332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(kds): Avoid calling Send()
from different goroutines
#2573
Conversation
81bd511
to
04bb599
Compare
Thanks! I just tested this fix (cherry-pick in 1.2.3) and it looks like it is solving my issue #2492 |
pkg/kds/mux/client.go
Outdated
@@ -75,20 +80,23 @@ func (c *client) Start(stop <-chan struct{}) (errs error) { | |||
"client-id", c.clientID, | |||
KDSVersionHeaderKey, KDSVersionV3, | |||
) | |||
log := muxClientLog.WithValues("client-id", c.clientID) | |||
log.Info("initializing Kuma Discovery Service (KDS) stream for remote-zone sync of resources") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: global-zone
@jakubdyszkiewicz noticed an issue with restarts of global which would not have remote reconnect. I need to look into this. |
pkg/kds/mux/session.go
Outdated
} | ||
|
||
func (s *session) close() { | ||
func (s *session) Close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these public functions should have godoc comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close isn't idempotent, and I think that it's unsafe for anything except the mux server to call it. Can we make it private and consider renaming it?
pkg/kds/mux/session.go
Outdated
s.clientStream.bufferStream.sendResult <- err | ||
case <-s.closing: | ||
select { | ||
case <-s.serverStream.bufferStream.sendBuffer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this code is trying to drain the send buffer, but I also don't think that you can actually fix the race like this.
At this point, the buffer stream cancellation has been called. If the other goroutine in Send hasn't checked the context yet, then you are OK, it will error. If it has already checked, then you don't need to poll any channels, just closing sendResult is enough to make it fail.
You still need to deal with closing the sendBuffer channel, which is never closed AFAICT. I think the easiest way to do this is to stuff a lock in the bufferStream and add API so close the buffer. Then the close is atomic WRT the sender.
Suggest wrapping bufferStream API around this to make it clearer. Something like this:
type bufferStream struct {
sendBuffer chan *mesh_proto.Message
sendResult chan error
recvBuffer chan *mesh_proto.Message
lock sync.Mutex // Protects the write side of the buffer, not the read side.
closed bool
}
func (k *bufferStream) close() {
k.lock.Lock()
defer k.lock.Unlock()
k.closed = true
close(k.sendBuffer)
close(k.sendResult)
close(k.recvBuffer)
}
func (k *bufferStream) dequeueMessage() *mesh_proto.Message {
return <-k.sendBuffer
}
func (k *bufferStream) enqueResult(err error) {
k.lock.Lock()
defer k.lock.Unlock()
if !k.closed {
k.sendResult <- err
}
}
func (k *bufferStream) Send(message *mesh_proto.Message) error {
k.lock.Lock()
if k.closed {
return io.EOF
}
k.sendBuffer <- message
k.lock.Unlock()
return <-k.sendResult
}
func (k *bufferStream) Recv() (*mesh_proto.Message, error) {
k.lock.Lock()
if k.closed {
k.lock.Unlock()
return nil, io.EOF
}
k.lock.Unlock()
select {
case r := <-k.recvBuffer:
return r, nil
}
}
Then you don't close the bufferStreams from the goroutines, you close them from the session Close.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx it makes sense indeed. I was trying to stay away for Mutexes and trying to find the right way to write this.
I think this code achieves what I was trying to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With a mutex approach, there's a risk of deadlocking if you need the goroutine and the caller to both hold mutexes. The trick I've used here is that the mutex only guards the write side of the stream object, protecting against closed channels.
I haven't forgotten about this PR just want to add some more e2e tests to kds before getting back on this. |
Rewrote most of KDS mux as it wasn't working as intended. KDS mux now ensures: Recv() on underlying streams are always called in the same goroutine Send() on underlying streams are always called in the same goroutine Close() actually waits for some goroutine to terminate Also added extra tests to ensure these invariants Fixes kumahq#2492 Signed-off-by: Charly Molter <charly.molter@konghq.com>
04bb599
to
cefa03e
Compare
@jpeach @bchatelard @jakubdyszkiewicz I've finally got around to updating this. I still want to check a couple of things but I think the code is ready to be reviewed. |
Codecov Report
@@ Coverage Diff @@
## master #2573 +/- ##
==========================================
+ Coverage 51.67% 51.78% +0.10%
==========================================
Files 870 870
Lines 50749 50774 +25
==========================================
+ Hits 26226 26291 +65
+ Misses 22428 22381 -47
- Partials 2095 2102 +7
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few nits and small issues. I also think it's worth adding explanatory comments while this is all fresh in your mind :)
"client-id", c.clientID, | ||
KDSVersionHeaderKey, KDSVersionV3, | ||
) | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: insert paragraph break here :)
start(newPolicySink(zoneName, zoneSyncer, clientStream, &testRuntimeContext{kds: kdsCtx}), stop) | ||
go func() { | ||
_ = newPolicySink(zoneName, zoneSyncer, clientStream, &testRuntimeContext{kds: kdsCtx}).Start() | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get a data race detected here:
WARNING: DATA RACE
Write at 0x00c00098ef70 by goroutine 62:
github.com/kumahq/kuma/pkg/kds/zone_test.glob..func1.3()
/home/jpeach/upstream/konghq/kuma/pkg/kds/zone/components_test.go:89 +0x679
...
Previous read at 0x00c00098ef70 by goroutine 140:
github.com/kumahq/kuma/pkg/kds/zone_test.glob..func1.3.1()
/home/jpeach/upstream/konghq/kuma/pkg/kds/zone/components_test.go:92 +0xbb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's part of the things I still need to check but I think it's a test only issue.
@@ -94,7 +88,9 @@ var _ = Describe("Zone Sync", func() { | |||
zoneStore = memory.NewStore() | |||
zoneSyncer = sync_store.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore) | |||
|
|||
start(newPolicySink(zoneName, zoneSyncer, clientStream, &testRuntimeContext{kds: kdsCtx}), stop) | |||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should someone be waiting for the wait group above?
pkg/kds/mux/session.go
Outdated
close(s.clientStream.responses) | ||
func (k *bufferStream) put(message *mesh_proto.Message) { | ||
k.recvBuffer <- message | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this method is only called from handleRecv
and that is the only place that calls close()
, writing to the streamn without holding the lock is OK.
I experimented with locking this, but it makes tests timeout since it serialized WRT Send()
. Probably a rwlock is the right choice (hold the write when setting the closed flag).
pkg/kds/mux/session.go
Outdated
return | ||
} | ||
r := itm.msg | ||
if kdsVersion == KDSVersionV2 && r != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If r
can actually be nil, then it still gets passed to stream.Send
, which doesn't seem right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually forgot to remove the r!=nil
which is not possible anymore
Signed-off-by: Charly Molter <charly.molter@konghq.com>
Signed-off-by: Charly Molter <charly.molter@konghq.com>
Rewrote most of KDS mux as it wasn't working as intended. KDS mux now ensures: Recv() on underlying streams are always called in the same goroutine Send() on underlying streams are always called in the same goroutine Close() actually waits for some goroutine to terminate Also added extra tests to ensure these invariants Fixes #2492 Signed-off-by: Charly Molter <charly.molter@konghq.com> (cherry picked from commit fa4f1bd)
…2744) Rewrote most of KDS mux as it wasn't working as intended. KDS mux now ensures: Recv() on underlying streams are always called in the same goroutine Send() on underlying streams are always called in the same goroutine Close() actually waits for some goroutine to terminate Also added extra tests to ensure these invariants Fixes #2492 Signed-off-by: Charly Molter <charly.molter@konghq.com> (cherry picked from commit fa4f1bd) Co-authored-by: Charly Molter <charly.molter@konghq.com>
Rewrote most of KDS mux as it wasn't working as intended. KDS mux now ensures: Recv() on underlying streams are always called in the same goroutine Send() on underlying streams are always called in the same goroutine Close() actually waits for some goroutine to terminate Also added extra tests to ensure these invariants Fixes kumahq#2492 Signed-off-by: Charly Molter <charly.molter@konghq.com>
Rewrote most of KDS mux as it wasn't working as intended. KDS mux now ensures: Recv() on underlying streams are always called in the same goroutine Send() on underlying streams are always called in the same goroutine Close() actually waits for some goroutine to terminate Also added extra tests to ensure these invariants Fixes kumahq#2492 Signed-off-by: Charly Molter <charly.molter@konghq.com>
Rewrote most of KDS mux as it wasn't working as intended.
KDS mux now ensures:
Also added extra tests to ensure these invariants
Fixes #2492