From 61895223d07ebc58cde504bd57d3b95c0d27b677 Mon Sep 17 00:00:00 2001 From: Thiago Kronig Date: Wed, 10 Jun 2015 13:22:08 -0300 Subject: [PATCH 1/5] ARTEMIS-127 Fixing activemq-unit-tests pom version --- tests/activemq5-unit-tests/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/activemq5-unit-tests/pom.xml b/tests/activemq5-unit-tests/pom.xml index d6d87e8e925..da44d3eb959 100644 --- a/tests/activemq5-unit-tests/pom.xml +++ b/tests/activemq5-unit-tests/pom.xml @@ -20,7 +20,7 @@ org.apache.activemq.tests artemis-tests-pom - 1.0.0-SNAPSHOT + 1.0.1-SNAPSHOT activemq5-unit-tests From aab34e5bd01757c3305691e80d7d3588e5ca0cbe Mon Sep 17 00:00:00 2001 From: Thiago Kronig Date: Wed, 10 Jun 2015 13:23:03 -0300 Subject: [PATCH 2/5] ARTEMIS-127 Fixing sync on non-final object for ArtemisBrokerWrapper --- .../activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index 86580e1c75e..b7397301d2f 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -38,7 +38,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { - protected Map testQueues = new HashMap(); + protected final Map testQueues = new HashMap(); public ArtemisBrokerWrapper(BrokerService brokerService) { From 64ecb9565dea42c7381e9a70a08339cafff7c960 Mon Sep 17 00:00:00 2001 From: Thiago Kronig Date: Wed, 10 Jun 2015 13:24:37 -0300 Subject: [PATCH 3/5] ARTEMIS-127 Use L as suffix for long constants --- ...bRestartJDBCQueueMasterSlaveLeaseTest.java | 2 +- .../broker/region/QueuePurgeTest.java | 2 +- .../cursors/StoreQueueCursorOrderTest.java | 46 +++++++++---------- .../broker/scheduler/JmsSchedulerTest.java | 8 ++-- .../org/apache/activemq/bugs/AMQ2314Test.java | 2 +- .../activemq/bugs/AMQ4485LowLimitTest.java | 8 ++-- ...rokersWithNDestsFanoutTransactionTest.java | 4 +- .../org/apache/activemq/bugs/AMQ4636Test.java | 4 +- .../activemq/bugs/AMQ5266SingleDestTest.java | 2 +- .../bugs/TrapMessageInJDBCStoreTest.java | 4 +- .../command/ActiveMQStreamMessageTest.java | 2 +- .../jdbc/JDBCIOExceptionHandlerTest.java | 12 ++--- .../store/jdbc/JDBCMessagePriorityTest.java | 2 +- .../store/jdbc/LeaseDatabaseLockerTest.java | 2 +- .../store/kahadb/KahaDBFastEnqueueTest.java | 2 +- .../usecases/BacklogNetworkCrossTalkTest.java | 2 +- .../BrokerQueueNetworkWithDisconnectTest.java | 2 +- ...ConcurrentProducerDurableConsumerTest.java | 2 +- .../ConcurrentProducerQueueConsumerTest.java | 2 +- .../usecases/JdbcDurableSubDupTest.java | 2 +- ...MessageGroupReconnectDistributionTest.java | 4 +- ...TempDestDemandSubscriptionCleanupTest.java | 2 +- 22 files changed, 59 insertions(+), 59 deletions(-) diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java index cf4929a5519..422b5ffe44f 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java @@ -43,7 +43,7 @@ protected void configureBroker(BrokerService brokerService) { LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler(); ioExceptionHandler.setIgnoreSQLExceptions(false); ioExceptionHandler.setStopStartConnectors(false); - ioExceptionHandler.setResumeCheckSleepPeriod(500l); + ioExceptionHandler.setResumeCheckSleepPeriod(500L); brokerService.setIoExceptionHandler(ioExceptionHandler); } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java index 91209379ad5..d02085aed68 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java @@ -67,7 +67,7 @@ protected void setUp() throws Exception { broker.setDataDirectoryFile(testDataDir); broker.setUseJmx(true); broker.setDeleteAllMessagesOnStartup(true); - broker.getSystemUsage().getMemoryUsage().setLimit(1024l*1024*64); + broker.getSystemUsage().getMemoryUsage().setLimit(1024L*1024*64); KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); persistenceAdapter.setDirectory(new File(testDataDir, "kahadb")); broker.setPersistenceAdapter(persistenceAdapter); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java index f8fab10a131..e5431c065d4 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java @@ -106,7 +106,7 @@ public void tesBlockedFuture() throws Exception { @Override public void run() { } - }, 2l) {}; + }, 2L) {}; msg.getMessageId().setFutureOrSequenceLong(future); underTest.addMessageLast(msg); @@ -116,12 +116,12 @@ public void run() { msg = getMessage(1); messages[0] = msg; msg.setMemoryUsage(systemUsage.getMemoryUsage()); - msg.getMessageId().setFutureOrSequenceLong(1l); + msg.getMessageId().setFutureOrSequenceLong(1L); underTest.addMessageLast(msg); assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); - assertEquals("setBatch unset", 0l, queueMessageStore.batch.get()); + assertEquals("setBatch unset", 0L, queueMessageStore.batch.get()); int dequeueCount = 0; @@ -171,9 +171,9 @@ public void testNoSetBatchWithUnOrderedFutureCurrentSync() throws Exception { FutureTask future = new FutureTask(new Runnable() { @Override public void run() { - msgRef.getMessageId().setFutureOrSequenceLong(1l); + msgRef.getMessageId().setFutureOrSequenceLong(1L); } - }, 1l) {}; + }, 1L) {}; msg.getMessageId().setFutureOrSequenceLong(future); Executors.newSingleThreadExecutor().submit(future); underTest.addMessageLast(msg); @@ -184,12 +184,12 @@ public void run() { msg = getMessage(1); messages[0] = msg; msg.setMemoryUsage(systemUsage.getMemoryUsage()); - msg.getMessageId().setFutureOrSequenceLong(1l); + msg.getMessageId().setFutureOrSequenceLong(1L); underTest.addMessageLast(msg); assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); - assertEquals("setBatch unset", 0l, queueMessageStore.batch.get()); + assertEquals("setBatch unset", 0L, queueMessageStore.batch.get()); int dequeueCount = 0; @@ -239,9 +239,9 @@ public void testSetBatchWithOrderedFutureCurrentFuture() throws Exception { FutureTask future = new FutureTask(new Runnable() { @Override public void run() { - msgRef.getMessageId().setFutureOrSequenceLong(0l); + msgRef.getMessageId().setFutureOrSequenceLong(0L); } - }, 0l) {}; + }, 0L) {}; msg.getMessageId().setFutureOrSequenceLong(future); Executors.newSingleThreadExecutor().submit(future); underTest.addMessageLast(msg); @@ -257,16 +257,16 @@ public void run() { FutureTask future2 = new FutureTask(new Runnable() { @Override public void run() { - msgRe2f.getMessageId().setFutureOrSequenceLong(1l); + msgRe2f.getMessageId().setFutureOrSequenceLong(1L); } - }, 1l) {}; + }, 1L) {}; msg.getMessageId().setFutureOrSequenceLong(future2); Executors.newSingleThreadExecutor().submit(future2); underTest.addMessageLast(msg); assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); - assertEquals("setBatch set", 1l, queueMessageStore.batch.get()); + assertEquals("setBatch set", 1L, queueMessageStore.batch.get()); int dequeueCount = 0; @@ -316,9 +316,9 @@ public void testSetBatchWithFuture() throws Exception { FutureTask future0 = new FutureTask(new Runnable() { @Override public void run() { - msgRef.getMessageId().setFutureOrSequenceLong(0l); + msgRef.getMessageId().setFutureOrSequenceLong(0L); } - }, 0l) {}; + }, 0L) {}; msg.getMessageId().setFutureOrSequenceLong(future0); underTest.addMessageLast(msg); Executors.newSingleThreadExecutor().submit(future0); @@ -332,9 +332,9 @@ public void run() { FutureTask future1 = new FutureTask(new Runnable() { @Override public void run() { - msgRef1.getMessageId().setFutureOrSequenceLong(3l); + msgRef1.getMessageId().setFutureOrSequenceLong(3L); } - }, 3l) {}; + }, 3L) {}; msg.getMessageId().setFutureOrSequenceLong(future1); underTest.addMessageLast(msg); @@ -342,7 +342,7 @@ public void run() { msg = getMessage(2); messages[1] = msg; msg.setMemoryUsage(systemUsage.getMemoryUsage()); - msg.getMessageId().setFutureOrSequenceLong(1l); + msg.getMessageId().setFutureOrSequenceLong(1L); underTest.addMessageLast(msg); assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); @@ -354,12 +354,12 @@ public void run() { msg = getMessage(3); messages[2] = msg; msg.setMemoryUsage(systemUsage.getMemoryUsage()); - msg.getMessageId().setFutureOrSequenceLong(3l); + msg.getMessageId().setFutureOrSequenceLong(3L); underTest.addMessageLast(msg); assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); - assertEquals("setBatch set", 2l, queueMessageStore.batch.get()); + assertEquals("setBatch set", 2L, queueMessageStore.batch.get()); int dequeueCount = 0; @@ -405,13 +405,13 @@ public void testSetBatch() throws Exception { ActiveMQTextMessage msg = getMessage(0); messages[0] = msg; msg.setMemoryUsage(systemUsage.getMemoryUsage()); - msg.getMessageId().setFutureOrSequenceLong(0l); + msg.getMessageId().setFutureOrSequenceLong(0L); underTest.addMessageLast(msg); msg = getMessage(1); messages[1] = msg; msg.setMemoryUsage(systemUsage.getMemoryUsage()); - msg.getMessageId().setFutureOrSequenceLong(1l); + msg.getMessageId().setFutureOrSequenceLong(1L); underTest.addMessageLast(msg); assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); @@ -419,12 +419,12 @@ public void testSetBatch() throws Exception { msg = getMessage(2); messages[2] = msg; msg.setMemoryUsage(systemUsage.getMemoryUsage()); - msg.getMessageId().setFutureOrSequenceLong(2l); + msg.getMessageId().setFutureOrSequenceLong(2L); underTest.addMessageLast(msg); assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); - assertEquals("setBatch set", 2l, queueMessageStore.batch.get()); + assertEquals("setBatch set", 2L, queueMessageStore.batch.get()); int dequeueCount = 0; diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java index 0ce584d1e59..41b0f0dc02a 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java @@ -260,7 +260,7 @@ public void onMessage(Message message) { // wait for the producer to block, which should happen immediately, and also wait long // enough for the delay to elapse. We should see no deliveries as the send should block // on the first message. - Thread.sleep(10000l); + Thread.sleep(10000L); assertEquals(100, latch.getCount()); @@ -268,7 +268,7 @@ public void onMessage(Message message) { broker.getSystemUsage().getJobSchedulerUsage().setLimit(1024 * 1024 * 33); // Wait long enough that the messages are enqueued and the delivery delay has elapsed. - Thread.sleep(10000l); + Thread.sleep(10000L); // Make sure we sent all the messages we expected to send Wait.waitFor(new Wait.Condition() { @@ -276,12 +276,12 @@ public void onMessage(Message message) { public boolean isSatisified() throws Exception { return producer.getSentCount() == producer.getMessageCount(); } - }, 20000l); + }, 20000L); assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount()); // Make sure we got all the messages we expected to get - latch.await(20000l, TimeUnit.MILLISECONDS); + latch.await(20000L, TimeUnit.MILLISECONDS); assertEquals("Consumer did not receive all messages.", 0, latch.getCount()); } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java index 369385c8b3f..08be5db1e6f 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java @@ -157,7 +157,7 @@ public void setUp() throws Exception { broker.setUseJmx(true); broker.setAdvisorySupport(false); broker.setDeleteAllMessagesOnStartup(true); - broker.getSystemUsage().getMemoryUsage().setLimit(1024l*1024*64); + broker.getSystemUsage().getMemoryUsage().setLimit(1024L*1024*64); broker.addConnector("tcp://localhost:0").setName("Default"); broker.start(); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java index 21c389f3907..7c549b4ba0c 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java @@ -101,14 +101,14 @@ protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws addNetworkConnector(broker); } broker.setSchedulePeriodForDestinationPurge(0); - broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024l); + broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024L); PolicyMap policyMap = new PolicyMap(); PolicyEntry policyEntry = new PolicyEntry(); policyEntry.setExpireMessagesPeriod(0); policyEntry.setQueuePrefetch(1000); - policyEntry.setMemoryLimit(2 * 1024 * 1024l); + policyEntry.setMemoryLimit(2 * 1024 * 1024L); policyEntry.setProducerFlowControl(false); policyEntry.setEnableAudit(true); policyEntry.setUseCache(true); @@ -117,7 +117,7 @@ protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws PolicyEntry inPolicyEntry = new PolicyEntry(); inPolicyEntry.setExpireMessagesPeriod(0); inPolicyEntry.setQueuePrefetch(1000); - inPolicyEntry.setMemoryLimit(5 * 1024 * 1024l); + inPolicyEntry.setMemoryLimit(5 * 1024 * 1024L); inPolicyEntry.setProducerFlowControl(true); inPolicyEntry.setEnableAudit(true); inPolicyEntry.setUseCache(true); @@ -252,7 +252,7 @@ public boolean isSatisified() throws Exception { } return true; } - }, 1000 * 60 * 1000l, 20*1000)); + }, 1000 * 60 * 1000L, 20*1000)); assertTrue("No exceptions:" + exceptions, exceptions.isEmpty()); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java index c2cf53ae2a5..8e4e4b7a968 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java @@ -98,7 +98,7 @@ protected BrokerService createBroker(int brokerid) throws Exception { PolicyEntry policyEntry = new PolicyEntry(); policyEntry.setExpireMessagesPeriod(0); policyEntry.setQueuePrefetch(1000); - policyEntry.setMemoryLimit(1024 * 1024l); + policyEntry.setMemoryLimit(1024 * 1024L); policyEntry.setOptimizedDispatch(false); policyEntry.setProducerFlowControl(false); policyEntry.setEnableAudit(true); @@ -171,7 +171,7 @@ public boolean isSatisified() throws Exception { } return true; } - }, 1000 * 60 * 1000l)); + }, 1000 * 60 * 1000L)); assertTrue("No exceptions:" + exceptions, exceptions.isEmpty()); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java index b9246fbe005..1e0ccb91275 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java @@ -101,9 +101,9 @@ protected BrokerService createBroker() throws Exception { JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter(); jdbc.setDataSource(embeddedDataSource); - jdbc.setLockKeepAlivePeriod(1000l); + jdbc.setLockKeepAlivePeriod(1000L); LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); - leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); + leaseDatabaseLocker.setLockAcquireSleepInterval(2000L); jdbc.setLocker(leaseDatabaseLocker); broker = new BrokerService(); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java index 0d7f44bfee2..ba7ee4dd4da 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java @@ -227,7 +227,7 @@ public void test() throws Exception { } // verify empty dlq - assertEquals("No pending messages", 0l, ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); + assertEquals("No pending messages", 0L, ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); } public class ExportQueuePublisher { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java index 688d066d2ae..ad6df7fc000 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java @@ -82,9 +82,9 @@ protected BrokerService createBroker(boolean withJMX) throws Exception { jdbc.setCleanupPeriod(0); testTransactionContext = new TestTransactionContext(jdbc); - jdbc.setLockKeepAlivePeriod(1000l); + jdbc.setLockKeepAlivePeriod(1000L); LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); - leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); + leaseDatabaseLocker.setLockAcquireSleepInterval(2000L); jdbc.setLocker(leaseDatabaseLocker); broker.setPersistenceAdapter(jdbc); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java index 9e0f4684161..e042217cedc 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java @@ -979,7 +979,7 @@ public void testWriteObject() { message.writeObject(new Byte((byte) 2)); message.writeObject(new Short((short) 2)); message.writeObject(new Integer(2)); - message.writeObject(new Long(2l)); + message.writeObject(new Long(2L)); message.writeObject(new Float(2.0f)); message.writeObject(new Double(2.0d)); }catch(Exception e) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java index df10d73e891..2502110a6e7 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java @@ -71,17 +71,17 @@ protected BrokerService createBroker(String name, boolean withJMX, boolean lease JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); jdbc.setDataSource(dataSource); - jdbc.setLockKeepAlivePeriod(1000l); + jdbc.setLockKeepAlivePeriod(1000L); if (leaseLocker) { LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); leaseDatabaseLocker.setHandleStartException(true); - leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); + leaseDatabaseLocker.setLockAcquireSleepInterval(2000L); jdbc.setLocker(leaseDatabaseLocker); } broker.setPersistenceAdapter(jdbc); LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler(); - ioExceptionHandler.setResumeCheckSleepPeriod(1000l); + ioExceptionHandler.setResumeCheckSleepPeriod(1000L); ioExceptionHandler.setStopStartConnectors(startStopConnectors); broker.setIoExceptionHandler(ioExceptionHandler); String connectionUri = broker.addConnector(TRANSPORT_URL).getPublishableConnectString(); @@ -129,18 +129,18 @@ public void run() { JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); jdbc.setDataSource(dataSource); - jdbc.setLockKeepAlivePeriod(1000l); + jdbc.setLockKeepAlivePeriod(1000L); if (lease) { LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); leaseDatabaseLocker.setHandleStartException(true); - leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); + leaseDatabaseLocker.setLockAcquireSleepInterval(2000L); jdbc.setLocker(leaseDatabaseLocker); } broker.setPersistenceAdapter(jdbc); LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler(); - ioExceptionHandler.setResumeCheckSleepPeriod(1000l); + ioExceptionHandler.setResumeCheckSleepPeriod(1000L); ioExceptionHandler.setStopStartConnectors(false); broker.setIoExceptionHandler(ioExceptionHandler); slave.set(broker); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java index 34796a4a3b5..2399738ea96 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java @@ -150,7 +150,7 @@ public void testConcurrentDurableSubsReconnectWithXLevels() throws Exception { for (int priority = 0; priority < maxPriority; priority++) { producers.add(new ProducerThread(topic, MSG_NUM, priority)); messageCounts[priority] = new AtomicInteger(0); - messageIds[priority] = 1l; + messageIds[priority] = 1L; } for (ProducerThread producer : producers) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java index 774d172bb1c..6042ae6e539 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java @@ -188,7 +188,7 @@ public void run() { statement = connection.prepareStatement(jdbc.getStatements().getLeaseUpdateStatement()); statement.setString(1, null); - statement.setLong(2, 0l); + statement.setLong(2, 0L); statement.setString(3, fakeId); assertEquals("we released " + fakeId, 1, statement.executeUpdate()); LOG.info("released " + fakeId); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java index 15abe3d7643..57530da7e26 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java @@ -189,7 +189,7 @@ private void publishMessages(AtomicLong count, int expiry) throws Exception { MessageProducer producer = session.createProducer(destination); Long start = System.currentTimeMillis(); - long i = 0l; + long i = 0L; while ( (i=count.getAndDecrement()) > 0) { Message message = null; if (useBytesMessage) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/BacklogNetworkCrossTalkTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/BacklogNetworkCrossTalkTest.java index 3e9b91362ae..b56c5b01342 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/BacklogNetworkCrossTalkTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/BacklogNetworkCrossTalkTest.java @@ -69,7 +69,7 @@ public void testProduceConsume() throws Exception { MessageConsumer clientB = createConsumer("B", destA); - final long maxWait = 5 * 60 * 1000l; + final long maxWait = 5 * 60 * 1000L; MessageIdList listA = getConsumerMessages("A", clientA); listA.setMaximumDuration(maxWait); listA.waitForMessagesToArrive(numMessages); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java index 0b8de2905a0..e32a1bc80c7 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java @@ -88,7 +88,7 @@ public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception { @SuppressWarnings("unchecked") public void testNoStuckConnectionsWithTransportDisconnect() throws Exception { - inactiveDuration=60000l; + inactiveDuration=60000L; useDuplexNetworkBridge = true; bridgeBrokers(SPOKE, HUB); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java index a5233ee14be..e1035a6350c 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java @@ -405,7 +405,7 @@ class TimedMessageListener implements MessageListener { final int batchSize = 1000; CountDownLatch firstReceiptLatch = new CountDownLatch(1); long mark = System.currentTimeMillis(); - long firstReceipt = 0l; + long firstReceipt = 0L; long receiptAccumulator = 0; long batchReceiptAccumulator = 0; long maxReceiptTime = 0; diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java index e9e3b54dea4..34807c64f2f 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java @@ -362,7 +362,7 @@ static class TimedMessageListener implements MessageListener { final CountDownLatch firstReceiptLatch = new CountDownLatch(1); long mark = System.currentTimeMillis(); - long firstReceipt = 0l; + long firstReceipt = 0L; long receiptAccumulator = 0; long batchReceiptAccumulator = 0; long maxReceiptTime = 0; diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java index 0cb2648bf2f..1b3be069c28 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java @@ -239,7 +239,7 @@ public void run() { ActiveMQConnectionFactory factory; MessageProducer messageProducer; - long timeToLive = 0l; + long timeToLive = 0L; TextMessage message = null; diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java index da2f367c48c..bbc3a7aec55 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.java @@ -122,8 +122,8 @@ public void testReconnect() throws Exception { final ArrayList batchCounters = new ArrayList(numConsumers); for (int i = 0; i < numConsumers; i++) { - consumedCounters.add(new AtomicLong(0l)); - batchCounters.add(new AtomicLong(0l)); + consumedCounters.add(new AtomicLong(0L)); + batchCounters.add(new AtomicLong(0L)); final int id = i; executorService.submit(new Runnable() { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempDestDemandSubscriptionCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempDestDemandSubscriptionCleanupTest.java index 08435df1b25..aa1b89b35d6 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempDestDemandSubscriptionCleanupTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempDestDemandSubscriptionCleanupTest.java @@ -122,7 +122,7 @@ public void run() { threadService.submit(tester); threadService.shutdown(); - assertTrue("executor done on time", threadService.awaitTermination(30l, TimeUnit.SECONDS)); + assertTrue("executor done on time", threadService.awaitTermination(30L, TimeUnit.SECONDS)); // for the real test... we should not have any subscriptions left on broker C for the temp dests BrokerItem brokerC = brokers.get(BROKER_C); From ae6a2b87eaa9325d2437db2ccd5152f6db0d2ad3 Mon Sep 17 00:00:00 2001 From: Thiago Kronig Date: Wed, 10 Jun 2015 20:37:58 -0300 Subject: [PATCH 4/5] ARTEMIS-127 Fix some concurrency idioms for ActimeMQ Tests --- .../JmsCreateConsumerInOnMessageTest.java | 6 ++-- .../JmsMultipleClientsTestSupport.java | 4 +-- .../activemq/LargeMessageTestSupport.java | 8 ++--- .../OnePrefetchAsyncConsumerTest.java | 2 +- .../region/QueueResendDuringShutdownTest.java | 4 +-- .../org/apache/activemq/bugs/AMQ2149Test.java | 2 +- .../org/apache/activemq/bugs/AMQ4607Test.java | 2 +- .../apache/activemq/bugs/CraigsBugTest.java | 7 ++-- .../activemq/bugs/amq1974/TryJmsClient.java | 5 ++- .../activemq/bugs/amq1974/TryJmsManager.java | 5 ++- .../apache/activemq/spring/ConsumerBean.java | 33 +++++++++++-------- .../activemq/spring/SpringConsumer.java | 8 ++--- .../store/kahadb/plist/PListTest.java | 2 +- .../activemq/streams/JMSInputStreamTest.java | 2 +- .../activemq/transport/TopicClusterTest.java | 4 +-- .../transport/failover/AMQ1925Test.java | 27 ++++----------- .../transport/udp/UdpTestSupport.java | 6 ++-- ...ConcurrentProducerDurableConsumerTest.java | 14 +++----- .../ConcurrentProducerQueueConsumerTest.java | 14 +++----- .../MultiBrokersMultiClientsTest.java | 2 +- .../NoDuplicateOnTopicNetworkTest.java | 2 +- .../usecases/ReliableReconnectTest.java | 8 ++--- .../VerifyNetworkConsumersDisconnectTest.java | 2 +- .../apache/activemq/util/MessageIdList.java | 32 +++++++++++------- 24 files changed, 95 insertions(+), 106 deletions(-) diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java index 7a219e2cc36..c0a4f5fd683 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java @@ -36,7 +36,7 @@ public class JmsCreateConsumerInOnMessageTest extends TestSupport implements Mes private MessageConsumer testConsumer; private MessageProducer producer; private Topic topic; - private Object lock = new Object(); + private final Object lock = new Object(); /* * @see junit.framework.TestCase#setUp() @@ -71,8 +71,8 @@ protected void tearDown() throws Exception { public void testCreateConsumer() throws Exception { Message msg = super.createMessage(); producer.send(msg); - if (testConsumer == null) { - synchronized (lock) { + synchronized (lock) { + while(testConsumer == null) { lock.wait(3000); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java index 5eaab8dda78..5c73a6e4e05 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java @@ -83,8 +83,6 @@ public class JmsMultipleClientsTestSupport { protected List connections = Collections.synchronizedList(new ArrayList()); protected MessageIdList allMessagesList = new MessageIdList(); - private AtomicInteger producerLock; - protected void startProducers(Destination dest, int msgCount) throws Exception { startProducers(createConnectionFactory(), dest, msgCount); } @@ -92,7 +90,7 @@ protected void startProducers(Destination dest, int msgCount) throws Exception { protected void startProducers(final ConnectionFactory factory, final Destination dest, final int msgCount) throws Exception { // Use concurrent send if (useConcurrentSend) { - producerLock = new AtomicInteger(producerCount); + final AtomicInteger producerLock = new AtomicInteger(producerCount); for (int i = 0; i < producerCount; i++) { Thread t = new Thread(new Runnable() { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java index fc772185cf1..d1ab8a5ba7f 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java @@ -61,7 +61,7 @@ public class LargeMessageTestSupport extends ClientTestSupport implements Messag protected int deliveryMode = DeliveryMode.PERSISTENT; protected IdGenerator idGen = new IdGenerator(); protected boolean validMessageConsumption = true; - protected AtomicInteger messageCount = new AtomicInteger(0); + protected final AtomicInteger messageCount = new AtomicInteger(0); protected int prefetchValue = 10000000; @@ -182,9 +182,9 @@ public void testLargeMessages() throws Exception { producer.send(msg); } long now = System.currentTimeMillis(); - while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) { - LOG.info("message count = " + messageCount); - synchronized (messageCount) { + synchronized (messageCount) { + while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) { + LOG.info("message count = " + messageCount); messageCount.wait(1000); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java index 085119847f2..26c6bf102aa 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java @@ -154,7 +154,7 @@ public ServerSession getServerSession() throws JMSException { } private class TestServerSession implements ServerSession { - TestServerSessionPool pool; + final TestServerSessionPool pool; Session session; public TestServerSession(TestServerSessionPool pool) throws JMSException { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java index 0439fa847c5..c7154a97681 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java @@ -54,7 +54,7 @@ public class QueueResendDuringShutdownTest { private Connection producerConnection; private Queue queue; - private Object messageReceiveSync = new Object(); + private final Object messageReceiveSync = new Object(); private int receiveCount; @Before @@ -239,7 +239,7 @@ protected void delay (long delayMs, String desc) { protected void waitForMessage (long delayMs) { try { synchronized ( this.messageReceiveSync ) { - if ( this.receiveCount == 0 ) { + while ( this.receiveCount == 0 ) { this.messageReceiveSync.wait(delayMs); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java index b2eba613665..c28d3ad2951 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java @@ -563,7 +563,7 @@ private void verifyOrderedMessageReceipt(byte destinationType, int concurrentPai } class TeardownTask implements Callable { - private Object brokerLock; + private final Object brokerLock; private BrokerService broker; public TeardownTask(Object brokerLock, BrokerService broker) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java index 265b692e6f3..b567c93c051 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java @@ -49,7 +49,7 @@ public class AMQ4607Test extends JmsMultipleBrokersTestSupport implements Uncaug public boolean duplex = true; protected Map consumerMap; - Map unhandeledExceptions = new HashMap(); + final Map unhandeledExceptions = new HashMap(); private void assertNoUnhandeledExceptions() { for( Entry e: unhandeledExceptions.entrySet()) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java index f956da6d75c..d71a9e42a6c 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java @@ -25,6 +25,9 @@ import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.command.ActiveMQQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + public class CraigsBugTest extends EmbeddedBrokerTestSupport { private String connectionUri; @@ -49,9 +52,7 @@ public void run() { conn.start(); try { - synchronized (this) { - wait(3000); - } + new CountDownLatch(1).await(3, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java index c8b4503c4d3..1f25109d5cd 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java @@ -25,6 +25,7 @@ import javax.jms.*; import java.io.File; import java.net.URISyntaxException; +import java.util.concurrent.CountDownLatch; public class TryJmsClient { @@ -59,9 +60,7 @@ private void start() throws Exception { startMessageSend(); - synchronized(this) { - this.wait(); - } + new CountDownLatch(1).await(); } private void startUsageMonitor(final BrokerService brokerService) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java index 3f5898719d6..c8eb7b34d84 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java @@ -25,6 +25,7 @@ import javax.jms.*; import java.io.File; import java.net.URISyntaxException; +import java.util.concurrent.CountDownLatch; public class TryJmsManager { @@ -59,9 +60,7 @@ private void start() throws Exception { startMessageConsumer(); - synchronized(this) { - this.wait(); - } + new CountDownLatch(1).await(); } private void startUsageMonitor(final BrokerService brokerService) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java index 8f22c33d403..4e1ab59b32b 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java @@ -73,14 +73,19 @@ public void waitForMessageToArrive() { long start = System.currentTimeMillis(); - try { - if (hasReceivedMessage()) { - synchronized (messages) { + synchronized(messages) + { + try + { + while (hasReceivedMessage()) + { messages.wait(4000); } } - } catch (InterruptedException e) { - LOG.info("Caught: " + e); + catch (InterruptedException e) + { + LOG.info("Caught: " + e); + } } long end = System.currentTimeMillis() - start; @@ -101,18 +106,18 @@ public void waitForMessagesToArrive(int messageCount,long maxWaitTime) { LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive"); long start = System.currentTimeMillis(); long endTime = start + maxWaitTime; - while (maxRemainingMessageCount > 0) { - try { - synchronized (messages) { + synchronized (messages) { + while (maxRemainingMessageCount > 0) { + try { messages.wait(1000); + if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) { + break; + } + } catch (InterruptedException e) { + LOG.info("Caught: " + e); } - if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) { - break; - } - } catch (InterruptedException e) { - LOG.info("Caught: " + e); + maxRemainingMessageCount = Math.max(0, messageCount - messages.size()); } - maxRemainingMessageCount = Math.max(0, messageCount - messages.size()); } long end = System.currentTimeMillis() - start; LOG.info("End of wait for " + end + " millis"); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java index 118e0361c83..ed0a48af8b7 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java @@ -43,13 +43,13 @@ public void start() throws JMSException { try { ConnectionFactory factory = template.getConnectionFactory(); - connection = factory.createConnection(); + final Connection c = connection = factory.createConnection(); // we might be a reusable connection in spring // so lets only set the client ID once if its not set - synchronized (connection) { - if (connection.getClientID() == null) { - connection.setClientID(myId); + synchronized (c) { + if (c.getClientID() == null) { + c.setClientID(myId); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java index 555503e8e79..71e46186916 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java @@ -617,7 +617,7 @@ public void run() { } } - Map locks = new HashMap(); + final Map locks = new HashMap(); private Object plistLocks(PList plist) { Object lock = null; diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java index f3926628f1e..b07c8cce6be 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java @@ -250,7 +250,7 @@ public void run() { } out.flush(); synchronized (complete) { - if (!complete.get()) { + while (!complete.get()) { complete.wait(30000); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java index 4db7c239c8d..26c215a47ba 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java @@ -55,7 +55,7 @@ public class TopicClusterTest extends TestCase implements MessageListener { protected Destination destination; protected boolean topic = true; - protected AtomicInteger receivedMessageCount = new AtomicInteger(0); + protected final AtomicInteger receivedMessageCount = new AtomicInteger(0); protected int deliveryMode = DeliveryMode.NON_PERSISTENT; protected MessageProducer[] producers; protected Connection[] connections; @@ -166,7 +166,7 @@ public void testSendReceive() throws Exception { } } synchronized (receivedMessageCount) { - if (receivedMessageCount.get() < expectedReceiveCount()) { + while (receivedMessageCount.get() < expectedReceiveCount()) { receivedMessageCount.wait(20000); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java index dfb5dfde47a..d03dbcd41c7 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; @@ -73,14 +74,12 @@ public void XtestAMQ1925_TXInProgress() throws Exception { // The runnable is likely to interrupt during the session#commit, since // this takes the longest - final Object starter = new Object(); + final CountDownLatch starter = new CountDownLatch(1); final AtomicBoolean restarted = new AtomicBoolean(); new Thread(new Runnable() { public void run() { try { - synchronized (starter) { - starter.wait(); - } + starter.await(); // Simulate broker failure & restart bs.stop(); @@ -97,9 +96,6 @@ public void run() { } }).start(); - synchronized (starter) { - starter.notifyAll(); - } for (int i = 0; i < MESSAGE_COUNT; i++) { Message message = consumer.receive(500); assertNotNull("No Message " + i + " found", message); @@ -108,9 +104,7 @@ public void run() { assertFalse("Timing problem, restarted too soon", restarted .get()); if (i == 10) { - synchronized (starter) { - starter.notifyAll(); - } + starter.countDown(); } if (i > MESSAGE_COUNT - 100) { assertTrue("Timing problem, restarted too late", restarted @@ -143,14 +137,12 @@ public void XtestAMQ1925_TXInProgress_TwoConsumers() throws Exception { // The runnable is likely to interrupt during the session#commit, since // this takes the longest - final Object starter = new Object(); + final CountDownLatch starter = new CountDownLatch(1); final AtomicBoolean restarted = new AtomicBoolean(); new Thread(new Runnable() { public void run() { try { - synchronized (starter) { - starter.wait(); - } + starter.await(); // Simulate broker failure & restart bs.stop(); @@ -167,9 +159,6 @@ public void run() { } }).start(); - synchronized (starter) { - starter.notifyAll(); - } Collection results = new ArrayList(MESSAGE_COUNT); for (int i = 0; i < MESSAGE_COUNT; i++) { Message message1 = consumer1.receive(20); @@ -191,9 +180,7 @@ public void run() { assertFalse("Timing problem, restarted too soon", restarted .get()); if (i == 10) { - synchronized (starter) { - starter.notifyAll(); - } + starter.countDown(); } if (i > MESSAGE_COUNT - 50) { assertTrue("Timing problem, restarted too late", restarted diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java index 1d770de24e8..7defe954ce5 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java @@ -46,7 +46,7 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen protected Transport producer; protected Transport consumer; - protected Object lock = new Object(); + protected final Object lock = new Object(); protected Command receivedCommand; protected TransportServer server; protected boolean large; @@ -251,10 +251,10 @@ protected Command assertCommandReceived() throws InterruptedException { Command answer = null; synchronized (lock) { answer = receivedCommand; - if (answer == null) { + while (answer == null) { lock.wait(waitForCommandTimeout); + answer = receivedCommand; } - answer = receivedCommand; } assertNotNull("Should have received a Command by now!", answer); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java index e1035a6350c..0e71dfea3a0 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java @@ -107,7 +107,7 @@ public void testSendRateWithActivatingConsumers() throws Exception { // periodically start a durable sub that has a backlog final int consumersToActivate = 5; - final Object addConsumerSignal = new Object(); + final CountDownLatch addConsumerSignal = new CountDownLatch(1); Executors.newCachedThreadPool(new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -120,9 +120,7 @@ public void run() { MessageConsumer consumer = null; for (int i = 0; i < consumersToActivate; i++) { LOG.info("Waiting for add signal from producer..."); - synchronized (addConsumerSignal) { - addConsumerSignal.wait(30 * 60 * 1000); - } + addConsumerSignal.await(30, TimeUnit.MINUTES); TimedMessageListener listener = new TimedMessageListener(); consumer = createDurableSubscriber(factory.createConnection(), destination, "consumer" + (i + 1)); LOG.info("Created consumer " + consumer); @@ -254,7 +252,7 @@ private double[] produceMessages(Destination destination, final int numIterations, Session session, MessageProducer producer, - Object addConsumerSignal) throws Exception { + CountDownLatch addConsumerSignal) throws Exception { long start; long count = 0; double batchMax = 0, max = 0, sum = 0; @@ -269,10 +267,8 @@ private double[] produceMessages(Destination destination, max = Math.max(max, (System.currentTimeMillis() - singleSendstart)); if (++count % 500 == 0) { if (addConsumerSignal != null) { - synchronized (addConsumerSignal) { - addConsumerSignal.notifyAll(); - LOG.info("Signalled add consumer"); - } + addConsumerSignal.countDown(); + LOG.info("Signalled add consumer"); } }; if (count % 5000 == 0) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java index 34807c64f2f..931fb55f08d 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java @@ -95,7 +95,7 @@ public void testSendRateWithActivatingConsumers() throws Exception { // periodically start a queue consumer final int consumersToActivate = 5; - final Object addConsumerSignal = new Object(); + final CountDownLatch addConsumerSignal = new CountDownLatch(1); Executors.newCachedThreadPool(new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -108,9 +108,7 @@ public void run() { MessageConsumer consumer = null; for (int i = 0; i < consumersToActivate; i++) { LOG.info("Waiting for add signal from producer..."); - synchronized (addConsumerSignal) { - addConsumerSignal.wait(30 * 60 * 1000); - } + addConsumerSignal.await(30, TimeUnit.MINUTES); TimedMessageListener listener = new TimedMessageListener(); consumer = createConsumer(factory.createConnection(), destination); LOG.info("Created consumer " + consumer); @@ -241,7 +239,7 @@ private double[] produceMessages(Destination destination, final int numIterations, Session session, MessageProducer producer, - Object addConsumerSignal) throws Exception { + CountDownLatch addConsumerSignal) throws Exception { long start; long count = 0; double batchMax = 0, max = 0, sum = 0; @@ -257,10 +255,8 @@ private double[] produceMessages(Destination destination, max = Math.max(max, (System.currentTimeMillis() - singleSendstart)); if (++count % 500 == 0) { if (addConsumerSignal != null) { - synchronized (addConsumerSignal) { - addConsumerSignal.notifyAll(); - LOG.info("Signalled add consumer"); - } + addConsumerSignal.countDown(); + LOG.info("Signalled add consumer"); } } ; diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java index bd5c4c89ce7..df02d9e7ee0 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java @@ -44,7 +44,7 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport private static final Logger LOG = LoggerFactory.getLogger(MultiBrokersMultiClientsTest.class); protected Map consumerMap; - Map unhandeledExceptions = new HashMap(); + final Map unhandeledExceptions = new HashMap(); public void testTopicAllConnected() throws Exception { bridgeAllBrokers(); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java index 2aa614d9c14..6d28f3483b0 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java @@ -265,7 +265,7 @@ class TopicWithDuplicateMessages { private MessageConsumer consumer; private final String durableID = "DURABLE_ID"; - private List receivedStrings = Collections.synchronizedList(new ArrayList()); + private final List receivedStrings = Collections.synchronizedList(new ArrayList()); private int numMessages = 10; private CountDownLatch recievedLatch = new CountDownLatch(numMessages); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java index 05fd5f847a8..9dc00328166 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java @@ -47,8 +47,8 @@ public class ReliableReconnectTest extends org.apache.activemq.TestSupport { protected int deliveryMode = DeliveryMode.PERSISTENT; protected String consumerClientId; protected Destination destination; - protected AtomicBoolean closeBroker = new AtomicBoolean(false); - protected AtomicInteger messagesReceived = new AtomicInteger(0); + protected final AtomicBoolean closeBroker = new AtomicBoolean(false); + protected final AtomicInteger messagesReceived = new AtomicInteger(0); protected BrokerService broker; protected int firstBatch = MESSAGE_COUNT / 10; private IdGenerator idGen = new IdGenerator(); @@ -159,7 +159,7 @@ public void testReconnect() throws Exception { connection.close(); spawnConsumer(); synchronized (closeBroker) { - if (!closeBroker.get()) { + while (!closeBroker.get()) { closeBroker.wait(); } } @@ -168,7 +168,7 @@ public void testReconnect() throws Exception { startBroker(false); // System.err.println("Started Broker again"); synchronized (messagesReceived) { - if (messagesReceived.get() < MESSAGE_COUNT) { + while (messagesReceived.get() < MESSAGE_COUNT) { messagesReceived.wait(60000); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java index 9eeb28c5272..eb5a3e28c6d 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java @@ -51,7 +51,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest public static final int TIMEOUT = 30000; protected Map consumerMap; - Map unhandledExceptions = new HashMap(); + final Map unhandledExceptions = new HashMap(); private void assertNoUnhandledExceptions() { for( Entry e: unhandledExceptions.entrySet()) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java index 7140a8650c8..c644c67dd72 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java @@ -140,20 +140,28 @@ public void waitForMessagesToArrive(int messageCount) { long start = System.currentTimeMillis(); - for (int i = 0; i < messageCount; i++) { - try { - if (hasReceivedMessages(messageCount)) { - break; - } - long duration = System.currentTimeMillis() - start; - if (duration >= maximumDuration) { - break; - } - synchronized (semaphore) { + synchronized (semaphore) + { + for (int i = 0; i < messageCount; i++) + { + try + { + if (hasReceivedMessages(messageCount)) + { + break; + } + long duration = System.currentTimeMillis() - start; + if (duration >= maximumDuration) + { + break; + } + semaphore.wait(maximumDuration - duration); } - } catch (InterruptedException e) { - LOG.info("Caught: " + e); + catch (InterruptedException e) + { + LOG.info("Caught: " + e); + } } } long end = System.currentTimeMillis() - start; From 4dd54080a1e91463ad4b6693fb1e62ad3a81124a Mon Sep 17 00:00:00 2001 From: Thiago Kronig Date: Fri, 12 Jun 2015 01:27:54 -0300 Subject: [PATCH 5/5] ARTEMIS-127 Fix Array.toString(), nonatomic update on volatiles --- .../broker/policy/AbortSlowConsumer1Test.java | 4 +- .../org/apache/activemq/bugs/AMQ2149Test.java | 112 ++++++++++-------- .../org/apache/activemq/bugs/AMQ3779Test.java | 2 +- .../command/ActiveMQTextMessageTest.java | 2 +- .../store/kahadb/KahaDBFastEnqueueTest.java | 4 +- .../store/kahadb/plist/PListTest.java | 8 +- .../activemq/transport/StubTransport.java | 7 +- .../DurableSubscriptionOffline4Test.java | 3 +- .../activemq/usecases/MemoryLimitTest.java | 4 +- 9 files changed, 78 insertions(+), 68 deletions(-) diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java index e17b362cf6b..4368f7927a1 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java @@ -82,7 +82,7 @@ public void testAbortAlreadyClosedConsumers() throws Exception { consumer.close(); TimeUnit.SECONDS.sleep(5); - assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); + assertTrue("no exceptions : " + exceptions, exceptions.isEmpty()); } @Test(timeout = 60 * 1000) @@ -99,6 +99,6 @@ public void testAbortAlreadyClosedConnection() throws Exception { conn.close(); TimeUnit.SECONDS.sleep(5); - assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); + assertTrue("no exceptions : " + exceptions, exceptions.isEmpty()); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java index c28d3ad2951..f172a919f33 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java @@ -30,6 +30,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.*; @@ -65,22 +66,22 @@ public class AMQ2149Test { private static final String BROKER_CONNECTOR = "tcp://localhost:61617"; private static final String DEFAULT_BROKER_URL = "failover:("+ BROKER_CONNECTOR +")?maxReconnectDelay=1000&useExponentialBackOff=false"; - + private final String SEQ_NUM_PROPERTY = "seqNum"; final int MESSAGE_LENGTH_BYTES = 75 * 1024; final long SLEEP_BETWEEN_SEND_MS = 25; final int NUM_SENDERS_AND_RECEIVERS = 10; final Object brokerLock = new Object(); - + private static final long DEFAULT_BROKER_STOP_PERIOD = 10 * 1000; private static final long DEFAULT_NUM_TO_SEND = 1400; - + long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD; long numtoSend = DEFAULT_NUM_TO_SEND; long sleepBetweenSend = SLEEP_BETWEEN_SEND_MS; String brokerURL = DEFAULT_BROKER_URL; - + int numBrokerRestarts = 0; final static int MAX_BROKER_RESTARTS = 4; BrokerService broker; @@ -88,15 +89,15 @@ public class AMQ2149Test { protected File dataDirFile; final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()}; - - + + public void createBroker(Configurer configurer) throws Exception { broker = new BrokerService(); configurePersistenceAdapter(broker); - + broker.getSystemUsage().getMemoryUsage().setLimit(MESSAGE_LENGTH_BYTES * 200 * NUM_SENDERS_AND_RECEIVERS); - broker.addConnector(BROKER_CONNECTOR); + broker.addConnector(BROKER_CONNECTOR); broker.setBrokerName(testName.getMethodName()); broker.setDataDirectoryFile(dataDirFile); if (configurer != null) { @@ -104,7 +105,7 @@ public void createBroker(Configurer configurer) throws Exception { } broker.start(); } - + protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception { } @@ -135,7 +136,7 @@ public void tearDown() throws Exception { executor.shutdownNow(); exceptions.clear(); } - + private String buildLongString() { final StringBuilder stringBuilder = new StringBuilder( MESSAGE_LENGTH_BYTES); @@ -156,8 +157,8 @@ private class Receiver implements MessageListener { private final MessageConsumer messageConsumer; - private volatile long nextExpectedSeqNum = 0; - + private AtomicLong nextExpectedSeqNum = new AtomicLong(); + private final boolean transactional; private String lastId = null; @@ -182,11 +183,11 @@ public Receiver(javax.jms.Destination dest, boolean transactional) throws JMSExc public void close() throws JMSException { connection.close(); } - + public long getNextExpectedSeqNo() { - return nextExpectedSeqNum; + return nextExpectedSeqNum.get(); } - + final int TRANSACITON_BATCH = 500; boolean resumeOnNextOrPreviousIsOk = false; public void onMessage(Message message) { @@ -194,7 +195,7 @@ public void onMessage(Message message) { final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); if ((seqNum % TRANSACITON_BATCH) == 0) { LOG.info(dest + " received " + seqNum); - + if (transactional) { LOG.info("committing.."); session.commit(); @@ -202,25 +203,26 @@ public void onMessage(Message message) { } if (resumeOnNextOrPreviousIsOk) { // after an indoubt commit we need to accept what we get (within reason) - if (seqNum != nextExpectedSeqNum) { - if (seqNum == nextExpectedSeqNum - (TRANSACITON_BATCH -1)) { - nextExpectedSeqNum -= (TRANSACITON_BATCH -1); + if (seqNum != nextExpectedSeqNum.get()) { + final long l = nextExpectedSeqNum.get(); + if (seqNum == l - (TRANSACITON_BATCH -1)) { + nextExpectedSeqNum.compareAndSet(l, l - (TRANSACITON_BATCH -1) ); LOG.info("In doubt commit failed, getting replay at:" + nextExpectedSeqNum); } } resumeOnNextOrPreviousIsOk = false; } - if (seqNum != nextExpectedSeqNum) { + if (seqNum != nextExpectedSeqNum.get()) { LOG.warn(dest + " received " + seqNum + " in msg: " + message.getJMSMessageID() + " expected " + nextExpectedSeqNum - + ", lastId: " + lastId + + ", lastId: " + lastId + ", message:" + message); fail(dest + " received " + seqNum + " expected " + nextExpectedSeqNum); } - ++nextExpectedSeqNum; + nextExpectedSeqNum.incrementAndGet(); lastId = message.getJMSMessageID(); } catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) { LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery); @@ -228,12 +230,12 @@ public void onMessage(Message message) { // in doubt - either commit command or reply missing // don't know if we will get a replay resumeOnNextOrPreviousIsOk = true; - nextExpectedSeqNum++; + nextExpectedSeqNum.incrementAndGet(); LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum); } else { resumeOnNextOrPreviousIsOk = false; // batch will be replayed - nextExpectedSeqNum -= (TRANSACITON_BATCH -1); + nextExpectedSeqNum.addAndGet(-(TRANSACITON_BATCH - 1)); } } catch (Throwable e) { @@ -255,6 +257,7 @@ private class Sender implements Runnable { private final MessageProducer messageProducer; private volatile long nextSequenceNumber = 0; + private final Object guard = new Object(); public Sender(javax.jms.Destination dest) throws JMSException { this.dest = dest; @@ -269,15 +272,24 @@ public Sender(javax.jms.Destination dest) throws JMSException { public void run() { final String longString = buildLongString(); + long nextSequenceNumber = this.nextSequenceNumber; while (nextSequenceNumber < numtoSend) { try { final Message message = session .createTextMessage(longString); message.setLongProperty(SEQ_NUM_PROPERTY, nextSequenceNumber); - ++nextSequenceNumber; - messageProducer.send(message); - + synchronized (guard) + { + if (nextSequenceNumber == this.nextSequenceNumber) + { + this.nextSequenceNumber = nextSequenceNumber + 1; + messageProducer.send(message); + } else { + continue; + } + } + if ((nextSequenceNumber % 500) == 0) { LOG.info(dest + " sent " + nextSequenceNumber); } @@ -353,13 +365,13 @@ public void configure(BrokerService broker) throws Exception { // no need to run this unless there are some issues with the others public void vanilaVerify_testOrder() throws Exception { - + createBroker(new Configurer() { public void configure(BrokerService broker) throws Exception { - broker.deleteAllMessages(); + broker.deleteAllMessages(); } }); - + verifyOrderedMessageReceipt(); verifyStats(false); } @@ -368,22 +380,22 @@ public void configure(BrokerService broker) throws Exception { public void testOrderWithRestart() throws Exception { createBroker(new Configurer() { public void configure(BrokerService broker) throws Exception { - broker.deleteAllMessages(); + broker.deleteAllMessages(); } }); - + final Timer timer = new Timer(); schedualRestartTask(timer, new Configurer() { - public void configure(BrokerService broker) throws Exception { + public void configure(BrokerService broker) throws Exception { } }); - + try { verifyOrderedMessageReceipt(); } finally { timer.cancel(); } - + verifyStats(true); } @@ -394,16 +406,16 @@ public void configure(BrokerService broker) throws Exception { broker.deleteAllMessages(); } }); - + final Timer timer = new Timer(); schedualRestartTask(timer, null); - + try { verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE); } finally { timer.cancel(); } - + verifyStats(true); } @@ -416,33 +428,33 @@ public void testQueueTransactionalOrderWithRestart() throws Exception { public void testTopicTransactionalOrderWithRestart() throws Exception { doTestTransactionalOrderWithRestart(ActiveMQDestination.TOPIC_TYPE); } - + public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception { numtoSend = 10000; sleepBetweenSend = 3; brokerStopPeriod = 10 * 1000; - + createBroker(new Configurer() { public void configure(BrokerService broker) throws Exception { broker.deleteAllMessages(); } }); - + final Timer timer = new Timer(); schedualRestartTask(timer, null); - + try { verifyOrderedMessageReceipt(destinationType, 1, true); } finally { timer.cancel(); } - + verifyStats(true); } private void verifyStats(boolean brokerRestarts) throws Exception { RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); - + for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) { DestinationStatistics stats = dest.getDestinationStatistics(); if (brokerRestarts) { @@ -453,7 +465,7 @@ private void verifyStats(boolean brokerRestarts) throws Exception { + " " + stats.getEnqueues().getCount() + " <= " +stats.getDequeues().getCount()); } else { assertEquals("qneue/dequeue match for: " + dest.getName(), - stats.getEnqueues().getCount(), stats.getDequeues().getCount()); + stats.getEnqueues().getCount(), stats.getDequeues().getCount()); } } } @@ -496,20 +508,20 @@ public void run() { } return task; } - + private void verifyOrderedMessageReceipt(byte destinationType) throws Exception { verifyOrderedMessageReceipt(destinationType, NUM_SENDERS_AND_RECEIVERS, false); } - + private void verifyOrderedMessageReceipt() throws Exception { verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE, NUM_SENDERS_AND_RECEIVERS, false); } - + private void verifyOrderedMessageReceipt(byte destinationType, int concurrentPairs, boolean transactional) throws Exception { Vector threads = new Vector(); Vector receivers = new Vector(); - + for (int i = 0; i < concurrentPairs; ++i) { final javax.jms.Destination destination = ActiveMQDestination.createDestination("test.dest." + i, destinationType); @@ -518,7 +530,7 @@ private void verifyOrderedMessageReceipt(byte destinationType, int concurrentPai thread.start(); threads.add(thread); } - + final long expiry = System.currentTimeMillis() + 1000 * 60 * 4; while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) { Thread sendThread = threads.firstElement(); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java index 5a410e87a40..c7a486f60f7 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java @@ -47,7 +47,7 @@ public void doAppend(LoggingEvent event) { } } }; - logger.getRootLogger().addAppender(appender); + Logger.getRootLogger().addAppender(appender); try { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java index 28fc307c5ec..13c71847c51 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java @@ -49,7 +49,7 @@ public void testShallowCopy() throws JMSException { String string = "str"; msg.setText(string); Message copy = msg.copy(); - assertTrue(msg.getText() == ((ActiveMQTextMessage) copy).getText()); + assertSame(msg.getText(), ((ActiveMQTextMessage) copy).getText()); } public void testSetText() { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java index 57530da7e26..4ccb51efe6b 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java @@ -238,8 +238,8 @@ public void startBroker(boolean deleteAllMessages, int checkPointPeriod) throws public void testRollover() throws Exception { byte flip = 0x1; for (long i=0; i exceptions = new Vector(); ExecutorService executor; diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java index 8fb70ec966f..a11d45a5c2b 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.util.ServiceStopper; @@ -29,7 +30,7 @@ public class StubTransport extends TransportSupport { private Queue queue = new ConcurrentLinkedQueue(); - private volatile int receiveCounter; + private AtomicInteger receiveCounter; protected void doStop(ServiceStopper stopper) throws Exception { } @@ -38,7 +39,7 @@ protected void doStart() throws Exception { } public void oneway(Object command) throws IOException { - receiveCounter++; + receiveCounter.incrementAndGet(); queue.add(command); } @@ -51,7 +52,7 @@ public String getRemoteAddress() { } public int getReceiveCounter() { - return receiveCounter; + return receiveCounter.get(); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java index 09c50d01076..bc31444d065 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline4Test.java @@ -30,6 +30,7 @@ import javax.jms.MessageProducer; import javax.jms.Session; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -80,7 +81,7 @@ public void testCleanupDeletedSubAfterRestart() throws Exception { MessageProducer producer = session.createProducer(null); final int toSend = 500; - final String payload = new byte[40*1024].toString(); + final String payload = Arrays.toString(new byte[40 * 1024]); int sent = 0; for (int i = sent; i < toSend; i++) { Message message = session.createTextMessage(payload); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java index 49026bd6a70..deb9cde7742 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java @@ -168,7 +168,7 @@ public void testLimit() throws Exception { final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) { @Override protected Message createMessage(int i) throws Exception { - return sess.createTextMessage(payload + "::" + i); + return sess.createTextMessage(Arrays.toString(payload) + "::" + i); } }; producer.setMessageCount(1000); @@ -176,7 +176,7 @@ protected Message createMessage(int i) throws Exception { final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) { @Override protected Message createMessage(int i) throws Exception { - return sess.createTextMessage(payload + "::" + i); + return sess.createTextMessage(Arrays.toString(payload) + "::" + i); } }; producer2.setMessageCount(1000);