Skip to content

Commit

Permalink
ARTEMIS-863 parsing spaces properly on network health addresses and a…
Browse files Browse the repository at this point in the history
…voiding loopback on configuration
  • Loading branch information
clebertsuconic committed Nov 28, 2016
1 parent cf1c4d1 commit 8a0b436
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.jboss.logging.Logger;
Expand All @@ -56,6 +57,9 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {

private String ipv6Command = IPV6_DEFAULT_COMMAND;

// To be used on tests. As we use the loopback as a valid address on tests.
private boolean ignoreLoopback = false;

/**
* The timeout to be used on isReachable
*/
Expand Down Expand Up @@ -88,6 +92,23 @@ public NetworkHealthCheck setNICName(String nicName) {
return this;
}

public boolean isIgnoreLoopback() {
return ignoreLoopback;
}

public NetworkHealthCheck setIgnoreLoopback(boolean ignoreLoopback) {
this.ignoreLoopback = ignoreLoopback;
return this;
}

public Set<InetAddress> getAddresses() {
return addresses;
}

public Set<URL> getUrls() {
return urls;
}

public String getNICName() {
if (networkInterface != null) {
return networkInterface.getName();
Expand All @@ -101,10 +122,12 @@ public NetworkHealthCheck parseAddressList(String addressList) {
String[] addresses = addressList.split(",");

for (String address : addresses) {
try {
this.addAddress(InetAddress.getByName(address));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (!address.trim().isEmpty()) {
try {
this.addAddress(InetAddress.getByName(address.trim()));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
}
Expand All @@ -117,10 +140,12 @@ public NetworkHealthCheck parseURIList(String addressList) {
String[] addresses = addressList.split(",");

for (String address : addresses) {
try {
this.addURL(new URL(address));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (!address.trim().isEmpty()) {
try {
this.addURL(new URL(address.trim()));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
}
Expand Down Expand Up @@ -180,9 +205,13 @@ public NetworkHealthCheck addAddress(InetAddress address) {
if (!check(address)) {
logger.warn("Ping Address " + address + " wasn't reacheable");
}
addresses.add(address);

checkStart();
if (!ignoreLoopback && address.isLoopbackAddress()) {
ActiveMQUtilLogger.LOGGER.addressloopback(address.toString());
} else {
addresses.add(address);
checkStart();
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ public interface ActiveMQUtilLogger extends BasicLogger {
@Message(id = 202000, value = "Missing privileges to set Thread Context Class Loader on Thread Factory. Using current Thread Context Class Loader",
format = Message.Format.MESSAGE_FORMAT)
void missingPrivsForClassloader();

@LogMessage(level = Logger.Level.WARN)
@Message(id = 202001, value = "{0} is a loopback address and will be discarded.",
format = Message.Format.MESSAGE_FORMAT)
void addressloopback(String address);
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,32 @@ public void run() {
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5);
}

@Test
public void testAccumulationOwnPool() throws Exception {
final AtomicInteger count = new AtomicInteger(0);

final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(100, TimeUnit.MILLISECONDS, false) {
@Override
public void run() {
if (count.get() == 0) {
try {
Thread.sleep(500);
} catch (Exception e) {
}
}
count.incrementAndGet();
}
};

local.start();

Thread.sleep(1000);

local.stop();

Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5 && count.get() > 0);
}

@Test
public void testUsingOwnExecutors() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class NetworkHealthTest {

NetworkHealthCheck addCheck(NetworkHealthCheck check) {
list.add(check);
return check;
return check.setIgnoreLoopback(true);
}

HttpServer httpServer;
Expand Down Expand Up @@ -137,7 +137,26 @@ public void testCheck6() throws Exception {
Assert.assertTrue(check.purePing(address));

Assert.assertTrue(check.check(address));
}

@Test
public void testParseSpaces() throws Exception {
NetworkHealthCheck check = addCheck(new NetworkHealthCheck(null, 100, 100));

// using two addresses for URI and localhost
check.parseAddressList("localhost, , 127.0.0.2").parseURIList("http://www.redhat.com, , http://www.apache.org");
Assert.assertEquals(2, check.getAddresses().size());
Assert.assertEquals(2, check.getUrls().size());
}

@Test
public void testParseLogger() throws Exception {
NetworkHealthCheck check = addCheck(new NetworkHealthCheck(null, 100, 100));

// using two addresses for URI and localhost
check.parseAddressList("localhost, , 127.0.0.2").parseURIList("http://www.redhat.com, , http://www.apache.org");
Assert.assertEquals(2, check.getAddresses().size());
Assert.assertEquals(2, check.getUrls().size());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,24 @@
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class NetworkIsolationReplicationTest extends FailoverTestBase {

private static final Logger logger = Logger.getLogger(NetworkIsolationReplicationTest.class);

@Before
@Override
public void setUp() throws Exception {
this.startBackupServer = false;
super.setUp();
}

@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getNettyAcceptor(live, 1);
Expand All @@ -49,37 +61,59 @@ protected ClientSession createSession(ClientSessionFactory sf1,

@Test
public void testDoNotActivateOnIsolation() throws Exception {
ServerLocator locator = getServerLocator();
AssertionLoggerHandler.startCapture();

backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("203.0.113.1"));
try {
ServerLocator locator = getServerLocator();

ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory());
// this block here is just to validate if ignoring loopback addresses logic is in place
{
backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("127.0.0.1"));

ClientSession session = createSession(sf, false, true, true);
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ202001"));

session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
AssertionLoggerHandler.clear();

Assert.assertFalse(backupServer.getServer().getNetworkHealthCheck().check());
backupServer.getServer().getNetworkHealthCheck().setIgnoreLoopback(true).addAddress(InetAddress.getByName("127.0.0.1"));

crash(false, true, session);
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ202001"));

for (int i = 0; i < 1000 && !backupServer.isStarted(); i++) {
Thread.sleep(10);
}
backupServer.getServer().getNetworkHealthCheck().clearAddresses();
}

Assert.assertTrue(backupServer.isStarted());
Assert.assertFalse(backupServer.isActive());
backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("203.0.113.1"));
backupServer.getServer().start();

liveServer.start();
ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory());

for (int i = 0; i < 1000 && backupServer.getServer().getReplicationEndpoint() != null && !backupServer.getServer().getReplicationEndpoint().isStarted(); i++) {
Thread.sleep(10);
}
ClientSession session = createSession(sf, false, true, true);

session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);

Assert.assertFalse(backupServer.getServer().getNetworkHealthCheck().check());

crash(false, true, session);

for (int i = 0; i < 1000 && !backupServer.isStarted(); i++) {
Thread.sleep(10);
}

Assert.assertTrue(backupServer.isStarted());
Assert.assertFalse(backupServer.isActive());

liveServer.start();

backupServer.getServer().getNetworkHealthCheck().clearAddresses();
for (int i = 0; i < 1000 && backupServer.getServer().getReplicationEndpoint() != null && !backupServer.getServer().getReplicationEndpoint().isStarted(); i++) {
Thread.sleep(10);
}

backupServer.getServer().getNetworkHealthCheck().clearAddresses();

// This will make sure the backup got synchronized after the network was activated again
Assert.assertTrue(backupServer.getServer().getReplicationEndpoint().isStarted());
// This will make sure the backup got synchronized after the network was activated again
Assert.assertTrue(backupServer.getServer().getReplicationEndpoint().isStarted());
} finally {
AssertionLoggerHandler.stopCapture();
}
}

@Test
Expand All @@ -90,29 +124,39 @@ public void testLiveIsolated() throws Exception {
liveServer.getServer().getConfiguration().setNetworkCheckList("203.0.113.1").
setNetworkCheckPeriod(100).setNetworkCheckTimeout(100);

liveServer.start();
try {

Assert.assertEquals(100L, liveServer.getServer().getNetworkHealthCheck().getPeriod());
liveServer.start();

liveServer.getServer().getNetworkHealthCheck().setTimeUnit(TimeUnit.MILLISECONDS);
Assert.assertEquals(100L, liveServer.getServer().getNetworkHealthCheck().getPeriod());

Assert.assertFalse(liveServer.getServer().getNetworkHealthCheck().check());
liveServer.getServer().getNetworkHealthCheck().setTimeUnit(TimeUnit.MILLISECONDS);

long timeout = System.currentTimeMillis() + 30000;
while (liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
}
Assert.assertFalse(liveServer.getServer().getNetworkHealthCheck().check());

long timeout = System.currentTimeMillis() + 30000;
while (liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
}

Assert.assertFalse(liveServer.isStarted());
Assert.assertFalse(liveServer.isStarted());

liveServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("127.0.0.1"));
liveServer.getServer().getNetworkHealthCheck().setIgnoreLoopback(true).addAddress(InetAddress.getByName("127.0.0.1"));

timeout = System.currentTimeMillis() + 30000;
while (!liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
}

timeout = System.currentTimeMillis() + 30000;
while (!liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
Assert.assertTrue(liveServer.isStarted());
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
throw e;
} finally {
liveServer.getServer().stop();
backupServer.getServer().stop();
}

Assert.assertTrue(liveServer.isStarted());
}

@Override
Expand Down

0 comments on commit 8a0b436

Please sign in to comment.