Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 833] Fix the availablePermits leak that could cause consumer stuck. #835

Merged
merged 2 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 40 additions & 15 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ const (
noMessageEntry = -1
)

type permitsReq int32

const (
// reset the availablePermits of pc
permitsReset permitsReq = iota
// increase the availablePermits
permitsInc
)

type partitionConsumerOpts struct {
topic string
consumerName string
Expand Down Expand Up @@ -127,7 +136,8 @@ type partitionConsumer struct {
messageCh chan ConsumerMessage

// the number of message slots available
availablePermits int32
availablePermits int32
availablePermitsCh chan permitsReq

// the size of the queue channel for buffering messages
queueSize int32
Expand Down Expand Up @@ -223,6 +233,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
dlq: dlq,
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
availablePermitsCh: make(chan permitsReq, 10),
}
pc.setConsumerState(consumerInit)
pc.log = client.log.SubLogger(log.Fields{
Expand Down Expand Up @@ -905,7 +916,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil

// reset available permits
pc.availablePermits = 0
pc.availablePermitsCh <- permitsReset
initialPermits := uint32(pc.queueSize)

pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
Expand All @@ -928,19 +939,14 @@ func (pc *partitionConsumer) dispatcher() {
messages[0] = nil
messages = messages[1:]

// TODO implement a better flow controller
// send more permits if needed
pc.availablePermits++
flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
if pc.availablePermits >= flowThreshold {
availablePermits := pc.availablePermits
requestedPermits := availablePermits
pc.availablePermits = 0
pc.availablePermitsCh <- permitsInc

pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits)
if err := pc.internalFlow(uint32(requestedPermits)); err != nil {
pc.log.WithError(err).Error("unable to send permits")
}
case pr := <-pc.availablePermitsCh:
switch pr {
case permitsInc:
pc.increasePermitsAndRequestMoreIfNeed()
case permitsReset:
pc.availablePermits = 0
}

case clearQueueCb := <-pc.clearQueueCh:
Expand Down Expand Up @@ -971,7 +977,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil

// reset available permits
pc.availablePermits = 0
pc.availablePermitsCh <- permitsReset
initialPermits := uint32(pc.queueSize)

pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
Expand Down Expand Up @@ -1403,6 +1409,25 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
if err != nil {
pc.log.Error("Connection was closed when request ack cmd")
}
pc.availablePermitsCh <- permitsInc
}

func (pc *partitionConsumer) increasePermitsAndRequestMoreIfNeed() {
// TODO implement a better flow controller
// send more permits if needed
flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
pc.availablePermits++
ap := pc.availablePermits
if ap >= flowThreshold {
availablePermits := ap
requestedPermits := ap
pc.availablePermitsCh <- permitsReset

pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits)
if err := pc.internalFlow(uint32(requestedPermits)); err != nil {
pc.log.WithError(err).Error("unable to send permits")
}
}
}

// _setConn sets the internal connection field of this partition consumer atomically.
Expand Down
82 changes: 82 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pulsar

import (
"context"
"errors"
"fmt"
"io/ioutil"
"log"
Expand Down Expand Up @@ -3125,3 +3126,84 @@ func TestConsumerSeekByTimeOnPartitionedTopic(t *testing.T) {
consumer.Ack(msg)
}
}

func TestAvailablePermitsLeak(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
client.Close()

topic := fmt.Sprintf("my-topic-test-ap-leak-%v", time.Now().Nanosecond())

// 1. Producer with valid key name
p1, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Encryption: &ProducerEncryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{"client-rsa.pem"},
},
Schema: NewStringSchema(nil),
DisableBatching: true,
})
assert.Nil(t, err)
assert.NotNil(t, p1)

subscriptionName := "enc-failure-subcription"
totalMessages := 1000

// 2. KeyReader is not set by the consumer
// Receive should fail since KeyReader is not setup
// because default behaviour of consumer is fail receiving message if error in decryption
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subscriptionName,
})
assert.Nil(t, err)

messageFormat := "my-message-%v"
for i := 0; i < totalMessages; i++ {
_, err := p1.Send(context.Background(), &ProducerMessage{
Value: fmt.Sprintf(messageFormat, i),
})
assert.Nil(t, err)
}

// 2. Set another producer that send message without crypto.
// The consumer can receive it correct.
p2, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Schema: NewStringSchema(nil),
DisableBatching: true,
})
assert.Nil(t, err)
assert.NotNil(t, p2)

_, err = p2.Send(context.Background(), &ProducerMessage{
Value: fmt.Sprintf(messageFormat, totalMessages),
})
assert.Nil(t, err)

// 3. Discard action on decryption failure. Create a availablePermits leak scenario
consumer.Close()

consumer, err = client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subscriptionName,
Decryption: &MessageDecryptionInfo{
ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionDiscard,
},
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
assert.NotNil(t, consumer)

// 4. If availablePermits does not leak, consumer can get the last message which is no crypto.
// The ctx3 will not exceed deadline.
ctx3, cancel3 := context.WithTimeout(context.Background(), 15*time.Second)
_, err = consumer.Receive(ctx3)
cancel3()
assert.NotEqual(t, true, errors.Is(err, context.DeadlineExceeded),
"This means the resource is exhausted. consumer.Receive() will block forever.")
}