Permalink
Browse files

- Added INTERNAL flag (https://issues.jboss.org/browse/JGRP-1599)

- Added internal thread pool to TP (https://issues.jboss.org/browse/JGRP-1599)
- Replaced Message.OOB with Message.Flag.OOB
- Replaced Message.DONT_BUNDLE with Message.Flag.DONT_BUNDLE
- Failure detection protocols now use INTERNAL flag
- Flow control protocols now use INTERNAL flag
- Membership protocols now use INTERNAL flag
- Stability protocol now uses INTERNAL flag
- MERGE3 protocol now uses INTERNAL flag
- Reliable retransmission protocols now use INTERNAL flag
- Sending discovery messages with DONT_BUNDLE (required)
  • Loading branch information...
1 parent 1cf79de commit 329f08bbff00f0f8a71123c68a3200804effb045 @belaban committed Feb 28, 2013
Showing with 467 additions and 545 deletions.
  1. +35 −65 src/org/jgroups/Message.java
  2. +1 −1 src/org/jgroups/auth/DemoToken.java
  3. +1 −1 src/org/jgroups/blocks/RequestOptions.java
  4. +2 −6 src/org/jgroups/protocols/AUTH.java
  5. +6 −9 src/org/jgroups/protocols/COUNTER.java
  6. +1 −1 src/org/jgroups/protocols/DAISYCHAIN.java
  7. +8 −5 src/org/jgroups/protocols/Discovery.java
  8. +4 −6 src/org/jgroups/protocols/ENCRYPT.java
  9. +15 −44 src/org/jgroups/protocols/Executing.java
  10. +4 −2 src/org/jgroups/protocols/FC.java
  11. +3 −3 src/org/jgroups/protocols/FD.java
  12. +1 −1 src/org/jgroups/protocols/FD_ALL.java
  13. +7 −18 src/org/jgroups/protocols/FD_SOCK.java
  14. +1 −4 src/org/jgroups/protocols/FORWARD_TO_COORD.java
  15. +6 −6 src/org/jgroups/protocols/FlowControl.java
  16. +19 −36 src/org/jgroups/protocols/Locking.java
  17. +5 −9 src/org/jgroups/protocols/MERGE3.java
  18. +2 −2 src/org/jgroups/protocols/PRIO.java
  19. +4 −9 src/org/jgroups/protocols/RELAY.java
  20. +3 −5 src/org/jgroups/protocols/RSVP.java
  21. +2 −2 src/org/jgroups/protocols/SCOPE.java
  22. +7 −12 src/org/jgroups/protocols/SEQUENCER.java
  23. +2 −3 src/org/jgroups/protocols/STOMP.java
  24. +128 −35 src/org/jgroups/protocols/TP.java
  25. +2 −2 src/org/jgroups/protocols/UDP.java
  26. +6 −7 src/org/jgroups/protocols/UNICAST.java
  27. +11 −25 src/org/jgroups/protocols/UNICAST2.java
  28. +8 −9 src/org/jgroups/protocols/UNICAST3.java
  29. +3 −4 src/org/jgroups/protocols/VERIFY_SUSPECT.java
  30. +2 −3 src/org/jgroups/protocols/pbcast/ClientGmsImpl.java
  31. +2 −4 src/org/jgroups/protocols/pbcast/CoordGmsImpl.java
  32. +10 −17 src/org/jgroups/protocols/pbcast/FLUSH.java
  33. +5 −10 src/org/jgroups/protocols/pbcast/GMS.java
  34. +1 −1 src/org/jgroups/protocols/pbcast/GmsImpl.java
  35. +8 −14 src/org/jgroups/protocols/pbcast/Merger.java
  36. +6 −9 src/org/jgroups/protocols/pbcast/NAKACK.java
  37. +5 −5 src/org/jgroups/protocols/pbcast/NAKACK2.java
  38. +2 −5 src/org/jgroups/protocols/pbcast/ParticipantGmsImpl.java
  39. +3 −6 src/org/jgroups/protocols/pbcast/STABLE.java
  40. +1 −2 src/org/jgroups/protocols/pbcast/STATE.java
  41. +3 −6 src/org/jgroups/protocols/pbcast/STATE_TRANSFER.java
  42. +3 −6 src/org/jgroups/protocols/pbcast/StreamingStateTransfer.java
  43. +3 −4 src/org/jgroups/protocols/relay/RELAY2.java
  44. +11 −27 src/org/jgroups/protocols/tom/TOA.java
  45. +12 −3 src/org/jgroups/util/MessageBatch.java
  46. +6 −1 tests/junit-functional/org/jgroups/blocks/RpcDispatcherTest.java
  47. +1 −1 tests/junit-functional/org/jgroups/protocols/NAKACK_Delivery_Test.java
  48. +1 −1 tests/junit-functional/org/jgroups/protocols/NAKACK_StressTest.java
  49. +1 −1 tests/junit-functional/org/jgroups/protocols/UNICAST_OOB_Test.java
  50. +1 −1 tests/junit-functional/org/jgroups/tests/DynamicDiscardTest.java
  51. +40 −40 tests/junit-functional/org/jgroups/tests/MessageTest.java
  52. +1 −1 tests/junit-functional/org/jgroups/tests/NakReceiverWindowTest.java
  53. +2 −2 tests/junit/org/jgroups/blocks/executor/ExecutingServiceTest.java
  54. +2 −2 tests/junit/org/jgroups/tests/ConnectStressTest.java
  55. +1 −1 tests/junit/org/jgroups/tests/Deadlock2Test.java
  56. +2 −2 tests/junit/org/jgroups/tests/DuplicateTest.java
  57. +1 −1 tests/junit/org/jgroups/tests/NAKACK_Test.java
  58. +10 −10 tests/junit/org/jgroups/tests/OOBTest.java
  59. +5 −8 tests/junit/org/jgroups/tests/UUIDCacheClearTest.java
  60. +1 −1 tests/other/org/jgroups/tests/MessageDispatcherSpeedTest.java
  61. +2 −2 tests/other/org/jgroups/tests/PingPong.java
  62. +2 −2 tests/other/org/jgroups/tests/RpcDispatcherSpeedTest.java
  63. +1 −1 tests/other/org/jgroups/tests/UnicastTest.java
  64. +2 −2 tests/other/org/jgroups/tests/UnicastTestRpc.java
  65. +7 −7 tests/perf/org/jgroups/tests/perf/UPerf.java
  66. +4 −4 tests/perf/org/jgroups/tests/perf/UUPerf.java
View
100 src/org/jgroups/Message.java
@@ -62,23 +62,24 @@
SCOPED( (short)(1 << 3)), // when a message has a scope
NO_RELIABILITY((short)(1 << 4)), // bypass UNICAST(2) and NAKACK
NO_TOTAL_ORDER((short)(1 << 5)), // bypass total order (e.g. SEQUENCER)
- NO_RELAY((short) (1 << 6)), // bypass relaying (RELAY)
- RSVP((short) (1 << 7)); // ack of a multicast (https://issues.jboss.org/browse/JGRP-1389)
+ NO_RELAY( (short)(1 << 6)), // bypass relaying (RELAY)
+ RSVP( (short)(1 << 7)), // ack of a multicast (https://issues.jboss.org/browse/JGRP-1389)
+ INTERNAL( (short)(1 << 8)); // for internal use by JGroups only, don't use !
final short value;
Flag(short value) {this.value=value;}
public short value() {return value;}
}
- public static final Flag OOB=Flag.OOB;
- public static final Flag DONT_BUNDLE=Flag.DONT_BUNDLE;
- public static final Flag NO_FC=Flag.NO_FC;
- public static final Flag SCOPED=Flag.SCOPED;
- public static final Flag NO_RELIABILITY=Flag.NO_RELIABILITY;
- public static final Flag NO_TOTAL_ORDER=Flag.NO_TOTAL_ORDER;
- public static final Flag NO_RELAY=Flag.NO_RELAY;
- public static final Flag RSVP=Flag.RSVP;
+ @Deprecated public static final Flag OOB=Flag.OOB;
+ @Deprecated public static final Flag DONT_BUNDLE=Flag.DONT_BUNDLE;
+ @Deprecated public static final Flag NO_FC=Flag.NO_FC;
+ @Deprecated public static final Flag SCOPED=Flag.SCOPED;
+ @Deprecated public static final Flag NO_RELIABILITY=Flag.NO_RELIABILITY;
+ @Deprecated public static final Flag NO_TOTAL_ORDER=Flag.NO_TOTAL_ORDER;
+ @Deprecated public static final Flag NO_RELAY=Flag.NO_RELAY;
+ @Deprecated public static final Flag RSVP=Flag.RSVP;
@@ -91,7 +92,8 @@
public short value() {return value;}
}
-
+
+ @Deprecated
public static final TransientFlag OOB_DELIVERED=TransientFlag.OOB_DELIVERED; // OOB which has already been delivered up the stack
@@ -837,67 +839,35 @@ public long size() {
public static String flagsToString(short flags) {
StringBuilder sb=new StringBuilder();
boolean first=true;
- if(isFlagSet(flags, Flag.OOB)) {
- first=false;
- sb.append("OOB");
- }
- if(isFlagSet(flags, Flag.DONT_BUNDLE)) {
- if(!first)
- sb.append("|");
- else
- first=false;
- sb.append("DONT_BUNDLE");
- }
- if(isFlagSet(flags, Flag.NO_FC)) {
- if(!first)
- sb.append("|");
- else
- first=false;
- sb.append("NO_FC");
- }
- if(isFlagSet(flags, Flag.SCOPED)) {
- if(!first)
- sb.append("|");
- else
- first=false;
- sb.append("SCOPED");
- }
- if(isFlagSet(flags, Flag.NO_RELIABILITY)) {
- if(!first)
- sb.append("|");
- else
- first=false;
- sb.append("NO_RELIABILITY");
- }
- if(isFlagSet(flags, Flag.NO_TOTAL_ORDER)) {
- if(!first)
- sb.append("|");
- else
- first=false;
- sb.append("NO_TOTAL_ORDER");
- }
- if(isFlagSet(flags, Flag.NO_RELAY)) {
- if(!first)
- sb.append("|");
- else
- first=false;
- sb.append("NO_RELAY");
- }
- if(isFlagSet(flags, Flag.RSVP)) {
- if(!first)
- sb.append("|");
- else
- first=false;
- sb.append("RSVP");
+
+ Flag[] all_flags=Flag.values();
+ for(Flag flag: all_flags) {
+ if(isFlagSet(flags, flag)) {
+ if(first)
+ first=false;
+ else
+ sb.append("|");
+ sb.append(flag);
+ }
}
return sb.toString();
}
public String transientFlagsToString() {
StringBuilder sb=new StringBuilder();
- if(isTransientFlagSet(TransientFlag.OOB_DELIVERED))
- sb.append("OOB_DELIVERED");
+ boolean first=true;
+
+ TransientFlag[] all_flags=TransientFlag.values();
+ for(TransientFlag flag: all_flags) {
+ if(isTransientFlagSet(flag)) {
+ if(first)
+ first=false;
+ else
+ sb.append("|");
+ sb.append(flag);
+ }
+ }
return sb.toString();
}
View
2 src/org/jgroups/auth/DemoToken.java
@@ -42,7 +42,7 @@ public boolean authenticate(AuthToken token, Message msg) {
Address sender=msg.getSrc();
// 1. send a challenge to the sender
- Message challenge=new Message(sender).setFlag(Message.OOB);
+ Message challenge=new Message(sender).setFlag(Message.Flag.OOB);
byte[] buf=generateRandomBytes();
DemoHeader hdr=new DemoHeader(buf);
challenge.putHeader(ID, hdr);
View
2 src/org/jgroups/blocks/RequestOptions.java
@@ -31,7 +31,7 @@
protected short scope;
/** The flags set in the message in which a request is sent */
- protected short flags; // Message.OOB, Message.DONT_BUNDLE etc
+ protected short flags; // Message.Flag.OOB, Message.Flag.DONT_BUNDLE etc
/** A list of members which should be excluded from a call */
protected Set<Address> exclusion_list;
View
8 src/org/jgroups/protocols/AUTH.java
@@ -185,17 +185,13 @@ protected void sendJoinRejectionMessage(Address dest, String error_msg) {
if(dest == null)
return;
- Message msg = new Message(dest, null, null);
JoinRsp joinRes=new JoinRsp(error_msg); // specify the error message on the JoinRsp
-
- GMS.GmsHeader gmsHeader=new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP, joinRes);
- msg.putHeader(gms_id, gmsHeader);
+ Message msg = new Message(dest).putHeader(gms_id, new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP, joinRes));
down_prot.down(new Event(Event.MSG, msg));
}
protected void sendMergeRejectionMessage(Address dest) {
- Message msg=new Message(dest, null, null);
- msg.setFlag(Message.OOB);
+ Message msg=new Message(dest).setFlag(Message.Flag.OOB);
GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP);
hdr.setMergeRejected(true);
msg.putHeader(gms_id, hdr);
View
15 src/org/jgroups/protocols/COUNTER.java
@@ -431,10 +431,9 @@ protected Owner getOwner() {
protected void sendRequest(Address dest, Request req) {
try {
Buffer buffer=requestToBuffer(req);
- Message msg=new Message(dest, null, buffer);
- msg.putHeader(id, new CounterHeader());
+ Message msg=new Message(dest, buffer).putHeader(id, new CounterHeader());
if(bypass_bundling)
- msg.setFlag(Message.DONT_BUNDLE);
+ msg.setFlag(Message.Flag.DONT_BUNDLE);
if(log.isTraceEnabled())
log.trace("[" + local_addr + "] --> [" + (dest == null? "ALL" : dest) + "] " + req);
@@ -449,10 +448,9 @@ protected void sendRequest(Address dest, Request req) {
protected void sendResponse(Address dest, Response rsp) {
try {
Buffer buffer=responseToBuffer(rsp);
- Message rsp_msg=new Message(dest, null, buffer);
- rsp_msg.putHeader(id, new CounterHeader());
+ Message rsp_msg=new Message(dest, buffer).putHeader(id, new CounterHeader());
if(bypass_bundling)
- rsp_msg.setFlag(Message.DONT_BUNDLE);
+ rsp_msg.setFlag(Message.Flag.DONT_BUNDLE);
if(log.isTraceEnabled())
log.trace("[" + local_addr + "] --> [" + dest + "] " + rsp);
@@ -480,10 +478,9 @@ protected void updateBackups(String name, long value, long version) {
protected void send(Address dest, Buffer buffer) {
try {
- Message rsp_msg=new Message(dest, null, buffer);
- rsp_msg.putHeader(id, new CounterHeader());
+ Message rsp_msg=new Message(dest, buffer).putHeader(id, new CounterHeader());
if(bypass_bundling)
- rsp_msg.setFlag(Message.DONT_BUNDLE);
+ rsp_msg.setFlag(Message.Flag.DONT_BUNDLE);
down_prot.down(new Event(Event.MSG, rsp_msg));
}
catch(Exception ex) {
View
2 src/org/jgroups/protocols/DAISYCHAIN.java
@@ -101,7 +101,7 @@ public Object down(final Event evt) {
if(msg.getSrc() == null)
msg.setSrc(local_addr);
- Executor pool=msg.isFlagSet(Message.OOB)? oob_pool : default_pool;
+ Executor pool=msg.isFlagSet(Message.Flag.OOB)? oob_pool : default_pool;
pool.execute(new Runnable() {
public void run() {
up_prot.up(evt);
View
13 src/org/jgroups/protocols/Discovery.java
@@ -259,8 +259,10 @@ public void sendDiscoveryRequest(String cluster_name, Promise promise, ViewId vi
Collection<PhysicalAddress> cluster_members=fetchClusterMembers(cluster_name);
if(cluster_members == null) {
- // multicast msg
- Message msg=new Message(null).setFlag(Message.OOB, Message.Flag.DONT_BUNDLE).putHeader(getId(), hdr);
+ // message needs to have DONT_BUNDLE flag: if A sends message M to B, and we need to fetch B's physical
+ // address, then the bundler thread blocks until the discovery request has returned. However, we cannot send
+ // the discovery *request* until the bundler thread has returned from sending M
+ Message msg=new Message(null).setFlag(Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE).putHeader(getId(), hdr);
sendMcastDiscoveryRequest(msg);
}
else {
@@ -281,7 +283,9 @@ public void sendDiscoveryRequest(String cluster_name, Promise promise, ViewId vi
for(final Address addr: cluster_members) {
if(addr.equals(physical_addr)) // no need to send the request to myself
continue;
- final Message msg=new Message(addr).setFlag(Message.OOB, Message.Flag.DONT_BUNDLE).putHeader(this.id, hdr);
+ // the message needs to be DONT_BUNDLE, see explanation above
+ final Message msg=new Message(addr).setFlag(Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE)
+ .putHeader(this.id, hdr);
if(log.isTraceEnabled())
log.trace(local_addr + ": sending discovery request to " + msg.getDest());
if(!sendDiscoveryRequestsInParallel()) {
@@ -634,9 +638,8 @@ protected void sendDiscoveryResponse(Address logical_addr, List<PhysicalAddress>
data=new PingData(logical_addr, null, view_id, is_server, logical_name, physical_addrs);
}
- final Message rsp_msg=new Message(sender).setFlag(Message.OOB);
final PingHeader rsp_hdr=new PingHeader(PingHeader.GET_MBRS_RSP, data);
- rsp_msg.putHeader(this.id, rsp_hdr);
+ final Message rsp_msg=new Message(sender).setFlag(Message.Flag.INTERNAL).putHeader(this.id, rsp_hdr);
if(stagger_timeout > 0) {
int view_size=view != null? view.size() : 10;
View
10 src/org/jgroups/protocols/ENCRYPT.java
@@ -842,9 +842,8 @@ private void sendSecretKey(SecretKey secret, PublicKey pubKey, Address source) t
//encrypt current secret key
byte[] encryptedKey=tmp.doFinal(secret.getEncoded());
- newMsg=new Message(source, local_addr, encryptedKey);
-
- newMsg.putHeader(this.id, new EncryptHeader(EncryptHeader.SECRETKEY, getSymVersion()));
+ newMsg=new Message(source, local_addr, encryptedKey)
+ .putHeader(this.id, new EncryptHeader(EncryptHeader.SECRETKEY, getSymVersion()));
if(log.isDebugEnabled())
log.debug(" Sending version " + getSymVersion() + " encoded key to client");
@@ -860,9 +859,8 @@ private Message sendKeyRequest() {
// send client's public key to server and request
// server's public key
- Message newMsg=new Message(keyServerAddr, local_addr, Kpair.getPublic().getEncoded());
-
- newMsg.putHeader(this.id,new EncryptHeader(EncryptHeader.KEY_REQUEST,getSymVersion()));
+ Message newMsg=new Message(keyServerAddr, local_addr, Kpair.getPublic().getEncoded())
+ .putHeader(this.id,new EncryptHeader(EncryptHeader.KEY_REQUEST,getSymVersion()));
passItDown(new Event(Event.MSG,newMsg));
return newMsg;
}
View
59 src/org/jgroups/protocols/Executing.java
@@ -1,43 +1,6 @@
package org.jgroups.protocols;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.NotSerializableException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.jgroups.Address;
-import org.jgroups.Event;
-import org.jgroups.Header;
-import org.jgroups.Message;
-import org.jgroups.View;
+import org.jgroups.*;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
@@ -48,6 +11,16 @@
import org.jgroups.util.Streamable;
import org.jgroups.util.Util;
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
/**
* This is the base protocol used for executions.
* @author wburns
@@ -901,10 +874,9 @@ protected void handleRemoveConsumer(Owner sender) {
protected void sendRequest(Address dest, Type type, long requestId, Object object) {
Request req=new Request(type, object, requestId);
- Message msg=new Message(dest, null, req);
- msg.putHeader(id, new ExecutorHeader());
+ Message msg=new Message(dest, req).putHeader(id, new ExecutorHeader());
if(bypass_bundling)
- msg.setFlag(Message.DONT_BUNDLE);
+ msg.setFlag(Message.Flag.DONT_BUNDLE);
if(log.isTraceEnabled())
log.trace("[" + local_addr + "] --> [" + (dest == null? "ALL" : dest) + "] " + req);
try {
@@ -918,10 +890,9 @@ protected void sendRequest(Address dest, Type type, long requestId, Object objec
protected void sendThreadRequest(Address dest, long threadId, Type type, long requestId,
Object object) {
RequestWithThread req=new RequestWithThread(type, object, requestId, threadId);
- Message msg=new Message(dest, null, req);
- msg.putHeader(id, new ExecutorHeader());
+ Message msg=new Message(dest, req).putHeader(id, new ExecutorHeader());
if(bypass_bundling)
- msg.setFlag(Message.DONT_BUNDLE);
+ msg.setFlag(Message.Flag.DONT_BUNDLE);
if(log.isTraceEnabled())
log.trace("[" + local_addr + "] --> [" + (dest == null? "ALL" : dest) + "] " + req);
try {
View
6 src/org/jgroups/protocols/FC.java
@@ -781,7 +781,8 @@ private void sendCredit(Address dest, long credit) {
number=(int)credit;
else
number=credit;
- Message msg=new Message(dest, number).setFlag(Message.OOB, Message.Flag.DONT_BUNDLE).putHeader(this.id,REPLENISH_HDR);
+ Message msg=new Message(dest, number).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE)
+ .putHeader(this.id,REPLENISH_HDR);
down_prot.down(new Event(Event.MSG, msg));
num_credit_responses_sent++;
}
@@ -795,7 +796,8 @@ private void sendCredit(Address dest, long credit) {
private void sendCreditRequest(final Address dest, Long credits_left) {
if(log.isTraceEnabled())
log.trace("sending credit request to " + dest);
- Message msg=new Message(dest, credits_left).setFlag(Message.Flag.DONT_BUNDLE).putHeader(this.id,CREDIT_REQUEST_HDR);
+ Message msg=new Message(dest, credits_left).setFlag(Message.Flag.DONT_BUNDLE, Message.Flag.INTERNAL)
+ .putHeader(this.id,CREDIT_REQUEST_HDR);
down_prot.down(new Event(Event.MSG, msg));
num_credit_requests_sent++;
}
View
6 src/org/jgroups/protocols/FD.java
@@ -309,7 +309,7 @@ else if(!isMonitorRunning())
protected void sendHeartbeatResponse(Address dest) {
- Message hb_ack=new Message(dest).setFlag(Message.OOB);
+ Message hb_ack=new Message(dest).setFlag(Message.Flag.INTERNAL);
FdHeader tmp_hdr=new FdHeader(FdHeader.HEARTBEAT_ACK);
tmp_hdr.from=local_addr;
hb_ack.putHeader(this.id, tmp_hdr);
@@ -437,7 +437,7 @@ public void run() {
}
// 1. send heartbeat request
- Message hb_req=new Message(dest).setFlag(Message.OOB).putHeader(id, new FdHeader(FdHeader.HEARTBEAT));
+ Message hb_req=new Message(dest).setFlag(Message.Flag.INTERNAL).putHeader(id, new FdHeader(FdHeader.HEARTBEAT));
if(log.isDebugEnabled())
log.debug(local_addr + ": sending are-you-alive msg to " + dest);
down_prot.down(new Event(Event.MSG, hb_req));
@@ -599,7 +599,7 @@ public void run() {
hdr.mbrs=new ArrayList<Address>(suspected_members);
hdr.from=local_addr;
}
- Message suspect_msg=new Message().setFlag(Message.OOB).putHeader(id, hdr);
+ Message suspect_msg=new Message().setFlag(Message.Flag.INTERNAL).putHeader(id, hdr);
if(log.isDebugEnabled())
log.debug(local_addr + ": broadcasting SUSPECT message (suspects=" + suspected_members + ")");
down_prot.down(new Event(Event.MSG, suspect_msg));
View
2 src/org/jgroups/protocols/FD_ALL.java
@@ -386,7 +386,7 @@ public void readFrom(DataInput in) throws Exception {}
*/
class HeartbeatSender implements Runnable {
public void run() {
- Message heartbeat=new Message().setFlag(Message.OOB).putHeader(id, new HeartbeatHeader());
+ Message heartbeat=new Message().setFlag(Message.Flag.INTERNAL).putHeader(id, new HeartbeatHeader());
down_prot.down(new Event(Event.MSG, heartbeat));
num_heartbeats_sent++;
}
View
25 src/org/jgroups/protocols/FD_SOCK.java
@@ -256,9 +256,7 @@ public Object up(Event evt) {
case FdHeader.GET_CACHE:
Address sender=msg.getSrc(); // guaranteed to be non-null
hdr=new FdHeader(FdHeader.GET_CACHE_RSP,new HashMap<Address,IpAddress>(cache));
- msg=new Message(sender, null, null);
- msg.setFlag(Message.OOB);
- msg.putHeader(this.id, hdr);
+ msg=new Message(sender).setFlag(Message.Flag.INTERNAL).putHeader(this.id, hdr);
down_prot.down(new Event(Event.MSG, msg));
break;
@@ -656,9 +654,7 @@ void getCacheFromCoordinator() {
return;
}
hdr=new FdHeader(FdHeader.GET_CACHE);
- msg=new Message(coord, null, null);
- msg.setFlag(Message.OOB);
- msg.putHeader(this.id, hdr);
+ msg=new Message(coord).setFlag(Message.Flag.INTERNAL).putHeader(this.id, hdr);
down_prot.down(new Event(Event.MSG, msg));
result=get_cache_promise.getResult(get_cache_timeout);
if(result != null) {
@@ -694,9 +690,7 @@ void broadcastSuspectMessage(Address suspected_mbr) {
hdr=new FdHeader(FdHeader.SUSPECT);
hdr.mbrs=new HashSet<Address>(1);
hdr.mbrs.add(suspected_mbr);
- suspect_msg=new Message();
- suspect_msg.setFlag(Message.OOB);
- suspect_msg.putHeader(this.id, hdr);
+ suspect_msg=new Message().setFlag(Message.Flag.INTERNAL).putHeader(this.id, hdr);
down_prot.down(new Event(Event.MSG, suspect_msg));
// 2. Add to broadcast task and start latter (if not yet running). The task will end when
@@ -716,8 +710,7 @@ void broadcastSuspectMessage(Address suspected_mbr) {
it will be unicast back to the requester
*/
void sendIHaveSockMessage(Address dst, Address mbr, IpAddress addr) {
- Message msg=new Message(dst, null, null);
- msg.setFlag(Message.OOB);
+ Message msg=new Message(dst).setFlag(Message.Flag.INTERNAL);
FdHeader hdr=new FdHeader(FdHeader.I_HAVE_SOCK);
hdr.mbr=mbr;
hdr.sock_addr=addr;
@@ -746,8 +739,7 @@ private IpAddress fetchPingAddress(Address mbr) {
// 2. Try to get the server socket address from mbr
ping_addr_promise.reset();
- ping_addr_req=new Message(mbr, null, null); // unicast
- ping_addr_req.setFlag(Message.OOB);
+ ping_addr_req=new Message(mbr).setFlag(Message.Flag.INTERNAL);
hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
hdr.mbr=mbr;
ping_addr_req.putHeader(this.id, hdr);
@@ -760,8 +752,7 @@ private IpAddress fetchPingAddress(Address mbr) {
if(!isPingerThreadRunning()) return null;
// 3. Try to get the server socket address from all members
- ping_addr_req=new Message(null); // multicast
- ping_addr_req.setFlag(Message.OOB);
+ ping_addr_req=new Message(null).setFlag(Message.Flag.INTERNAL);
hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
hdr.mbr=mbr;
ping_addr_req.putHeader(this.id, hdr);
@@ -1224,9 +1215,7 @@ public void run() {
hdr=new FdHeader(FdHeader.SUSPECT);
hdr.mbrs=new HashSet<Address>(suspects);
}
- suspect_msg=new Message(); // mcast SUSPECT to all members
- suspect_msg.setFlag(Message.OOB);
- suspect_msg.putHeader(id, hdr);
+ suspect_msg=new Message().setFlag(Message.Flag.INTERNAL).putHeader(id, hdr); // mcast SUSPECT to all members
down_prot.down(new Event(Event.MSG, suspect_msg));
if(log.isTraceEnabled()) log.trace("task done");
}
View
5 src/org/jgroups/protocols/FORWARD_TO_COORD.java
@@ -180,10 +180,7 @@ protected void sendNotCoord(Address target, long ack_id) {
}
protected void send(Address target, long ack_id, byte type) {
- Message msg=new Message(target);
- ForwardHeader hdr=new ForwardHeader(type, ack_id);
- msg.putHeader(id, hdr);
- down_prot.down(new Event(Event.MSG, msg));
+ down_prot.down(new Event(Event.MSG, new Message(target).putHeader(id, new ForwardHeader(type, ack_id))));
}
View
12 src/org/jgroups/protocols/FlowControl.java
@@ -528,24 +528,24 @@ protected void handleCreditRequest(Map<Address,Credit> map, Address sender, long
protected void sendCredit(Address dest, long credits) {
if(log.isTraceEnabled())
if(log.isTraceEnabled()) log.trace("sending " + credits + " credits to " + dest);
- Message msg=new Message(dest, null,credits);
- msg.setFlag(Message.OOB, Message.Flag.DONT_BUNDLE);
- msg.putHeader(this.id, REPLENISH_HDR);
+ Message msg=new Message(dest, credits)
+ .setFlag(Message.Flag.OOB, Message.Flag.INTERNAL)
+ .putHeader(this.id,REPLENISH_HDR);
down_prot.down(new Event(Event.MSG, msg));
num_credit_responses_sent++;
}
/**
- * We cannot send this request as OOB messages, as the credit request needs to queue up behind the regular messages;
+ * We cannot send this request as OOB message, as the credit request needs to queue up behind the regular messages;
* if a receiver cannot process the regular messages, that is a sign that the sender should be throttled !
* @param dest The member to which we send the credit request
* @param credits_needed The number of bytes (of credits) left for dest
*/
protected void sendCreditRequest(final Address dest, Long credits_needed) {
if(log.isTraceEnabled())
log.trace("sending request for " + credits_needed + " credits to " + dest);
- Message msg=new Message(dest, null, credits_needed).setFlag(Message.Flag.DONT_BUNDLE);
- msg.putHeader(this.id, CREDIT_REQUEST_HDR);
+ Message msg=new Message(dest, credits_needed).setFlag(Message.Flag.INTERNAL)
+ .putHeader(this.id, CREDIT_REQUEST_HDR);
down_prot.down(new Event(Event.MSG, msg));
num_credit_requests_sent++;
}
View
55 src/org/jgroups/protocols/Locking.java
@@ -1,43 +1,29 @@
package org.jgroups.protocols;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.LockSupport;
-
-import org.jgroups.Address;
-import org.jgroups.Event;
-import org.jgroups.Header;
-import org.jgroups.Message;
-import org.jgroups.View;
+import org.jgroups.*;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.blocks.locking.AwaitInfo;
import org.jgroups.blocks.locking.LockInfo;
import org.jgroups.blocks.locking.LockNotification;
-import org.jgroups.util.Owner;
import org.jgroups.stack.Protocol;
+import org.jgroups.util.Owner;
import org.jgroups.util.Streamable;
import org.jgroups.util.Util;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.*;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.LockSupport;
+
/**
@@ -360,10 +346,9 @@ protected Owner getOwner() {
protected void sendRequest(Address dest, Type type, String lock_name, Owner owner, long timeout, boolean is_trylock) {
Request req=new Request(type, lock_name, owner, timeout, is_trylock);
- Message msg=new Message(dest, null, req);
- msg.putHeader(id, new LockingHeader());
+ Message msg=new Message(dest, req).putHeader(id, new LockingHeader());
if(bypass_bundling)
- msg.setFlag(Message.DONT_BUNDLE);
+ msg.setFlag(Message.Flag.DONT_BUNDLE);
if(log.isTraceEnabled())
log.trace("[" + local_addr + "] --> [" + (dest == null? "ALL" : dest) + "] " + req);
try {
@@ -377,10 +362,9 @@ protected void sendRequest(Address dest, Type type, String lock_name, Owner owne
protected void sendLockResponse(Type type, Owner dest, String lock_name) {
Request rsp=new Request(type, lock_name, dest, 0);
- Message lock_granted_rsp=new Message(dest.getAddress(), null, rsp);
- lock_granted_rsp.putHeader(id, new LockingHeader());
+ Message lock_granted_rsp=new Message(dest.getAddress(), rsp).putHeader(id, new LockingHeader());
if(bypass_bundling)
- lock_granted_rsp.setFlag(Message.DONT_BUNDLE);
+ lock_granted_rsp.setFlag(Message.Flag.DONT_BUNDLE);
if(log.isTraceEnabled())
log.trace("[" + local_addr + "] --> [" + dest.getAddress() + "] " + rsp);
@@ -396,10 +380,9 @@ protected void sendLockResponse(Type type, Owner dest, String lock_name) {
protected void sendSignalResponse(Owner dest, String lock_name) {
Request rsp=new Request(Type.SIG_RET, lock_name, dest, 0);
- Message lock_granted_rsp=new Message(dest.getAddress(), null, rsp);
- lock_granted_rsp.putHeader(id, new LockingHeader());
+ Message lock_granted_rsp=new Message(dest.getAddress(), rsp).putHeader(id, new LockingHeader());
if(bypass_bundling)
- lock_granted_rsp.setFlag(Message.DONT_BUNDLE);
+ lock_granted_rsp.setFlag(Message.Flag.DONT_BUNDLE);
if(log.isTraceEnabled())
log.trace("[" + local_addr + "] --> [" + dest.getAddress() + "] " + rsp);
View
14 src/org/jgroups/protocols/MERGE3.java
@@ -274,8 +274,7 @@ public Object up(Event evt) {
case VIEW_REQ:
View tmp_view=view != null? view.copy() : null;
Header tmphdr=MergeHeader.createViewResponse(tmp_view);
- Message view_rsp=new Message(sender);
- view_rsp.putHeader(getId(), tmphdr);
+ Message view_rsp=new Message(sender).setFlag(Message.Flag.INTERNAL).putHeader(getId(),tmphdr);
down_prot.down(new Event(Event.MSG, view_rsp));
break;
case VIEW_RSP:
@@ -318,8 +317,7 @@ public void run() {
MergeHeader hdr=MergeHeader.createInfo(view_id, logical_name, Arrays.asList(physical_addr));
if(transport_supports_multicasting) {
- Message msg=new Message();
- msg.putHeader(getId(), hdr);
+ Message msg=new Message().setFlag(Message.Flag.INTERNAL).putHeader(getId(), hdr);
down_prot.down(new Event(Event.MSG, msg));
return;
}
@@ -339,8 +337,7 @@ public void run() {
log.trace("discovery protocol " + discovery_protocol.getName() + " returned " + physical_addrs.size() +
" physical addresses: " + Util.printListWithDelimiter(physical_addrs, ", ", 10));
for(Address addr: physical_addrs) {
- Message info=new Message(addr);
- info.putHeader(getId(), hdr);
+ Message info=new Message(addr).setFlag(Message.Flag.INTERNAL).putHeader(getId(), hdr);
down_prot.down(new Event(Event.MSG, info));
}
}
@@ -428,9 +425,8 @@ protected void _run() {
view_rsps.add(local_addr, view.copy());
continue;
}
- Message view_req=new Message(target);
- Header hdr=MergeHeader.createViewRequest();
- view_req.putHeader(getId(), hdr);
+ Message view_req=new Message(target).setFlag(Message.Flag.INTERNAL)
+ .putHeader(getId(), MergeHeader.createViewRequest());
down_prot.down(new Event(Event.MSG, view_req));
}
view_rsps.waitForAllResponses(check_interval / 10);
View
4 src/org/jgroups/protocols/PRIO.java
@@ -98,7 +98,7 @@ public Object up(Event evt) {
switch(evt.getType()) {
case Event.MSG:
Message message = (Message)evt.getArg();
- if ( message.isFlagSet( Message.OOB ) ) {
+ if ( message.isFlagSet( Message.Flag.OOB ) ) {
return up_prot.up(evt);
}
else {
@@ -146,7 +146,7 @@ public Object down(Event evt) {
switch(evt.getType()) {
case Event.MSG:
Message message = (Message)evt.getArg();
- if ( message.isFlagSet( Message.OOB ) ) {
+ if ( message.isFlagSet( Message.Flag.OOB ) ) {
return down_prot.down(evt);
}
else {
View
13 src/org/jgroups/protocols/RELAY.java
@@ -323,8 +323,7 @@ protected Object installView(byte[] buf, int offset, int length) {
/** Forwards the message across the TCP link to the other local cluster */
protected void forward(byte[] buffer, int offset, int length) {
- Message msg=new Message(null, null, buffer, offset, length);
- msg.putHeader(id, new RelayHeader(RelayHeader.Type.FORWARD));
+ Message msg=new Message(null, null, buffer, offset, length).putHeader(id, new RelayHeader(RelayHeader.Type.FORWARD));
if(bridge != null) {
try {
bridge.send(msg);
@@ -366,8 +365,7 @@ protected void sendViewToRemote(ViewData view_data, boolean use_seperate_thread)
try {
if(bridge != null && bridge.isConnected()) {
byte[] buf=Util.streamableToByteBuffer(view_data);
- final Message msg=new Message(null, null, buf);
- msg.putHeader(id, RelayHeader.create(RelayHeader.Type.VIEW));
+ final Message msg=new Message(null, buf).putHeader(id, RelayHeader.create(RelayHeader.Type.VIEW));
if(use_seperate_thread) {
timer.execute(new Runnable() {
public void run() {
@@ -503,9 +501,7 @@ public void run() {
protected void sendViewOnLocalCluster(final List<Address> destinations, final byte[] buffer) {
for(Address dest: destinations) {
- Message view_msg=new Message(dest, null, buffer);
- RelayHeader hdr=RelayHeader.create(RelayHeader.Type.VIEW);
- view_msg.putHeader(id, hdr);
+ Message view_msg=new Message(dest, buffer).putHeader(id, RelayHeader.create(RelayHeader.Type.VIEW));
down_prot.down(new Event(Event.MSG, view_msg));
}
}
@@ -614,8 +610,7 @@ public void viewAccepted(View view) {
public void run() {
if(bridge == null || !bridge.isConnected() || remote_view != null)
return;
- Message msg=new Message();
- msg.putHeader(id, RelayHeader.create(RELAY.RelayHeader.Type.BROADCAST_VIEW));
+ Message msg=new Message().putHeader(id, RelayHeader.create(RELAY.RelayHeader.Type.BROADCAST_VIEW));
try {
bridge.send(msg);
}
View
8 src/org/jgroups/protocols/RSVP.java
@@ -241,10 +241,9 @@ protected void handleResponse(Address member, short id) {
protected void sendResponse(Address dest, short id) {
try {
- Message msg=new Message(dest);
- msg.setFlag(Message.Flag.RSVP, Message.Flag.DONT_BUNDLE);
RsvpHeader hdr=new RsvpHeader(RsvpHeader.RSP,id);
- msg.putHeader(this.id, hdr);
+ Message msg=new Message(dest).setFlag(Message.Flag.RSVP, Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE)
+ .putHeader(this.id, hdr);
if(log.isTraceEnabled())
log.trace(local_addr + ": " + hdr.typeToString() + " --> " + dest);
down_prot.down(new Event(Event.MSG, msg));
@@ -286,9 +285,8 @@ public void run() {
cancelTask();
return;
}
- Message msg=new Message(target).setFlag(Message.Flag.RSVP);
RsvpHeader hdr=new RsvpHeader(RsvpHeader.REQ_ONLY, rsvp_id);
- msg.putHeader(id, hdr);
+ Message msg=new Message(target).setFlag(Message.Flag.RSVP).putHeader(id,hdr);
if(log.isTraceEnabled())
log.trace(local_addr + ": " + hdr.typeToString() + " --> " + target);
down_prot.down(new Event(Event.MSG, msg));
View
4 src/org/jgroups/protocols/SCOPE.java
@@ -217,7 +217,7 @@ public Object up(Event evt) {
Message msg=(Message)evt.getArg();
// we don't handle unscoped or OOB messages
- if(!msg.isFlagSet(Message.SCOPED) || msg.isFlagSet(Message.OOB))
+ if(!msg.isFlagSet(Message.SCOPED) || msg.isFlagSet(Message.Flag.OOB))
break;
ScopeHeader hdr=(ScopeHeader)msg.getHeader(id);
@@ -248,7 +248,7 @@ public Object up(Event evt) {
public void up(MessageBatch batch) {
for(Message msg: batch) {
- if(!msg.isFlagSet(Message.SCOPED) || msg.isFlagSet(Message.OOB)) // we don't handle unscoped or OOB messages
+ if(!msg.isFlagSet(Message.SCOPED) || msg.isFlagSet(Message.Flag.OOB)) // we don't handle unscoped or OOB messages
continue;
ScopeHeader hdr=(ScopeHeader)msg.getHeader(id);
View
19 src/org/jgroups/protocols/SEQUENCER.java
@@ -144,7 +144,7 @@ public Object down(Event evt) {
switch(evt.getType()) {
case Event.MSG:
Message msg=(Message)evt.getArg();
- if(msg.getDest() != null || msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.OOB))
+ if(msg.getDest() != null || msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB))
break;
if(msg.getSrc() == null)
@@ -202,7 +202,7 @@ public Object up(Event evt) {
switch(evt.getType()) {
case Event.MSG:
msg=(Message)evt.getArg();
- if(msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.OOB))
+ if(msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB))
break;
hdr=(SequencerHeader)msg.getHeader(this.id);
if(hdr == null)
@@ -255,7 +255,7 @@ public Object up(Event evt) {
public void up(MessageBatch batch) {
for(Message msg: batch) {
- if(msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.OOB) || msg.getHeader(id) == null)
+ if(msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB) || msg.getHeader(id) == null)
continue;
batch.remove(msg);
@@ -349,9 +349,8 @@ protected void flushMessagesInForwardTable() {
Long key=entry.getKey();
byte[] val=entry.getValue();
- Message forward_msg=new Message(null, val);
SequencerHeader hdr=new SequencerHeader(SequencerHeader.WRAPPED_BCAST, key);
- forward_msg.putHeader(this.id,hdr);
+ Message forward_msg=new Message(null, val).putHeader(this.id, hdr);
if(log.isTraceEnabled())
log.trace(local_addr + ": flushing (broadcasting) " + local_addr + "::" + key);
down_prot.down(new Event(Event.MSG, forward_msg));
@@ -376,10 +375,8 @@ protected void flushMessagesInForwardTable() {
byte[] val=entry.getValue();
while(flushing && running && !forward_table.isEmpty()) {
- Message forward_msg=new Message(coord, val);
SequencerHeader hdr=new SequencerHeader(SequencerHeader.FLUSH, key);
- forward_msg.putHeader(this.id,hdr);
- forward_msg.setFlag(Message.Flag.DONT_BUNDLE);
+ Message forward_msg=new Message(coord, val).putHeader(this.id,hdr).setFlag(Message.Flag.DONT_BUNDLE);
if(log.isTraceEnabled())
log.trace(local_addr + ": flushing (forwarding) " + local_addr + "::" + key + " to coord " + coord);
ack_promise.reset();
@@ -426,10 +423,9 @@ protected void forward(final byte[] marshalled_msg, long seqno, boolean flush) {
Address target=coord;
if(target == null)
return;
- Message forward_msg=new Message(target, marshalled_msg);
byte type=flush? SequencerHeader.FLUSH : SequencerHeader.FORWARD;
SequencerHeader hdr=new SequencerHeader(type, seqno);
- forward_msg.putHeader(this.id,hdr);
+ Message forward_msg=new Message(target, marshalled_msg).putHeader(this.id,hdr);
down_prot.down(new Event(Event.MSG, forward_msg));
forwarded_msgs++;
}
@@ -441,9 +437,8 @@ protected void broadcast(final Message msg, boolean copy, Address original_sende
bcast_msg=msg; // no need to add a header, message already has one
}
else {
- bcast_msg=new Message(null, msg.getRawBuffer(), msg.getOffset(), msg.getLength());
SequencerHeader new_hdr=new SequencerHeader(SequencerHeader.WRAPPED_BCAST, seqno);
- bcast_msg.putHeader(this.id, new_hdr);
+ bcast_msg=new Message(null, msg.getRawBuffer(), msg.getOffset(), msg.getLength()).putHeader(this.id, new_hdr);
if(resend) {
new_hdr.flush_ack=true;
bcast_msg.setFlag(Message.Flag.DONT_BUNDLE);
View
5 src/org/jgroups/protocols/STOMP.java
@@ -342,8 +342,7 @@ protected String getAllEndpoints() {
protected void broadcastEndpoint() {
if(endpoint != null) {
- Message msg=new Message();
- msg.putHeader(id, StompHeader.createHeader(StompHeader.Type.ENDPOINT, "endpoint", endpoint));
+ Message msg=new Message().putHeader(id, StompHeader.createHeader(StompHeader.Type.ENDPOINT, "endpoint", endpoint));
down_prot.down(new Event(Event.MSG, msg));
}
}
@@ -479,7 +478,7 @@ protected void handleFrame(Frame frame) {
headers.put("sender", session_id.toString());
}
- Message msg=new Message(null, null, frame.getBody());
+ Message msg=new Message(null, frame.getBody());
Header hdr=StompHeader.createHeader(StompHeader.Type.MESSAGE, headers);
msg.putHeader(id, hdr);
down_prot.down(new Event(Event.MSG, msg));
View
163 src/org/jgroups/protocols/TP.java
@@ -53,7 +53,6 @@
protected static final byte LIST=1; // we have a list of messages rather than a single message when set
protected static final byte MULTICAST=2; // message is a multicast (versus a unicast) message when set
- protected static final byte OOB=4; // message has OOB flag set (Message.OOB)
protected static final boolean can_bind_to_mcast_addr; // are we running on Linux ?
@@ -158,33 +157,59 @@
@Property(name="oob_thread_pool.queue_enabled", description="Use queue to enqueue incoming OOB messages")
protected boolean oob_thread_pool_queue_enabled=true;
- @Property(name="oob_thread_pool.queue_max_size",description="Maximum queue size for incoming OOB messages. Default is 500")
+ @Property(name="oob_thread_pool.queue_max_size",description="Maximum queue size for incoming OOB messages")
protected int oob_thread_pool_queue_max_size=500;
@Property(name="oob_thread_pool.rejection_policy",
- description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run. Default is Discard")
- String oob_thread_pool_rejection_policy="discard";
+ description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run")
+ protected String oob_thread_pool_rejection_policy="discard";
protected int thread_pool_min_threads=2;
protected int thread_pool_max_threads=10;
protected long thread_pool_keep_alive_time=30000;
- @Property(name="thread_pool.enabled",description="Switch for enabling thread pool for regular messages. Default true")
+ @Property(name="thread_pool.enabled",description="Switch for enabling thread pool for regular messages")
protected boolean thread_pool_enabled=true;
- @Property(name="thread_pool.queue_enabled", description="Use queue to enqueue incoming regular messages. Default is true")
+ @Property(name="thread_pool.queue_enabled", description="Queue to enqueue incoming regular messages")
protected boolean thread_pool_queue_enabled=true;
- @Property(name="thread_pool.queue_max_size", description="Maximum queue size for incoming OOB messages. Default is 500")
+ @Property(name="thread_pool.queue_max_size", description="Maximum queue size for incoming OOB messages")
protected int thread_pool_queue_max_size=500;
@Property(name="thread_pool.rejection_policy",
description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run")
protected String thread_pool_rejection_policy="Discard";
+
+ @Property(name="internal_thread_pool.enabled",description="Switch for enabling thread pool for internal messages",
+ writable=false)
+ protected boolean internal_thread_pool_enabled=true;
+
+ @Property(name="internal_thread_pool.min_threads",description="Minimum thread pool size for the internal thread pool")
+ protected int internal_thread_pool_min_threads=2;
+
+ @Property(name="internal_thread_pool.max_threads",description="Maximum thread pool size for the internal thread pool")
+ protected int internal_thread_pool_max_threads=4;
+
+ @Property(name="internal_thread_pool.keep_alive_time", description="Timeout in ms to remove idle threads from the internal pool")
+ protected long internal_thread_pool_keep_alive_time=30000;
+
+ @Property(name="internal_thread_pool.queue_enabled", description="Queue to enqueue incoming internal messages")
+ protected boolean internal_thread_pool_queue_enabled=true;
+
+ @Property(name="internal_thread_pool.queue_max_size",description="Maximum queue size for incoming internal messages")
+ protected int internal_thread_pool_queue_max_size=500;
+
+ @Property(name="internal_thread_pool.rejection_policy",
+ description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run")
+ protected String internal_thread_pool_rejection_policy="discard";
+
+
+
@Property(description="Type of timer to be used. Valid values are \"old\" (DefaultTimeScheduler, used up to 2.10), " +
"\"new\" or \"new2\" (TimeScheduler2), \"new3\" (TimeScheduler3) and \"wheel\". Note that this property " +
"might disappear in future releases, if one of the 3 timers is chosen as default timer")
@@ -443,10 +468,13 @@ public int getTimerQueueSize() {
protected String channel_name=null;
@ManagedAttribute(description="Number of OOB messages received")
- protected long num_oob_msgs_received=0;
+ protected long num_oob_msgs_received;
@ManagedAttribute(description="Number of regular messages received")
- protected long num_incoming_msgs_received=0;
+ protected long num_incoming_msgs_received;
+
+ @ManagedAttribute(description="Number of internal messages received")
+ protected long num_internal_msgs_received;
@ManagedAttribute(description="Class of the timer implementation")
public String getTimerClass() {
@@ -498,10 +526,10 @@ public void clearDifferentVersionCache() {
protected Executor oob_thread_pool;
/** Factory which is used by oob_thread_pool */
- protected ThreadFactory oob_thread_factory=null;
+ protected ThreadFactory oob_thread_factory;
/** Used if oob_thread_pool is a ThreadPoolExecutor and oob_thread_pool_queue_enabled is true */
- protected BlockingQueue<Runnable> oob_thread_pool_queue=null;
+ protected BlockingQueue<Runnable> oob_thread_pool_queue;
// ================================== Regular thread pool ======================
@@ -510,10 +538,18 @@ public void clearDifferentVersionCache() {
protected Executor thread_pool;
/** Factory which is used by oob_thread_pool */
- protected ThreadFactory default_thread_factory=null;
+ protected ThreadFactory default_thread_factory;
+
+ /** Used if thread_pool is a ThreadPoolExecutor and thread_pool_queue_enabled is true */
+ protected BlockingQueue<Runnable> thread_pool_queue;
+
+ // ================================== Internal thread pool ======================
+
+ /** The thread pool which handles JGroups internal messages (Flag.INTERNAL)*/
+ protected Executor internal_thread_pool;
/** Used if thread_pool is a ThreadPoolExecutor and thread_pool_queue_enabled is true */
- protected BlockingQueue<Runnable> thread_pool_queue=null;
+ protected BlockingQueue<Runnable> internal_thread_pool_queue;
// ================================== Timer thread pool =========================
protected TimeScheduler timer;
@@ -608,7 +644,7 @@ public String toString() {
public void resetStats() {
num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=0;
- num_oob_msgs_received=num_incoming_msgs_received=0;
+ num_oob_msgs_received=num_incoming_msgs_received=num_internal_msgs_received=0;
}
public void registerProbeHandler(DiagnosticsHandler.ProbeHandler handler) {
@@ -800,6 +836,26 @@ public int getRegularMaxQueueSize() {
return thread_pool_queue_max_size;
}
+
+ @ManagedAttribute(description="Current number of threads in the internal thread pool")
+ public int getInternalPoolSize() {
+ return internal_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)internal_thread_pool).getPoolSize() : 0;
+ }
+
+ public long getInternalMessages() {
+ return num_internal_msgs_received;
+ }
+
+ @ManagedAttribute(description="Number of messages in the internal thread pool's queue")
+ public int getInternalQueueSize() {
+ return internal_thread_pool_queue != null? internal_thread_pool_queue.size() : 0;
+ }
+
+ public int getInternalMaxQueueSize() {
+ return internal_thread_pool_queue_max_size;
+ }
+
+
@ManagedAttribute(name="timer_tasks",description="Number of timer tasks queued up for execution")
public int getNumTimerTasks() {
return timer != null? timer.size() : -1;
@@ -939,6 +995,7 @@ else if(timer_type.equalsIgnoreCase("wheel")) {
Util.verifyRejectionPolicy(oob_thread_pool_rejection_policy);
Util.verifyRejectionPolicy(thread_pool_rejection_policy);
+ Util.verifyRejectionPolicy(internal_thread_pool_rejection_policy);
// ========================================== OOB thread pool ==============================
@@ -974,6 +1031,23 @@ else if(timer_type.equalsIgnoreCase("wheel")) {
}
}
+
+ // ========================================== Internal thread pool ==============================
+
+ if(internal_thread_pool == null
+ || (internal_thread_pool instanceof ThreadPoolExecutor && ((ThreadPoolExecutor)internal_thread_pool).isShutdown())) {
+ if(internal_thread_pool_enabled) {
+ if(internal_thread_pool_queue_enabled)
+ internal_thread_pool_queue=new LinkedBlockingQueue<Runnable>(internal_thread_pool_queue_max_size);
+ else
+ internal_thread_pool_queue=new SynchronousQueue<Runnable>();
+ internal_thread_pool=createThreadPool(internal_thread_pool_min_threads, internal_thread_pool_max_threads, internal_thread_pool_keep_alive_time,
+ internal_thread_pool_rejection_policy, internal_thread_pool_queue, oob_thread_factory);
+ }
+ // if the internal thread pool is disabled, we won't create it (not even a DirectExecutor)
+ }
+
+
Map<String, Object> m=new HashMap<String, Object>(2);
if(bind_addr != null)
m.put("bind_addr", bind_addr);
@@ -1019,6 +1093,9 @@ public void destroy() {
if(thread_pool instanceof ThreadPoolExecutor)
shutdownThreadPool(thread_pool);
+
+ if(internal_thread_pool instanceof ThreadPoolExecutor)
+ shutdownThreadPool(internal_thread_pool);
}
/**
@@ -1187,7 +1264,9 @@ public Object down(Event evt) {
final String cluster_name=hdr.channel_name;
// changed to fix http://jira.jboss.com/jira/browse/JGRP-506
- Executor pool=msg.isFlagSet(Message.OOB)? oob_thread_pool : thread_pool;
+ boolean internal=msg.isFlagSet(Message.Flag.INTERNAL);
+ Executor pool=internal && internal_thread_pool != null? internal_thread_pool
+ : internal || msg.isFlagSet(Message.Flag.OOB)? oob_thread_pool : thread_pool;
pool.execute(new Runnable() {
public void run() {
passMessageUp(copy, cluster_name, false, multicast, false);
@@ -1336,24 +1415,34 @@ protected void receive(Address sender, byte[] data, int offset, int length) {
if(is_message_list) { // used if message bundling is enabled
final MessageBatch[] batches=readMessageBatch(dis, multicast);
- final MessageBatch batch=batches[0], oob_batch=batches[1];
+ final MessageBatch batch=batches[0], oob_batch=batches[1], internal_batch=batches[2];
if(oob_batch != null) {
num_oob_msgs_received+=oob_batch.size();
oob_thread_pool.execute(new BatchHandler(oob_batch));
}
-
if(batch != null) {
num_incoming_msgs_received+=batch.size();
thread_pool.execute(new BatchHandler(batch));
}
+ if(internal_batch != null) {
+ num_internal_msgs_received+=internal_batch.size();
+ Executor pool=internal_thread_pool != null? internal_thread_pool : oob_thread_pool;
+ pool.execute(new BatchHandler(internal_batch));
+ }
}
else {
Message msg=readMessage(dis);
- boolean is_oob=msg.isFlagSet(Message.Flag.OOB);
- if(is_oob) num_oob_msgs_received++;
- else num_incoming_msgs_received++;
- Executor pool=is_oob? oob_thread_pool : thread_pool;
+ if(msg.isFlagSet(Message.Flag.INTERNAL))
+ num_internal_msgs_received++;
+ else if(msg.isFlagSet(Message.Flag.OOB))
+ num_oob_msgs_received++;
+ else
+ num_incoming_msgs_received++;
+
+ boolean internal=msg.isFlagSet(Message.Flag.INTERNAL); // use internal pool or OOB (if intrenal pool is null)
+ Executor pool=internal && internal_thread_pool != null? internal_thread_pool
+ : internal || msg.isFlagSet(Message.Flag.OOB)? oob_thread_pool : thread_pool;
TpHeader hdr=(TpHeader)msg.getHeader(id);
String cluster_name=hdr.channel_name;
pool.execute(new MyHandler(msg, cluster_name, multicast));
@@ -1442,7 +1531,7 @@ public void run() {
/** Serializes and sends a message. This method is not reentrant */
protected void send(Message msg, Address dest, boolean multicast) throws Exception {
// bundle all messages except when tagged with DONT_BUNDLE
- if(!msg.isFlagSet(Message.DONT_BUNDLE)) {
+ if(!msg.isFlagSet(Message.Flag.DONT_BUNDLE)) {
bundler.send(msg);
return;
}
@@ -1535,8 +1624,6 @@ protected static void writeMessage(Message msg, DataOutputStream dos, boolean mu
dos.writeShort(Version.version); // write the version
if(multicast)
flags+=MULTICAST;
- if(msg.isFlagSet(Message.OOB))
- flags+=OOB;
dos.writeByte(flags);
msg.writeTo(dos);
}
@@ -1617,13 +1704,14 @@ public static void writeMessageList(Address dest, Address src, String cluster_na
}
/**
- * Reads a list of messages into 2 MessageBatches: a regular one and an OOB one
+ * Reads a list of messages into 3 MessageBatches: a regular, an OOB and an internal one
* @param in
- * @return an array of 2 MessageBatches, the regular is at index 0 and the OOB at index 1 (either can be null)
+ * @return an array of 2 MessageBatches, the regular is at index 0 and the OOB at index 1
+ * and the internal at index 2 (either can be null)
* @throws Exception
*/
public static MessageBatch[] readMessageBatch(DataInputStream in, boolean multicast) throws Exception {
- MessageBatch[] mbs=new MessageBatch[2];
+ MessageBatch[] batches=new MessageBatch[3]; // [0]: reg, [1]: OOB, [2]: internal
Address dest=Util.readAddress(in);
Address src=Util.readAddress(in);
String cluster_name=Util.readString(in);
@@ -1635,18 +1723,23 @@ public static void writeMessageList(Address dest, Address src, String cluster_na
msg.setDest(dest);
if(msg.getSrc() == null)
msg.setSrc(src);
- if(msg.isFlagSet(Message.Flag.OOB)) {
- if(mbs[1] == null)
- mbs[1]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.OOB, len);
- mbs[1].add(msg);
+ if(msg.isFlagSet(Message.Flag.INTERNAL)) {
+ if(batches[2] == null)
+ batches[2]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.INTERNAL, len);
+ batches[2].add(msg);
+ }
+ else if(msg.isFlagSet(Message.Flag.OOB)) {
+ if(batches[1] == null)
+ batches[1]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.OOB, len);
+ batches[1].add(msg);
}
else {
- if(mbs[0] == null)
- mbs[0]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.REG, len);
- mbs[0].add(msg);
+ if(batches[0] == null)
+ batches[0]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.REG, len);
+ batches[0].add(msg);
}
}
- return mbs;
+ return batches;
}
View
4 src/org/jgroups/protocols/UDP.java
@@ -603,12 +603,12 @@ protected void handleConfigEvent(Map<String,Object> map) {
if(map == null) return;
if(map.containsKey("send_buf_size")) {
- mcast_send_buf_size=((Integer)map.get("send_buf_size")).intValue();
+ mcast_send_buf_size=(Integer)map.get("send_buf_size");
ucast_send_buf_size=mcast_send_buf_size;
set_buffers=true;
}
if(map.containsKey("recv_buf_size")) {
- mcast_recv_buf_size=((Integer)map.get("recv_buf_size")).intValue();
+ mcast_recv_buf_size=(Integer)map.get("recv_buf_size");
ucast_recv_buf_size=mcast_recv_buf_size;
set_buffers=true;
}
View
13 src/org/jgroups/protocols/UNICAST.java
@@ -626,7 +626,7 @@ protected void handleDataReceived(Address sender, long seqno, short conn_id, bo
// An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering !
// http://jira.jboss.com/jira/browse/JGRP-377
- if(msg.isFlagSet(Message.OOB) && added) {
+ if(msg.isFlagSet(Message.Flag.OOB) && added) {
try {
up_prot.up(evt);
}
@@ -678,7 +678,7 @@ protected void handleBatchReceived(Address sender, Map<Short,List<Message>> map)
// An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering !
// http://jira.jboss.com/jira/browse/JGRP-377
- if(msg.isFlagSet(Message.OOB) && msg_added) {
+ if(msg.isFlagSet(Message.Flag.OOB) && msg_added) {
try {
up_prot.up(new Event(Event.MSG, msg));
}
@@ -725,7 +725,7 @@ protected int removeAndDeliver(final AtomicBoolean processing, Table<Message> wi
MessageBatch batch=new MessageBatch(local_addr, sender, null, false, list);
for(Message msg_to_deliver: batch) {
// discard OOB msg: it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-377)
- if(msg_to_deliver.isFlagSet(Message.OOB))
+ if(msg_to_deliver.isFlagSet(Message.Flag.OOB))
batch.remove(msg_to_deliver);
}
@@ -887,8 +887,8 @@ protected void stopRetransmitTask() {
protected void sendAck(Address dst, long seqno, short conn_id) {
if(!running) // if we are disconnected, then don't send any acks which throw exceptions on shutdown
return;
- Message ack=new Message(dst);
- ack.putHeader(this.id, UnicastHeader.createAckHeader(seqno, conn_id));
+ Message ack=new Message(dst).setFlag(Message.Flag.INTERNAL)
+ .putHeader(this.id, UnicastHeader.createAckHeader(seqno, conn_id));
if(log.isTraceEnabled())
log.trace(new StringBuilder().append(local_addr).append(" --> ACK(").append(dst).
append(": #").append(seqno).append(')'));
@@ -922,8 +922,7 @@ protected synchronized short getNewConnectionId() {
protected void sendRequestForFirstSeqno(Address dest, long seqno_received) {
- Message msg=new Message(dest);
- msg.setFlag(Message.OOB);
+ Message msg=new Message(dest).setFlag(Message.Flag.OOB);
UnicastHeader hdr=UnicastHeader.createSendFirstSeqnoHeader(seqno_received);
msg.putHeader(this.id, hdr);
if(log.isTraceEnabled())
View
36 src/org/jgroups/protocols/UNICAST2.java
@@ -633,10 +633,9 @@ public void sendStableMessages() {
}
protected void sendStableMessage(Address dest, short conn_id, long hd, long hr) {
- Message stable_msg=new Message(dest, null, null);
- Unicast2Header hdr=Unicast2Header.createStableHeader(conn_id, hd, hr);
- stable_msg.putHeader(this.id, hdr);
- stable_msg.setFlag(Message.OOB);
+ Message stable_msg=new Message(dest).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL)
+ .putHeader(this.id, Unicast2Header.createStableHeader(conn_id, hd, hr));
+
if(log.isTraceEnabled()) {
StringBuilder sb=new StringBuilder();
sb.append(local_addr).append(" --> STABLE(").append(dest).append(": ")
@@ -728,8 +727,8 @@ public void removeAllConnections() {
public void retransmit(SeqnoList missing, Address sender) {
Unicast2Header hdr=Unicast2Header.createXmitReqHeader();
- Message retransmit_msg=new Message(sender, null, missing);
- retransmit_msg.setFlag(Message.OOB);
+ Message retransmit_msg=new Message(sender, missing);
+ retransmit_msg.setFlag(Message.Flag.OOB);
if(log.isTraceEnabled())
log.trace(local_addr + ": sending XMIT_REQ (" + missing + ") to " + sender);
retransmit_msg.putHeader(this.id, hdr);
@@ -786,7 +785,7 @@ protected boolean handleDataReceived(Address sender, long seqno, short conn_id,
// An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering !
// http://jira.jboss.com/jira/browse/JGRP-377
- if(msg.isFlagSet(Message.OOB) && added) {
+ if(msg.isFlagSet(Message.Flag.OOB) && added) {
try {
up_prot.up(evt);
}
@@ -830,7 +829,7 @@ protected void handleBatchReceived(Address sender, Map<Short,List<Message>> map)
// An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering !
// http://jira.jboss.com/jira/browse/JGRP-377
- if(msg.isFlagSet(Message.OOB) && msg_added) {
+ if(msg.isFlagSet(Message.Flag.OOB) && msg_added) {
try {
up_prot.up(new Event(Event.MSG, msg));
}
@@ -899,7 +898,7 @@ protected void removeAndPassUp(Table<Message> win, Address sender) {
MessageBatch batch=new MessageBatch(local_addr, sender, null, false, msgs);
for(Message msg_to_deliver: batch) {
// discard OOB msg: it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-377)
- if(msg_to_deliver.isFlagSet(Message.OOB))
+ if(msg_to_deliver.isFlagSet(Message.Flag.OOB))
batch.remove(msg_to_deliver);
}
@@ -919,20 +918,6 @@ protected void removeAndPassUp(Table<Message> win, Address sender) {
catch(Throwable t) {
log.error("failed to deliver batch " + batch, t);
}
-
-
-
- /*for(Message m: msgs) {
- // discard OOB msg: it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-377)
- if(m.isFlagSet(Message.OOB))
- continue;
- try {
- up_prot.up(new Event(Event.MSG, m));
- }
- catch(Throwable t) {
- log.error("couldn't deliver message " + m, t);
- }
- }*/
}
}
finally {
@@ -1091,7 +1076,7 @@ protected synchronized short getNewConnectionId() {
protected void sendRequestForFirstSeqno(Address dest, long seqno_received) {
Message msg=new Message(dest);
- msg.setFlag(Message.OOB);
+ msg.setFlag(Message.Flag.OOB);
Unicast2Header hdr=Unicast2Header.createSendFirstSeqnoHeader(seqno_received);
msg.putHeader(this.id, hdr);
if(log.isTraceEnabled())
@@ -1100,7 +1085,8 @@ protected void sendRequestForFirstSeqno(Address dest, long seqno_received) {
}
protected void sendAck(Address dest, long seqno, short conn_id) {
- Message msg=new Message(dest).setFlag(Message.OOB).putHeader(this.id, Unicast2Header.createAckHeader(seqno, conn_id));
+ Message msg=new Message(dest).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL)
+ .putHeader(this.id, Unicast2Header.createAckHeader(seqno, conn_id));
if(log.isTraceEnabled())
log.trace(local_addr + " --> ACK(" + dest + "," + seqno + " [conn_id=" + conn_id + "])");
down_prot.down(new Event(Event.MSG, msg));
View
17 src/org/jgroups/protocols/UNICAST3.java
@@ -565,7 +565,7 @@ public void removeAllConnections() {
/** Sends a retransmit request to the given sender */
protected void retransmit(SeqnoList missing, Address sender) {
- Message xmit_msg=new Message(sender, missing).setFlag(Message.OOB).putHeader(id, Header.createXmitReqHeader());
+ Message xmit_msg=new Message(sender, missing).setFlag(Message.Flag.OOB).putHeader(id, Header.createXmitReqHeader());
if(log.isTraceEnabled())
log.trace(local_addr + ": sending XMIT_REQ (" + missing + ") to " + sender);
down_prot.down(new Event(Event.MSG, xmit_msg));
@@ -623,7 +623,7 @@ protected void handleDataReceived(Address sender, long seqno, short conn_id, bo
// An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering !
// http://jira.jboss.com/jira/browse/JGRP-377
- if(msg.isFlagSet(Message.OOB) && added) {
+ if(msg.isFlagSet(Message.Flag.OOB) && added) {
try {
up_prot.up(evt);
}
@@ -666,7 +666,7 @@ protected void handleBatchReceived(Address sender, Map<Short,List<Message>> map)
// An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering !
// http://jira.jboss.com/jira/browse/JGRP-377
- if(msg.isFlagSet(Message.OOB) && msg_added) {
+ if(msg.isFlagSet(Message.Flag.OOB) && msg_added) {
try {
up_prot.up(new Event(Event.MSG, msg));
}
@@ -719,7 +719,7 @@ protected int removeAndDeliver(final AtomicBoolean processing, Table<Message> wi
MessageBatch batch=new MessageBatch(local_addr, sender, null, false, list);
for(Message msg_to_deliver: batch) {
// discard OOB msg: it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-377)
- if(msg_to_deliver.isFlagSet(Message.OOB))
+ if(msg_to_deliver.isFlagSet(Message.Flag.OOB))
batch.remove(msg_to_deliver);
}
if(batch.isEmpty())
@@ -933,7 +933,7 @@ protected void stopRetransmitTask() {
protected void sendAck(Address dst, long seqno, short conn_id) {
if(!running) // if we are disconnected, then don't send any acks which throw exceptions on shutdown
return;
- Message ack=new Message(dst).putHeader(this.id, Header.createAckHeader(seqno, conn_id));
+ Message ack=new Message(dst).setFlag(Message.Flag.INTERNAL).putHeader(this.id, Header.createAckHeader(seqno, conn_id));
if(log.isTraceEnabled())
log.trace(new StringBuilder().append(local_addr).append(" --> ACK(").append(dst).
append(": #").append(seqno).append(')'));
@@ -967,10 +967,9 @@ protected synchronized short getNewConnectionId() {
protected void sendRequestForFirstSeqno(Address dest, long seqno_received) {
- Message msg=new Message(dest);
- msg.setFlag(Message.OOB);
- Header hdr=Header.createSendFirstSeqnoHeader(seqno_received);
- msg.putHeader(this.id, hdr);
+ Message msg=new Message(dest).setFlag(Message.Flag.OOB)
+ .putHeader(this.id, Header.createSendFirstSeqnoHeader(seqno_received));
+
if(log.isTraceEnabled())
log.trace(local_addr + " --> SEND_FIRST_SEQNO(" + dest + "," + seqno_received + ")");
down_prot.down(new Event(Event.MSG, msg));
View
7 src/org/jgroups/protocols/VERIFY_SUSPECT.java
@@ -121,7 +121,7 @@ public Object up(Event evt) {
Message rsp;
Address target=use_mcast_rsps? null : hdr.from;
for(int i=0; i < num_msgs; i++) {
- rsp=new Message(target).setFlag(Message.OOB)
+ rsp=new Message(target).setFlag(Message.Flag.INTERNAL)
.putHeader(this.id, new VerifyHeader(VerifyHeader.I_AM_NOT_DEAD, local_addr));
down_prot.down(new Event(Event.MSG, rsp));
}
@@ -201,9 +201,8 @@ void verifySuspect(Address mbr) {
if(log.isTraceEnabled()) log.trace("verifying that " + mbr + " is dead");
for(int i=0; i < num_msgs; i++) {
- msg=new Message(mbr, null, null);
- msg.setFlag(Message.OOB);
- msg.putHeader(this.id, new VerifyHeader(VerifyHeader.ARE_YOU_DEAD, local_addr));
+ msg=new Message(mbr).setFlag(Message.Flag.INTERNAL)
+ .putHeader(this.id, new VerifyHeader(VerifyHeader.ARE_YOU_DEAD, local_addr));
down_prot.down(new Event(Event.MSG, msg));
}
}
View
5 src/org/jgroups/protocols/pbcast/ClientGmsImpl.java
@@ -186,7 +186,7 @@ private void joinInternal(Address mbr, boolean joinWithStateTransfer,boolean use
}
// send VIEW_ACK to sender of view
- Message view_ack=new Message(coord).setFlag(Message.OOB)
+ Message view_ack=new Message(coord).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL)
.putHeader(gms.getId(), new GMS.GmsHeader(GMS.GmsHeader.VIEW_ACK));
gms.getDownProtocol().down(new Event(Event.MSG, view_ack));
return;
@@ -250,8 +250,7 @@ void sendJoinMessage(Address coord, Address mbr,boolean joinWithTransfer, boolea
Message msg;
GMS.GmsHeader hdr;
- msg=new Message(coord, null, null);
- msg.setFlag(Message.OOB);
+ msg=new Message(coord).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL);
if(joinWithTransfer)
hdr=new GMS.GmsHeader(GMS.GmsHeader.JOIN_REQ_WITH_STATE_TRANSFER, mbr,useFlushIfPresent);
else
View
6 src/org/jgroups/protocols/pbcast/CoordGmsImpl.java
@@ -261,10 +261,8 @@ public void stop() {
private void sendLeaveResponses(Collection<Address> leaving_members) {
for(Address address: leaving_members){
- Message msg=new Message(address, null, null); // send an ack to the leaving member
- msg.setFlag(Message.OOB);
- GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.LEAVE_RSP);
- msg.putHeader(gms.getId(), hdr);
+ Message msg=new Message(address).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL)
+ .putHeader(gms.getId(), new GMS.GmsHeader(GMS.GmsHeader.LEAVE_RSP));
gms.getDownProtocol().down(new Event(Event.MSG, msg));
}
}
View
27 src/org/jgroups/protocols/pbcast/FLUSH.java
@@ -548,9 +548,8 @@ private void handleFlushReconcile(Message msg, FlushHeader fh) {
log.debug(localAddress + ": returned from FLUSH_RECONCILE, "
+ " sending RECONCILE_OK to " + requester);
- Message reconcileOk = new Message(requester);
- reconcileOk.setFlag(Message.OOB);
- reconcileOk.putHeader(this.id, new FlushHeader(FlushHeader.FLUSH_RECONCILE_OK));
+ Message reconcileOk = new Message(requester).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL)
+ .putHeader(this.id,new FlushHeader(FlushHeader.FLUSH_RECONCILE_OK));
down_prot.down(new Event(Event.MSG, reconcileOk));
}
@@ -564,10 +563,8 @@ private void handleStartFlush(Message msg, FlushHeader fh) {
}
onStartFlush(flushRequester, fh);
} else {
- FlushHeader fhr = new FlushHeader(FlushHeader.FLUSH_NOT_COMPLETED, fh.viewID,
- fh.flushParticipants);
- Message response = new Message(flushRequester);
- response.putHeader(this.id, fhr);
+ FlushHeader fhr = new FlushHeader(FlushHeader.FLUSH_NOT_COMPLETED, fh.viewID, fh.flushParticipants);
+ Message response = new Message(flushRequester).putHeader(this.id, fhr);
down_prot.down(new Event(Event.MSG, response));
if (log.isDebugEnabled())
log.debug(localAddress + ": received START_FLUSH, responded with FLUSH_NOT_COMPLETED to " + flushRequester);
@@ -580,9 +577,8 @@ private void rejectFlush(Collection<? extends Address> participants, long viewId
for (Address flushMember : participants) {
if(flushMember == null)
continue;
- Message reject = new Message(flushMember, localAddress, null);
- reject.setFlag(Message.OOB);
- reject.putHeader(this.id, new FlushHeader(FlushHeader.ABORT_FLUSH, viewId,participants));
+ Message reject = new Message(flushMember, localAddress, null).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL)
+ .putHeader(this.id, new FlushHeader(FlushHeader.ABORT_FLUSH, viewId,participants));
down_prot.down(new Event(Event.MSG, reject));
}
}
@@ -693,9 +689,8 @@ private void onSuspend(final List<Address> members) {
flushMembers.addAll(participantsInFlush);
flushMembers.removeAll(suspected);
- msg = new Message(null, localAddress, null);
- msg.putHeader(this.id, new FlushHeader(FlushHeader.START_FLUSH, currentViewId(),
- participantsInFlush));
+ msg = new Message(null, localAddress, null)
+ .putHeader(this.id, new FlushHeader(FlushHeader.START_FLUSH, currentViewId(), participantsInFlush));
}
if (participantsInFlush.isEmpty()) {
flush_promise.setResult(SUCCESS_START_FLUSH);
@@ -778,8 +773,7 @@ private void onStartFlush(Address flushStarter, FlushHeader fh) {
FlushHeader fhr = new FlushHeader(FlushHeader.FLUSH_COMPLETED, fh.viewID,fh.flushParticipants);
fhr.addDigest(digest);
- Message msg = new Message(flushStarter);
- msg.putHeader(this.id, fhr);
+ Message msg = new Message(flushStarter).putHeader(this.id, fhr);
down_prot.down(new Event(Event.MSG, msg));
if (log.isDebugEnabled())
log.debug(localAddress + ": received START_FLUSH, responded with FLUSH_COMPLETED to " + flushStarter);
@@ -807,8 +801,7 @@ private void onFlushCompleted(Address address, final FlushHeader header) {
needsReconciliationPhase = enable_reconciliation && flushCompleted && hasVirtualSynchronyGaps();
if (needsReconciliationPhase) {
Digest d = findHighestSequences();
- msg = new Message();
- msg.setFlag(Message.OOB);
+ msg = new Message().setFlag(Message.Flag.OOB);
FlushHeader fh = new FlushHeader(FlushHeader.FLUSH_RECONCILE, currentViewId(),flushMembers);
reconcileOks.clear();
fh.addDigest(d);
View
15 src/org/jgroups/protocols/pbcast/GMS.java
@@ -545,9 +545,7 @@ public void castViewChange(View new_view, Digest digest, JoinRsp jr, Collection<
}
public void sendJoinResponse(JoinRsp rsp, Address dest) {