-
Notifications
You must be signed in to change notification settings - Fork 336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clean callbacks of connection after run loop stopped (#239) #248
Clean callbacks of connection after run loop stopped (#239) #248
Conversation
a6b5467
to
38a16c2
Compare
pulsar/internal/connection.go
Outdated
c.pingTicker.Stop() | ||
c.pingCheckTicker.Stop() | ||
|
||
for _, listener := range c.listeners { | ||
listener.ConnectionClosed() | ||
} | ||
|
||
for _, req := range c.pendingReqs { | ||
if c.runLoopStoppedCh != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you reuse the close channel here? Why create another channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean reusing the closeCh
?
It seems that the closeCh
is a signal to instruct the run()
loop to exit. However we would not know when the run()
has finished its job without a dedicated signal.
Therefore, the runLoopStoppedCh
is the dedicated signal and is responsible to indicating the run()
loop stopped.
When runLoopStoppedCh
closed, we are able do further cleanup resources shared with the run()
loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the run loop have to exit before removing the pending callback? How could a callback be called twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @wolfstudy pointed out, the problem comes from theinternalWriteData
may call on Close()
https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L331
While the run loop will go to next iteration no matter that the Close() is called inside the loop or from another goroutine. At that point, the run loop can go into incomingCmdCh
branch even if the c.closeCh
had been closed.
If the run loop goes into the incomingCmdCh branch, then the callbacks in the pendingReqs
being called in the Close() will be called second time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the issue is connection.Close() is called multiple times?
After Close() is the connection struct ever used again?
If we want to ensure Close() is only called once can we use a sync.Once and do something similar to what the consumers do?
https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_impl.go#L362
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if order is needed (pendingReqs close callbacks must happen before the others c.listeners and c.consumerHandlers callbacks) maybe @merlimat would know better.
I think moving just the pendingReqs into the run loops is needed. The other structures are protected with locks. Have you tried moving the pendingReqs code from Close() to the run loop and then running some tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, moving just the pendingReqs into the run loops will still pass the existing tests, but it is really hard to write new race tests without to be flaky.
I would suggest we keep cleanup code at only one place, and they would be better triggered by the run loop. However the Close()
would become an async operation.
Hi @merlimat, would you have any feedback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After tracing the call path of the Close()
, I found that it is called directly by the Close()
in the client_impl.go
.
When performing a graceful shutdown, although normally we will first call the producer.Close()/consumer.Close()
and then call client.Close()
at the end. I think it is still a good idea to keep the Close()
as a sync operation to remain similar behavior between the producer.Close()/consumer.Close()
.
We could move the cleanup code of c.pendingReqs
, c.listeners
and c.consumerHandlers
into the run loop. However in that case, I would suggest keeping the runLoopStoppedCh
channel to inform the Close()
that the cleanup process has finished.
What do you think? @cckellogg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you see the run loop ever getting stuck for a period of time when close is called? I think we should probably only pendingReqs cleanup in the run loop since that map is only accessed in that go routine . We could keep the others where they are since they are protected by locks. We just need to add some comments on why this is the case and why the runLoopStoppedCh is needed.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you see the run loop ever getting stuck for a period of time when close is called?
No, they are separated originally. To keep the original behavior, the runLoopStoppedCh is not needed if we move the pendingReqs cleanup at the end of run loop.
I just updated the PR which moves pendingReqs cleanup into the run loop and adds some notices.
Please let me know if it looks good to you.
38a16c2
to
9702aff
Compare
The change LGTM +1, ping @cckellogg @merlimat PTAL |
if c.cnx != nil { | ||
c.cnx.Close() | ||
} | ||
c.TriggerClose() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a client calls this won't it trigger another call to Close()
if the run loop is still going. This would cause all the listeners to be triggered again. Are there potential issues with that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is wrapped with a mutex and a c.state == connectionClosed
check.
I think another call to Close()
will have no effect.
Move the milestones to 0.2.0 |
Fixes #239
Motivation
As @wolfstudy pointed out here #239 (comment)
There is a race on callbacks of
pendingReqs
when closing the connection while the run loop is still running, which will lead to calling a callback up to 2 times:pulsar-client-go/pulsar/internal/connection.go
Lines 669 to 671 in e7f1673
Modifications
Introducing a
runLoopStoppedCh
to make sure that the run loop has already stopped when cleaning callbacks ofpendingReqs
in theClose()