Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform producer compression from IO threads #7733

Merged
merged 6 commits into from
Aug 4, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private final BlockingQueue<OpSendMsg> pendingCallbacks;
private final Semaphore semaphore;
private volatile Timeout sendTimeout = null;
private volatile Timeout batchMessageAndSendTimeout = null;
private long createProducerTimeout;
private final BatchMessageContainerBase batchMessageContainer;
private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -133,6 +132,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne

private final ConnectionHandler connectionHandler;

private ScheduledFuture<?> batchTimerTask;

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
.newUpdater(ProducerImpl.class, "msgIdGenerator");
Expand Down Expand Up @@ -406,7 +407,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
return;
}
}

try {
synchronized (this) {
int readStartIndex = 0;
Expand Down Expand Up @@ -795,10 +796,10 @@ public CompletableFuture<Void> closeAsync() {
sendTimeout = null;
}

Timeout batchTimeout = batchMessageAndSendTimeout;
if (batchTimeout != null) {
batchTimeout.cancel();
batchMessageAndSendTimeout = null;
ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
if (batchTimerTask != null) {
batchTimerTask.cancel(false);
this.batchTimerTask = null;
}

if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
Expand Down Expand Up @@ -1229,8 +1230,24 @@ public void connectionOpened(final ClientCnx cnx) {

if (!producerCreatedFuture.isDone() && isBatchMessagingEnabled()) {
// schedule the first batch message task
client.timer().newTimeout(batchMessageAndSendTask, conf.getBatchingMaxPublishDelayMicros(),
TimeUnit.MICROSECONDS);
batchTimerTask = cnx.ctx().executor().scheduleAtFixedRate(() -> {
if (log.isTraceEnabled()) {
log.trace(
"[{}] [{}] Batching the messages from the batch container from timer thread",
topic,
producerName);
}
// semaphore acquired when message was enqueued to container
synchronized (ProducerImpl.this) {
// If it's closing/closed we need to ignore the send batch timer and not
// schedule next timeout.
if (getState() == State.Closing || getState() == State.Closed) {
return;
}

batchMessageAndSend();
}
}, 0, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
}
resendMessages(cnx);
}
Expand Down Expand Up @@ -1504,32 +1521,6 @@ private void failPendingBatchMessages(PulsarClientException ex) {
semaphore.release(numMessagesInBatch);
}

TimerTask batchMessageAndSendTask = new TimerTask() {

@Override
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
if (log.isTraceEnabled()) {
log.trace("[{}] [{}] Batching the messages from the batch container from timer thread", topic,
producerName);
}
// semaphore acquired when message was enqueued to container
synchronized (ProducerImpl.this) {
// If it's closing/closed we need to ignore the send batch timer and not schedule next timeout.
if (getState() == State.Closing || getState() == State.Closed) {
return;
}

batchMessageAndSend();
// schedule the next batch message task
batchMessageAndSendTimeout = client.timer()
.newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
}
}
};

@Override
public CompletableFuture<Void> flushAsync() {
CompletableFuture<MessageId> lastSendFuture;
Expand Down