Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 833] Fix the availablePermits leak that could cause consumer stuck. #835

Merged
merged 2 commits into from
Oct 13, 2022

Conversation

Gleiphir2769
Copy link
Contributor

Fixes #833

Motivation

The availablePermits may leak when message is discarded. Please refer to #833 for details.

Modifications

  • Wraps a struct availablePermits to atomic increase and reset.
  • Add the permits increasing in discardCorruptedMessage.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows: pulsar/consumer_test.go:TestAvailablePermitsLeak()

@Gleiphir2769 Gleiphir2769 changed the title [issue 833] Fix for availablePermits leak [Issue 833] Fix the availablePermits leak that could cause consumer stuck. Aug 28, 2022
Copy link
Contributor

@zzzming zzzming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good finding to fix the availablePermits leak.

Can we let dipatcher() to manage the availablePermits since most cases already covered by various signals in dispatcher()'s select cases. The benefits are 1) centralize one function to manage availablePermits 2) to avoid using Atomic or any synchronizing because avialablePermits can only updated one at a time.
To achieve that, can we have failure cases from MessageReceived() to send a signal to a new channel that dispatcher() listens to that will call your new function increasePermitsAndRequestMoreIfNeed()

There might be other cases leaking availablePermits. So in the future, those cases can send a sig to this new increasePermits channel.

What do you think? I think the statement management (i.e. availablePermits) should be done by dispatcher() instead of leaking all over the places then it forces to use synchronization (i.e. atomic) and better use of channel.

@Gleiphir2769
Copy link
Contributor Author

Can we let dipatcher() to manage the availablePermits since most cases already covered by various signals in dispatcher()'s select cases. The benefits are 1) centralize one function to manage availablePermits 2) to avoid using Atomic or any synchronizing because avialablePermits can only updated one at a time. To achieve that, can we have failure cases from MessageReceived() to send a signal to a new channel that dispatcher() listens to that will call your new function increasePermitsAndRequestMoreIfNeed()

Hi @zzzming. I agree with you that handle the availablePermits in dispatcher().

There might be other cases leaking availablePermits. So in the future, those cases can send a sig to this new increasePermits channel.

You are right. Chunking also need to increase availablePermits when all the chunks haven't all arrived yet. So this is also a case where need to request more availablePermits after return early from MessageReceived().

@Gleiphir2769
Copy link
Contributor Author

Gleiphir2769 commented Sep 5, 2022

The modify for dispatcher has done. CC @zzzming

By the way, what is the difference between availablePermits and requestedPermits?

pc.availablePermits++
flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
if pc.availablePermits >= flowThreshold {
availablePermits := pc.availablePermits
requestedPermits := availablePermits
pc.availablePermits = 0
pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits)
if err := pc.internalFlow(uint32(requestedPermits)); err != nil {
pc.log.WithError(err).Error("unable to send permits")
}
}

I think the code here is redundant. availablePermits always equal with the requestedPermits. What do you think?

@zzzming
Copy link
Contributor

zzzming commented Oct 10, 2022

@Gleiphir2769 availablePermits is consumer counter to keep track how many messages the broker can send. Upon the initial connection of a consumer, the consumer sets the initial permits to the queue size and sends to the broker; because of that, the available permits reduces to 0. That is in the dispatcher() function.
For every messages it receives, the available permit is increased by one. This allows the consumer to request more messages once the number of permits over the threshold. This is where you have changed the code in this PR.

requestPermits is the number of messages the consumer communicates with the broker that it allows to send.

These two are very close terminologies. Ultimately, we need one counter to keep track the number of availablePermits. The requestPermits is just an intermediary variable hold the availablePermits value and send to the broker.

The rest of PR looks good to me.

@merlimat @michaeljmarshall @wolfstudy Can we merge this soon?

Copy link
Member

@wolfstudy wolfstudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, LGTM +1

@wolfstudy wolfstudy merged commit a013ff0 into apache:master Oct 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] The availablePermits of consumer may leak
3 participants