Skip to content

Commit

Permalink
Issue 1455: MessageID has always batch index 0 when sending messages …
Browse files Browse the repository at this point in the history
…in a batch (#2099)

*Motivation*

Fixes #1455.

Pulsar uses a callback chain for completing the list of callbacks for a batch. However the callback chain doesn't reference the message instance for completing the callback.
so when callback chain is triggered, it always uses the first message id to complete the chain of callbacks.

*Changes*

Introduce a field to keep message instance in the callback chain. So when the chain is invoked, each callback can use the right message instance to complete the callback.

Added an integration test to ensure it works correctly.
  • Loading branch information
sijie committed Jul 6, 2018
1 parent 0420e59 commit 83e3157
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ protected ProducerSendCallback newObject(Handle<ProducerSendCallback> handle) {
};

@Override
public void addCallback(SendCallback scb) {
public void addCallback(MessageImpl<?> msg, SendCallback scb) {
// noop
}

Expand All @@ -221,6 +221,11 @@ public SendCallback getNextSendCallback() {
return null;
}

@Override
public MessageImpl<?> getNextMessage() {
return null;
}

@Override
public CompletableFuture<MessageId> getFuture() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ protected ProducerSendCallback newObject(Handle<ProducerSendCallback> handle) {
};

@Override
public void addCallback(SendCallback scb) {
public void addCallback(MessageImpl<?> msg, SendCallback scb) {
// noop
}

Expand All @@ -390,6 +390,11 @@ public SendCallback getNextSendCallback() {
return null;
}

@Override
public MessageImpl<?> getNextMessage() {
return null;
}

@Override
public CompletableFuture<MessageId> getFuture() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void add(MessageImpl<?> msg, SendCallback callback) {
}

if (previousCallback != null) {
previousCallback.addCallback(callback);
previousCallback.addCallback(msg, callback);
}
previousCallback = callback;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ CompletableFuture<MessageId> internalSendAsync(Message<T> message) {

sendAsync(message, new SendCallback() {
SendCallback nextCallback = null;
MessageImpl<?> nextMsg = null;
long createdAt = System.nanoTime();

@Override
Expand All @@ -219,6 +220,11 @@ public SendCallback getNextSendCallback() {
return nextCallback;
}

@Override
public MessageImpl<?> getNextMessage() {
return nextMsg;
}

@Override
public void sendComplete(Exception e) {
if (e != null) {
Expand All @@ -230,20 +236,22 @@ public void sendComplete(Exception e) {
}
while (nextCallback != null) {
SendCallback sendCallback = nextCallback;
MessageImpl<?> msg = nextMsg;
if (e != null) {
stats.incrementSendFailed();
sendCallback.getFuture().completeExceptionally(e);
} else {
sendCallback.getFuture().complete(message.getMessageId());
sendCallback.getFuture().complete(msg.getMessageId());
stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
}
nextMsg = nextCallback.getNextMessage();
nextCallback = nextCallback.getNextSendCallback();
sendCallback = null;
}
}

@Override
public void addCallback(SendCallback scb) {
public void addCallback(MessageImpl<?> msg, SendCallback scb) {
nextMsg = msg;
nextCallback = scb;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,25 @@ public interface SendCallback {
/**
* used to specify a callback to be invoked on completion of a send operation for individual messages sent in a
* batch. Callbacks for messages in a batch get chained
*
* @param scb
*
* @param msg message sent
* @param scb callback associated with the message
*/
void addCallback(SendCallback scb);
void addCallback(MessageImpl<?> msg, SendCallback scb);

/**
*
* @return next callback in chain
*/
SendCallback getNextSendCallback();

/**
* Return next message in chain
*
* @return next message in chain
*/
MessageImpl<?> getNextMessage();

/**
*
* @return future associated with callback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,25 @@
package org.apache.pulsar.tests.integration.semantics;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

/**
* Test pulsar produce/consume semantics
Expand Down Expand Up @@ -184,4 +191,61 @@ private static void checkMessagesIdempotencyEnabled(Consumer<String> consumer) t
receiveAndAssertMessage(consumer, 1L, "message-1");
receiveAndAssertMessage(consumer, 2L, "message-2");
}

@Test(dataProvider = "ServiceUrls")
public void testBatchProducing(String serviceUrl) throws Exception {
String topicName = generateTopicName("testbatchproducing", true);

int numMessages = 10;

List<MessageId> producedMsgIds;

try (PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build()) {

try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("my-sub")
.subscribe()) {

try (Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(true)
.batchingMaxMessages(5)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.create()) {

List<CompletableFuture<MessageId>> sendFutures = Lists.newArrayList();
for (int i = 0; i < numMessages; i++) {
sendFutures.add(producer.sendAsync("batch-message-" + i));
}
CompletableFuture.allOf(sendFutures.toArray(new CompletableFuture[numMessages])).get();
producedMsgIds = sendFutures.stream().map(future -> future.join()).collect(Collectors.toList());
}

for (int i = 0; i < numMessages; i++) {
Message<String> m = consumer.receive();
assertEquals(producedMsgIds.get(i), m.getMessageId());
assertEquals("batch-message-" + i, m.getValue());
}
}
}

// inspect the message ids
for (int i = 0; i < 5; i++) {
assertTrue(producedMsgIds.get(i) instanceof BatchMessageIdImpl);
BatchMessageIdImpl mid = (BatchMessageIdImpl) producedMsgIds.get(i);
log.info("Message {} id : {}", i, mid);

assertEquals(i, mid.getBatchIndex());
}
for (int i = 5; i < 10; i++) {
assertTrue(producedMsgIds.get(i) instanceof BatchMessageIdImpl);
BatchMessageIdImpl mid = (BatchMessageIdImpl) producedMsgIds.get(i);
log.info("Message {} id : {}", i, mid);

assertEquals(i - 5, mid.getBatchIndex());
}
}
}

0 comments on commit 83e3157

Please sign in to comment.