Skip to content

Commit

Permalink
Fixed typo in ConsumerBuilder acknowledgmentGroupTime option (#1608)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Apr 23, 2018
1 parent 2ac2b5c commit 9947f74
Show file tree
Hide file tree
Showing 13 changed files with 36 additions and 36 deletions.
Expand Up @@ -1421,7 +1421,7 @@ public void persistentTopicsCursorReset(String topicName) throws Exception {


// create consumer and subscription // create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));


Expand Down Expand Up @@ -1472,7 +1472,7 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep


// create consumer and subscription // create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));


Expand Down Expand Up @@ -1543,7 +1543,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception {


// create consumer and subscription // create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


List<String> topics = admin.persistentTopics().getList("prop-xyz/ns1"); List<String> topics = admin.persistentTopics().getList("prop-xyz/ns1");
assertEquals(topics.size(), 4); assertEquals(topics.size(), 4);
Expand Down
Expand Up @@ -1441,7 +1441,7 @@ public void persistentTopicsCursorReset(String topicName) throws Exception {


// create consumer and subscription // create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));


Expand Down Expand Up @@ -1492,7 +1492,7 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep


// create consumer and subscription // create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));


Expand Down Expand Up @@ -1563,7 +1563,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception {


// create consumer and subscription // create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


List<String> topics = admin.persistentTopics().getList("prop-xyz/use/ns1"); List<String> topics = admin.persistentTopics().getList("prop-xyz/use/ns1");
assertEquals(topics.size(), 4); assertEquals(topics.size(), 4);
Expand Down
Expand Up @@ -142,7 +142,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception {
SubscriptionStats subStats; SubscriptionStats subStats;


Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);


PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
Expand Down Expand Up @@ -219,7 +219,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
SubscriptionStats subStats; SubscriptionStats subStats;


Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);


PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
Expand Down
Expand Up @@ -136,12 +136,12 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception {
TestConsumerStateEventListener listener1 = new TestConsumerStateEventListener(); TestConsumerStateEventListener listener1 = new TestConsumerStateEventListener();
TestConsumerStateEventListener listener2 = new TestConsumerStateEventListener(); TestConsumerStateEventListener listener2 = new TestConsumerStateEventListener();
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover); .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover);




// 1. two consumers on the same subscription // 1. two consumers on the same subscription
ConsumerBuilder<byte[]> consumerBulder1 = consumerBuilder.clone().consumerName("1") ConsumerBuilder<byte[]> consumerBulder1 = consumerBuilder.clone().consumerName("1")
.consumerEventListener(listener1).acknowledmentGroupTime(0, TimeUnit.SECONDS); .consumerEventListener(listener1).acknowledgmentGroupTime(0, TimeUnit.SECONDS);
Consumer<byte[]> consumer1 = consumerBulder1.subscribe(); Consumer<byte[]> consumer1 = consumerBulder1.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("2").consumerEventListener(listener2) Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("2").consumerEventListener(listener2)
.subscribe(); .subscribe();
Expand Down
Expand Up @@ -508,7 +508,7 @@ public void testUnackedCountWithRedeliveries() throws Exception {


ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.receiverQueueSize(10).subscriptionType(SubscriptionType.Shared) .receiverQueueSize(10).subscriptionType(SubscriptionType.Shared)
.acknowledmentGroupTime(0, TimeUnit.SECONDS); .acknowledgmentGroupTime(0, TimeUnit.SECONDS);
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) consumerBuilder.subscribe(); ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) consumerBuilder.subscribe();


for (int i = 0; i < numMsgs; i++) { for (int i = 0; i < numMsgs; i++) {
Expand Down
Expand Up @@ -256,7 +256,7 @@ public void testFailoverSingleAckedNormalTopic() throws Exception {
// 2. Create consumer // 2. Create consumer
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName) ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Failover) .subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Failover)
.acknowledmentGroupTime(0, TimeUnit.SECONDS); .acknowledgmentGroupTime(0, TimeUnit.SECONDS);
Consumer<byte[]> consumer1 = consumerBuilder.clone().consumerName("consumer-1").subscribe(); Consumer<byte[]> consumer1 = consumerBuilder.clone().consumerName("consumer-1").subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("consumer-2").subscribe(); Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("consumer-2").subscribe();


Expand Down
Expand Up @@ -597,7 +597,7 @@ public void testBrokerSubscriptionRecovery(boolean unloadBundleGracefully) throw
final int totalProducedMsgs = 500; final int totalProducedMsgs = 500;


Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriberName) Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriberName)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic") Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic")
.enableBatching(false) .enableBatching(false)
Expand Down Expand Up @@ -703,15 +703,15 @@ public void testBlockBrokerDispatching() throws Exception {


ConsumerImpl<byte[]> consumer1Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) ConsumerImpl<byte[]> consumer1Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize) .subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
// create subscription-2 and 3 // create subscription-2 and 3
ConsumerImpl<byte[]> consumer1Sub2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) ConsumerImpl<byte[]> consumer1Sub2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize) .subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
consumer1Sub2.close(); consumer1Sub2.close();
ConsumerImpl<byte[]> consumer1Sub3 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) ConsumerImpl<byte[]> consumer1Sub3 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName3).receiverQueueSize(receiverQueueSize) .subscriptionName(subscriberName3).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
consumer1Sub3.close(); consumer1Sub3.close();


Producer<byte[]> producer = pulsarClient.newProducer() Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down Expand Up @@ -751,7 +751,7 @@ public void testBlockBrokerDispatching() throws Exception {
// (1.b) consumer2 with same sub should not receive any more messages as subscription is blocked // (1.b) consumer2 with same sub should not receive any more messages as subscription is blocked
ConsumerImpl<byte[]> consumer2Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) ConsumerImpl<byte[]> consumer2Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize) .subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
int consumer2Msgs = 0; int consumer2Msgs = 0;
for (int j = 0; j < totalProducedMsgs; j++) { for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumer2Sub1.receive(100, TimeUnit.MILLISECONDS); msg = consumer2Sub1.receive(100, TimeUnit.MILLISECONDS);
Expand All @@ -776,7 +776,7 @@ public void testBlockBrokerDispatching() throws Exception {
**/ **/
ConsumerImpl<byte[]> consumerSub2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) ConsumerImpl<byte[]> consumerSub2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize) .subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Set<MessageId> messages2 = Sets.newHashSet(); Set<MessageId> messages2 = Sets.newHashSet();
for (int j = 0; j < totalProducedMsgs; j++) { for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumerSub2.receive(100, TimeUnit.MILLISECONDS); msg = consumerSub2.receive(100, TimeUnit.MILLISECONDS);
Expand All @@ -793,7 +793,7 @@ public void testBlockBrokerDispatching() throws Exception {
/** (3) if Subscription3 is acking then it shouldn't be blocked **/ /** (3) if Subscription3 is acking then it shouldn't be blocked **/
consumer1Sub3 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) consumer1Sub3 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName3).receiverQueueSize(receiverQueueSize) .subscriptionName(subscriberName3).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
int consumedMsgsSub3 = 0; int consumedMsgsSub3 = 0;
for (int j = 0; j < totalProducedMsgs; j++) { for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumer1Sub3.receive(100, TimeUnit.MILLISECONDS); msg = consumer1Sub3.receive(100, TimeUnit.MILLISECONDS);
Expand Down
Expand Up @@ -958,7 +958,7 @@ public void testSharedConsumerAckDifferentConsumer() throws Exception {
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/my-topic1").subscriptionName("my-subscriber-name") .topic("persistent://my-property/my-ns/my-topic1").subscriptionName("my-subscriber-name")
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared) .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared)
.acknowledmentGroupTime(0, TimeUnit.SECONDS); .acknowledgmentGroupTime(0, TimeUnit.SECONDS);
Consumer<byte[]> consumer1 = consumerBuilder.subscribe(); Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe(); Consumer<byte[]> consumer2 = consumerBuilder.subscribe();


Expand Down Expand Up @@ -1143,7 +1143,7 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex
Consumer<byte[]> consumer = pulsarClient.newConsumer() Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1") .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared) .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


Producer<byte[]> producer = pulsarClient.newProducer() Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/unacked-topic").create(); .topic("persistent://my-property/my-ns/unacked-topic").create();
Expand Down Expand Up @@ -1307,7 +1307,7 @@ public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exceptio
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer() ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1") .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).ackTimeout(1, TimeUnit.SECONDS) .receiverQueueSize(receiverQueueSize).ackTimeout(1, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


Producer<byte[]> producer = pulsarClient.newProducer() Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/unacked-topic") .topic("persistent://my-property/my-ns/unacked-topic")
Expand Down Expand Up @@ -1885,7 +1885,7 @@ public void testSharedSamePriorityConsumer() throws Exception {
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name") .topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize) .subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
.acknowledmentGroupTime(0, TimeUnit.SECONDS); .acknowledgmentGroupTime(0, TimeUnit.SECONDS);
Consumer<byte[]> c1 = consumerBuilder.subscribe(); Consumer<byte[]> c1 = consumerBuilder.subscribe();
Consumer<byte[]> c2 = consumerBuilder.subscribe(); Consumer<byte[]> c2 = consumerBuilder.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2") Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2")
Expand Down Expand Up @@ -1988,7 +1988,7 @@ public void testRedeliveryFailOverConsumer() throws Exception {
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer() ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1") .topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Failover) .receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Failover)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic") Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic")
.create(); .create();
Expand Down Expand Up @@ -2284,7 +2284,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
Set<String> messageSet = Sets.newHashSet(); Set<String> messageSet = Sets.newHashSet();
Consumer<byte[]> consumer = pulsarClient.newConsumer() Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name") .topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name")
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


// 1. Invalid key name // 1. Invalid key name
try { try {
Expand Down Expand Up @@ -2318,7 +2318,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
consumer.close(); consumer.close();
consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1") consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


int msgNum = 0; int msgNum = 0;
try { try {
Expand All @@ -2339,7 +2339,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
// Set keyreader // Set keyreader
consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1") consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL) .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
.cryptoKeyReader(new EncKeyReader()).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .cryptoKeyReader(new EncKeyReader()).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


for (int i = msgNum; i < totalMsg - 1; i++) { for (int i = msgNum; i < totalMsg - 1; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS); msg = consumer.receive(5, TimeUnit.SECONDS);
Expand All @@ -2356,7 +2356,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
consumer.close(); consumer.close();
consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1") consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD) .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();


// Receive should proceed and discard encrypted messages // Receive should proceed and discard encrypted messages
msg = consumer.receive(5, TimeUnit.SECONDS); msg = consumer.receive(5, TimeUnit.SECONDS);
Expand All @@ -2382,12 +2382,12 @@ public void testConsumerSubscriptionInitialize() throws Exception {


// 2, create consumer // 2, create consumer
Consumer<byte[]> defaultConsumer = pulsarClient.newConsumer().topic(topicName) Consumer<byte[]> defaultConsumer = pulsarClient.newConsumer().topic(topicName)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-default").subscribe(); .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-default").subscribe();
Consumer<byte[]> latestConsumer = pulsarClient.newConsumer().topic(topicName) Consumer<byte[]> latestConsumer = pulsarClient.newConsumer().topic(topicName)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-latest") .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-latest")
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest).subscribe(); .subscriptionInitialPosition(SubscriptionInitialPosition.Latest).subscribe();
Consumer<byte[]> earliestConsumer = pulsarClient.newConsumer().topic(topicName) Consumer<byte[]> earliestConsumer = pulsarClient.newConsumer().topic(topicName)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-earliest") .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-earliest")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();


// 3, produce 5 messages more // 3, produce 5 messages more
Expand Down
Expand Up @@ -105,7 +105,7 @@ public void testTopicInternalStats() throws Exception {
final String topicName = "persistent://my-property/my-ns/my-topic1"; final String topicName = "persistent://my-property/my-ns/my-topic1";
final String subscriptionName = "my-subscriber-name"; final String subscriptionName = "my-subscriber-name";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
final int numberOfMsgs = 1000; final int numberOfMsgs = 1000;
for (int i = 0; i < numberOfMsgs; i++) { for (int i = 0; i < numberOfMsgs; i++) {
Expand Down
Expand Up @@ -49,7 +49,7 @@ public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient client, Pr
} }


if (properties.containsKey(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)) { if (properties.containsKey(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)) {
consumerBuilder.acknowledmentGroupTime( consumerBuilder.acknowledgmentGroupTime(
Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), TimeUnit.MILLISECONDS); Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), TimeUnit.MILLISECONDS);
} }


Expand Down

0 comments on commit 9947f74

Please sign in to comment.