-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix TAPA concurrency issue #346
Conversation
I just found a similar problem with the conduit handling, I will add a fix as part of this PR |
/reverify |
@@ -193,7 +204,10 @@ func (c *Conduit) GetIPs() []string { | |||
func (c *Conduit) SetVIPs(vips []string) error { | |||
c.mu.Lock() | |||
defer c.mu.Unlock() | |||
if !c.isConnected() { | |||
c.ctxMu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be a;
if c.ctx != nil { }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And why the double mutex'es in the first place? c.mu
is already held, which ensures single thread.
@@ -137,9 +144,12 @@ func (c *Conduit) Connect(ctx context.Context) error { | |||
// Disconnect closes the connection from NSM, closes all streams | |||
// and stop the VIP watcher | |||
func (c *Conduit) Disconnect(ctx context.Context) error { | |||
c.isDisconnecting.Store(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the c.mu.Lock()
a few lines up and drop the unnecessary extra ctxMu lock.
// Set cancelOpen, so the close function could cancel this Open function. | ||
sr.ctxMu.Lock() | ||
if sr.cancelOpen != nil { // a previous open is still running |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be an attempt to make sure single tread in this function?
If so, the creation of the ctx (below) must be done with the lock held!
Also, if single thread is ensured, there is no need for the "sr.mu.Lock()" in the go-routine below since it seems doing the same thing in worse way.
var ctx context.Context | ||
ctx, sr.cancelOpen = context.WithCancel(context.TODO()) | ||
sr.ctxMu.Unlock() | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A golden rule for synchronization primitives is;
Lock data, never lock code!
This locks code.
sr.mu.Lock() | ||
defer sr.mu.Unlock() | ||
defer func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recode without defer. And ask "are two mutex'es really needed?"
In general the mutex strategy must be improved. Or rather, there must be a mutex strategy. The code gives the impression that a number of "gotchas" has trigged a number of mutex hacks. Golden rule repeated: Lock data, never lock code! |
All the comments seems to be arround the double mutex pattern, so I answer as a new comment: The reason to have 2 mutexes is to be able to force disconnecting while an NSM request is in process (in SetVIPs / Connect functions). When This pattern has been applied in the conduit handler (/pkg/ambassador/tap/conduit/conduit.go) and stream manager (/pkg/ambassador/tap/conduit/stream.go) After discussions, this kind of problem must instead be fixed using channels and goroutine. |
Here is a small struct with the logic removed but with the same behavior: package main
import (
"context"
"sync"
"sync/atomic"
)
func main() {}
type MyType struct {
ctx context.Context
cancel context.CancelFunc
IsConnected bool
mu sync.Mutex // needed to lock IsConnected
muCtx sync.Mutex // needed to lock cancel
isDisconnecting atomic.Bool
}
func (mt *MyType) Connect() {
mt.mu.Lock()
defer mt.mu.Unlock()
mt.muCtx.Lock()
mt.ctx, mt.cancel = context.WithCancel(context.Background())
mt.muCtx.Unlock()
mt.IsConnected = true
}
// Cannot be called if is not connected or if it is disconnecting
func (mt *MyType) Update() {
mt.mu.Lock()
defer mt.mu.Unlock()
if !mt.IsConnected || mt.isDisconnecting.Load() {
return
}
mt.muCtx.Lock()
mt.ctx, mt.cancel = context.WithCancel(context.Background())
mt.muCtx.Unlock()
<-mt.ctx.Done() // Simulate infinite update
mt.IsConnected = true
}
// Will close the Connect/Update calls
func (mt *MyType) Delete() {
mt.isDisconnecting.Store(true)
mt.muCtx.Lock()
mt.cancel()
mt.muCtx.Unlock()
mt.mu.Lock()
defer mt.mu.Unlock()
mt.ctx = nil
mt.IsConnected = false
mt.isDisconnecting.Store(false)
} |
First, you don't have to lock a call to cancel since there is a lock in the function alredy, so remains the function pointer. But it's never cleared (set to nil), which it really should BTW, so its not a problem either. Suggested Delete without muCtx func (mt *MyType) Delete() {
mt.mu.Lock()
defer mt.mu.Unlock()
mt.isDisconnecting.Store(true)
if mt.cancel != nil {
mt.cancel()
}
mt.ctx = nil
mt.cancel = nil
mt.IsConnected = false
mt.isDisconnecting.Store(false)
} I don't know if mt.isDisconnecting.Store should be with or without the lock held. In the original example it was set true without the lock and false with the lock. That may not be harmful, but it is inconsistent. (I have not compiled the code, so there may be bugs) |
More flaws, or inconsistencies with the double-mutex'es; func (mt *MyType) Update() {
mt.mu.Lock()
defer mt.mu.Unlock()
if !mt.IsConnected || mt.isDisconnecting.Load() {
return
}
// If Delete is called here the old ctx will be cancelled, but not the new one
mt.muCtx.Lock()
mt.ctx, mt.cancel = context.WithCancel(context.Background())
mt.muCtx.Unlock()
// If Delete is called here the new ctx will be cancelled, but not the old one
<-mt.ctx.Done() // Simulate infinite update
mt.IsConnected = true
} |
One more; func (mt *MyType) Connect() {
mt.mu.Lock()
defer mt.mu.Unlock()
// If Delete is called here you will have a SEGV
mt.muCtx.Lock()
mt.ctx, mt.cancel = context.WithCancel(context.Background())
mt.muCtx.Unlock()
mt.IsConnected = true
} |
Not for data locking! Then mutex'es should be used! But to block (and enqueue) multiple calls to a go-routine by using a mutex should not be used in go. That is a different (worse) way, traditionally using integer-semaphores. In go, a channel should be used |
Some notes from https://pkg.go.dev/context should be considered:
This will happen in the normal Update() case. The old ctx is simply overwritten. No cancel called.
Now, I don't think it's good to be too medieval, so exceptions may be necessary. Never the less, this is a recommendation form the guys designing this stuff. |
Thank you for your comments.
Yes, it should since the
Right, it must be locked using muCtx so there is no problem with multiple Delete calls.
Right, it that case, I checked, it can simply be removed from the struct since the ctx, is not used anywhere else. Only Cancel is important.
I still don't really see how to implement this. This is how I rewrote it: package main
import (
"context"
"sync"
)
func main() {}
type MyType struct {
cancel context.CancelFunc
IsConnected bool
mu sync.Mutex // needed to lock IsConnected and Connect/Update/Disconnect logic
muCtx sync.Mutex // needed to lock cancel and isDisconnecting
isDisconnecting bool
}
func (mt *MyType) Connect() {
mt.mu.Lock()
defer mt.mu.Unlock()
mt.muCtx.Lock()
if mt.isDisconnecting {
mt.muCtx.Unlock()
return
}
if mt.cancel != nil {
mt.cancel()
}
_, cancel := context.WithCancel(context.Background())
defer cancel()
mt.cancel = cancel
mt.muCtx.Unlock()
// Connect logic here
mt.IsConnected = true
}
// Cannot be called if is not connected or if it is disconnecting
func (mt *MyType) Update() {
mt.mu.Lock()
defer mt.mu.Unlock()
if !mt.IsConnected {
return
}
mt.muCtx.Lock()
if !mt.IsConnected || mt.isDisconnecting {
mt.muCtx.Unlock()
return
}
if mt.cancel != nil {
mt.cancel()
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mt.cancel = cancel
mt.muCtx.Unlock()
<-ctx.Done() // Update logic here
mt.IsConnected = true
}
// Will close the Connect/Update calls
func (mt *MyType) Delete() {
mt.muCtx.Lock()
if mt.isDisconnecting {
mt.muCtx.Unlock()
return
}
mt.isDisconnecting = true
if mt.cancel != nil {
mt.cancel()
}
mt.muCtx.Unlock()
mt.mu.Lock()
defer mt.mu.Unlock()
if !mt.IsConnected {
return
}
// Disconnect logic here
mt.IsConnected = false
mt.muCtx.Lock()
mt.isDisconnecting = false
mt.muCtx.Unlock()
} |
There are several bugs in the code above, mostly multiple calls to cancel().
The point here is to allow the caller to decide about cancel's or timeouts. The called function should honor the passed ctx and cleanup+return if the context is canceled. package main
import (
"context"
"sync"
)
func main() {}
type connState int
const (
idle connState = iota
connected
deleted
canceled
failed
)
type MyType struct {
mu sync.Mutex
state connState
}
// The context should be passed by the caller to allow cancel (or
// timeouts) in the way the caller wishes.
func (mt *MyType) Connect(ctx context.Context) {
mt.mu.Lock()
defer mt.mu.Unlock()
if mt.state != idle {
return // Can only connect once
}
mt.state = connected
// Connect logic here which honors the ctx
// (the state can be connected (happy path) or canceled/failed on return)
}
func (mt *MyType) Update(ctx context.Context) {
mt.mu.Lock()
defer mt.mu.Unlock()
if mt.state != connected {
return // Can only update in connected state
}
// What was the logic here? I write as I assume it works below.
//<-mt.ctx.Done()
// The Update logic. Report outcome via a channel (not a context)
// if it is in a go-routine. The caller may cancel the context at
// any time.
res := make(chan int)
// Update logic here which honors the ctx
select {
case <-ctx.Done():
mt.state = canceled
return
case rc := <-res:
if rc != 0 {
mt.state = failed
// (return an error here?)
}
}
// (the state can be connected (happy path) or canceled/failed on return)
}
// Will close the Connect/Update calls
func (mt *MyType) Delete(ctx context.Context) {
mt.mu.Lock()
defer mt.mu.Unlock()
if mt.state != connected {
return // Can only delete in connected state
}
mt.state = deleted
// Disconnect logic here which honors the ctx, and a select a'la Update()
// (if the state is deleted, canceled or failed on return probably
// doesn't matter for Delete())
} The functions should probably return an error. |
Ok, it's simpler, but it's also very different from the behavior I wanted to reach. My goal was to cancel the Connect or Update when Delete was called. |
I see. But the muCtx will not work as intended! Example: func (mt *MyType) Connect() {
mt.mu.Lock()
defer mt.mu.Unlock()
// Delete() called here. Cancel will NOT be called
mt.muCtx.Lock()
mt.ctx, mt.cancel = context.WithCancel(context.Background())
mt.muCtx.Unlock()
// Delete called here works as intended
mt.IsConnected = true
} |
Here Delete() calls cancel. A ctx-child (and it's cancel()) is stored in the struct, but it's created once in package main
import (
"context"
"fmt"
"sync"
)
func main() {}
type connState int
const (
idle connState = iota
connected
deleted
canceled
failed
)
type MyType struct {
mu sync.Mutex
state connState
ctx context.Context // Might contain a logger
cancel context.CancelFunc
}
func NewMyType(ctx context.Context) *MyType {
var mt MyType
mt.ctx, mt.cancel = context.WithCancel(ctx)
return &mt
}
func (mt *MyType) Connect() error {
mt.mu.Lock()
defer mt.mu.Unlock()
if mt.state != idle {
return fmt.Errorf("Already connected (well, in-use at least)")
}
// Connect logic here which honors the mt.ctx
mt.state = connected
return nil
}
func (mt *MyType) Update() error {
mt.mu.Lock()
defer mt.mu.Unlock()
if mt.state != connected {
return fmt.Errorf("Not connected")
}
// Update logic here which honors the mt.ctx
return nil
}
// Delete MUST be called after Connect() to ensure cancel()
func (mt *MyType) Delete(ctx context.Context) {
// Cancel any on-going Connect or Update
// NO SYNC IS NEEDED HERE! MULTIPLE CALLS WORKS! THE POINTER IS !=nil
mt.cancel()
mt.mu.Lock()
defer mt.mu.Unlock()
if mt.state == deleted {
return // Already deleted
}
// Here the state can be
// idle - do nothing (shouldn't happen)
// connected - disconnect. Normal case
// canceled - a Connect or Update was canceled (might be difficult)
// failed - a Connect or Update has failed. Do nothing?
// Disconnect logic here which doesn't care about the mt.ctx
// (since it's canceled), but honors the passed ctx.
mt.state = deleted
} |
still, you delete (ny bad, I called it like that in my example, while it must be disconnect), but re-connect doesn't work because the ctx is set in the New function |
IMO you should create a new object on re-connect. But if you really want to re-use the object you may refresh the ctx,cancel in Delete() and set state=idle. (and rename Delete -> Disconnect) |
// Disconnect MUST be called after Connect() to ensure cancel()
func (mt *MyType) Disconnect(ctx context.Context) {
// Cancel any on-going Connect or Update
// NO SYNC IS NEEDED HERE! MULTIPLE CALLS WORKS! THE POINTER IS !=nil
mt.cancel()
mt.mu.Lock()
defer mt.mu.Unlock()
if mt.state == idle {
return // Not connected
}
// Here the state can be
// connected - disconnect. Normal case
// canceled - a Connect or Update was canceled (might be difficult)
// failed - a Connect or Update has failed. Do nothing?
// Disconnect logic here which doesn't care about the mt.ctx
// (since it's canceled), but honors the passed ctx.
mt.ctx, mt.cancel = context.WithCancel(ctx)
mt.state = idle
} |
I think it's best to clean-up everything necessary in Connect() or Update() when cancelled or failed. Then the Disconnect() can disconnect if state==connected, and do nothing for states cancelled or failed. |
I think my example was confusing, I tried to make it simpler, so I removed things, but maybe too much things. Callers of Connect/Update must be also able to cancel their calls via the context. In the TAPA, that object contains the stream manager, which keeps the state of the streams. The streams also need a reference to that object because they need a functions to get the local IPs (IPs are set after Connect/Update calls) |
The stored ctx is a child ctx. Meaning that cancel() on the parent also cancels the child. Cancel on the child in Disconnect() becomes a no-op, and I am unsure if it's required to prevent the child leak. Probably not. If the parent is cancelled a check may be added in Disconnect() since the state may still be connected, but the mt.ctx cancelled. I can't figure out a reason that would be necessary though. Disconnect should probably disconnect anyway. |
The delay was 50ms before, even in the unit tests. It is now 1 second by default and 1ms in the unit tests.
if multiple events occur (AddStream, SetStreams), a stack of Open would pile up waiting for the mutex to be unlocked. When the mutex would be unlocked (on close call), the close could then be executed, and after, piled open calls would execute again, which would cause the stream to be re-opened.
SetVips could be called even after the conduit has been disconnected. So the NSM connection would have been requested a new time and the interface would be remaining in the pod. The VIP watcher has been moved from the conduit handler to the conduit manager (conduitRetry), so the SetVIPs won't be blocking any other request and can be cancelled easily when disconnecting the conduit.
I fixed it based on your comments, I had to refactor somethings so now it follows the solution described here: #346 (comment) The VIP Watcher has been moved from the conduit handler to the conduit manager (conduitRetry). I tried with these unit tests #343 and everything they are all passing. |
cc.ctxMu.Lock() | ||
defer cc.ctxMu.Unlock() | ||
if ctx.Err() == nil { | ||
cc.configuration.Watch() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is disconnect() supposed to work here without block? It will be blocked until cc.configuration.Watch() returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine, Watch is not blocking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the comment about blocking disconnect(). If that's as intended I approve.
I have not reviewed the tests, and I do not intend to 😄 |
Description
Issue link
#316
#343 (stream-max-targets test)
Checklist