Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchingMaxMessages option is ignored #513

Closed
omnilight opened this issue Apr 28, 2021 · 0 comments · Fixed by #528
Closed

BatchingMaxMessages option is ignored #513

omnilight opened this issue Apr 28, 2021 · 0 comments · Fixed by #528

Comments

@omnilight
Copy link
Contributor

Expected behavior

When I initialize client like the following:

producer, err := client.CreateProducer(pulsar.ProducerOptions{
   BatchingMaxMessages: 2
}

I expect it will send maximum 2 messages in the batch, even if I send using SendAsync.

Actual behavior

Parameter is ignored in batch builder. Both batchContainer and keyBasedBatchContainer during calling of their method Add use hasSpace method internally. But this method looks like the following:


// for simple batch:
func (bc *batchContainer) hasSpace(payload []byte) bool {
	msgSize := uint32(len(payload))
	return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) > uint32(bc.maxBatchSize)
}

// for key based:
func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool {
	msgSize := uint32(len(payload))
	return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) > uint32(bc.maxBatchSize)
}

So it does not check for the maxMessages. Option. Moreover. Is is obvious that this method should always return false at the first call of method Add because it has a condition bc.numMessages > 0, but this bug is compensated by another bug in method Send itself:

if replicateTo != nil && bc.numMessages != 0 {
		// If the current batch is not empty and we're trying to set the replication clusters,
		// then we need to force the current batch to flush and send the message individually
		return false
	} else if bc.msgMetadata.ReplicateTo != nil {
		// There's already a message with cluster replication list. need to flush before next
		// message can be sent
		return false
	} else if bc.hasSpace(payload) {
		// The current batch is full. Producer has to call Flush() to
		return false
	}

Calling method bc.hasSpace must be !bc.hasSpace.

As a result: batch limiting does not work as expected and the only way to fix that from the client code is to use Disable batching option

Steps to reproduce

Just set option BatchingMaxMessages to any value and send more message using SendAsync method. Batch would be finished only when size (in bytes) of bytes would be exceeded, not when number of messages would exceed limit

System configuration

Pulsar version: 2.7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant