Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-21830 Fixed logging of failed attempts to connect to the previous node in the ring. #11327

Merged
merged 24 commits into from
May 21, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -6770,7 +6770,6 @@ else if (req.changeTopology()) {

if (previous != null && !previous.id().equals(nodeId) &&
(req.checkPreviousNodeId() == null || previous.id().equals(req.checkPreviousNodeId()))) {
Collection<InetSocketAddress> nodeAddrs = spi.getEffectiveNodeAddresses(previous);

// The connection recovery connection to one node is connCheckTick.
// We need to suppose network delays. So we use half of this time.
Expand All @@ -6781,13 +6780,16 @@ else if (req.changeTopology()) {
"previous [" + previous + "] with timeout " + backwardCheckTimeout);
}

liveAddr = checkConnection(new ArrayList<>(nodeAddrs), backwardCheckTimeout);
liveAddr = checkConnection(previous, backwardCheckTimeout);

if (log.isInfoEnabled()) {
log.info("Connection check to previous node done: [liveAddr=" + liveAddr
+ ", previousNode=" + U.toShortString(previous) + ", addressesToCheck=" +
nodeAddrs + ", connectingNodeId=" + nodeId + ']');
}
String logMsg = "Connection check to previous node " + (liveAddr == null ? "failed" : "done")
+ ". ConnectingNodeId=" + nodeId + ". PreviousNode=" + U.toShortString(previous)
+ ", aliveAddr=" + liveAddr + "].";

if (liveAddr == null)
U.warn(log, logMsg);
else if (log.isInfoEnabled())
log.info(logMsg);
}

ok = liveAddr != null;
Expand Down Expand Up @@ -7254,9 +7256,11 @@ private void ringMessageReceived() {
}

/** @return Alive address if was able to connected to. {@code Null} otherwise. */
private InetSocketAddress checkConnection(List<InetSocketAddress> addrs, int timeout) {
private InetSocketAddress checkConnection(TcpDiscoveryNode node, int timeout) {
AtomicReference<InetSocketAddress> liveAddrHolder = new AtomicReference<>();

List<InetSocketAddress> addrs = new ArrayList<>(spi.getEffectiveNodeAddresses(node));

CountDownLatch latch = new CountDownLatch(addrs.size());

int addrLeft = addrs.size();
Expand All @@ -7282,17 +7286,29 @@ private InetSocketAddress checkConnection(List<InetSocketAddress> addrs, int tim
for (int i = 0; i < addrsToCheck; ++i) {
InetSocketAddress addr = addrs.get(addrIdx.getAndIncrement());

String logMsg = "Checking connection to node [nodeId=" + node.id() + ", address=" + addr + "], result=";
String failReason = null;

try (Socket sock = new Socket()) {
if (liveAddrHolder.get() == null) {
sock.connect(addr, perAddrTimeout);

liveAddrHolder.compareAndSet(null, addr);

logMsg += "success.";
}
else
logMsg += "skipped, cause='Another alive address is already found'.";
}
catch (Exception ignored) {
// No-op.
catch (Exception e) {
failReason = e.getMessage();
}
finally {
if (failReason != null)
U.warn(log, logMsg + "failed, cause='" + failReason + "'.");
else if (log.isInfoEnabled())
log.info(logMsg);

latch.countDown();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
Expand All @@ -56,6 +57,8 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

Expand Down Expand Up @@ -109,6 +112,9 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest {
/** */
private TcpDiscoverySpi specialSpi;

/** */
private ListeningTestLogger testLog;

/** */
private boolean usePortFromNodeName;

Expand Down Expand Up @@ -153,6 +159,9 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest {

cfg.setLocalHost(localhost);

if (testLog != null)
cfg.setGridLogger(testLog);

return cfg;
}

Expand Down Expand Up @@ -234,7 +243,7 @@ public void testServerGetsSegmentedOnBecomeDangling() throws Exception {
*/
@Test
public void testBackwardNodeCheckWithSameLoopbackSingleLocalAddress() throws Exception {
doTestBackwardNodeCheckWithSameLoopback("127.0.0.1");
doTestBackwardNodeCheckWithSameLoopback("127.0.0.1", false);
}

/**
Expand All @@ -243,14 +252,26 @@ public void testBackwardNodeCheckWithSameLoopbackSingleLocalAddress() throws Exc
*/
@Test
public void testBackwardNodeCheckWithSameLoopbackSeveralLocalAddresses() throws Exception {
doTestBackwardNodeCheckWithSameLoopback("0.0.0.0");
maksaska marked this conversation as resolved.
Show resolved Hide resolved
doTestBackwardNodeCheckWithSameLoopback("0.0.0.0", true);
maksaska marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Performs Tests backward node ping if {@link TcpDiscoveryNode#socketAddresses()} contains same loopback address as of local node.
* Assumes several local address are resolved.
*/
private void doTestBackwardNodeCheckWithSameLoopback(String localhost) throws Exception {
private void doTestBackwardNodeCheckWithSameLoopback(String localhost, boolean withSkippedLogs) throws Exception {
ListeningTestLogger testMethodLog = new ListeningTestLogger(log);

String startLogMsg = "Checking connection to node";

Collection<LogListener> lsnrs = new ArrayList<>();

lsnrs.add(LogListener.matches(startLogMsg).andMatches("result=success").times(1).build());
lsnrs.add(LogListener.matches(startLogMsg).andMatches("result=skipped").atLeast(withSkippedLogs ? 1 : 0).build());
lsnrs.add(LogListener.matches("Connection check to previous node done").times(1).build());

lsnrs.forEach(testMethodLog::registerListener);

this.localhost = localhost;

specialSpi = new TestDiscoverySpi();
Expand All @@ -262,9 +283,12 @@ private void doTestBackwardNodeCheckWithSameLoopback(String localhost) throws Ex
Ignite node1 = startGrid(1);

specialSpi = new TestDiscoverySpi();
testLog = testMethodLog;

Ignite node2 = startGrid(2);
maksaska marked this conversation as resolved.
Show resolved Hide resolved

testLog = null;

CountDownLatch handshakeToNode2 = new CountDownLatch(1);

// Listener of handshake request from node0 to node2. Activates simulation of same localhost address of node1
Expand Down Expand Up @@ -313,6 +337,11 @@ private void doTestBackwardNodeCheckWithSameLoopback(String localhost) throws Ex
// Node 1 must not be kicked.
for (Ignite ig : G.allGrids())
assertEquals(3, ig.cluster().nodes().size());

for (LogListener lsnr : lsnrs)
waitForCondition(lsnr::check, getTestTimeout());

testMethodLog.clearListeners();
}

/**
Expand Down Expand Up @@ -383,6 +412,54 @@ private void simulateFailureOfTwoNodes(boolean sequentionally) throws Exception
}
}

/**
* This test uses node failure by stopping service threads, which makes the node unresponsive and results in
maksaska marked this conversation as resolved.
Show resolved Hide resolved
* failing connection to the server.
*
* @throws Exception If failed.
* @see TcpDiscoverySpi#simulateNodeFailure()
maksaska marked this conversation as resolved.
Show resolved Hide resolved
*/
@Test
public void testCheckNodeFailureSocketConnectionLogMessage() throws Exception {
maksaska marked this conversation as resolved.
Show resolved Hide resolved
ListeningTestLogger testLog = new ListeningTestLogger(log);

Collection<LogListener> lsnrs = new ArrayList<>();

lsnrs.add(LogListener.matches("Checking connection to node").andMatches("result=failed").times(1).build());
lsnrs.add(LogListener.matches("Connection check to previous node failed").times(1).build());

lsnrs.forEach(testLog::registerListener);

TcpDiscoverySpi spi0 = new TcpDiscoverySpi();
maksaska marked this conversation as resolved.
Show resolved Hide resolved

startGrid(getTestConfigWithSpi(spi0, "ignite-0"));
maksaska marked this conversation as resolved.
Show resolved Hide resolved

IgniteConfiguration cfg1 = getTestConfigWithSpi(new TcpDiscoverySpi(), "ignite-1");
maksaska marked this conversation as resolved.
Show resolved Hide resolved
cfg1.setGridLogger(testLog);

startGrid(cfg1);

startGrid(getTestConfigWithSpi(new TcpDiscoverySpi(), "ignite-2"));

spi0.simulateNodeFailure();

for (LogListener lsnr : lsnrs)
waitForCondition(lsnr::check, getTestTimeout());
maksaska marked this conversation as resolved.
Show resolved Hide resolved

testLog.clearListeners();
maksaska marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Returns default {@link IgniteConfiguration} with specified ignite instance name and {@link TcpDiscoverySpi}.
* @param spi {@link TcpDiscoverySpi}
* @param igniteInstanceName ignite instance name
* @return {@link IgniteConfiguration}
* @throws Exception If failed.
*/
private IgniteConfiguration getTestConfigWithSpi(TcpDiscoverySpi spi, String igniteInstanceName) throws Exception {
maksaska marked this conversation as resolved.
Show resolved Hide resolved
return getConfiguration(igniteInstanceName).setDiscoverySpi(spi);
}

/**
* @param ig Ignite instance to get failedNodes collection from.
*/
Expand Down