Skip to content

Commit

Permalink
IGNITE-9657 IGNITE-9657 Fixed socket leak in TcpDiscoverySpi
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Gura <agura@apache.org>
  • Loading branch information
ezhuravl authored and agura committed Sep 21, 2018
1 parent 28079cd commit 6c3a486
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 15 deletions.
Expand Up @@ -1498,18 +1498,25 @@ protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOpe

assert remAddr != null;

InetSocketAddress resolved = remAddr.isUnresolved() ?
new InetSocketAddress(InetAddress.getByName(remAddr.getHostName()), remAddr.getPort()) : remAddr;
try {
InetSocketAddress resolved = remAddr.isUnresolved() ?
new InetSocketAddress(InetAddress.getByName(remAddr.getHostName()), remAddr.getPort()) : remAddr;

InetAddress addr = resolved.getAddress();

InetAddress addr = resolved.getAddress();
assert addr != null;

assert addr != null;
sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));

sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));
writeToSocket(sock, null, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));

writeToSocket(sock, null, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
return sock;
} catch (IOException | IgniteSpiOperationTimeoutException e) {
if (sock != null)
U.closeQuiet(sock);

return sock;
throw e;
}
}

/**
Expand All @@ -1519,18 +1526,25 @@ protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOpe
* @throws IOException If failed.
*/
Socket createSocket() throws IOException {
Socket sock;
Socket sock = null;

if (isSslEnabled())
sock = sslSockFactory.createSocket();
else
sock = new Socket();
try {
if (isSslEnabled())
sock = sslSockFactory.createSocket();
else
sock = new Socket();

sock.bind(new InetSocketAddress(locHost, 0));

sock.bind(new InetSocketAddress(locHost, 0));
sock.setTcpNoDelay(true);

sock.setTcpNoDelay(true);
return sock;
} catch (IOException e) {
if (sock != null)
U.closeQuiet(sock);

return sock;
throw e;
}
}

/**
Expand Down
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.spi.discovery.tcp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;

/**
* Client-based discovery SPI test with unresolved server hosts.
*/
public class TcpClientDiscoveryUnresolvedHostTest extends GridCommonAbstractTest {
/** */
TestTcpDiscoverySpi spi;

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

spi = new TestTcpDiscoverySpi();

cfg.setDiscoverySpi(spi.setJoinTimeout(5000).setIpFinder(new TcpDiscoveryVmIpFinder()
.setAddresses(Collections.singletonList("test:47500"))));

cfg.setCacheConfiguration();

cfg.setClientMode(true);

return cfg;
}

/**
* Test that sockets closed after exception.
*
* @throws Exception in case of error.
*/
public void test() throws Exception {
try {
startGrid(0);
} catch (IgniteCheckedException e) {
//Ignore.
}

assertEquals(0, spi.getSockets().size());
}

/**
* TcpDiscoverySpi implementation with additional storing of created sockets.
*/
private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
/** */
Set<Socket> sockets = new HashSet<>();

/** {@inheritDoc} */
@Override Socket createSocket() throws IOException {
Socket socket = super.createSocket();

sockets.add(socket);

return socket;
}

/** {@inheritDoc} */
@Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
throws IOException, IgniteSpiOperationTimeoutException {

try {
return super.openSocket(sock, remAddr, timeoutHelper);
}
catch (IgniteSpiOperationTimeoutException | IOException e) {
if (sock.isClosed())
sockets.remove(sock);

throw e;
}
}

/**
* Gets list of sockets opened by this discovery spi.
*
* @return List of sockets.
*/
public Set<Socket> getSockets() {
return sockets;
}
}
}
Expand Up @@ -30,6 +30,7 @@
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiFailureTimeoutSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiMulticastTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryUnresolvedHostTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryClientSuspensionSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMarshallerCheckSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMultiThreadedTest;
Expand Down Expand Up @@ -99,6 +100,7 @@ public static TestSuite suite() throws Exception {
suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class));
suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureTimeoutSelfTest.class));
suite.addTest(new TestSuite(TcpClientDiscoveryUnresolvedHostTest.class));

suite.addTest(new TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class));
suite.addTest(new TestSuite(TcpDiscoveryNodeConfigConsistentIdSelfTest.class));
Expand Down

0 comments on commit 6c3a486

Please sign in to comment.