Skip to content

Commit

Permalink
JGRP-1463: Avoid ArrayIndexOutOfBoundsException
Browse files Browse the repository at this point in the history
  • Loading branch information
dimbleby committed May 14, 2012
1 parent 58de897 commit a9e05fe
Showing 1 changed file with 78 additions and 64 deletions.
142 changes: 78 additions & 64 deletions src/org/jgroups/protocols/FD_SOCK.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,20 @@ public class FD_SOCK extends Protocol implements Runnable {
@Property(description="Used to map the internal port (bind_port) to an external port. Only used if > 0",
systemProperty=Global.EXTERNAL_PORT,writable=false)
protected int external_port=0;
@Property(name="bind_interface", converter=PropertyConverters.BindInterface.class,
description="The interface (NIC) which should be used by this transport", dependsUpon="bind_addr")

@Property(name="bind_interface", converter=PropertyConverters.BindInterface.class,
description="The interface (NIC) which should be used by this transport", dependsUpon="bind_addr")
protected String bind_interface_str=null;

@Property(description="Timeout for getting socket cache from coordinator. Default is 1000 msec")
long get_cache_timeout=1000;
long get_cache_timeout=1000;

@Property(description="Interval for broadcasting suspect messages. Default is 5000 msec")
long suspect_msg_interval=5000;
long suspect_msg_interval=5000;

@Property(description="Number of attempts coordinator is solicited for socket cache until we give up. Default is 3")
int num_tries=3;
int num_tries=3;

@Property(description="Start port for server socket. Default value of 0 picks a random port")
int start_port=0;

Expand All @@ -76,25 +76,25 @@ public class FD_SOCK extends Protocol implements Runnable {

@Property(description="Number of ports to probe for start_port and client_bind_port")
int port_range=50;

@Property(description="Whether to use KEEP_ALIVE on the ping socket or not. Default is true")
private boolean keep_alive=true;

@Property(description="Max time in millis to wait for ping Socket.connect() to return")
private int sock_conn_timeout=1000;


/* --------------------------------------------- JMX ------------------------------------------------------ */


private int num_suspect_events=0;

private final BoundedList<Address> suspect_history=new BoundedList<Address>(20);


/* --------------------------------------------- Fields ------------------------------------------------------ */


private volatile List<Address> members=new ArrayList<Address>(11); // volatile eliminates the lock

protected final Set<Address> suspected_mbrs=new CopyOnWriteArraySet<Address>();
Expand Down Expand Up @@ -128,7 +128,7 @@ public class FD_SOCK extends Protocol implements Runnable {
private boolean log_suspected_msgs=true;


public FD_SOCK() {
public FD_SOCK() {
}

@ManagedAttribute(description="Member address")
Expand Down Expand Up @@ -158,7 +158,7 @@ public String printSuspectHistory() {
}
return sb.toString();
}

@ManagedOperation
public String printCache() {
StringBuilder sb=new StringBuilder();
Expand All @@ -167,7 +167,7 @@ public String printCache() {
}
return sb.toString();
}

public void init() throws Exception {
srv_sock_handler=new ServerSocketHandler();
timer=getTransport().getTimer();
Expand All @@ -180,8 +180,8 @@ public void start() throws Exception {
super.start();
}

public void stop() {
stopPingerThread();
public void stop() {
stopPingerThread();
stopServerSocket(true); // graceful close
bcast_task.removeAll();
suspected_mbrs.clear();
Expand Down Expand Up @@ -297,9 +297,9 @@ public Object down(Event evt) {
break;

case Event.CONNECT:
case Event.CONNECT_WITH_STATE_TRANSFER:
case Event.CONNECT_WITH_STATE_TRANSFER:
case Event.CONNECT_USE_FLUSH:
case Event.CONNECT_WITH_STATE_TRANSFER_USE_FLUSH:
case Event.CONNECT_WITH_STATE_TRANSFER_USE_FLUSH:
Object ret=down_prot.down(evt);
try {
startServerSocket();
Expand All @@ -309,7 +309,7 @@ public Object down(Event evt) {
}
return ret;

case Event.DISCONNECT:
case Event.DISCONNECT:
stopServerSocket(true); // graceful close
break;

Expand Down Expand Up @@ -361,7 +361,7 @@ public Object down(Event evt) {
* nothing happens. In both cases, a new member to be monitored will be chosen and monitoring continues (unless
* there are fewer than 2 members).
*/
public void run() {
public void run() {

// 1. Broadcast my own addr:sock to all members so they can update their cache
if(!srv_sock_sent) {
Expand All @@ -383,21 +383,21 @@ public void run() {
while(isPingerThreadRunning()) {
regular_sock_close=false;
ping_dest=determinePingDest(); // gets the neighbor to our right

if(log.isDebugEnabled())
log.debug("ping_dest is " + ping_dest + ", pingable_mbrs=" + pingable_mbrs);
log.debug("ping_dest is " + ping_dest + ", pingable_mbrs=" + pingable_mbrs);

if(ping_dest == null || !isPingerThreadRunning())
break;
IpAddress ping_addr=fetchPingAddress(ping_dest);
if(ping_addr == null) {

IpAddress ping_addr=fetchPingAddress(ping_dest);

if(ping_addr == null) {
if(log.isTraceEnabled()) log.trace("socket address for " + ping_dest + " could not be fetched, retrying");
Util.sleep(1000);
continue;
}
}

if(!setupPingSocket(ping_addr) && isPingerThreadRunning()) {
// covers use cases #7 and #8 in ManualTests.txt
if(log.isDebugEnabled()) log.debug("could not create socket to " + ping_dest);
Expand Down Expand Up @@ -435,9 +435,9 @@ public void run() {
}
if(log.isTraceEnabled()) log.trace("pinger thread terminated");
}

private synchronized boolean isPingerThreadRunning(){
return pinger_thread != null && pinger_thread.isAlive() && !pinger_thread.isInterrupted();
return pinger_thread != null && pinger_thread.isAlive() && !pinger_thread.isInterrupted();
}


Expand Down Expand Up @@ -490,12 +490,12 @@ void handleSocketClose(Exception ex) {
/**
* Does *not* need to be synchronized on pinger_mutex because the caller (down()) already has the mutex acquired
*/
private synchronized void startPingerThread() {
private synchronized void startPingerThread() {
if(!isPingerThreadRunning()) {
ThreadFactory factory=getThreadFactory();
pinger_thread=factory.newThread(this, "FD_SOCK pinger");
pinger_thread.setDaemon(true);
pinger_thread.start();
pinger_thread=factory.newThread(this, "FD_SOCK pinger");
pinger_thread.setDaemon(true);
pinger_thread.start();
}
}

Expand Down Expand Up @@ -530,7 +530,7 @@ private synchronized void stopPingerThread() {

ping_addr_promise.setResult(null);
get_cache_promise.setResult(null);

sendPingTermination(); // PATCH by Bruce Schuchardt (http://jira.jboss.com/jira/browse/JGRP-246)
teardownPingSocket();
}
Expand Down Expand Up @@ -569,7 +569,7 @@ void startServerSocket() throws Exception {
"jgroups.fd_sock.srv_sock", bind_addr, start_port, start_port+port_range); // grab a random unused port above 10000
srv_sock_addr=new IpAddress(external_addr != null? external_addr : bind_addr, external_port > 0? external_port : srv_sock.getLocalPort());
if(srv_sock_handler != null) {
srv_sock_handler.start(); // won't start if already running
srv_sock_handler.start(); // won't start if already running
}
}

Expand Down Expand Up @@ -743,22 +743,22 @@ private IpAddress fetchPingAddress(Address mbr) {
return ret;

if(!isPingerThreadRunning()) return null;

// 2. Try to get the server socket address from mbr
ping_addr_promise.reset();
ping_addr_req=new Message(mbr, null, null); // unicast
ping_addr_req.setFlag(Message.OOB);
hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
hdr.mbr=mbr;
ping_addr_req.putHeader(this.id, hdr);
down_prot.down(new Event(Event.MSG, ping_addr_req));
down_prot.down(new Event(Event.MSG, ping_addr_req));
ret=ping_addr_promise.getResult(500);
if(ret != null) {
return ret;
}

if(!isPingerThreadRunning()) return null;

// 3. Try to get the server socket address from all members
ping_addr_req=new Message(null); // multicast
ping_addr_req.setFlag(Message.OOB);
Expand All @@ -772,19 +772,33 @@ private IpAddress fetchPingAddress(Address mbr) {


private Address determinePingDest() {
Address tmp;
Address first_mbr = null;
boolean several_mbrs = false;
boolean found_local_addr = false;

if(pingable_mbrs == null || pingable_mbrs.size() < 2 || local_addr == null)
if(pingable_mbrs == null || local_addr == null)
return null;
for(int i=0; i < pingable_mbrs.size(); i++) {
tmp=pingable_mbrs.get(i);
if(local_addr.equals(tmp)) {
if(i + 1 >= pingable_mbrs.size())
return pingable_mbrs.get(0);
else
return pingable_mbrs.get(i + 1);

// Look for the pingable member who follows the local_addr
for(Address tmp: pingable_mbrs) {
if(found_local_addr)
return tmp;

if(first_mbr == null) {
first_mbr = tmp;
}
else {
several_mbrs = true;
}

if (tmp.equals(local_addr))
found_local_addr = true;
}

// If the local address was the last in the list, then wrap.
if (found_local_addr && several_mbrs)
return first_mbr;

return null;
}

Expand Down Expand Up @@ -841,7 +855,7 @@ public FdHeader(byte type, Address mbr, IpAddress sock_addr) {
this.mbr=mbr;
this.sock_addr=sock_addr;
}

public FdHeader(byte type, Set<Address> mbrs) {
this.type=type;
this.mbrs=mbrs;
Expand Down Expand Up @@ -890,12 +904,12 @@ public int size() {
int retval=Global.BYTE_SIZE; // type
retval+=Util.size(mbr);

// use of Util.size(Address) with IpAddress overestimates size by one byte.
// use of Util.size(Address) with IpAddress overestimates size by one byte.
// replace: retval+=Util.size(sock_addr); with the following:
int ipaddr_size = 0 ;
ipaddr_size += Global.BYTE_SIZE ; // presence byte
ipaddr_size += Global.BYTE_SIZE ; // presence byte
if (sock_addr != null)
ipaddr_size += sock_addr.size(); // IpAddress size
ipaddr_size += sock_addr.size(); // IpAddress size
retval += ipaddr_size ;

retval+=Global.INT_SIZE; // cachedAddrs size
Expand Down Expand Up @@ -990,15 +1004,15 @@ private class ServerSocketHandler implements Runnable {
String getName() {
return acceptor != null? acceptor.getName() : null;
}

ServerSocketHandler() {
start();
}

final void start() {
if(acceptor == null) {
acceptor=getThreadFactory().newThread(this, "FD_SOCK server socket acceptor");
acceptor.setDaemon(true);
acceptor=getThreadFactory().newThread(this, "FD_SOCK server socket acceptor");
acceptor.setDaemon(true);
acceptor.start();
}
}
Expand Down Expand Up @@ -1036,8 +1050,8 @@ public void run() {
log.trace("accepted connection from " + client_sock.getInetAddress() + ':' + client_sock.getPort());
ClientConnectionHandler client_conn_handler=new ClientConnectionHandler(client_sock, clients);
Thread t = getThreadFactory().newThread(client_conn_handler, "FD_SOCK client connection handler");
t.setDaemon(true);
t.setDaemon(true);

synchronized(clients) {
clients.add(client_conn_handler);
}
Expand All @@ -1060,7 +1074,7 @@ private static class ClientConnectionHandler implements Runnable {
final Object mutex=new Object();
final List<ClientConnectionHandler> clients;

ClientConnectionHandler(Socket client_sock, List<ClientConnectionHandler> clients) {
ClientConnectionHandler(Socket client_sock, List<ClientConnectionHandler> clients) {
this.client_sock=client_sock;
this.clients=clients;
}
Expand Down Expand Up @@ -1155,7 +1169,7 @@ public void removeAll() {
stopTask();
}
}


private void startTask() {
if(future == null || future.isDone()) {
Expand Down

0 comments on commit a9e05fe

Please sign in to comment.