Skip to content

Commit

Permalink
Issue #1517: make getLastConfirmedEntry in ManagedLedgerImpl return r…
Browse files Browse the repository at this point in the history
…eal LAC (#1550)
  • Loading branch information
zhaijack authored and sijie committed Apr 12, 2018
1 parent 199961e commit 2de50a7
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 18 deletions.
Expand Up @@ -359,7 +359,19 @@ public void operationFailed(MetaStoreException e) {
STATE_UPDATER.set(this, State.LedgerOpened); STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = System.currentTimeMillis(); lastLedgerCreatedTimestamp = System.currentTimeMillis();
currentLedger = lh; currentLedger = lh;

lastConfirmedEntry = new PositionImpl(lh.getId(), -1); lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
// bypass empty ledgers, find last ledger with Message if possible.
while (lastConfirmedEntry.getEntryId() == -1) {
Map.Entry<Long, LedgerInfo> formerLedger = ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
if (formerLedger != null) {
LedgerInfo ledgerInfo = formerLedger.getValue();
lastConfirmedEntry = PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
} else {
break;
}
}

LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build(); LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
ledgers.put(lh.getId(), info); ledgers.put(lh.getId(), info);


Expand Down
Expand Up @@ -60,10 +60,10 @@ protected void cleanup() throws Exception {


@Test @Test
public void testSimpleReader() throws Exception { public void testSimpleReader() throws Exception {
Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testSimpleReader")
.startMessageId(MessageId.earliest).create(); .startMessageId(MessageId.earliest).create();


Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1") Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testSimpleReader")
.create(); .create();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
String message = "my-message-" + i; String message = "my-message-" + i;
Expand All @@ -88,14 +88,14 @@ public void testSimpleReader() throws Exception {


@Test @Test
public void testReaderAfterMessagesWerePublished() throws Exception { public void testReaderAfterMessagesWerePublished() throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1") Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished")
.create(); .create();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
String message = "my-message-" + i; String message = "my-message-" + i;
producer.send(message.getBytes()); producer.send(message.getBytes());
} }


Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished")
.startMessageId(MessageId.earliest).create(); .startMessageId(MessageId.earliest).create();


Message<byte[]> msg = null; Message<byte[]> msg = null;
Expand All @@ -116,17 +116,17 @@ public void testReaderAfterMessagesWerePublished() throws Exception {


@Test @Test
public void testMultipleReaders() throws Exception { public void testMultipleReaders() throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1") Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testMultipleReaders")
.create(); .create();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
String message = "my-message-" + i; String message = "my-message-" + i;
producer.send(message.getBytes()); producer.send(message.getBytes());
} }


Reader<byte[]> reader1 = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") Reader<byte[]> reader1 = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMultipleReaders")
.startMessageId(MessageId.earliest).create(); .startMessageId(MessageId.earliest).create();


Reader<byte[]> reader2 = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") Reader<byte[]> reader2 = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMultipleReaders")
.startMessageId(MessageId.earliest).create(); .startMessageId(MessageId.earliest).create();


Message<byte[]> msg = null; Message<byte[]> msg = null;
Expand Down Expand Up @@ -157,7 +157,7 @@ public void testMultipleReaders() throws Exception {


@Test @Test
public void testTopicStats() throws Exception { public void testTopicStats() throws Exception {
String topicName = "persistent://my-property/use/my-ns/my-topic1"; String topicName = "persistent://my-property/use/my-ns/testTopicStats";


Reader<byte[]> reader1 = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create(); Reader<byte[]> reader1 = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();


Expand All @@ -178,14 +178,14 @@ public void testTopicStats() throws Exception {


@Test @Test
public void testReaderOnLastMessage() throws Exception { public void testReaderOnLastMessage() throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1") Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderOnLastMessage")
.create(); .create();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
String message = "my-message-" + i; String message = "my-message-" + i;
producer.send(message.getBytes()); producer.send(message.getBytes());
} }


Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderOnLastMessage")
.startMessageId(MessageId.latest).create(); .startMessageId(MessageId.latest).create();


for (int i = 10; i < 20; i++) { for (int i = 10; i < 20; i++) {
Expand Down Expand Up @@ -213,15 +213,15 @@ public void testReaderOnLastMessage() throws Exception {


@Test @Test
public void testReaderOnSpecificMessage() throws Exception { public void testReaderOnSpecificMessage() throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1") Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderOnSpecificMessage")
.create(); .create();
List<MessageId> messageIds = new ArrayList<>(); List<MessageId> messageIds = new ArrayList<>();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
String message = "my-message-" + i; String message = "my-message-" + i;
messageIds.add(producer.send(message.getBytes())); messageIds.add(producer.send(message.getBytes()));
} }


Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderOnSpecificMessage")
.startMessageId(messageIds.get(4)).create(); .startMessageId(messageIds.get(4)).create();


// Publish more messages and verify the readers only sees messages starting from the intended message // Publish more messages and verify the readers only sees messages starting from the intended message
Expand Down Expand Up @@ -354,10 +354,10 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
} }


@Test @Test
public void testSimpleReaderReachEndofTopic() throws Exception { public void testSimpleReaderReachEndOfTopic() throws Exception {
Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testSimpleReaderReachEndOfTopic")
.startMessageId(MessageId.earliest).create(); .startMessageId(MessageId.earliest).create();
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1") Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testSimpleReaderReachEndOfTopic")
.create(); .create();


// no data write, should return false // no data write, should return false
Expand Down Expand Up @@ -409,13 +409,13 @@ public void testSimpleReaderReachEndofTopic() throws Exception {
} }


@Test @Test
public void testReaderReachEndofTopicOnMessageWithBatches() throws Exception { public void testReaderReachEndOfTopicOnMessageWithBatches() throws Exception {
Reader<byte[]> reader = pulsarClient.newReader() Reader<byte[]> reader = pulsarClient.newReader()
.topic("persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches") .topic("persistent://my-property/use/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
.startMessageId(MessageId.earliest).create(); .startMessageId(MessageId.earliest).create();


Producer<byte[]> producer = pulsarClient.newProducer() Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches") .topic("persistent://my-property/use/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
.enableBatching(true).batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS).create(); .enableBatching(true).batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS).create();


// no data write, should return false // no data write, should return false
Expand Down Expand Up @@ -448,4 +448,40 @@ public void testReaderReachEndofTopicOnMessageWithBatches() throws Exception {
assertFalse(reader.hasMessageAvailable()); assertFalse(reader.hasMessageAvailable());
producer.close(); producer.close();
} }

@Test
public void testMessageAvailableAfterRestart() throws Exception {
String topic = "persistent://my-property/use/my-ns/testMessageAvailableAfterRestart";
String content = "my-message-1";

// stop retention from cleaning up
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();

try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
.startMessageId(MessageId.earliest).create()) {
assertFalse(reader.hasMessageAvailable());
}

try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create()) {
producer.send(content.getBytes());
}

try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
.startMessageId(MessageId.earliest).create()) {
assertTrue(reader.hasMessageAvailable());
}

// cause broker to drop topic. Will be loaded next time we access it
pulsar.getBrokerService().getTopicReference(topic).get().close().get();

try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
.startMessageId(MessageId.earliest).create()) {
assertTrue(reader.hasMessageAvailable());

String readOut = new String(reader.readNext().getData());
assertTrue(readOut.equals(content));
assertFalse(reader.hasMessageAvailable());
}

}
} }

0 comments on commit 2de50a7

Please sign in to comment.