Skip to content

Commit

Permalink
Increment permits when redelivering messages from incomingMessages (#101
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sschepens authored and merlimat committed Nov 16, 2016
1 parent 72cc677 commit 5e9884c
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 108 deletions.
Expand Up @@ -1239,6 +1239,65 @@ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep
}
}

@Test
public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exception {
log.info("-- Starting {} test --", methodName);

int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
int totalReceiveMsg = 0;
try {
final int receiverQueueSize = 20;
final int totalProducedMsgs = 100;

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setReceiverQueueSize(receiverQueueSize);
conf.setAckTimeout(1, TimeUnit.SECONDS);
conf.setSubscriptionType(SubscriptionType.Shared);
ConsumerImpl consumer = (ConsumerImpl) pulsarClient
.subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", conf);

ProducerConfiguration producerConf = new ProducerConfiguration();

Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
producerConf);

// (1) Produced Messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

// (2) wait for consumer to receive messages
Thread.sleep(200);
assertEquals(consumer.numMessagesInQueue(), receiverQueueSize);

// (3) wait for messages to expire, we should've received more
Thread.sleep(2000);
assertEquals(consumer.numMessagesInQueue(), receiverQueueSize);

for (int i = 0; i < totalProducedMsgs; i++) {
Message msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
consumer.acknowledge(msg);
totalReceiveMsg++;
log.info("Received message: " + new String(msg.getData()));
} else {
break;
}
}

// total received-messages should match to produced messages
assertEquals(totalProducedMsgs, totalReceiveMsg);
producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
fail();
} finally {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}

@Test
public void testUnackBlockRedeliverMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down Expand Up @@ -1594,7 +1653,7 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause()
}).collect(Collectors.toSet());

// (3) redeliver all consumed messages
consumer.redeliverUnacknowledgedMessages(Lists.newArrayList(redeliveryMessages));
consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
Thread.sleep(1000);

Set<MessageIdImpl> messages2 = Sets.newHashSet();
Expand Down Expand Up @@ -1686,7 +1745,7 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
}).collect(Collectors.toSet());

// (3) redeliver all consumed messages
consumer.redeliverUnacknowledgedMessages(Lists.newArrayList(redeliveryMessages));
consumer.redeliverUnacknowledgedMessages(Sets.newHashSet(redeliveryMessages));
Thread.sleep(1000);

Set<MessageIdImpl> messages2 = Sets.newHashSet();
Expand Down
Expand Up @@ -16,6 +16,7 @@
package com.yahoo.pulsar.client.impl;

import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -316,5 +317,5 @@ public String getSubscription() {
* the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
* breaks, the messages are redelivered after reconnect.
*/
protected abstract void redeliverUnacknowledgedMessages(List<MessageIdImpl> messageIds);
protected abstract void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds);
}

0 comments on commit 5e9884c

Please sign in to comment.