Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-863 parsing spaces properly & avoiding loopback on configuration #899

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.jboss.logging.Logger;
Expand All @@ -56,6 +57,9 @@ public class NetworkHealthCheck extends ActiveMQScheduledComponent {

private String ipv6Command = IPV6_DEFAULT_COMMAND;

// To be used on tests. As we use the loopback as a valid address on tests.
private boolean ignoreLoopback = false;

/**
* The timeout to be used on isReachable
*/
Expand Down Expand Up @@ -88,6 +92,23 @@ public NetworkHealthCheck setNICName(String nicName) {
return this;
}

public boolean isIgnoreLoopback() {
return ignoreLoopback;
}

public NetworkHealthCheck setIgnoreLoopback(boolean ignoreLoopback) {
this.ignoreLoopback = ignoreLoopback;
return this;
}

public Set<InetAddress> getAddresses() {
return addresses;
}

public Set<URL> getUrls() {
return urls;
}

public String getNICName() {
if (networkInterface != null) {
return networkInterface.getName();
Expand All @@ -101,10 +122,12 @@ public NetworkHealthCheck parseAddressList(String addressList) {
String[] addresses = addressList.split(",");

for (String address : addresses) {
try {
this.addAddress(InetAddress.getByName(address));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (!address.trim().isEmpty()) {
try {
this.addAddress(InetAddress.getByName(address.trim()));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
}
Expand All @@ -117,10 +140,12 @@ public NetworkHealthCheck parseURIList(String addressList) {
String[] addresses = addressList.split(",");

for (String address : addresses) {
try {
this.addURL(new URL(address));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (!address.trim().isEmpty()) {
try {
this.addURL(new URL(address.trim()));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
}
Expand Down Expand Up @@ -180,9 +205,13 @@ public NetworkHealthCheck addAddress(InetAddress address) {
if (!check(address)) {
logger.warn("Ping Address " + address + " wasn't reacheable");
}
addresses.add(address);

checkStart();
if (!ignoreLoopback && address.isLoopbackAddress()) {
ActiveMQUtilLogger.LOGGER.addressloopback(address.toString());
} else {
addresses.add(address);
checkStart();
}
return this;
}

Expand Down
Expand Up @@ -48,4 +48,9 @@ public interface ActiveMQUtilLogger extends BasicLogger {
@Message(id = 202000, value = "Missing privileges to set Thread Context Class Loader on Thread Factory. Using current Thread Context Class Loader",
format = Message.Format.MESSAGE_FORMAT)
void missingPrivsForClassloader();

@LogMessage(level = Logger.Level.WARN)
@Message(id = 202001, value = "{0} is a loopback address and will be discarded.",
format = Message.Format.MESSAGE_FORMAT)
void addressloopback(String address);
}
Expand Up @@ -78,6 +78,32 @@ public void run() {
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5);
}

@Test
public void testAccumulationOwnPool() throws Exception {
final AtomicInteger count = new AtomicInteger(0);

final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(100, TimeUnit.MILLISECONDS, false) {
@Override
public void run() {
if (count.get() == 0) {
try {
Thread.sleep(500);
} catch (Exception e) {
}
}
count.incrementAndGet();
}
};

local.start();

Thread.sleep(1000);

local.stop();

Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5 && count.get() > 0);
}

@Test
public void testUsingOwnExecutors() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
Expand Down
Expand Up @@ -57,7 +57,7 @@ public class NetworkHealthTest {

NetworkHealthCheck addCheck(NetworkHealthCheck check) {
list.add(check);
return check;
return check.setIgnoreLoopback(true);
}

HttpServer httpServer;
Expand Down Expand Up @@ -137,7 +137,26 @@ public void testCheck6() throws Exception {
Assert.assertTrue(check.purePing(address));

Assert.assertTrue(check.check(address));
}

@Test
public void testParseSpaces() throws Exception {
NetworkHealthCheck check = addCheck(new NetworkHealthCheck(null, 100, 100));

// using two addresses for URI and localhost
check.parseAddressList("localhost, , 127.0.0.2").parseURIList("http://www.redhat.com, , http://www.apache.org");
Assert.assertEquals(2, check.getAddresses().size());
Assert.assertEquals(2, check.getUrls().size());
}

@Test
public void testParseLogger() throws Exception {
NetworkHealthCheck check = addCheck(new NetworkHealthCheck(null, 100, 100));

// using two addresses for URI and localhost
check.parseAddressList("localhost, , 127.0.0.2").parseURIList("http://www.redhat.com, , http://www.apache.org");
Assert.assertEquals(2, check.getAddresses().size());
Assert.assertEquals(2, check.getUrls().size());
}

@Test
Expand Down
Expand Up @@ -24,12 +24,24 @@
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class NetworkIsolationReplicationTest extends FailoverTestBase {

private static final Logger logger = Logger.getLogger(NetworkIsolationReplicationTest.class);

@Before
@Override
public void setUp() throws Exception {
this.startBackupServer = false;
super.setUp();
}

@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getNettyAcceptor(live, 1);
Expand All @@ -49,37 +61,59 @@ protected ClientSession createSession(ClientSessionFactory sf1,

@Test
public void testDoNotActivateOnIsolation() throws Exception {
ServerLocator locator = getServerLocator();
AssertionLoggerHandler.startCapture();

backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("203.0.113.1"));
try {
ServerLocator locator = getServerLocator();

ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory());
// this block here is just to validate if ignoring loopback addresses logic is in place
{
backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("127.0.0.1"));

ClientSession session = createSession(sf, false, true, true);
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ202001"));

session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
AssertionLoggerHandler.clear();

Assert.assertFalse(backupServer.getServer().getNetworkHealthCheck().check());
backupServer.getServer().getNetworkHealthCheck().setIgnoreLoopback(true).addAddress(InetAddress.getByName("127.0.0.1"));

crash(false, true, session);
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ202001"));

for (int i = 0; i < 1000 && !backupServer.isStarted(); i++) {
Thread.sleep(10);
}
backupServer.getServer().getNetworkHealthCheck().clearAddresses();
}

Assert.assertTrue(backupServer.isStarted());
Assert.assertFalse(backupServer.isActive());
backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("203.0.113.1"));
backupServer.getServer().start();

liveServer.start();
ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory());

for (int i = 0; i < 1000 && backupServer.getServer().getReplicationEndpoint() != null && !backupServer.getServer().getReplicationEndpoint().isStarted(); i++) {
Thread.sleep(10);
}
ClientSession session = createSession(sf, false, true, true);

session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);

Assert.assertFalse(backupServer.getServer().getNetworkHealthCheck().check());

crash(false, true, session);

for (int i = 0; i < 1000 && !backupServer.isStarted(); i++) {
Thread.sleep(10);
}

Assert.assertTrue(backupServer.isStarted());
Assert.assertFalse(backupServer.isActive());

liveServer.start();

backupServer.getServer().getNetworkHealthCheck().clearAddresses();
for (int i = 0; i < 1000 && backupServer.getServer().getReplicationEndpoint() != null && !backupServer.getServer().getReplicationEndpoint().isStarted(); i++) {
Thread.sleep(10);
}

backupServer.getServer().getNetworkHealthCheck().clearAddresses();

// This will make sure the backup got synchronized after the network was activated again
Assert.assertTrue(backupServer.getServer().getReplicationEndpoint().isStarted());
// This will make sure the backup got synchronized after the network was activated again
Assert.assertTrue(backupServer.getServer().getReplicationEndpoint().isStarted());
} finally {
AssertionLoggerHandler.stopCapture();
}
}

@Test
Expand All @@ -90,29 +124,39 @@ public void testLiveIsolated() throws Exception {
liveServer.getServer().getConfiguration().setNetworkCheckList("203.0.113.1").
setNetworkCheckPeriod(100).setNetworkCheckTimeout(100);

liveServer.start();
try {

Assert.assertEquals(100L, liveServer.getServer().getNetworkHealthCheck().getPeriod());
liveServer.start();

liveServer.getServer().getNetworkHealthCheck().setTimeUnit(TimeUnit.MILLISECONDS);
Assert.assertEquals(100L, liveServer.getServer().getNetworkHealthCheck().getPeriod());

Assert.assertFalse(liveServer.getServer().getNetworkHealthCheck().check());
liveServer.getServer().getNetworkHealthCheck().setTimeUnit(TimeUnit.MILLISECONDS);

long timeout = System.currentTimeMillis() + 30000;
while (liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
}
Assert.assertFalse(liveServer.getServer().getNetworkHealthCheck().check());

long timeout = System.currentTimeMillis() + 30000;
while (liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
}

Assert.assertFalse(liveServer.isStarted());
Assert.assertFalse(liveServer.isStarted());

liveServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("127.0.0.1"));
liveServer.getServer().getNetworkHealthCheck().setIgnoreLoopback(true).addAddress(InetAddress.getByName("127.0.0.1"));

timeout = System.currentTimeMillis() + 30000;
while (!liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
}

timeout = System.currentTimeMillis() + 30000;
while (!liveServer.isStarted() && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
Assert.assertTrue(liveServer.isStarted());
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
throw e;
} finally {
liveServer.getServer().stop();
backupServer.getServer().stop();
}

Assert.assertTrue(liveServer.isStarted());
}

@Override
Expand Down