Permalink
Browse files

Fixed ArrayOutOfBoundsException in FD_SOCK (https://issues.jboss.org/…

  • Loading branch information...
1 parent 9316177 commit 1bda0216c6c750bee79f1d38168b0869641436c4 @belaban committed Jan 24, 2012
Showing with 51 additions and 58 deletions.
  1. +51 −58 src/org/jgroups/protocols/FD_SOCK.java
View
109 src/org/jgroups/protocols/FD_SOCK.java
@@ -6,14 +6,12 @@
import org.jgroups.stack.IpAddress;
import org.jgroups.stack.Protocol;
import org.jgroups.util.*;
+import org.jgroups.util.ThreadFactory;
import java.io.*;
import java.net.*;
import java.util.*;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/**
@@ -93,11 +91,11 @@
/* --------------------------------------------- Fields ------------------------------------------------------ */
- private final List<Address> members=new ArrayList<Address>(11); // list of group members (updated on VIEW_CHANGE)
+ private volatile List<Address> members=new ArrayList<Address>(11); // volatile eliminates the lock
- protected final Set<Address> suspected_mbrs=new HashSet<Address>();
+ protected final Set<Address> suspected_mbrs=new CopyOnWriteArraySet<Address>();
- private final List<Address> pingable_mbrs=new ArrayList<Address>(11);
+ private final List<Address> pingable_mbrs=new CopyOnWriteArrayList<Address>();
volatile boolean srv_sock_sent=false; // has own socket been broadcast yet ?
/** Used to rendezvous on GET_CACHE and GET_CACHE_RSP */
@@ -132,9 +130,9 @@ public FD_SOCK() {
@ManagedAttribute(description="Member address")
public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}
@ManagedAttribute(description="List of cluster members")
- public String getMembers() {return members != null? members.toString() : "null";}
+ public String getMembers() {return members.toString();}
@ManagedAttribute(description="List of pingable members of a cluster")
- public String getPingableMembers() {return pingable_mbrs != null? pingable_mbrs.toString() : "null";}
+ public String getPingableMembers() {return pingable_mbrs.toString();}
@ManagedAttribute(description="Ping destination")
public String getPingDest() {return ping_dest != null? ping_dest.toString() : "null";}
@ManagedAttribute(description="Number of suspect event generated")
@@ -317,31 +315,28 @@ public Object down(Event evt) {
View v=(View) evt.getArg();
final List<Address> new_mbrs=v.getMembers();
- synchronized(this) {
- members.clear();
- members.addAll(new_mbrs);
- suspected_mbrs.retainAll(new_mbrs);
- cache.keySet().retainAll(members); // remove all entries in 'cache' which are not in the new membership
- bcast_task.adjustSuspectedMembers(members);
- pingable_mbrs.clear();
- pingable_mbrs.addAll(members);
- if(log.isDebugEnabled()) log.debug("VIEW_CHANGE received: " + members);
-
- if(members.size() > 1) {
- if(isPingerThreadRunning()) {
- Address tmp_ping_dest=determinePingDest();
- boolean hasNewPingDest = ping_dest != null && tmp_ping_dest != null && !ping_dest.equals(tmp_ping_dest);
- if(hasNewPingDest) {
- interruptPingerThread(); // allows the thread to use the new socket
- }
+ members=new_mbrs; // volatile write will ensure all reads after this see the new membership
+ suspected_mbrs.retainAll(new_mbrs);
+ cache.keySet().retainAll(new_mbrs); // remove all entries in 'cache' which are not in the new membership
+ bcast_task.adjustSuspectedMembers(new_mbrs);
+ pingable_mbrs.clear();
+ pingable_mbrs.addAll(new_mbrs);
+ if(log.isDebugEnabled()) log.debug("VIEW_CHANGE received: " + new_mbrs);
+
+ if(new_mbrs.size() > 1) {
+ if(isPingerThreadRunning()) {
+ Address tmp_ping_dest=determinePingDest();
+ boolean hasNewPingDest = ping_dest != null && tmp_ping_dest != null && !ping_dest.equals(tmp_ping_dest);
+ if(hasNewPingDest) {
+ interruptPingerThread(); // allows the thread to use the new socket
}
- else
- startPingerThread(); // only starts if not yet running
- }
- else {
- ping_dest=null;
- stopPingerThread();
}
+ else
+ startPingerThread(); // only starts if not yet running
+ }
+ else {
+ ping_dest=null;
+ stopPingerThread();
}
break;
@@ -450,14 +445,11 @@ void suspect(Set<Address> suspects) {
return;
final List<Address> eligible_mbrs=new ArrayList<Address>();
- synchronized(this) {
- for(Address suspect: suspects) {
- suspect_history.add(suspect);
- suspected_mbrs.add(suspect);
- }
- eligible_mbrs.addAll(members);
- eligible_mbrs.removeAll(suspected_mbrs);
- }
+ for(Address suspect: suspects)
+ suspect_history.add(suspect);
+ suspected_mbrs.addAll(suspects);
+ eligible_mbrs.addAll(members);
+ eligible_mbrs.removeAll(suspected_mbrs);
// Check if we're coord, then send up the stack
if(local_addr != null && !eligible_mbrs.isEmpty()) {
@@ -790,8 +782,9 @@ private Address determinePingDest() {
}
- Address determineCoordinator() {
- return !members.isEmpty()? members.get(0) : null;
+ protected Address determineCoordinator() {
+ List<Address> tmp=members;
+ return !tmp.isEmpty()? tmp.get(0) : null;
}
@@ -1123,35 +1116,35 @@ public void run() {
* any longer. Then the task terminates.
*/
private class BroadcastTask implements Runnable {
- final Set<Address> suspected_mbrs=new HashSet<Address>();
+ final Set<Address> suspects=new HashSet<Address>();
Future<?> future;
/** Adds a suspected member. Starts the task if not yet running */
public void addSuspectedMember(Address mbr) {
if(mbr == null) return;
if(!members.contains(mbr)) return;
- synchronized(suspected_mbrs) {
- if(suspected_mbrs.add(mbr))
+ synchronized(suspects) {
+ if(suspects.add(mbr))
startTask();
}
}
public void removeSuspectedMember(Address suspected_mbr) {
if(suspected_mbr == null) return;
- synchronized(suspected_mbrs) {
- suspected_mbrs.remove(suspected_mbr);
- if(suspected_mbrs.isEmpty()) {
+ synchronized(suspects) {
+ suspects.remove(suspected_mbr);
+ if(suspects.isEmpty()) {
stopTask();
}
}
}
public void removeAll() {
- synchronized(suspected_mbrs) {
- suspected_mbrs.clear();
+ synchronized(suspects) {
+ suspects.clear();
stopTask();
}
}
@@ -1182,11 +1175,11 @@ private void stopTask() {
*/
public void adjustSuspectedMembers(List<Address> new_mbrship) {
if(new_mbrship == null || new_mbrship.isEmpty()) return;
- synchronized(suspected_mbrs) {
- boolean modified=suspected_mbrs.retainAll(new_mbrship);
+ synchronized(suspects) {
+ boolean modified=suspects.retainAll(new_mbrship);
if(log.isTraceEnabled() && modified)
- log.trace("adjusted suspected_mbrs: " + suspected_mbrs);
- if(suspected_mbrs.isEmpty())
+ log.trace("adjusted suspected_mbrs: " + suspects);
+ if(suspects.isEmpty())
stopTask();
}
}
@@ -1197,17 +1190,17 @@ public void run() {
FdHeader hdr;
if(log.isTraceEnabled())
- log.trace("broadcasting SUSPECT message (suspected_mbrs=" + suspected_mbrs + ") to group");
+ log.trace("broadcasting SUSPECT message (suspected_mbrs=" + suspects + ") to group");
- synchronized(suspected_mbrs) {
- if(suspected_mbrs.isEmpty()) {
+ synchronized(suspects) {
+ if(suspects.isEmpty()) {
stopTask();
if(log.isTraceEnabled()) log.trace("task done (no suspected members)");
return;
}
hdr=new FdHeader(FdHeader.SUSPECT);
- hdr.mbrs=new HashSet<Address>(suspected_mbrs);
+ hdr.mbrs=new HashSet<Address>(suspects);
}
suspect_msg=new Message(); // mcast SUSPECT to all members
suspect_msg.setFlag(Message.OOB);

0 comments on commit 1bda021

Please sign in to comment.