Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid producer deadlock on connection closing #337

Merged
merged 4 commits into from
Jul 29, 2020

Conversation

merlimat
Copy link
Contributor

Motivation

There's a condition in which a producer can remain deadlocked in the event of a connection failure.

The sequence goes like:

  1. Producer (or multiple producers) have several outstanding request to write on a connection
  2. The channel (used to pass buffers to write) has the buffer full and thus it's blocking
  3. When the connection is closed the channel is not drained
  4. The connection tries to notify the producers that it's time to reconnect
  5. The producer go-routine is not able to process that notification, since it's blocked on the connection channel

@merlimat merlimat added this to the 0.2.0 milestone Jul 29, 2020
@merlimat merlimat self-assigned this Jul 29, 2020
@@ -729,7 +756,7 @@ func (c *connection) Close() {

func (c *connection) changeState(state connectionState) {
c.Lock()
c.state = state
atomic.StoreInt32(&c.state, int32(state))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In here, Is it necessary for us to c.Lock() and c.Unlock() actions on an atomic operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's still needed to ensure that the mutex condition is broadcasted. To trigger a condition you need to have a lock on the associated mutex.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if there are other problems with this approach, because for an atomic primitive, we rarely see the operation of Lock and Unlock it.

atomic.StoreInt32(&c.state, int32(state))
c.Lock()
c.cond.Broadcast()
c.Unlock()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 different usages here:

  • On one hand we want other go routines to be able to check the state, without taking a lock
  • On the other hand, we still need to maintain the atomic state update and notification

Copy link
Member

@wolfstudy wolfstudy Jul 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i agree with you, The main difference between us here is whether we need to lock to protect the atomic primitive, I think it is not needed, atomic itself is a synchronization primitive, so here, we can reduce the scope of the lock and only lock sync.cond. The code example is as follows:

atomic.StoreInt32(&c.state, int32(state))
c.Lock()
c.cond.Broadcast()
c.Unlock()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore me

Copy link
Member

@wolfstudy wolfstudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1

@merlimat merlimat merged commit c0cba32 into apache:master Jul 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants