Skip to content

Commit

Permalink
Make client services clean up properly, so that sockets stay fresh ov…
Browse files Browse the repository at this point in the history
…er multiple test runs.
  • Loading branch information
willscott committed May 2, 2012
1 parent 9631601 commit 469e3d2
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,40 @@

package com.aelitis.azureus.core.networkmanager.impl.tcp;

import java.net.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;

import org.gudy.azureus2.core3.config.*;
import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.*;

import com.aelitis.azureus.core.networkmanager.*;
import java.util.Arrays;
import java.util.Date;

import org.gudy.azureus2.core3.config.COConfigurationManager;
import org.gudy.azureus2.core3.config.ParameterListener;
import org.gudy.azureus2.core3.logging.LogAlert;
import org.gudy.azureus2.core3.logging.LogEvent;
import org.gudy.azureus2.core3.logging.LogIDs;
import org.gudy.azureus2.core3.logging.Logger;
import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.RandomUtils;
import org.gudy.azureus2.core3.util.SimpleTimer;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.core3.util.TimerEvent;
import org.gudy.azureus2.core3.util.TimerEventPerformer;
import org.gudy.azureus2.core3.util.TimerEventPeriodic;

import com.aelitis.azureus.core.networkmanager.ConnectionEndpoint;
import com.aelitis.azureus.core.networkmanager.Transport;
import com.aelitis.azureus.core.networkmanager.VirtualServerChannelSelector;
import com.aelitis.azureus.core.networkmanager.VirtualServerChannelSelectorFactory;
import com.aelitis.azureus.core.networkmanager.admin.NetworkAdmin;
import com.aelitis.azureus.core.networkmanager.admin.NetworkAdminPropertyChangeListener;
import com.aelitis.azureus.core.networkmanager.impl.IncomingConnectionManager;
import com.aelitis.azureus.core.networkmanager.impl.ProtocolDecoder;
import com.aelitis.azureus.core.networkmanager.impl.TransportHelperFilter;
import com.aelitis.azureus.core.networkmanager.impl.TransportCryptoManager;
import com.aelitis.azureus.core.networkmanager.impl.TransportHelperFilter;


/**
Expand All @@ -52,6 +69,12 @@ public class IncomingSocketChannelManager
private final String port_config_key;
private final String port_enable_config_key;

private final ParameterListener port_config_listener;
private final ParameterListener port_enable_config_listener;
private final ParameterListener rcvbuf_listener;
private final NetworkAdminPropertyChangeListener bind_address_listener;
private final TimerEventPeriodic periodic_listener;

private int tcp_listen_port;

private int so_rcvbuf_size = COConfigurationManager.getIntParameter( "network.tcp.socket.SO_RCVBUF" );
Expand All @@ -63,7 +86,7 @@ public class IncomingSocketChannelManager
private VirtualServerChannelSelector[] serverSelectors = new VirtualServerChannelSelector[0];
private int listenFailCounts[] = new int[0];

private IncomingConnectionManager incoming_manager = IncomingConnectionManager.getSingleton();
private final IncomingConnectionManager incoming_manager = IncomingConnectionManager.getSingleton();

protected AEMonitor this_mon = new AEMonitor( "IncomingSocketChannelManager" );

Expand All @@ -80,56 +103,62 @@ public IncomingSocketChannelManager( String _port_config_key, String _port_enabl

tcp_listen_port = COConfigurationManager.getIntParameter( port_config_key );

//allow dynamic port number changes
COConfigurationManager.addParameterListener( port_config_key, new ParameterListener() {
public void parameterChanged(String parameterName) {
int port = COConfigurationManager.getIntParameter( port_config_key );
if( port != tcp_listen_port ) {
tcp_listen_port = port;
restart();
}
}
});
// allow dynamic port number changes
port_config_listener = new ParameterListener() {
@Override
public void parameterChanged(String parameterName) {
int port = COConfigurationManager.getIntParameter(port_config_key);
if (port != tcp_listen_port) {
tcp_listen_port = port;
restart();
}
}
};
COConfigurationManager.addParameterListener(port_config_key, port_config_listener);

COConfigurationManager.addParameterListener( port_enable_config_key, new ParameterListener() {
public void parameterChanged(String parameterName) {
restart();
}
});
port_enable_config_listener = new ParameterListener() {
@Override
public void parameterChanged(String parameterName) {
restart();
}
};
COConfigurationManager.addParameterListener(port_enable_config_key,
port_enable_config_listener);

//allow dynamic receive buffer size changes
COConfigurationManager.addParameterListener( "network.tcp.socket.SO_RCVBUF", new ParameterListener() {
public void parameterChanged(String parameterName) {
int size = COConfigurationManager.getIntParameter( "network.tcp.socket.SO_RCVBUF" );
if( size != so_rcvbuf_size ) {
so_rcvbuf_size = size;
restart();
}
}
});
// allow dynamic receive buffer size changes
rcvbuf_listener = new ParameterListener() {
@Override
public void parameterChanged(String parameterName) {
int size = COConfigurationManager.getIntParameter("network.tcp.socket.SO_RCVBUF");
if (size != so_rcvbuf_size) {
so_rcvbuf_size = size;
restart();
}
}
};
COConfigurationManager
.addParameterListener("network.tcp.socket.SO_RCVBUF", rcvbuf_listener);

//allow dynamic bind address changes

NetworkAdmin.getSingleton().addPropertyChangeListener(
new NetworkAdminPropertyChangeListener()
{
public void
propertyChanged(
String property )
{
if ( property == NetworkAdmin.PR_DEFAULT_BIND_ADDRESS ){

InetAddress[] addresses = NetworkAdmin.getSingleton().getMultiHomedServiceBindAddresses();

if ( !Arrays.equals(addresses, default_bind_addresses)) {

default_bind_addresses = addresses;

restart();
}
}
}
});
bind_address_listener = new NetworkAdminPropertyChangeListener() {
@Override
public void
propertyChanged(String property) {
if (property == NetworkAdmin.PR_DEFAULT_BIND_ADDRESS) {

InetAddress[] addresses = NetworkAdmin.getSingleton()
.getMultiHomedServiceBindAddresses();

if (!Arrays.equals(addresses, default_bind_addresses)) {

default_bind_addresses = addresses;

restart();
}
}
}
};
NetworkAdmin.getSingleton().addPropertyChangeListener(bind_address_listener);


//start processing
Expand All @@ -140,9 +169,11 @@ public void parameterChanged(String parameterName) {
//it seems that sometimes under OSX that listen server sockets sometimes stop accepting incoming connections for some unknown reason
//this checker tests to make sure the listen socket is still accepting connections, and if not, recreates the socket

SimpleTimer.addPeriodicEvent("IncomingSocketChannelManager:concheck", 60 * 1000, new TimerEventPerformer()
periodic_listener = SimpleTimer.addPeriodicEvent("IncomingSocketChannelManager:concheck",
60 * 1000, new TimerEventPerformer()
{
public void
@Override
public void
perform(
TimerEvent ev )
{
Expand Down Expand Up @@ -198,6 +229,20 @@ public void parameterChanged(String parameterName) {
});
}

/**
* Stop listeners associated with this Channel Manager, so that it is safe
* to dispose of.
*/
public void close() {
COConfigurationManager.removeParameterListener(port_config_key, port_config_listener);
COConfigurationManager.removeParameterListener(port_enable_config_key,
port_enable_config_listener);
COConfigurationManager.removeParameterListener("network.tcp.socket.SO_RCVBUF",
rcvbuf_listener);
NetworkAdmin.getSingleton().removePropertyChangeListener(bind_address_listener);
periodic_listener.cancel();
}

public boolean
isEnabled()
{
Expand Down Expand Up @@ -258,7 +303,8 @@ private final class
TcpSelectListener implements
VirtualServerChannelSelector.SelectListener
{
public void newConnectionAccepted( final ServerSocketChannel server, final SocketChannel channel ) {
@Override
public void newConnectionAccepted( final ServerSocketChannel server, final SocketChannel channel ) {

InetAddress remote_ia = channel.socket().getInetAddress();

Expand All @@ -271,11 +317,13 @@ public void newConnectionAccepted( final ServerSocketChannel server, final Socke
final TCPTransportHelper helper = new TCPTransportHelper( channel );

TransportCryptoManager.getSingleton().manageCrypto( helper, null, true, null, new TransportCryptoManager.HandshakeListener() {
public void handshakeSuccess( ProtocolDecoder decoder, ByteBuffer remaining_initial_data ) {
@Override
public void handshakeSuccess( ProtocolDecoder decoder, ByteBuffer remaining_initial_data ) {
process( server.socket().getLocalPort(), decoder.getFilter());
}

public void
@Override
public void
handshakeFailure(
Throwable failure_msg )
{
Expand Down Expand Up @@ -306,19 +354,22 @@ public void handshakeSuccess( ProtocolDecoder decoder, ByteBuffer remaining_init
helper.close( "Handshake failure: " + Debug.getNestedExceptionMessage( failure_msg ));
}

public void
@Override
public void
gotSecret(
byte[] session_secret )
{
}

public int
@Override
public int
getMaximumPlainHeaderLength()
{
return( incoming_manager.getMaxMinMatchBufferSize());
}

public int
@Override
public int
matchPlainHeader(
ByteBuffer buffer )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,20 @@

package com.aelitis.azureus.core.networkmanager.impl.tcp;

import java.net.*;
import java.nio.channels.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.*;
import org.gudy.azureus2.core3.logging.LogAlert;
import org.gudy.azureus2.core3.logging.LogEvent;
import org.gudy.azureus2.core3.logging.LogIDs;
import org.gudy.azureus2.core3.logging.Logger;
import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.AEThread;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.SystemTime;

import com.aelitis.azureus.core.networkmanager.VirtualServerChannelSelector;

Expand Down Expand Up @@ -65,7 +74,8 @@ public VirtualBlockingServerChannelSelector( InetSocketAddress _bind_address, in
* Start the server and begin accepting incoming connections.
*
*/
public void start() {
@Override
public void start() {
try{
this_mon.enter();

Expand All @@ -81,7 +91,8 @@ public void start() {
if (Logger.isEnabled()) Logger.log(new LogEvent(LOGID, "TCP incoming server socket " + bind_address));

AEThread accept_thread = new AEThread( "VServerSelector:port" + bind_address.getPort() ) {
public void runSupport() {
@Override
public void runSupport() {
accept_loop();
}
};
Expand All @@ -105,12 +116,14 @@ public void runSupport() {
/**
* Stop the server.
*/
public void stop() {
@Override
public void stop() {
try{
this_mon.enter();

if( server_channel != null ) {
try {
server_channel.socket().close();
server_channel.close();
server_channel = null;
}
Expand Down Expand Up @@ -147,21 +160,24 @@ protected void accept_loop() {
* Is this selector actively running
* @return true if enabled, false if not running
*/
public boolean isRunning() {
@Override
public boolean isRunning() {
if( server_channel != null && server_channel.isOpen() ) return true;
return false;
}


public InetAddress getBoundToAddress() {
@Override
public InetAddress getBoundToAddress() {
if( server_channel != null ) {
return server_channel.socket().getInetAddress();
}
return null;
}


public long getTimeOfLastAccept() {
@Override
public long getTimeOfLastAccept() {
return last_accept_time;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.junit.BeforeClass;
import org.junit.Test;

import edu.washington.cs.oneswarm.f2f.datagram.DatagramConnection;
import edu.washington.cs.oneswarm.f2f.servicesharing.EchoServer;
import edu.washington.cs.oneswarm.f2f.servicesharing.ServiceSharingManager;
import edu.washington.cs.oneswarm.test.util.TestUtils;
Expand All @@ -39,7 +40,7 @@ public void setupLogging() {
logFinest(logger);
logFinest(ServiceSharingSingleProcessTest.logger);
logFinest(EchoServer.logger);
// logFinest(DatagramConnection.logger);
logFinest(DatagramConnection.logger);
// logFinest(DatagramRateLimitedChannelQueue.logger);
// logFinest(ReadController.logger);

Expand Down
Loading

0 comments on commit 469e3d2

Please sign in to comment.