Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Replicator producer should have blockIfQueueFull=true #15724

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented May 23, 2022

Motivation

In #15691, an exception is seen which isn't properly handled.

2022-05-19T18:35:09,916+0000 [BookKeeperClientWorker-OrderedExecutor-3-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://tenant/namespace/perftest-partition-33][pulsar-cluster-src -> pulsar-cluster-dst] Error producing on remote broker
org.apache.pulsar.client.api.PulsarClientException$MemoryBufferIsFullError: Client memory buffer is full
	at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:849) ~[com.datastax.oss-pulsar-client-original-2.10.0.3.jar:2.10.0.3]
	at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:429) ~[com.datastax.oss-pulsar-client-original-2.10.0.3.jar:2.10.0.3]
	at org.apache.pulsar.broker.service.persistent.PersistentReplicator.lambda$readEntriesComplete$2(PersistentReplicator.java:369) ~[com.datastax.oss-pulsar-broker-2.10.0.3.jar:2.10.0.3]
	at java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:753) ~[?:?]
	at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:731) ~[?:?]
	at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2108) ~[?:?]
	at org.apache.pulsar.broker.service.persistent.PersistentReplicator.readEntriesComplete(PersistentReplicator.java:367) ~[com.datastax.oss-pulsar-broker-2.10.0.3.jar:2.10.0.3]
	at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:156) ~[com.datastax.oss-managed-ledger-2.10.0.3.jar:2.10.0.3]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [com.datastax.oss-managed-ledger-2.10.0.3.jar:2.10.0.3]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [com.datastax.oss-bookkeeper-common-4.14.5.1.0.0.jar:4.14.5.1.0.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.76.Final.jar:4.1.76.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]

The reason for this is the memory limit controller which is enabled by default. PR #15723 addresses that and makes it configurable and disables the memory limit for replication clients by default. Besides this, the blockIfQueueFull=true setting should be used in the replicator producer.

  • prevents exceptions when sending gets backpressured by memory limit (or queue size limit)

Modifications

  • set blockIfQueueFull=true for replicator producer

- prevents exceptions when sending gets backpressured by
  memory limit
@lhotari lhotari added type/bug The PR fixed a bug or issue reported a bug area/broker area/geo-replication doc-not-needed Your PR changes do not impact docs labels May 23, 2022
@lhotari lhotari added this to the 2.11.0 milestone May 23, 2022
@lhotari lhotari self-assigned this May 23, 2022
@@ -83,6 +83,7 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
.enableBatching(false)
.sendTimeout(0, TimeUnit.SECONDS) //
.maxPendingMessages(producerQueueSize) //
.blockIfQueueFull(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would mean blocking the IO threads (or other shared threads) in the broker causing a deadlock

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to find a better solution to have a broker-wide flow control mechanism for replicators that can be tied to the client memory limit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing @merlimat . Yes, blocking would be problematic in this case.

Would you mind also reviewing #15723 since the memory limit is currently causing regressions in geo-replication in 2.10 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker area/geo-replication doc-not-needed Your PR changes do not impact docs type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants