diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index b2f00b2e2bb02..2609060f972f0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -97,7 +97,6 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private final BlockingQueue pendingCallbacks; private final Semaphore semaphore; private volatile Timeout sendTimeout = null; - private volatile Timeout batchMessageAndSendTimeout = null; private long createProducerTimeout; private final BatchMessageContainerBase batchMessageContainer; private CompletableFuture lastSendFuture = CompletableFuture.completedFuture(null); @@ -133,6 +132,8 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private final ConnectionHandler connectionHandler; + private ScheduledFuture batchTimerTask; + @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater msgIdGeneratorUpdater = AtomicLongFieldUpdater .newUpdater(ProducerImpl.class, "msgIdGenerator"); @@ -406,7 +407,7 @@ public void sendAsync(Message message, SendCallback callback) { return; } } - + try { synchronized (this) { int readStartIndex = 0; @@ -795,10 +796,10 @@ public CompletableFuture closeAsync() { sendTimeout = null; } - Timeout batchTimeout = batchMessageAndSendTimeout; - if (batchTimeout != null) { - batchTimeout.cancel(); - batchMessageAndSendTimeout = null; + ScheduledFuture batchTimerTask = this.batchTimerTask; + if (batchTimerTask != null) { + batchTimerTask.cancel(false); + this.batchTimerTask = null; } if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) { @@ -1229,8 +1230,24 @@ public void connectionOpened(final ClientCnx cnx) { if (!producerCreatedFuture.isDone() && isBatchMessagingEnabled()) { // schedule the first batch message task - client.timer().newTimeout(batchMessageAndSendTask, conf.getBatchingMaxPublishDelayMicros(), - TimeUnit.MICROSECONDS); + batchTimerTask = cnx.ctx().executor().scheduleAtFixedRate(() -> { + if (log.isTraceEnabled()) { + log.trace( + "[{}] [{}] Batching the messages from the batch container from timer thread", + topic, + producerName); + } + // semaphore acquired when message was enqueued to container + synchronized (ProducerImpl.this) { + // If it's closing/closed we need to ignore the send batch timer and not + // schedule next timeout. + if (getState() == State.Closing || getState() == State.Closed) { + return; + } + + batchMessageAndSend(); + } + }, 0, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS); } resendMessages(cnx); } @@ -1504,32 +1521,6 @@ private void failPendingBatchMessages(PulsarClientException ex) { semaphore.release(numMessagesInBatch); } - TimerTask batchMessageAndSendTask = new TimerTask() { - - @Override - public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled()) { - return; - } - if (log.isTraceEnabled()) { - log.trace("[{}] [{}] Batching the messages from the batch container from timer thread", topic, - producerName); - } - // semaphore acquired when message was enqueued to container - synchronized (ProducerImpl.this) { - // If it's closing/closed we need to ignore the send batch timer and not schedule next timeout. - if (getState() == State.Closing || getState() == State.Closed) { - return; - } - - batchMessageAndSend(); - // schedule the next batch message task - batchMessageAndSendTimeout = client.timer() - .newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS); - } - } - }; - @Override public CompletableFuture flushAsync() { CompletableFuture lastSendFuture;