Skip to content

Commit

Permalink
[feat] Expose the chunk config of consumer to the reader (#987)
Browse files Browse the repository at this point in the history
* [feat] Expose the chunk config of consumer to the reader

* add test for reader's chunk config

* refactoring some code

* Update pulsar/reader.go

Co-authored-by: Zike Yang <zike@apache.org>

---------

Co-authored-by: Zike Yang <zike@apache.org>
  • Loading branch information
CrazyCollin and RobertIndie committed Mar 9, 2023
1 parent 5277f3f commit e269c42
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 15 deletions.
10 changes: 10 additions & 0 deletions pulsar/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ type ReaderOptions struct {
// BackoffPolicy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
BackoffPolicy internal.BackoffPolicy

// MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100)
MaxPendingChunkedMessage int

// ExpireTimeOfIncompleteChunk sets the expiry time of discarding incomplete chunked message. (default: 60 seconds)
ExpireTimeOfIncompleteChunk time.Duration

// AutoAckIncompleteChunk sets whether reader auto acknowledges incomplete chunked message when it should
// be removed (e.g.the chunked message pending queue is full). (default: false)
AutoAckIncompleteChunk bool
}

// Reader can be used to scan through all the messages currently available in a topic.
Expand Down
41 changes: 26 additions & 15 deletions pulsar/reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,33 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
options.Decryption.MessageCrypto = messageCrypto
}

if options.MaxPendingChunkedMessage == 0 {
options.MaxPendingChunkedMessage = 100
}

if options.ExpireTimeOfIncompleteChunk == 0 {
options.ExpireTimeOfIncompleteChunk = time.Minute
}

consumerOptions := &partitionConsumerOpts{
topic: options.Topic,
consumerName: options.Name,
subscription: subscriptionName,
subscriptionType: Exclusive,
receiverQueueSize: receiverQueueSize,
startMessageID: startMessageID,
startMessageIDInclusive: options.StartMessageIDInclusive,
subscriptionMode: nonDurable,
readCompacted: options.ReadCompacted,
metadata: options.Properties,
nackRedeliveryDelay: defaultNackRedeliveryDelay,
replicateSubscriptionState: false,
decryption: options.Decryption,
schema: options.Schema,
backoffPolicy: options.BackoffPolicy,
topic: options.Topic,
consumerName: options.Name,
subscription: subscriptionName,
subscriptionType: Exclusive,
receiverQueueSize: receiverQueueSize,
startMessageID: startMessageID,
startMessageIDInclusive: options.StartMessageIDInclusive,
subscriptionMode: nonDurable,
readCompacted: options.ReadCompacted,
metadata: options.Properties,
nackRedeliveryDelay: defaultNackRedeliveryDelay,
replicateSubscriptionState: false,
decryption: options.Decryption,
schema: options.Schema,
backoffPolicy: options.BackoffPolicy,
maxPendingChunkedMessage: options.MaxPendingChunkedMessage,
expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk,
autoAckIncompleteChunk: options.AutoAckIncompleteChunk,
}

reader := &reader{
Expand Down
39 changes: 39 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,45 @@ func TestReaderConfigSubscribeName(t *testing.T) {
assert.NotNil(t, consumer)
}

func TestReaderConfigChunk(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()

r1, err := client.CreateReader(ReaderOptions{
Topic: "my-topic1",
StartMessageID: EarliestMessageID(),
MaxPendingChunkedMessage: 50,
ExpireTimeOfIncompleteChunk: 30 * time.Second,
AutoAckIncompleteChunk: true,
})
assert.Nil(t, err)
defer r1.Close()

// verify specified chunk options
pcOpts := r1.(*reader).pc.options
assert.Equal(t, 50, pcOpts.maxPendingChunkedMessage)
assert.Equal(t, 30*time.Second, pcOpts.expireTimeOfIncompleteChunk)
assert.True(t, pcOpts.autoAckIncompleteChunk)

r2, err := client.CreateReader(ReaderOptions{
Topic: "my-topic2",
StartMessageID: EarliestMessageID(),
})
assert.Nil(t, err)
defer r2.Close()

// verify default chunk options
pcOpts = r2.(*reader).pc.options
assert.Equal(t, 100, pcOpts.maxPendingChunkedMessage)
assert.Equal(t, time.Minute, pcOpts.expireTimeOfIncompleteChunk)
assert.False(t, pcOpts.autoAckIncompleteChunk)
}

func TestReader(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
Expand Down

0 comments on commit e269c42

Please sign in to comment.