Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

https://issues.apache.org/jira/browse/AMQCPP-473

  • Loading branch information...
commit fd90349e7b242b5a6d770b46ac34594ced160df5 1 parent 95f0712
authored April 05, 2013
5  activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
@@ -212,8 +212,8 @@ namespace core{
212 212
                 properties->getProperty("connection.optimizedAckScheduledAckInterval", Long::toString(optimizedAckScheduledAckInterval)));
213 213
             this->consumerFailoverRedeliveryWaitPeriod = Long::parseLong(
214 214
                 properties->getProperty("connection.consumerFailoverRedeliveryWaitPeriod", Long::toString(consumerFailoverRedeliveryWaitPeriod)));
215  
-            this->nonBlockingRedelivery = Long::parseLong(
216  
-                properties->getProperty("connection.nonBlockingRedelivery", Long::toString(nonBlockingRedelivery)));
  215
+            this->nonBlockingRedelivery = Boolean::parseBoolean(
  216
+                properties->getProperty("connection.nonBlockingRedelivery", Boolean::toString(nonBlockingRedelivery)));
217 217
 
218 218
             this->defaultPrefetchPolicy->configure(*properties);
219 219
             this->defaultRedeliveryPolicy->configure(*properties);
@@ -407,6 +407,7 @@ void ActiveMQConnectionFactory::configureConnection(ActiveMQConnection* connecti
407 407
     connection->setExclusiveConsumer(this->settings->exclusiveConsumer);
408 408
     connection->setTransactedIndividualAck(this->settings->transactedIndividualAck);
409 409
     connection->setUseRetroactiveConsumer(this->settings->useRetroactiveConsumer);
  410
+    connection->setNonBlockingRedelivery(this->settings->nonBlockingRedelivery);
410 411
     connection->setConsumerFailoverRedeliveryWaitPeriod(this->settings->consumerFailoverRedeliveryWaitPeriod);
411 412
 
412 413
     if (this->settings->defaultListener) {
15  activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
@@ -524,6 +524,8 @@ namespace {
524 524
                 Exception wrapper(ex.clone());
525 525
                 this->session->getConnection()->onAsyncException(wrapper);
526 526
             }
  527
+
  528
+            this->consumer.reset(NULL);
527 529
         }
528 530
     };
529 531
 
@@ -560,7 +562,7 @@ namespace {
560 562
     class OptimizedAckTask : public Runnable {
561 563
     private:
562 564
 
563  
-        ActiveMQConsumerKernel* consumer;
  565
+        Pointer<ActiveMQConsumerKernel> consumer;
564 566
         ActiveMQConsumerKernelConfig* impl;
565 567
 
566 568
     private:
@@ -570,7 +572,7 @@ namespace {
570 572
 
571 573
     public:
572 574
 
573  
-        OptimizedAckTask(ActiveMQConsumerKernel* consumer, ActiveMQConsumerKernelConfig* impl) :
  575
+        OptimizedAckTask(Pointer<ActiveMQConsumerKernel> consumer, ActiveMQConsumerKernelConfig* impl) :
574 576
             Runnable(), consumer(consumer), impl(impl) {}
575 577
         virtual ~OptimizedAckTask() {}
576 578
 
@@ -579,8 +581,11 @@ namespace {
579 581
                 if (impl->optimizeAcknowledge && !impl->unconsumedMessages->isClosed()) {
580 582
                     this->consumer->deliverAcks();
581 583
                 }
  584
+
582 585
             } catch(Exception& ex) {
  586
+                impl->session->getConnection()->onAsyncException(ex);
583 587
             }
  588
+            this->consumer.reset(NULL);
584 589
         }
585 590
     };
586 591
 
@@ -618,6 +623,8 @@ namespace {
618 623
             } catch (Exception& e) {
619 624
                 session->getConnection()->onAsyncException(e);
620 625
             }
  626
+
  627
+            this->consumer.reset(NULL);
621 628
         }
622 629
     };
623 630
 }
@@ -1928,7 +1935,9 @@ void ActiveMQConsumerKernel::setOptimizedAckScheduledAckInterval(long long value
1928 1935
 
1929 1936
     // Should we periodically send out all outstanding acks.
1930 1937
     if (this->internal->optimizeAcknowledge && this->internal->optimizedAckScheduledAckInterval > 0) {
1931  
-        this->internal->optimizedAckTask = new OptimizedAckTask(this, this->internal);
  1938
+        Pointer<ActiveMQConsumerKernel> self =
  1939
+            this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
  1940
+        this->internal->optimizedAckTask = new OptimizedAckTask(self, this->internal);
1932 1941
 
1933 1942
         try {
1934 1943
             this->session->getScheduler()->executePeriodically(
2  activemq-cpp/src/test-integration/Makefile.am
@@ -47,6 +47,7 @@ cc_sources = \
47 47
     activemq/test/openwire/OpenwireMapMessageTest.cpp \
48 48
     activemq/test/openwire/OpenwireMessageCompressionTest.cpp \
49 49
     activemq/test/openwire/OpenwireMessagePriorityTest.cpp \
  50
+    activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp \
50 51
     activemq/test/openwire/OpenwireOptimizedAckTest.cpp \
51 52
     activemq/test/openwire/OpenwireQueueBrowserTest.cpp \
52 53
     activemq/test/openwire/OpenwireSimpleRollbackTest.cpp \
@@ -105,6 +106,7 @@ h_sources = \
105 106
     activemq/test/openwire/OpenwireMapMessageTest.h \
106 107
     activemq/test/openwire/OpenwireMessageCompressionTest.h \
107 108
     activemq/test/openwire/OpenwireMessagePriorityTest.h \
  109
+    activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h \
108 110
     activemq/test/openwire/OpenwireOptimizedAckTest.h \
109 111
     activemq/test/openwire/OpenwireQueueBrowserTest.h \
110 112
     activemq/test/openwire/OpenwireSimpleRollbackTest.h \
2  activemq-cpp/src/test-integration/TestRegistry.cpp
@@ -29,6 +29,7 @@
29 29
 #include "activemq/test/openwire/OpenwireMessageCompressionTest.h"
30 30
 #include "activemq/test/openwire/OpenwireMessagePriorityTest.h"
31 31
 #include "activemq/test/openwire/OpenwireMapMessageTest.h"
  32
+#include "activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h"
32 33
 #include "activemq/test/openwire/OpenwireOptimizedAckTest.h"
33 34
 #include "activemq/test/openwire/OpenwireQueueBrowserTest.h"
34 35
 #include "activemq/test/openwire/OpenwireSimpleRollbackTest.h"
@@ -65,6 +66,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireJmsRecoverTes
65 66
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessageCompressionTest );
66 67
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessagePriorityTest );
67 68
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMapMessageTest );
  69
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireNonBlockingRedeliveryTest );
68 70
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireOptimizedAckTest );
69 71
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireQueueBrowserTest );
70 72
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
262  activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
... ...
@@ -0,0 +1,262 @@
  1
+/*
  2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
  3
+ * contributor license agreements.  See the NOTICE file distributed with
  4
+ * this work for additional information regarding copyright ownership.
  5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
  6
+ * (the "License"); you may not use this file except in compliance with
  7
+ * the License.  You may obtain a copy of the License at
  8
+ *
  9
+ *     http://www.apache.org/licenses/LICENSE-2.0
  10
+ *
  11
+ * Unless required by applicable law or agreed to in writing, software
  12
+ * distributed under the License is distributed on an "AS IS" BASIS,
  13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14
+ * See the License for the specific language governing permissions and
  15
+ * limitations under the License.
  16
+ */
  17
+
  18
+#include "OpenwireNonBlockingRedeliveryTest.h"
  19
+
  20
+#include <cms/MessageListener.h>
  21
+#include <cms/ExceptionListener.h>
  22
+
  23
+#include <activemq/commands/Message.h>
  24
+#include <activemq/commands/ProducerId.h>
  25
+#include <activemq/commands/MessageId.h>
  26
+#include <activemq/core/ActiveMQConnectionFactory.h>
  27
+#include <activemq/core/ActiveMQConnection.h>
  28
+#include <activemq/core/ActiveMQConsumer.h>
  29
+#include <activemq/core/PrefetchPolicy.h>
  30
+#include <activemq/exceptions/ActiveMQException.h>
  31
+
  32
+#include <decaf/lang/Thread.h>
  33
+#include <decaf/lang/Pointer.h>
  34
+#include <decaf/util/LinkedList.h>
  35
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
  36
+
  37
+using namespace std;
  38
+using namespace cms;
  39
+using namespace activemq;
  40
+using namespace activemq::commands;
  41
+using namespace activemq::core;
  42
+using namespace activemq::test;
  43
+using namespace activemq::test::openwire;
  44
+using namespace activemq::util;
  45
+using namespace activemq::exceptions;
  46
+using namespace decaf;
  47
+using namespace decaf::lang;
  48
+using namespace decaf::util;
  49
+using namespace decaf::util::concurrent;
  50
+using namespace decaf::util::concurrent::atomic;
  51
+
  52
+////////////////////////////////////////////////////////////////////////////////
  53
+namespace {
  54
+
  55
+    class TestProducer : public Thread {
  56
+    private:
  57
+
  58
+        std::string destinationName;
  59
+        std::string brokerUri;
  60
+        int produceMessages;
  61
+
  62
+    public:
  63
+
  64
+        TestProducer(const std::string& brokerUri,
  65
+                     const std::string& destinationName,
  66
+                     int produceMessages) : Thread(),
  67
+                                            destinationName(destinationName),
  68
+                                            brokerUri(brokerUri),
  69
+                                            produceMessages(produceMessages) {
  70
+        }
  71
+
  72
+        void run() {
  73
+
  74
+            Pointer<ActiveMQConnectionFactory> connectionFactory;
  75
+            Pointer<Connection> connection;
  76
+            Pointer<Session> session;
  77
+            Pointer<Destination> destination;
  78
+
  79
+            try {
  80
+
  81
+                connectionFactory.reset(new ActiveMQConnectionFactory(brokerUri));
  82
+                connection.reset(connectionFactory->createConnection());
  83
+                connection->start();
  84
+                session.reset(connection->createSession(Session::AUTO_ACKNOWLEDGE));
  85
+
  86
+                destination.reset(session->createQueue(destinationName));
  87
+
  88
+                // Create a MessageProducer from the Session to the Topic or Queue
  89
+                Pointer<MessageProducer> producer(session->createProducer(destination.get()));
  90
+                producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
  91
+
  92
+                for (int i = 0; i < produceMessages; i++) {
  93
+                    Pointer<TextMessage> message(session->createTextMessage());
  94
+                    message->setLongProperty("TestTime", System::currentTimeMillis());
  95
+                    try {
  96
+                        producer->send(message.get());
  97
+                    } catch (Exception& deeperException) {
  98
+                    }
  99
+
  100
+                    Thread::sleep(50);
  101
+                }
  102
+            } catch (Exception& e) {
  103
+            }
  104
+
  105
+            try {
  106
+                if (connection != NULL) {
  107
+                    connection->close();
  108
+                }
  109
+            } catch (Exception& e) {
  110
+            }
  111
+        }
  112
+    };
  113
+
  114
+    class TestConsumer : public Thread, public MessageListener {
  115
+    private:
  116
+
  117
+        std::string brokerUri;
  118
+        std::string destinationName;
  119
+        CountDownLatch totalMessages;
  120
+        int expected;
  121
+        int receivedCount;
  122
+        bool rolledBack;
  123
+        bool failed;
  124
+        LinkedList<int>* messages;
  125
+        Pointer<ActiveMQConnectionFactory> connectionFactory;
  126
+        Pointer<Connection> connection;
  127
+        Pointer<Session> session;
  128
+        Pointer<MessageConsumer> consumer;
  129
+
  130
+    public:
  131
+
  132
+        TestConsumer(const std::string& brokerUri,
  133
+                     const std::string& destinationName,
  134
+                     LinkedList<int>* messages,
  135
+                     int totalMessages) : Thread(),
  136
+                                          brokerUri(brokerUri),
  137
+                                          destinationName(destinationName),
  138
+                                          totalMessages(totalMessages),
  139
+                                          expected(totalMessages),
  140
+                                          receivedCount(0),
  141
+                                          rolledBack(false),
  142
+                                          failed(false),
  143
+                                          messages(messages),
  144
+                                          connectionFactory(),
  145
+                                          connection(),
  146
+                                          session(),
  147
+                                          consumer() {
  148
+        }
  149
+
  150
+        bool isFailed() const {
  151
+            return this->failed;
  152
+        }
  153
+
  154
+        virtual void run() {
  155
+            try {
  156
+
  157
+                connectionFactory.reset(new ActiveMQConnectionFactory(brokerUri));
  158
+                connection.reset(connectionFactory->createConnection());
  159
+                session.reset(connection->createSession(Session::SESSION_TRANSACTED));
  160
+
  161
+                Pointer<ActiveMQConnection> amqCon = connection.dynamicCast<ActiveMQConnection>();
  162
+
  163
+                RedeliveryPolicy* policy = amqCon->getRedeliveryPolicy();
  164
+                policy->setInitialRedeliveryDelay(1000);
  165
+                policy->setBackOffMultiplier(-1);
  166
+                policy->setRedeliveryDelay(1000);
  167
+                policy->setUseExponentialBackOff(false);
  168
+                policy->setMaximumRedeliveries(10);
  169
+
  170
+                Pointer<Destination> destination(session->createQueue(destinationName));
  171
+                consumer.reset(session->createConsumer(destination.get()));
  172
+                consumer->setMessageListener(this);
  173
+
  174
+                connection->start();
  175
+
  176
+                if (!totalMessages.await(10, TimeUnit::MINUTES)) {
  177
+                    this->failed = true;
  178
+                }
  179
+
  180
+            } catch (Exception& e) {
  181
+            }
  182
+            try {
  183
+                if (connection != NULL) {
  184
+                    connection->close();
  185
+                }
  186
+            } catch (Exception& e) {
  187
+            }
  188
+        }
  189
+
  190
+        virtual void onMessage(const cms::Message* message) {
  191
+            receivedCount++;
  192
+
  193
+            try {
  194
+
  195
+                const commands::Message* amqMessage =
  196
+                    dynamic_cast<const commands::Message*>(message);
  197
+
  198
+                if (!rolledBack) {
  199
+                    if (++receivedCount == expected / 2) {
  200
+                        rolledBack = true;
  201
+                        session->rollback();
  202
+                    }
  203
+                } else {
  204
+                    Pointer<MessageId> msgId = amqMessage->getMessageId();
  205
+                    messages->add((int)msgId->getProducerSequenceId());
  206
+                    session->commit();
  207
+                    totalMessages.countDown();
  208
+                }
  209
+
  210
+            } catch (Exception& ex) {
  211
+                this->failed = true;
  212
+            }
  213
+        }
  214
+    };
  215
+}
  216
+
  217
+////////////////////////////////////////////////////////////////////////////////
  218
+OpenwireNonBlockingRedeliveryTest::OpenwireNonBlockingRedeliveryTest() {
  219
+}
  220
+
  221
+////////////////////////////////////////////////////////////////////////////////
  222
+OpenwireNonBlockingRedeliveryTest::~OpenwireNonBlockingRedeliveryTest() {
  223
+}
  224
+
  225
+////////////////////////////////////////////////////////////////////////////////
  226
+std::string OpenwireNonBlockingRedeliveryTest::getBrokerURL() const {
  227
+    return activemq::util::IntegrationCommon::getInstance().getOpenwireURL() +
  228
+        "?connection.nonBlockingRedelivery=true";
  229
+}
  230
+
  231
+////////////////////////////////////////////////////////////////////////////////
  232
+void OpenwireNonBlockingRedeliveryTest::testConsumerMessagesAreNotOrdered() {
  233
+
  234
+    LinkedList<int> messages;
  235
+
  236
+    const std::string DEST_NAME = "QUEUE.FOO";
  237
+
  238
+    TestProducer producer(getBrokerURL(), DEST_NAME, 500);
  239
+    TestConsumer consumer(getBrokerURL(), DEST_NAME, &messages, 500);
  240
+
  241
+    producer.start();
  242
+    consumer.start();
  243
+
  244
+    producer.join();
  245
+    consumer.join();
  246
+
  247
+    CPPUNIT_ASSERT(!consumer.isFailed());
  248
+
  249
+    bool ordered = true;
  250
+    int lastId = 0;
  251
+    Pointer<Iterator<int> > sequenceIds(messages.iterator());
  252
+    while (sequenceIds->hasNext()) {
  253
+        int id = sequenceIds->next();
  254
+        if (id != (lastId + 1)) {
  255
+            ordered = false;
  256
+        }
  257
+
  258
+        lastId = id;
  259
+    }
  260
+
  261
+    CPPUNIT_ASSERT(!ordered);
  262
+}
46  activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
... ...
@@ -0,0 +1,46 @@
  1
+/*
  2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
  3
+ * contributor license agreements.  See the NOTICE file distributed with
  4
+ * this work for additional information regarding copyright ownership.
  5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
  6
+ * (the "License"); you may not use this file except in compliance with
  7
+ * the License.  You may obtain a copy of the License at
  8
+ *
  9
+ *     http://www.apache.org/licenses/LICENSE-2.0
  10
+ *
  11
+ * Unless required by applicable law or agreed to in writing, software
  12
+ * distributed under the License is distributed on an "AS IS" BASIS,
  13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14
+ * See the License for the specific language governing permissions and
  15
+ * limitations under the License.
  16
+ */
  17
+
  18
+#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIRENONBLOCKINGREDELIVERYTEST_H_
  19
+#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIRENONBLOCKINGREDELIVERYTEST_H_
  20
+
  21
+#include <activemq/test/MessagePriorityTest.h>
  22
+
  23
+namespace activemq {
  24
+namespace test {
  25
+namespace openwire {
  26
+
  27
+    class OpenwireNonBlockingRedeliveryTest : public MessagePriorityTest {
  28
+
  29
+        CPPUNIT_TEST_SUITE( OpenwireNonBlockingRedeliveryTest );
  30
+        CPPUNIT_TEST( testConsumerMessagesAreNotOrdered );
  31
+        CPPUNIT_TEST_SUITE_END();
  32
+
  33
+    public:
  34
+
  35
+        OpenwireNonBlockingRedeliveryTest();
  36
+        virtual ~OpenwireNonBlockingRedeliveryTest();
  37
+
  38
+        virtual std::string getBrokerURL() const;
  39
+
  40
+        void testConsumerMessagesAreNotOrdered();
  41
+
  42
+    };
  43
+
  44
+}}}
  45
+
  46
+#endif /* _ACTIVEMQ_TEST_OPENWIRE_OPENWIRENONBLOCKINGREDELIVERYTEST_H_ */

0 notes on commit fd90349

Please sign in to comment.
Something went wrong with that request. Please try again.