Skip to content

Commit

Permalink
This closes #2247
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Aug 13, 2018
2 parents 4dd116e + 87fdff5 commit 22b62b5
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 9 deletions.
Expand Up @@ -237,7 +237,7 @@ public Lock lockFailover() {
public void connect(final int initialConnectAttempts,
final boolean failoverOnInitialConnection) throws ActiveMQException {
// Get the connection
getConnectionWithRetry(initialConnectAttempts);
getConnectionWithRetry(initialConnectAttempts, null);

if (connection == null) {
StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig);
Expand Down Expand Up @@ -743,7 +743,7 @@ private void reconnectSessions(final RemotingConnection oldConnection,
session.preHandleFailover(connection);
}

getConnectionWithRetry(reconnectAttempts);
getConnectionWithRetry(reconnectAttempts, oldConnection);

if (connection == null) {
if (!clientProtocolManager.isAlive())
Expand Down Expand Up @@ -774,7 +774,7 @@ private void reconnectSessions(final RemotingConnection oldConnection,
}
}

private void getConnectionWithRetry(final int reconnectAttempts) {
private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {
if (!clientProtocolManager.isAlive())
return;
if (logger.isTraceEnabled()) {
Expand All @@ -795,6 +795,10 @@ private void getConnectionWithRetry(final int reconnectAttempts) {
}

if (getConnection() != null) {
if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {
// transferring old connection version into the new connection
((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());
}
if (logger.isDebugEnabled()) {
logger.debug("Reconnection successful");
}
Expand Down
Expand Up @@ -22,9 +22,9 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun;

if (serverArg[0].startsWith("HORNETQ")) {
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false");
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false&reconnectAttempts=-1&retryInterval=100");
} else {
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false");
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100");
}


Expand Down
@@ -0,0 +1,41 @@
package clients

import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.client.FailoverEventListener
import org.apache.activemq.artemis.api.core.client.FailoverEventType
import org.apache.activemq.artemis.jms.client.ActiveMQConnection

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Create a client connection factory

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun

import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit;

CountDownLatch latch = new CountDownLatch(1);
((ActiveMQConnection)connectionToFail).setFailoverListener(new FailoverEventListener() {
@Override
void failoverEvent(FailoverEventType eventType) {
latch.countDown();
}
})
((ActiveMQConnection)connectionToFail).getSessionFactory().getConnection().fail(new ActiveMQException("fail"));
GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS));
Expand Up @@ -26,7 +26,6 @@ String serverType = arg[0];
String clientType = arg[1];
String operation = arg[2];


try {
legacyOption = legacy;
} catch (Throwable e) {
Expand Down Expand Up @@ -127,8 +126,60 @@ if (operation.equals("sendAckMessages") || operation.equals("sendTopic")) {
plain.setStringProperty("plain", "doce");
plain.setIntProperty("order", 15)
producer.send(plain);

session.commit();
session.close();

Session newSession = connection.createSession(true, Session.SESSION_TRANSACTED);
connectionToFail = connection;
if (clientType.equals("ARTEMIS-SNAPSHOT")) {
// this is validating a bug that could only be fixed in snapshot
GroovyRun.evaluate("clients/artemisFail.groovy", "serverArg", serverType);
}
MessageProducer newProducer = newSession.createProducer(destination);
for (int i = 0 ; i < 10; i++) {
String bodyText = "This is message " + i;
TextMessage textMessage = newSession.createTextMessage(bodyText);
int size = 5 + i % 10;
StringBuffer variableSize = new StringBuffer();
for (int s = 0; s < size; s++) {
variableSize.append(" " + i);
}
textMessage.setStringProperty("inMessageId", variableSize.toString());
newProducer.send(textMessage);
newSession.commit();

newSession.close();
newSession = connection.createSession(true, Session.SESSION_TRANSACTED);
newProducer = newSession.createProducer(destination);
if (i % 2 == 0) {
// failing half of the sessions for the snapshots
if (clientType.equals("ARTEMIS-SNAPSHOT")) {
// this is validating a bug that could only be fixed in snapshot
GroovyRun.evaluate("clients/artemisFail.groovy", "serverArg", serverType);
}
}

}

// even if topic, will send a few on queue
newProducer = newSession.createProducer(queue);

for (int i = 0; i < 7; i++) {
String bodyText = "This is message " + i;
TextMessage textMessage = newSession.createTextMessage(bodyText);
int size = 5 + i % 10;
StringBuffer variableSize = new StringBuffer();
for (int s = 0; s < size; s++) {
variableSize.append(" " + i);
}
textMessage.setStringProperty("inMessageId", variableSize.toString());
newProducer.send(textMessage);
newSession.commit();
}

newSession.commit();
newSession.close();


connection.close();
}
Expand Down Expand Up @@ -194,7 +245,26 @@ if (operation.equals("receiveMessages") || operation.equals("receiveNonDurableSu
GroovyRun.assertNotNull(plain);
GroovyRun.assertEquals("doce", plain.getStringProperty("plain"));


for (int i = 0 ; i < 10; i++) {
TextMessage recMessage = consumer.receive(5000);
GroovyRun.assertNotNull(recMessage);
GroovyRun.assertEquals("This is message " + i, recMessage.getText());
}

session.commit();

consumer.close();

// force a few on the queue even if the test is for topics
consumer = session.createConsumer(queue);

for (int i = 0; i < 7; i++) {
TextMessage recMessage = consumer.receive(5000);
GroovyRun.assertNotNull(recMessage);
GroovyRun.assertEquals("This is message " + i, recMessage.getText());
}

connection.close();
}

Expand Down
Expand Up @@ -32,6 +32,6 @@ for (Object o : queueControls) {
QueueControl c = (QueueControl) o;
GroovyRun.assertTrue(c.getPersistentSize() > 0);
GroovyRun.assertTrue(c.getDurablePersistentSize() > 0);
GroovyRun.assertEquals(16l, c.getMessageCount());
GroovyRun.assertEquals(16l, c.getDurableMessageCount());
GroovyRun.assertEquals(33l, c.getMessageCount());
GroovyRun.assertEquals(33l, c.getDurableMessageCount());
}

0 comments on commit 22b62b5

Please sign in to comment.