Skip to content

Commit

Permalink
Issue 937: add CommandGetLastMessageId to make reader know the end of…
Browse files Browse the repository at this point in the history
… topic (#1066)

* add CommandGetLastMessageId to getlastMessageId of topic

* rebase master, change following comments

* add partition index in GetLastMessageIdResponse

* fix rebase error

* bump proot version to v11

* change following comments

* change following comments2

* change following comments3

* change following comments

* get cnx() first
  • Loading branch information
zhaijack authored and merlimat committed Feb 14, 2018
1 parent fbb42e7 commit 7404952
Show file tree
Hide file tree
Showing 16 changed files with 1,457 additions and 57 deletions.
Expand Up @@ -334,4 +334,11 @@ public interface ManagedLedger {
* @param config
*/
void setConfig(ManagedLedgerConfig config);

/**
* Gets last confirmed entry of the managed ledger.
*
* @return the last confirmed entry id
*/
Position getLastConfirmedEntry();
}
Expand Up @@ -1062,7 +1062,7 @@ private void closeAllCursors(CloseCallback callback, final Object ctx) {
Futures.waitForAll(futures).thenRun(() -> {
callback.closeComplete(ctx);
}).exceptionally(exception -> {
callback.closeFailed(getManagedLedgerException(exception.getCause()), ctx);
callback.closeFailed(ManagedLedgerException.getManagedLedgerException(exception.getCause()), ctx);
return null;
});
}
Expand Down Expand Up @@ -1282,7 +1282,7 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
}).exceptionally(ex -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition,
ex.getMessage());
opReadEntry.readEntriesFailed(getManagedLedgerException(ex.getCause()), opReadEntry.ctx);
opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), opReadEntry.ctx);
return null;
});
}
Expand Down Expand Up @@ -1351,7 +1351,7 @@ void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ct
entryCache.asyncReadEntry(ledger, position, callback, ctx);
}).exceptionally(ex -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
callback.readEntryFailed(getManagedLedgerException(ex.getCause()), ctx);
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
return null;
});
}
Expand Down Expand Up @@ -2173,7 +2173,8 @@ public int getPendingAddEntriesCount() {
return pendingAddEntries.size();
}

public PositionImpl getLastConfirmedEntry() {
@Override
public Position getLastConfirmedEntry() {
return lastConfirmedEntry;
}

Expand Down
Expand Up @@ -31,10 +31,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;

import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
Expand All @@ -59,6 +57,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer;
Expand Down Expand Up @@ -110,7 +109,7 @@ public class ServerCnx extends PulsarHandler {
private String originalPrincipal = null;
private Set<String> proxyRoles;
private boolean authenticateOriginalAuthData;

enum State {
Start, Connected, Failed
}
Expand Down Expand Up @@ -192,8 +191,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}

/*
* If authentication and authorization is enabled and if the authRole is one of proxyRoles we want to enforce
* - the originalPrincipal is given while connecting
* If authentication and authorization is enabled and if the authRole is one of proxyRoles we want to enforce
* - the originalPrincipal is given while connecting
* - originalPrincipal is not blank
* - originalPrincipal is not a proxy principal
*/
Expand All @@ -218,7 +217,7 @@ protected void handleLookup(CommandLookupTopic lookup) {
if (topicName == null) {
return;
}

String originalPrincipal = null;
if (authenticateOriginalAuthData && lookup.hasOriginalAuthData()) {
originalPrincipal = validateOriginalPrincipal(
Expand All @@ -233,9 +232,9 @@ protected void handleLookup(CommandLookupTopic lookup) {
} else {
originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.originalPrincipal;
}

final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided for lookup ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
Expand Down Expand Up @@ -319,7 +318,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
} else {
originalPrincipal = partitionMetadata.hasOriginalPrincipal() ? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
}

final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
Expand Down Expand Up @@ -441,7 +440,7 @@ CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consum

return commandConsumerStatsResponseBuilder;
}

private String validateOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal, Long requestId, GeneratedMessageLite request) {
ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
SSLSession sslSession = null;
Expand All @@ -461,7 +460,7 @@ private String validateOriginalPrincipal(String originalAuthData, String origina
return null;
}
}

private String getOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal,
SSLSession sslSession) throws AuthenticationException {
if (authenticateOriginalAuthData) {
Expand Down Expand Up @@ -532,7 +531,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
DestinationName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe);
if (topicName == null) {
return;
}
}

if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided while subscribing ";
Expand Down Expand Up @@ -1104,6 +1103,35 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
}
}

@Override
protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) {
checkArgument(state == State.Connected);

CompletableFuture<Consumer> consumerFuture = consumers.get(getLastMessageId.getConsumerId());

if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
long requestId = getLastMessageId.getRequestId();

Topic topic = consumer.getSubscription().getTopic();
Position position = topic.getLastMessageId();
int partitionIndex = DestinationName.getPartitionIndex(topic.getName());
if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), consumer.getSubscription().getName(), position, partitionIndex);
}
MessageIdData messageId = MessageIdData.newBuilder()
.setLedgerId(((PositionImpl)position).getLedgerId())
.setEntryId(((PositionImpl)position).getEntryId())
.setPartition(partitionIndex)
.build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
} else {
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found"));
}
}

@Override
protected boolean isHandshakeCompleted() {
return state == State.Connected;
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -122,4 +123,6 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats
PersistentTopicStats getStats();

PersistentTopicInternalStats getInternalStats();

Position getLastMessageId();
}
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.broker.admin.AdminResource;
Expand Down Expand Up @@ -909,6 +910,11 @@ public CompletableFuture<Void> unsubscribe(String subName) {
return CompletableFuture.completedFuture(null);
}

@Override
public Position getLastMessageId() {
throw new UnsupportedOperationException("getLastMessageId is not supported on non-persistent topic");
}

public void markBatchMessagePublished() {
this.hasBatchMessagePublished = true;
}
Expand Down
Expand Up @@ -1536,5 +1536,10 @@ public long getLastPublishedSequenceId(String producerName) {
return messageDeduplication.getLastPublishedSequenceId(producerName);
}

@Override
public Position getLastMessageId() {
return ledger.getLastConfirmedEntry();
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
}
Expand Up @@ -19,7 +19,11 @@
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand All @@ -28,8 +32,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,8 +42,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.google.common.collect.Sets;

public class TopicReaderTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(TopicReaderTest.class);

Expand Down Expand Up @@ -359,4 +361,103 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
reader.close();
log.info("-- Exiting {} test --", methodName);
}


@Test
public void testSimpleReaderReachEndofTopic() throws Exception {
ReaderConfiguration conf = new ReaderConfiguration();
Reader reader = pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.earliest,
conf);
ProducerConfiguration producerConf = new ProducerConfiguration();
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);

// no data write, should return false
assertFalse(reader.hasMessageAvailable());

// produce message 0 -- 99
for (int i = 0; i < 100; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

MessageImpl msg = null;
Set<String> messageSet = Sets.newHashSet();
int index = 0;

// read message till end.
while (reader.hasMessageAvailable()) {
msg = (MessageImpl) reader.readNext(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + (index ++);
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}

assertEquals(index, 100);
// readNext should return null, after reach the end of topic.
assertNull(reader.readNext(1, TimeUnit.SECONDS));

// produce message again.
for (int i = 100; i < 200; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

// read message till end again.
while (reader.hasMessageAvailable()) {
msg = (MessageImpl) reader.readNext(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + (index ++);
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}

assertEquals(index, 200);
// readNext should return null, after reach the end of topic.
assertNull(reader.readNext(1, TimeUnit.SECONDS));

producer.close();
}

@Test
public void testReaderReachEndofTopicOnMessageWithBatches() throws Exception {
Reader reader = pulsarClient.createReader(
"persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches", MessageId.earliest,
new ReaderConfiguration());

ProducerConfiguration producerConf = new ProducerConfiguration();
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelay(100, TimeUnit.MILLISECONDS);
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches", producerConf);

// no data write, should return false
assertFalse(reader.hasMessageAvailable());

for (int i = 0; i < 100; i++) {
String message = "my-message-" + i;
producer.sendAsync(message.getBytes());
}

// Write one sync message to ensure everything before got persistend
producer.send("my-message-10".getBytes());

MessageId lastMessageId = null;
int index = 0;
assertTrue(reader.hasMessageAvailable());

if (reader.hasMessageAvailable()) {
Message msg = reader.readNext();
lastMessageId = msg.getMessageId();
assertEquals(lastMessageId.getClass(), BatchMessageIdImpl.class);

while (msg != null) {
index++;
msg = reader.readNext(100, TimeUnit.MILLISECONDS);
}
assertEquals(index, 101);
}

assertFalse(reader.hasMessageAvailable());
producer.close();
}
}
Expand Up @@ -62,4 +62,14 @@ public interface Reader extends Closeable {
* Return true if the topic was terminated and this reader has reached the end of the topic
*/
boolean hasReachedEndOfTopic();

/**
* Check if there is any message available to read from the current position.
*/
boolean hasMessageAvailable() throws PulsarClientException;

/**
* Asynchronously Check if there is message that has been published successfully to the broker in the topic.
*/
CompletableFuture<Boolean> hasMessageAvailableAsync();
}

0 comments on commit 7404952

Please sign in to comment.