Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: belaban/JGroups
base: 0977fb3
...
head fork: belaban/JGroups
compare: 88e3228
Checking mergeability… Don't worry, you can still create the pull request.
  • 2 commits
  • 1 file changed
  • 0 commit comments
  • 2 contributors
Commits on May 23, 2012
@dimbleby dimbleby Send upwards message to the upwards protocol 6e37c51
Commits on May 29, 2012
@belaban Merge pull request #47 from dimbleby/trivia
Send upwards message to the upwards protocol
88e3228
Showing with 37 additions and 37 deletions.
  1. +37 −37 src/org/jgroups/protocols/pbcast/GMS.java
View
74 src/org/jgroups/protocols/pbcast/GMS.java
@@ -24,7 +24,7 @@
* Group membership protocol. Handles joins/leaves/crashes (suspicions) and
* emits new views accordingly. Use VIEW_ENFORCER on top of this layer to make
* sure new members don't receive any messages until they are members
- *
+ *
* @author Bela Ban
*/
@MBean(description="Group membership protocol")
@@ -40,7 +40,7 @@
@Property(description="Leave timeout")
long leave_timeout=5000;
-
+
@Property(description="Timeout (in ms) to complete merge")
long merge_timeout=5000; // time to wait for all MERGE_RSPS
@@ -49,7 +49,7 @@
@Property(description="Print physical address(es) on startup")
private boolean print_physical_addrs=true;
-
+
/**
* Setting this to false disables concurrent startups. This is only used by
* unit testing code for testing merging. To everybody else: don't change it
@@ -57,7 +57,7 @@
*/
@Property(description="Temporary switch. Default is true and should not be changed")
boolean handle_concurrent_startup=true;
-
+
/**
* Whether view bundling (http://jira.jboss.com/jira/browse/JGRP-144) should
* be enabled or not. Setting this to false forces each JOIN/LEAVE/SUPSECT
@@ -66,25 +66,25 @@
*/
@Property(description="View bundling toggle")
private boolean view_bundling=true;
-
+
@Property(description="Max view bundling timeout if view bundling is turned on. Default is 50 msec")
private long max_bundling_time=50; // 50ms max to wait for other JOIN, LEAVE or SUSPECT requests
-
+
@Property(description="Max number of old members to keep in history. Default is 50")
protected int num_prev_mbrs=50;
@Property(description="Number of views to store in history")
int num_prev_views=20;
-
+
@Property(description="Time in ms to wait for all VIEW acks (0 == wait forever. Default is 2000 msec" )
long view_ack_collection_timeout=2000;
-
+
@Property(description="Timeout to resume ViewHandler")
long resume_task_timeout=20000;
@Property(description="Use flush for view changes. Default is true")
boolean use_flush_if_present=true;
-
+
@Property(description="Logs failures for collecting all view acks if true")
boolean log_collect_msgs=true;
@@ -93,8 +93,8 @@
/* --------------------------------------------- JMX ---------------------------------------------- */
-
-
+
+
private int num_views=0;
/** Stores the last 20 views */
@@ -105,7 +105,7 @@
@Property(converter=PropertyConverters.FlushInvoker.class,name="flush_invoker_class")
protected Class<Callable<Boolean>> flushInvokerClass;
-
+
private GmsImpl impl=null;
private final Object impl_mutex=new Object(); // synchronizes event entry into impl
private final Map<String,GmsImpl> impls=new HashMap<String,GmsImpl>(3);
@@ -115,7 +115,7 @@
protected Address local_addr=null;
protected final Membership members=new Membership(); // real membership
-
+
private final Membership tmp_members=new Membership(); // base for computing next view
/** Members joined but for which no view has been received yet */
@@ -128,10 +128,10 @@
private BoundedList<Address> prev_members=null;
protected View view=null;
-
+
protected long ltime=0;
- protected TimeScheduler timer=null;
+ protected TimeScheduler timer=null;
/** Class to process JOIN, LEAVE and MERGE requests */
private final ViewHandler view_handler=new ViewHandler();
@@ -305,7 +305,7 @@ public void resetStats() {
public List<Integer> providedDownServices() {
return Arrays.asList(Event.IS_MERGE_IN_PROGRESS);
}
-
+
public void setImpl(GmsImpl new_impl) {
synchronized(impl_mutex) {
if(impl == new_impl) // unnecessary ?
@@ -479,8 +479,8 @@ public void castViewChange(View new_view, Digest digest, JoinRsp jr, Collection<
// Send down a local TMP_VIEW event. This is needed by certain layers (e.g. NAKACK) to compute correct digest
// in case client's next request (e.g. getState()) reaches us *before* our own view change multicast.
- // Check NAKACK's TMP_VIEW handling for details
- down_prot.up(new Event(Event.TMP_VIEW, new_view));
+ // Check NAKACK's TMP_VIEW handling for details
+ up_prot.up(new Event(Event.TMP_VIEW, new_view));
down_prot.down(new Event(Event.TMP_VIEW, new_view));
List<Address> ackMembers=new ArrayList<Address>(new_view.getMembers());
@@ -543,10 +543,10 @@ public void castViewChange(View new_view, Digest digest, JoinRsp jr, Collection<
}
public void sendJoinResponse(JoinRsp rsp, Address dest) {
- Message m=new Message(dest, null, null);
+ Message m=new Message(dest, null, null);
GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP, rsp);
m.putHeader(this.id, hdr);
- getDownProtocol().down(new Event(Event.MSG, m));
+ getDownProtocol().down(new Event(Event.MSG, m));
}
@@ -748,7 +748,7 @@ void stopFlush() {
up_prot.up(new Event(Event.RESUME));
}
}
-
+
void stopFlush(List<Address> members) {
if(log.isTraceEnabled()){
log.trace(local_addr + ": sending RESUME event");
@@ -771,7 +771,7 @@ public Object up(Event evt) {
break;
case GmsHeader.JOIN_REQ_WITH_STATE_TRANSFER:
view_handler.add(new Request(Request.JOIN_WITH_STATE_TRANSFER, hdr.mbr, false, null, hdr.useFlushIfPresent));
- break;
+ break;
case GmsHeader.JOIN_RSP:
impl.handleJoinResponse(hdr.join_rsp);
break;
@@ -813,7 +813,7 @@ public Object up(Event evt) {
if(log.isTraceEnabled()) {
log.trace(local_addr + ": got merge response from " + msg.getSrc() +
", merge_id=" + hdr.merge_id + ", merge data is "+ merge_data);
- }
+ }
impl.handleMergeResponse(merge_data, hdr.merge_id);
break;
@@ -825,14 +825,14 @@ public Object up(Event evt) {
Digest tmp=hdr.my_digest;
down_prot.down(new Event(Event.MERGE_DIGEST, tmp));
break;
-
- case GmsHeader.INSTALL_MERGE_VIEW_OK:
+
+ case GmsHeader.INSTALL_MERGE_VIEW_OK:
//[JGRP-700] - FLUSH: flushing should span merge
- merge_ack_collector.ack(msg.getSrc());
- break;
+ merge_ack_collector.ack(msg.getSrc());
+ break;
case GmsHeader.CANCEL_MERGE:
- //[JGRP-524] - FLUSH and merge: flush doesn't wrap entire merge process
+ //[JGRP-524] - FLUSH and merge: flush doesn't wrap entire merge process
impl.handleMergeCancelled(hdr.merge_id);
break;
@@ -878,7 +878,7 @@ public Object up(Event evt) {
view_handler.add(new Request(Request.SUSPECT, suspected, true));
ack_collector.suspect(suspected);
merge_ack_collector.suspect(suspected);
- return retval;
+ return retval;
case Event.UNSUSPECT:
impl.unsuspect((Address)evt.getArg());
@@ -896,7 +896,7 @@ public Object up(Event evt) {
-
+
@SuppressWarnings("unchecked")
public Object down(Event evt) {
int type=evt.getType();
@@ -939,7 +939,7 @@ public Object down(Event evt) {
return e;
}
return null; // don't pass down: event has already been passed down
-
+
case Event.DISCONNECT:
impl.leave((Address)evt.getArg());
if(!(impl instanceof CoordGmsImpl)) {
@@ -951,7 +951,7 @@ public Object down(Event evt) {
case Event.CONFIG :
Map<String,Object> config=(Map<String,Object>)evt.getArg();
if((config != null && config.containsKey("flush_supported"))){
- flushProtocolInStack=true;
+ flushProtocolInStack=true;
}
break;
@@ -1047,9 +1047,9 @@ public GmsHeader(byte type, Address mbr,boolean useFlushIfPresent) {
this.mbr=mbr;
this.useFlushIfPresent = useFlushIfPresent;
}
-
+
public GmsHeader(byte type, Address mbr) {
- this(type,mbr,true);
+ this(type,mbr,true);
}
public GmsHeader(byte type, Collection<Address> mbrs) {
@@ -1204,7 +1204,7 @@ public int size() {
retval+=Global.BYTE_SIZE; // presence for merge_id
if(merge_id != null)
retval+=merge_id.size();
-
+
retval+=Global.BYTE_SIZE; // boolean useFlushIfPresent
return retval;
}
@@ -1392,12 +1392,12 @@ private void process(List<Request> requests) {
case Request.JOIN:
case Request.JOIN_WITH_STATE_TRANSFER:
case Request.LEAVE:
- case Request.SUSPECT:
+ case Request.SUSPECT:
impl.handleMembershipChange(requests);
break;
case Request.MERGE:
impl.merge(firstReq.views);
- break;
+ break;
default:
log.error("request " + firstReq.type + " is unknown; discarded");
}

No commit comments for this range

Something went wrong with that request. Please try again.