From 329f08bbff00f0f8a71123c68a3200804effb045 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Thu, 28 Feb 2013 13:40:33 +0100 Subject: [PATCH] - Added INTERNAL flag (https://issues.jboss.org/browse/JGRP-1599) - Added internal thread pool to TP (https://issues.jboss.org/browse/JGRP-1599) - Replaced Message.OOB with Message.Flag.OOB - Replaced Message.DONT_BUNDLE with Message.Flag.DONT_BUNDLE - Failure detection protocols now use INTERNAL flag - Flow control protocols now use INTERNAL flag - Membership protocols now use INTERNAL flag - Stability protocol now uses INTERNAL flag - MERGE3 protocol now uses INTERNAL flag - Reliable retransmission protocols now use INTERNAL flag - Sending discovery messages with DONT_BUNDLE (required) --- src/org/jgroups/Message.java | 100 ++++------- src/org/jgroups/auth/DemoToken.java | 2 +- src/org/jgroups/blocks/RequestOptions.java | 2 +- src/org/jgroups/protocols/AUTH.java | 8 +- src/org/jgroups/protocols/COUNTER.java | 15 +- src/org/jgroups/protocols/DAISYCHAIN.java | 2 +- src/org/jgroups/protocols/Discovery.java | 13 +- src/org/jgroups/protocols/ENCRYPT.java | 10 +- src/org/jgroups/protocols/Executing.java | 59 ++----- src/org/jgroups/protocols/FC.java | 6 +- src/org/jgroups/protocols/FD.java | 6 +- src/org/jgroups/protocols/FD_ALL.java | 2 +- src/org/jgroups/protocols/FD_SOCK.java | 25 +-- .../jgroups/protocols/FORWARD_TO_COORD.java | 5 +- src/org/jgroups/protocols/FlowControl.java | 12 +- src/org/jgroups/protocols/Locking.java | 55 ++---- src/org/jgroups/protocols/MERGE3.java | 14 +- src/org/jgroups/protocols/PRIO.java | 4 +- src/org/jgroups/protocols/RELAY.java | 13 +- src/org/jgroups/protocols/RSVP.java | 8 +- src/org/jgroups/protocols/SCOPE.java | 4 +- src/org/jgroups/protocols/SEQUENCER.java | 19 +- src/org/jgroups/protocols/STOMP.java | 5 +- src/org/jgroups/protocols/TP.java | 163 ++++++++++++++---- src/org/jgroups/protocols/UDP.java | 4 +- src/org/jgroups/protocols/UNICAST.java | 13 +- src/org/jgroups/protocols/UNICAST2.java | 36 ++-- src/org/jgroups/protocols/UNICAST3.java | 17 +- src/org/jgroups/protocols/VERIFY_SUSPECT.java | 7 +- .../protocols/pbcast/ClientGmsImpl.java | 5 +- .../protocols/pbcast/CoordGmsImpl.java | 6 +- src/org/jgroups/protocols/pbcast/FLUSH.java | 27 ++- src/org/jgroups/protocols/pbcast/GMS.java | 15 +- src/org/jgroups/protocols/pbcast/GmsImpl.java | 2 +- src/org/jgroups/protocols/pbcast/Merger.java | 22 +-- src/org/jgroups/protocols/pbcast/NAKACK.java | 15 +- src/org/jgroups/protocols/pbcast/NAKACK2.java | 10 +- .../protocols/pbcast/ParticipantGmsImpl.java | 7 +- src/org/jgroups/protocols/pbcast/STABLE.java | 9 +- src/org/jgroups/protocols/pbcast/STATE.java | 3 +- .../protocols/pbcast/STATE_TRANSFER.java | 9 +- .../pbcast/StreamingStateTransfer.java | 9 +- src/org/jgroups/protocols/relay/RELAY2.java | 7 +- src/org/jgroups/protocols/tom/TOA.java | 38 ++-- src/org/jgroups/util/MessageBatch.java | 15 +- .../org/jgroups/blocks/RpcDispatcherTest.java | 7 +- .../protocols/NAKACK_Delivery_Test.java | 2 +- .../jgroups/protocols/NAKACK_StressTest.java | 2 +- .../jgroups/protocols/UNICAST_OOB_Test.java | 2 +- .../org/jgroups/tests/DynamicDiscardTest.java | 2 +- .../org/jgroups/tests/MessageTest.java | 80 ++++----- .../jgroups/tests/NakReceiverWindowTest.java | 2 +- .../blocks/executor/ExecutingServiceTest.java | 4 +- .../org/jgroups/tests/ConnectStressTest.java | 4 +- .../org/jgroups/tests/Deadlock2Test.java | 2 +- .../org/jgroups/tests/DuplicateTest.java | 4 +- .../junit/org/jgroups/tests/NAKACK_Test.java | 2 +- tests/junit/org/jgroups/tests/OOBTest.java | 20 +-- .../org/jgroups/tests/UUIDCacheClearTest.java | 13 +- .../tests/MessageDispatcherSpeedTest.java | 2 +- tests/other/org/jgroups/tests/PingPong.java | 4 +- .../jgroups/tests/RpcDispatcherSpeedTest.java | 4 +- .../other/org/jgroups/tests/UnicastTest.java | 2 +- .../org/jgroups/tests/UnicastTestRpc.java | 4 +- tests/perf/org/jgroups/tests/perf/UPerf.java | 14 +- tests/perf/org/jgroups/tests/perf/UUPerf.java | 8 +- 66 files changed, 467 insertions(+), 545 deletions(-) diff --git a/src/org/jgroups/Message.java b/src/org/jgroups/Message.java index 296ad6a6ef1..b9e2b1970f7 100644 --- a/src/org/jgroups/Message.java +++ b/src/org/jgroups/Message.java @@ -62,8 +62,9 @@ public static enum Flag { SCOPED( (short)(1 << 3)), // when a message has a scope NO_RELIABILITY((short)(1 << 4)), // bypass UNICAST(2) and NAKACK NO_TOTAL_ORDER((short)(1 << 5)), // bypass total order (e.g. SEQUENCER) - NO_RELAY((short) (1 << 6)), // bypass relaying (RELAY) - RSVP((short) (1 << 7)); // ack of a multicast (https://issues.jboss.org/browse/JGRP-1389) + NO_RELAY( (short)(1 << 6)), // bypass relaying (RELAY) + RSVP( (short)(1 << 7)), // ack of a multicast (https://issues.jboss.org/browse/JGRP-1389) + INTERNAL( (short)(1 << 8)); // for internal use by JGroups only, don't use ! final short value; Flag(short value) {this.value=value;} @@ -71,14 +72,14 @@ public static enum Flag { public short value() {return value;} } - public static final Flag OOB=Flag.OOB; - public static final Flag DONT_BUNDLE=Flag.DONT_BUNDLE; - public static final Flag NO_FC=Flag.NO_FC; - public static final Flag SCOPED=Flag.SCOPED; - public static final Flag NO_RELIABILITY=Flag.NO_RELIABILITY; - public static final Flag NO_TOTAL_ORDER=Flag.NO_TOTAL_ORDER; - public static final Flag NO_RELAY=Flag.NO_RELAY; - public static final Flag RSVP=Flag.RSVP; + @Deprecated public static final Flag OOB=Flag.OOB; + @Deprecated public static final Flag DONT_BUNDLE=Flag.DONT_BUNDLE; + @Deprecated public static final Flag NO_FC=Flag.NO_FC; + @Deprecated public static final Flag SCOPED=Flag.SCOPED; + @Deprecated public static final Flag NO_RELIABILITY=Flag.NO_RELIABILITY; + @Deprecated public static final Flag NO_TOTAL_ORDER=Flag.NO_TOTAL_ORDER; + @Deprecated public static final Flag NO_RELAY=Flag.NO_RELAY; + @Deprecated public static final Flag RSVP=Flag.RSVP; @@ -91,7 +92,8 @@ public static enum TransientFlag { public short value() {return value;} } - + + @Deprecated public static final TransientFlag OOB_DELIVERED=TransientFlag.OOB_DELIVERED; // OOB which has already been delivered up the stack @@ -837,58 +839,16 @@ public long size() { public static String flagsToString(short flags) { StringBuilder sb=new StringBuilder(); boolean first=true; - if(isFlagSet(flags, Flag.OOB)) { - first=false; - sb.append("OOB"); - } - if(isFlagSet(flags, Flag.DONT_BUNDLE)) { - if(!first) - sb.append("|"); - else - first=false; - sb.append("DONT_BUNDLE"); - } - if(isFlagSet(flags, Flag.NO_FC)) { - if(!first) - sb.append("|"); - else - first=false; - sb.append("NO_FC"); - } - if(isFlagSet(flags, Flag.SCOPED)) { - if(!first) - sb.append("|"); - else - first=false; - sb.append("SCOPED"); - } - if(isFlagSet(flags, Flag.NO_RELIABILITY)) { - if(!first) - sb.append("|"); - else - first=false; - sb.append("NO_RELIABILITY"); - } - if(isFlagSet(flags, Flag.NO_TOTAL_ORDER)) { - if(!first) - sb.append("|"); - else - first=false; - sb.append("NO_TOTAL_ORDER"); - } - if(isFlagSet(flags, Flag.NO_RELAY)) { - if(!first) - sb.append("|"); - else - first=false; - sb.append("NO_RELAY"); - } - if(isFlagSet(flags, Flag.RSVP)) { - if(!first) - sb.append("|"); - else - first=false; - sb.append("RSVP"); + + Flag[] all_flags=Flag.values(); + for(Flag flag: all_flags) { + if(isFlagSet(flags, flag)) { + if(first) + first=false; + else + sb.append("|"); + sb.append(flag); + } } return sb.toString(); } @@ -896,8 +856,18 @@ public static String flagsToString(short flags) { public String transientFlagsToString() { StringBuilder sb=new StringBuilder(); - if(isTransientFlagSet(TransientFlag.OOB_DELIVERED)) - sb.append("OOB_DELIVERED"); + boolean first=true; + + TransientFlag[] all_flags=TransientFlag.values(); + for(TransientFlag flag: all_flags) { + if(isTransientFlagSet(flag)) { + if(first) + first=false; + else + sb.append("|"); + sb.append(flag); + } + } return sb.toString(); } diff --git a/src/org/jgroups/auth/DemoToken.java b/src/org/jgroups/auth/DemoToken.java index 02503981a63..94b99d15edb 100644 --- a/src/org/jgroups/auth/DemoToken.java +++ b/src/org/jgroups/auth/DemoToken.java @@ -42,7 +42,7 @@ public boolean authenticate(AuthToken token, Message msg) { Address sender=msg.getSrc(); // 1. send a challenge to the sender - Message challenge=new Message(sender).setFlag(Message.OOB); + Message challenge=new Message(sender).setFlag(Message.Flag.OOB); byte[] buf=generateRandomBytes(); DemoHeader hdr=new DemoHeader(buf); challenge.putHeader(ID, hdr); diff --git a/src/org/jgroups/blocks/RequestOptions.java b/src/org/jgroups/blocks/RequestOptions.java index 53a7cee5d0c..585637abe49 100644 --- a/src/org/jgroups/blocks/RequestOptions.java +++ b/src/org/jgroups/blocks/RequestOptions.java @@ -31,7 +31,7 @@ public class RequestOptions { protected short scope; /** The flags set in the message in which a request is sent */ - protected short flags; // Message.OOB, Message.DONT_BUNDLE etc + protected short flags; // Message.Flag.OOB, Message.Flag.DONT_BUNDLE etc /** A list of members which should be excluded from a call */ protected Set
exclusion_list; diff --git a/src/org/jgroups/protocols/AUTH.java b/src/org/jgroups/protocols/AUTH.java index 7b2682d2f16..7b08c204281 100644 --- a/src/org/jgroups/protocols/AUTH.java +++ b/src/org/jgroups/protocols/AUTH.java @@ -185,17 +185,13 @@ protected void sendJoinRejectionMessage(Address dest, String error_msg) { if(dest == null) return; - Message msg = new Message(dest, null, null); JoinRsp joinRes=new JoinRsp(error_msg); // specify the error message on the JoinRsp - - GMS.GmsHeader gmsHeader=new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP, joinRes); - msg.putHeader(gms_id, gmsHeader); + Message msg = new Message(dest).putHeader(gms_id, new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP, joinRes)); down_prot.down(new Event(Event.MSG, msg)); } protected void sendMergeRejectionMessage(Address dest) { - Message msg=new Message(dest, null, null); - msg.setFlag(Message.OOB); + Message msg=new Message(dest).setFlag(Message.Flag.OOB); GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP); hdr.setMergeRejected(true); msg.putHeader(gms_id, hdr); diff --git a/src/org/jgroups/protocols/COUNTER.java b/src/org/jgroups/protocols/COUNTER.java index d96fe882aba..a679ea02601 100644 --- a/src/org/jgroups/protocols/COUNTER.java +++ b/src/org/jgroups/protocols/COUNTER.java @@ -431,10 +431,9 @@ protected Owner getOwner() { protected void sendRequest(Address dest, Request req) { try { Buffer buffer=requestToBuffer(req); - Message msg=new Message(dest, null, buffer); - msg.putHeader(id, new CounterHeader()); + Message msg=new Message(dest, buffer).putHeader(id, new CounterHeader()); if(bypass_bundling) - msg.setFlag(Message.DONT_BUNDLE); + msg.setFlag(Message.Flag.DONT_BUNDLE); if(log.isTraceEnabled()) log.trace("[" + local_addr + "] --> [" + (dest == null? "ALL" : dest) + "] " + req); @@ -449,10 +448,9 @@ protected void sendRequest(Address dest, Request req) { protected void sendResponse(Address dest, Response rsp) { try { Buffer buffer=responseToBuffer(rsp); - Message rsp_msg=new Message(dest, null, buffer); - rsp_msg.putHeader(id, new CounterHeader()); + Message rsp_msg=new Message(dest, buffer).putHeader(id, new CounterHeader()); if(bypass_bundling) - rsp_msg.setFlag(Message.DONT_BUNDLE); + rsp_msg.setFlag(Message.Flag.DONT_BUNDLE); if(log.isTraceEnabled()) log.trace("[" + local_addr + "] --> [" + dest + "] " + rsp); @@ -480,10 +478,9 @@ protected void updateBackups(String name, long value, long version) { protected void send(Address dest, Buffer buffer) { try { - Message rsp_msg=new Message(dest, null, buffer); - rsp_msg.putHeader(id, new CounterHeader()); + Message rsp_msg=new Message(dest, buffer).putHeader(id, new CounterHeader()); if(bypass_bundling) - rsp_msg.setFlag(Message.DONT_BUNDLE); + rsp_msg.setFlag(Message.Flag.DONT_BUNDLE); down_prot.down(new Event(Event.MSG, rsp_msg)); } catch(Exception ex) { diff --git a/src/org/jgroups/protocols/DAISYCHAIN.java b/src/org/jgroups/protocols/DAISYCHAIN.java index 42a04d1e94c..e087bb5eec8 100644 --- a/src/org/jgroups/protocols/DAISYCHAIN.java +++ b/src/org/jgroups/protocols/DAISYCHAIN.java @@ -101,7 +101,7 @@ public Object down(final Event evt) { if(msg.getSrc() == null) msg.setSrc(local_addr); - Executor pool=msg.isFlagSet(Message.OOB)? oob_pool : default_pool; + Executor pool=msg.isFlagSet(Message.Flag.OOB)? oob_pool : default_pool; pool.execute(new Runnable() { public void run() { up_prot.up(evt); diff --git a/src/org/jgroups/protocols/Discovery.java b/src/org/jgroups/protocols/Discovery.java index 20ab754bfe5..65426fcc420 100644 --- a/src/org/jgroups/protocols/Discovery.java +++ b/src/org/jgroups/protocols/Discovery.java @@ -259,8 +259,10 @@ public void sendDiscoveryRequest(String cluster_name, Promise promise, ViewId vi Collection cluster_members=fetchClusterMembers(cluster_name); if(cluster_members == null) { - // multicast msg - Message msg=new Message(null).setFlag(Message.OOB, Message.Flag.DONT_BUNDLE).putHeader(getId(), hdr); + // message needs to have DONT_BUNDLE flag: if A sends message M to B, and we need to fetch B's physical + // address, then the bundler thread blocks until the discovery request has returned. However, we cannot send + // the discovery *request* until the bundler thread has returned from sending M + Message msg=new Message(null).setFlag(Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE).putHeader(getId(), hdr); sendMcastDiscoveryRequest(msg); } else { @@ -281,7 +283,9 @@ public void sendDiscoveryRequest(String cluster_name, Promise promise, ViewId vi for(final Address addr: cluster_members) { if(addr.equals(physical_addr)) // no need to send the request to myself continue; - final Message msg=new Message(addr).setFlag(Message.OOB, Message.Flag.DONT_BUNDLE).putHeader(this.id, hdr); + // the message needs to be DONT_BUNDLE, see explanation above + final Message msg=new Message(addr).setFlag(Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE) + .putHeader(this.id, hdr); if(log.isTraceEnabled()) log.trace(local_addr + ": sending discovery request to " + msg.getDest()); if(!sendDiscoveryRequestsInParallel()) { @@ -634,9 +638,8 @@ protected void sendDiscoveryResponse(Address logical_addr, List data=new PingData(logical_addr, null, view_id, is_server, logical_name, physical_addrs); } - final Message rsp_msg=new Message(sender).setFlag(Message.OOB); final PingHeader rsp_hdr=new PingHeader(PingHeader.GET_MBRS_RSP, data); - rsp_msg.putHeader(this.id, rsp_hdr); + final Message rsp_msg=new Message(sender).setFlag(Message.Flag.INTERNAL).putHeader(this.id, rsp_hdr); if(stagger_timeout > 0) { int view_size=view != null? view.size() : 10; diff --git a/src/org/jgroups/protocols/ENCRYPT.java b/src/org/jgroups/protocols/ENCRYPT.java index d3c212cfc9e..0d389e37036 100644 --- a/src/org/jgroups/protocols/ENCRYPT.java +++ b/src/org/jgroups/protocols/ENCRYPT.java @@ -842,9 +842,8 @@ private void sendSecretKey(SecretKey secret, PublicKey pubKey, Address source) t //encrypt current secret key byte[] encryptedKey=tmp.doFinal(secret.getEncoded()); - newMsg=new Message(source, local_addr, encryptedKey); - - newMsg.putHeader(this.id, new EncryptHeader(EncryptHeader.SECRETKEY, getSymVersion())); + newMsg=new Message(source, local_addr, encryptedKey) + .putHeader(this.id, new EncryptHeader(EncryptHeader.SECRETKEY, getSymVersion())); if(log.isDebugEnabled()) log.debug(" Sending version " + getSymVersion() + " encoded key to client"); @@ -860,9 +859,8 @@ private Message sendKeyRequest() { // send client's public key to server and request // server's public key - Message newMsg=new Message(keyServerAddr, local_addr, Kpair.getPublic().getEncoded()); - - newMsg.putHeader(this.id,new EncryptHeader(EncryptHeader.KEY_REQUEST,getSymVersion())); + Message newMsg=new Message(keyServerAddr, local_addr, Kpair.getPublic().getEncoded()) + .putHeader(this.id,new EncryptHeader(EncryptHeader.KEY_REQUEST,getSymVersion())); passItDown(new Event(Event.MSG,newMsg)); return newMsg; } diff --git a/src/org/jgroups/protocols/Executing.java b/src/org/jgroups/protocols/Executing.java index 92874bbfedb..68ad844ef04 100644 --- a/src/org/jgroups/protocols/Executing.java +++ b/src/org/jgroups/protocols/Executing.java @@ -1,43 +1,6 @@ package org.jgroups.protocols; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.Externalizable; -import java.io.IOException; -import java.io.NotSerializableException; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.jgroups.Address; -import org.jgroups.Event; -import org.jgroups.Header; -import org.jgroups.Message; -import org.jgroups.View; +import org.jgroups.*; import org.jgroups.annotations.MBean; import org.jgroups.annotations.ManagedAttribute; import org.jgroups.annotations.Property; @@ -48,6 +11,16 @@ import org.jgroups.util.Streamable; import org.jgroups.util.Util; +import java.io.*; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.*; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** * This is the base protocol used for executions. * @author wburns @@ -901,10 +874,9 @@ protected void handleRemoveConsumer(Owner sender) { protected void sendRequest(Address dest, Type type, long requestId, Object object) { Request req=new Request(type, object, requestId); - Message msg=new Message(dest, null, req); - msg.putHeader(id, new ExecutorHeader()); + Message msg=new Message(dest, req).putHeader(id, new ExecutorHeader()); if(bypass_bundling) - msg.setFlag(Message.DONT_BUNDLE); + msg.setFlag(Message.Flag.DONT_BUNDLE); if(log.isTraceEnabled()) log.trace("[" + local_addr + "] --> [" + (dest == null? "ALL" : dest) + "] " + req); try { @@ -918,10 +890,9 @@ protected void sendRequest(Address dest, Type type, long requestId, Object objec protected void sendThreadRequest(Address dest, long threadId, Type type, long requestId, Object object) { RequestWithThread req=new RequestWithThread(type, object, requestId, threadId); - Message msg=new Message(dest, null, req); - msg.putHeader(id, new ExecutorHeader()); + Message msg=new Message(dest, req).putHeader(id, new ExecutorHeader()); if(bypass_bundling) - msg.setFlag(Message.DONT_BUNDLE); + msg.setFlag(Message.Flag.DONT_BUNDLE); if(log.isTraceEnabled()) log.trace("[" + local_addr + "] --> [" + (dest == null? "ALL" : dest) + "] " + req); try { diff --git a/src/org/jgroups/protocols/FC.java b/src/org/jgroups/protocols/FC.java index c6c5d00412c..49f8d2346bc 100644 --- a/src/org/jgroups/protocols/FC.java +++ b/src/org/jgroups/protocols/FC.java @@ -781,7 +781,8 @@ private void sendCredit(Address dest, long credit) { number=(int)credit; else number=credit; - Message msg=new Message(dest, number).setFlag(Message.OOB, Message.Flag.DONT_BUNDLE).putHeader(this.id,REPLENISH_HDR); + Message msg=new Message(dest, number).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE) + .putHeader(this.id,REPLENISH_HDR); down_prot.down(new Event(Event.MSG, msg)); num_credit_responses_sent++; } @@ -795,7 +796,8 @@ private void sendCredit(Address dest, long credit) { private void sendCreditRequest(final Address dest, Long credits_left) { if(log.isTraceEnabled()) log.trace("sending credit request to " + dest); - Message msg=new Message(dest, credits_left).setFlag(Message.Flag.DONT_BUNDLE).putHeader(this.id,CREDIT_REQUEST_HDR); + Message msg=new Message(dest, credits_left).setFlag(Message.Flag.DONT_BUNDLE, Message.Flag.INTERNAL) + .putHeader(this.id,CREDIT_REQUEST_HDR); down_prot.down(new Event(Event.MSG, msg)); num_credit_requests_sent++; } diff --git a/src/org/jgroups/protocols/FD.java b/src/org/jgroups/protocols/FD.java index f88394cc304..19a723cef54 100644 --- a/src/org/jgroups/protocols/FD.java +++ b/src/org/jgroups/protocols/FD.java @@ -309,7 +309,7 @@ else if(!isMonitorRunning()) protected void sendHeartbeatResponse(Address dest) { - Message hb_ack=new Message(dest).setFlag(Message.OOB); + Message hb_ack=new Message(dest).setFlag(Message.Flag.INTERNAL); FdHeader tmp_hdr=new FdHeader(FdHeader.HEARTBEAT_ACK); tmp_hdr.from=local_addr; hb_ack.putHeader(this.id, tmp_hdr); @@ -437,7 +437,7 @@ public void run() { } // 1. send heartbeat request - Message hb_req=new Message(dest).setFlag(Message.OOB).putHeader(id, new FdHeader(FdHeader.HEARTBEAT)); + Message hb_req=new Message(dest).setFlag(Message.Flag.INTERNAL).putHeader(id, new FdHeader(FdHeader.HEARTBEAT)); if(log.isDebugEnabled()) log.debug(local_addr + ": sending are-you-alive msg to " + dest); down_prot.down(new Event(Event.MSG, hb_req)); @@ -599,7 +599,7 @@ public void run() { hdr.mbrs=new ArrayList
(suspected_members); hdr.from=local_addr; } - Message suspect_msg=new Message().setFlag(Message.OOB).putHeader(id, hdr); + Message suspect_msg=new Message().setFlag(Message.Flag.INTERNAL).putHeader(id, hdr); if(log.isDebugEnabled()) log.debug(local_addr + ": broadcasting SUSPECT message (suspects=" + suspected_members + ")"); down_prot.down(new Event(Event.MSG, suspect_msg)); diff --git a/src/org/jgroups/protocols/FD_ALL.java b/src/org/jgroups/protocols/FD_ALL.java index cdd8ca31536..1ebb17e8e13 100644 --- a/src/org/jgroups/protocols/FD_ALL.java +++ b/src/org/jgroups/protocols/FD_ALL.java @@ -386,7 +386,7 @@ public void readFrom(DataInput in) throws Exception {} */ class HeartbeatSender implements Runnable { public void run() { - Message heartbeat=new Message().setFlag(Message.OOB).putHeader(id, new HeartbeatHeader()); + Message heartbeat=new Message().setFlag(Message.Flag.INTERNAL).putHeader(id, new HeartbeatHeader()); down_prot.down(new Event(Event.MSG, heartbeat)); num_heartbeats_sent++; } diff --git a/src/org/jgroups/protocols/FD_SOCK.java b/src/org/jgroups/protocols/FD_SOCK.java index 1358946b956..56487dc536a 100644 --- a/src/org/jgroups/protocols/FD_SOCK.java +++ b/src/org/jgroups/protocols/FD_SOCK.java @@ -256,9 +256,7 @@ public Object up(Event evt) { case FdHeader.GET_CACHE: Address sender=msg.getSrc(); // guaranteed to be non-null hdr=new FdHeader(FdHeader.GET_CACHE_RSP,new HashMap(cache)); - msg=new Message(sender, null, null); - msg.setFlag(Message.OOB); - msg.putHeader(this.id, hdr); + msg=new Message(sender).setFlag(Message.Flag.INTERNAL).putHeader(this.id, hdr); down_prot.down(new Event(Event.MSG, msg)); break; @@ -656,9 +654,7 @@ void getCacheFromCoordinator() { return; } hdr=new FdHeader(FdHeader.GET_CACHE); - msg=new Message(coord, null, null); - msg.setFlag(Message.OOB); - msg.putHeader(this.id, hdr); + msg=new Message(coord).setFlag(Message.Flag.INTERNAL).putHeader(this.id, hdr); down_prot.down(new Event(Event.MSG, msg)); result=get_cache_promise.getResult(get_cache_timeout); if(result != null) { @@ -694,9 +690,7 @@ void broadcastSuspectMessage(Address suspected_mbr) { hdr=new FdHeader(FdHeader.SUSPECT); hdr.mbrs=new HashSet
(1); hdr.mbrs.add(suspected_mbr); - suspect_msg=new Message(); - suspect_msg.setFlag(Message.OOB); - suspect_msg.putHeader(this.id, hdr); + suspect_msg=new Message().setFlag(Message.Flag.INTERNAL).putHeader(this.id, hdr); down_prot.down(new Event(Event.MSG, suspect_msg)); // 2. Add to broadcast task and start latter (if not yet running). The task will end when @@ -716,8 +710,7 @@ void broadcastSuspectMessage(Address suspected_mbr) { it will be unicast back to the requester */ void sendIHaveSockMessage(Address dst, Address mbr, IpAddress addr) { - Message msg=new Message(dst, null, null); - msg.setFlag(Message.OOB); + Message msg=new Message(dst).setFlag(Message.Flag.INTERNAL); FdHeader hdr=new FdHeader(FdHeader.I_HAVE_SOCK); hdr.mbr=mbr; hdr.sock_addr=addr; @@ -746,8 +739,7 @@ private IpAddress fetchPingAddress(Address mbr) { // 2. Try to get the server socket address from mbr ping_addr_promise.reset(); - ping_addr_req=new Message(mbr, null, null); // unicast - ping_addr_req.setFlag(Message.OOB); + ping_addr_req=new Message(mbr).setFlag(Message.Flag.INTERNAL); hdr=new FdHeader(FdHeader.WHO_HAS_SOCK); hdr.mbr=mbr; ping_addr_req.putHeader(this.id, hdr); @@ -760,8 +752,7 @@ private IpAddress fetchPingAddress(Address mbr) { if(!isPingerThreadRunning()) return null; // 3. Try to get the server socket address from all members - ping_addr_req=new Message(null); // multicast - ping_addr_req.setFlag(Message.OOB); + ping_addr_req=new Message(null).setFlag(Message.Flag.INTERNAL); hdr=new FdHeader(FdHeader.WHO_HAS_SOCK); hdr.mbr=mbr; ping_addr_req.putHeader(this.id, hdr); @@ -1224,9 +1215,7 @@ public void run() { hdr=new FdHeader(FdHeader.SUSPECT); hdr.mbrs=new HashSet
(suspects); } - suspect_msg=new Message(); // mcast SUSPECT to all members - suspect_msg.setFlag(Message.OOB); - suspect_msg.putHeader(id, hdr); + suspect_msg=new Message().setFlag(Message.Flag.INTERNAL).putHeader(id, hdr); // mcast SUSPECT to all members down_prot.down(new Event(Event.MSG, suspect_msg)); if(log.isTraceEnabled()) log.trace("task done"); } diff --git a/src/org/jgroups/protocols/FORWARD_TO_COORD.java b/src/org/jgroups/protocols/FORWARD_TO_COORD.java index e797fc6db14..41b704ab2df 100644 --- a/src/org/jgroups/protocols/FORWARD_TO_COORD.java +++ b/src/org/jgroups/protocols/FORWARD_TO_COORD.java @@ -180,10 +180,7 @@ protected void sendNotCoord(Address target, long ack_id) { } protected void send(Address target, long ack_id, byte type) { - Message msg=new Message(target); - ForwardHeader hdr=new ForwardHeader(type, ack_id); - msg.putHeader(id, hdr); - down_prot.down(new Event(Event.MSG, msg)); + down_prot.down(new Event(Event.MSG, new Message(target).putHeader(id, new ForwardHeader(type, ack_id)))); } diff --git a/src/org/jgroups/protocols/FlowControl.java b/src/org/jgroups/protocols/FlowControl.java index 98b67744a38..a36ad2b1c48 100644 --- a/src/org/jgroups/protocols/FlowControl.java +++ b/src/org/jgroups/protocols/FlowControl.java @@ -528,15 +528,15 @@ protected void handleCreditRequest(Map map, Address sender, long protected void sendCredit(Address dest, long credits) { if(log.isTraceEnabled()) if(log.isTraceEnabled()) log.trace("sending " + credits + " credits to " + dest); - Message msg=new Message(dest, null,credits); - msg.setFlag(Message.OOB, Message.Flag.DONT_BUNDLE); - msg.putHeader(this.id, REPLENISH_HDR); + Message msg=new Message(dest, credits) + .setFlag(Message.Flag.OOB, Message.Flag.INTERNAL) + .putHeader(this.id,REPLENISH_HDR); down_prot.down(new Event(Event.MSG, msg)); num_credit_responses_sent++; } /** - * We cannot send this request as OOB messages, as the credit request needs to queue up behind the regular messages; + * We cannot send this request as OOB message, as the credit request needs to queue up behind the regular messages; * if a receiver cannot process the regular messages, that is a sign that the sender should be throttled ! * @param dest The member to which we send the credit request * @param credits_needed The number of bytes (of credits) left for dest @@ -544,8 +544,8 @@ protected void sendCredit(Address dest, long credits) { protected void sendCreditRequest(final Address dest, Long credits_needed) { if(log.isTraceEnabled()) log.trace("sending request for " + credits_needed + " credits to " + dest); - Message msg=new Message(dest, null, credits_needed).setFlag(Message.Flag.DONT_BUNDLE); - msg.putHeader(this.id, CREDIT_REQUEST_HDR); + Message msg=new Message(dest, credits_needed).setFlag(Message.Flag.INTERNAL) + .putHeader(this.id, CREDIT_REQUEST_HDR); down_prot.down(new Event(Event.MSG, msg)); num_credit_requests_sent++; } diff --git a/src/org/jgroups/protocols/Locking.java b/src/org/jgroups/protocols/Locking.java index 325f164d358..38439167156 100644 --- a/src/org/jgroups/protocols/Locking.java +++ b/src/org/jgroups/protocols/Locking.java @@ -1,31 +1,6 @@ package org.jgroups.protocols; -import java.io.DataInput; -import java.io.DataOutput; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.LockSupport; - -import org.jgroups.Address; -import org.jgroups.Event; -import org.jgroups.Header; -import org.jgroups.Message; -import org.jgroups.View; +import org.jgroups.*; import org.jgroups.annotations.MBean; import org.jgroups.annotations.ManagedAttribute; import org.jgroups.annotations.ManagedOperation; @@ -33,11 +8,22 @@ import org.jgroups.blocks.locking.AwaitInfo; import org.jgroups.blocks.locking.LockInfo; import org.jgroups.blocks.locking.LockNotification; -import org.jgroups.util.Owner; import org.jgroups.stack.Protocol; +import org.jgroups.util.Owner; import org.jgroups.util.Streamable; import org.jgroups.util.Util; +import java.io.DataInput; +import java.io.DataOutput; +import java.util.*; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.LockSupport; + /** @@ -360,10 +346,9 @@ protected Owner getOwner() { protected void sendRequest(Address dest, Type type, String lock_name, Owner owner, long timeout, boolean is_trylock) { Request req=new Request(type, lock_name, owner, timeout, is_trylock); - Message msg=new Message(dest, null, req); - msg.putHeader(id, new LockingHeader()); + Message msg=new Message(dest, req).putHeader(id, new LockingHeader()); if(bypass_bundling) - msg.setFlag(Message.DONT_BUNDLE); + msg.setFlag(Message.Flag.DONT_BUNDLE); if(log.isTraceEnabled()) log.trace("[" + local_addr + "] --> [" + (dest == null? "ALL" : dest) + "] " + req); try { @@ -377,10 +362,9 @@ protected void sendRequest(Address dest, Type type, String lock_name, Owner owne protected void sendLockResponse(Type type, Owner dest, String lock_name) { Request rsp=new Request(type, lock_name, dest, 0); - Message lock_granted_rsp=new Message(dest.getAddress(), null, rsp); - lock_granted_rsp.putHeader(id, new LockingHeader()); + Message lock_granted_rsp=new Message(dest.getAddress(), rsp).putHeader(id, new LockingHeader()); if(bypass_bundling) - lock_granted_rsp.setFlag(Message.DONT_BUNDLE); + lock_granted_rsp.setFlag(Message.Flag.DONT_BUNDLE); if(log.isTraceEnabled()) log.trace("[" + local_addr + "] --> [" + dest.getAddress() + "] " + rsp); @@ -396,10 +380,9 @@ protected void sendLockResponse(Type type, Owner dest, String lock_name) { protected void sendSignalResponse(Owner dest, String lock_name) { Request rsp=new Request(Type.SIG_RET, lock_name, dest, 0); - Message lock_granted_rsp=new Message(dest.getAddress(), null, rsp); - lock_granted_rsp.putHeader(id, new LockingHeader()); + Message lock_granted_rsp=new Message(dest.getAddress(), rsp).putHeader(id, new LockingHeader()); if(bypass_bundling) - lock_granted_rsp.setFlag(Message.DONT_BUNDLE); + lock_granted_rsp.setFlag(Message.Flag.DONT_BUNDLE); if(log.isTraceEnabled()) log.trace("[" + local_addr + "] --> [" + dest.getAddress() + "] " + rsp); diff --git a/src/org/jgroups/protocols/MERGE3.java b/src/org/jgroups/protocols/MERGE3.java index 2d35621c7bb..48c19b2a60a 100644 --- a/src/org/jgroups/protocols/MERGE3.java +++ b/src/org/jgroups/protocols/MERGE3.java @@ -274,8 +274,7 @@ public Object up(Event evt) { case VIEW_REQ: View tmp_view=view != null? view.copy() : null; Header tmphdr=MergeHeader.createViewResponse(tmp_view); - Message view_rsp=new Message(sender); - view_rsp.putHeader(getId(), tmphdr); + Message view_rsp=new Message(sender).setFlag(Message.Flag.INTERNAL).putHeader(getId(),tmphdr); down_prot.down(new Event(Event.MSG, view_rsp)); break; case VIEW_RSP: @@ -318,8 +317,7 @@ public void run() { MergeHeader hdr=MergeHeader.createInfo(view_id, logical_name, Arrays.asList(physical_addr)); if(transport_supports_multicasting) { - Message msg=new Message(); - msg.putHeader(getId(), hdr); + Message msg=new Message().setFlag(Message.Flag.INTERNAL).putHeader(getId(), hdr); down_prot.down(new Event(Event.MSG, msg)); return; } @@ -339,8 +337,7 @@ public void run() { log.trace("discovery protocol " + discovery_protocol.getName() + " returned " + physical_addrs.size() + " physical addresses: " + Util.printListWithDelimiter(physical_addrs, ", ", 10)); for(Address addr: physical_addrs) { - Message info=new Message(addr); - info.putHeader(getId(), hdr); + Message info=new Message(addr).setFlag(Message.Flag.INTERNAL).putHeader(getId(), hdr); down_prot.down(new Event(Event.MSG, info)); } } @@ -428,9 +425,8 @@ protected void _run() { view_rsps.add(local_addr, view.copy()); continue; } - Message view_req=new Message(target); - Header hdr=MergeHeader.createViewRequest(); - view_req.putHeader(getId(), hdr); + Message view_req=new Message(target).setFlag(Message.Flag.INTERNAL) + .putHeader(getId(), MergeHeader.createViewRequest()); down_prot.down(new Event(Event.MSG, view_req)); } view_rsps.waitForAllResponses(check_interval / 10); diff --git a/src/org/jgroups/protocols/PRIO.java b/src/org/jgroups/protocols/PRIO.java index c8486b61aa3..ce03985ad85 100644 --- a/src/org/jgroups/protocols/PRIO.java +++ b/src/org/jgroups/protocols/PRIO.java @@ -98,7 +98,7 @@ public Object up(Event evt) { switch(evt.getType()) { case Event.MSG: Message message = (Message)evt.getArg(); - if ( message.isFlagSet( Message.OOB ) ) { + if ( message.isFlagSet( Message.Flag.OOB ) ) { return up_prot.up(evt); } else { @@ -146,7 +146,7 @@ public Object down(Event evt) { switch(evt.getType()) { case Event.MSG: Message message = (Message)evt.getArg(); - if ( message.isFlagSet( Message.OOB ) ) { + if ( message.isFlagSet( Message.Flag.OOB ) ) { return down_prot.down(evt); } else { diff --git a/src/org/jgroups/protocols/RELAY.java b/src/org/jgroups/protocols/RELAY.java index 80c0c4a2e9e..873e269d030 100644 --- a/src/org/jgroups/protocols/RELAY.java +++ b/src/org/jgroups/protocols/RELAY.java @@ -323,8 +323,7 @@ protected Object installView(byte[] buf, int offset, int length) { /** Forwards the message across the TCP link to the other local cluster */ protected void forward(byte[] buffer, int offset, int length) { - Message msg=new Message(null, null, buffer, offset, length); - msg.putHeader(id, new RelayHeader(RelayHeader.Type.FORWARD)); + Message msg=new Message(null, null, buffer, offset, length).putHeader(id, new RelayHeader(RelayHeader.Type.FORWARD)); if(bridge != null) { try { bridge.send(msg); @@ -366,8 +365,7 @@ protected void sendViewToRemote(ViewData view_data, boolean use_seperate_thread) try { if(bridge != null && bridge.isConnected()) { byte[] buf=Util.streamableToByteBuffer(view_data); - final Message msg=new Message(null, null, buf); - msg.putHeader(id, RelayHeader.create(RelayHeader.Type.VIEW)); + final Message msg=new Message(null, buf).putHeader(id, RelayHeader.create(RelayHeader.Type.VIEW)); if(use_seperate_thread) { timer.execute(new Runnable() { public void run() { @@ -503,9 +501,7 @@ public void run() { protected void sendViewOnLocalCluster(final List
destinations, final byte[] buffer) { for(Address dest: destinations) { - Message view_msg=new Message(dest, null, buffer); - RelayHeader hdr=RelayHeader.create(RelayHeader.Type.VIEW); - view_msg.putHeader(id, hdr); + Message view_msg=new Message(dest, buffer).putHeader(id, RelayHeader.create(RelayHeader.Type.VIEW)); down_prot.down(new Event(Event.MSG, view_msg)); } } @@ -614,8 +610,7 @@ protected class RemoteViewFetcher implements Runnable { public void run() { if(bridge == null || !bridge.isConnected() || remote_view != null) return; - Message msg=new Message(); - msg.putHeader(id, RelayHeader.create(RELAY.RelayHeader.Type.BROADCAST_VIEW)); + Message msg=new Message().putHeader(id, RelayHeader.create(RELAY.RelayHeader.Type.BROADCAST_VIEW)); try { bridge.send(msg); } diff --git a/src/org/jgroups/protocols/RSVP.java b/src/org/jgroups/protocols/RSVP.java index 7803aae3fea..314b701d624 100644 --- a/src/org/jgroups/protocols/RSVP.java +++ b/src/org/jgroups/protocols/RSVP.java @@ -241,10 +241,9 @@ protected void handleResponse(Address member, short id) { protected void sendResponse(Address dest, short id) { try { - Message msg=new Message(dest); - msg.setFlag(Message.Flag.RSVP, Message.Flag.DONT_BUNDLE); RsvpHeader hdr=new RsvpHeader(RsvpHeader.RSP,id); - msg.putHeader(this.id, hdr); + Message msg=new Message(dest).setFlag(Message.Flag.RSVP, Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE) + .putHeader(this.id, hdr); if(log.isTraceEnabled()) log.trace(local_addr + ": " + hdr.typeToString() + " --> " + dest); down_prot.down(new Event(Event.MSG, msg)); @@ -286,9 +285,8 @@ public void run() { cancelTask(); return; } - Message msg=new Message(target).setFlag(Message.Flag.RSVP); RsvpHeader hdr=new RsvpHeader(RsvpHeader.REQ_ONLY, rsvp_id); - msg.putHeader(id, hdr); + Message msg=new Message(target).setFlag(Message.Flag.RSVP).putHeader(id,hdr); if(log.isTraceEnabled()) log.trace(local_addr + ": " + hdr.typeToString() + " --> " + target); down_prot.down(new Event(Event.MSG, msg)); diff --git a/src/org/jgroups/protocols/SCOPE.java b/src/org/jgroups/protocols/SCOPE.java index db8463ddcca..44e733bc4c1 100644 --- a/src/org/jgroups/protocols/SCOPE.java +++ b/src/org/jgroups/protocols/SCOPE.java @@ -217,7 +217,7 @@ public Object up(Event evt) { Message msg=(Message)evt.getArg(); // we don't handle unscoped or OOB messages - if(!msg.isFlagSet(Message.SCOPED) || msg.isFlagSet(Message.OOB)) + if(!msg.isFlagSet(Message.SCOPED) || msg.isFlagSet(Message.Flag.OOB)) break; ScopeHeader hdr=(ScopeHeader)msg.getHeader(id); @@ -248,7 +248,7 @@ public Object up(Event evt) { public void up(MessageBatch batch) { for(Message msg: batch) { - if(!msg.isFlagSet(Message.SCOPED) || msg.isFlagSet(Message.OOB)) // we don't handle unscoped or OOB messages + if(!msg.isFlagSet(Message.SCOPED) || msg.isFlagSet(Message.Flag.OOB)) // we don't handle unscoped or OOB messages continue; ScopeHeader hdr=(ScopeHeader)msg.getHeader(id); diff --git a/src/org/jgroups/protocols/SEQUENCER.java b/src/org/jgroups/protocols/SEQUENCER.java index 02e902d82ad..44f08214260 100644 --- a/src/org/jgroups/protocols/SEQUENCER.java +++ b/src/org/jgroups/protocols/SEQUENCER.java @@ -144,7 +144,7 @@ public Object down(Event evt) { switch(evt.getType()) { case Event.MSG: Message msg=(Message)evt.getArg(); - if(msg.getDest() != null || msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.OOB)) + if(msg.getDest() != null || msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB)) break; if(msg.getSrc() == null) @@ -202,7 +202,7 @@ public Object up(Event evt) { switch(evt.getType()) { case Event.MSG: msg=(Message)evt.getArg(); - if(msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.OOB)) + if(msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB)) break; hdr=(SequencerHeader)msg.getHeader(this.id); if(hdr == null) @@ -255,7 +255,7 @@ public Object up(Event evt) { public void up(MessageBatch batch) { for(Message msg: batch) { - if(msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.OOB) || msg.getHeader(id) == null) + if(msg.isFlagSet(Message.NO_TOTAL_ORDER) || msg.isFlagSet(Message.Flag.OOB) || msg.getHeader(id) == null) continue; batch.remove(msg); @@ -349,9 +349,8 @@ protected void flushMessagesInForwardTable() { Long key=entry.getKey(); byte[] val=entry.getValue(); - Message forward_msg=new Message(null, val); SequencerHeader hdr=new SequencerHeader(SequencerHeader.WRAPPED_BCAST, key); - forward_msg.putHeader(this.id,hdr); + Message forward_msg=new Message(null, val).putHeader(this.id, hdr); if(log.isTraceEnabled()) log.trace(local_addr + ": flushing (broadcasting) " + local_addr + "::" + key); down_prot.down(new Event(Event.MSG, forward_msg)); @@ -376,10 +375,8 @@ protected void flushMessagesInForwardTable() { byte[] val=entry.getValue(); while(flushing && running && !forward_table.isEmpty()) { - Message forward_msg=new Message(coord, val); SequencerHeader hdr=new SequencerHeader(SequencerHeader.FLUSH, key); - forward_msg.putHeader(this.id,hdr); - forward_msg.setFlag(Message.Flag.DONT_BUNDLE); + Message forward_msg=new Message(coord, val).putHeader(this.id,hdr).setFlag(Message.Flag.DONT_BUNDLE); if(log.isTraceEnabled()) log.trace(local_addr + ": flushing (forwarding) " + local_addr + "::" + key + " to coord " + coord); ack_promise.reset(); @@ -426,10 +423,9 @@ protected void forward(final byte[] marshalled_msg, long seqno, boolean flush) { Address target=coord; if(target == null) return; - Message forward_msg=new Message(target, marshalled_msg); byte type=flush? SequencerHeader.FLUSH : SequencerHeader.FORWARD; SequencerHeader hdr=new SequencerHeader(type, seqno); - forward_msg.putHeader(this.id,hdr); + Message forward_msg=new Message(target, marshalled_msg).putHeader(this.id,hdr); down_prot.down(new Event(Event.MSG, forward_msg)); forwarded_msgs++; } @@ -441,9 +437,8 @@ protected void broadcast(final Message msg, boolean copy, Address original_sende bcast_msg=msg; // no need to add a header, message already has one } else { - bcast_msg=new Message(null, msg.getRawBuffer(), msg.getOffset(), msg.getLength()); SequencerHeader new_hdr=new SequencerHeader(SequencerHeader.WRAPPED_BCAST, seqno); - bcast_msg.putHeader(this.id, new_hdr); + bcast_msg=new Message(null, msg.getRawBuffer(), msg.getOffset(), msg.getLength()).putHeader(this.id, new_hdr); if(resend) { new_hdr.flush_ack=true; bcast_msg.setFlag(Message.Flag.DONT_BUNDLE); diff --git a/src/org/jgroups/protocols/STOMP.java b/src/org/jgroups/protocols/STOMP.java index 293b8a28d9a..acebac964de 100644 --- a/src/org/jgroups/protocols/STOMP.java +++ b/src/org/jgroups/protocols/STOMP.java @@ -342,8 +342,7 @@ protected String getAllEndpoints() { protected void broadcastEndpoint() { if(endpoint != null) { - Message msg=new Message(); - msg.putHeader(id, StompHeader.createHeader(StompHeader.Type.ENDPOINT, "endpoint", endpoint)); + Message msg=new Message().putHeader(id, StompHeader.createHeader(StompHeader.Type.ENDPOINT, "endpoint", endpoint)); down_prot.down(new Event(Event.MSG, msg)); } } @@ -479,7 +478,7 @@ protected void handleFrame(Frame frame) { headers.put("sender", session_id.toString()); } - Message msg=new Message(null, null, frame.getBody()); + Message msg=new Message(null, frame.getBody()); Header hdr=StompHeader.createHeader(StompHeader.Type.MESSAGE, headers); msg.putHeader(id, hdr); down_prot.down(new Event(Event.MSG, msg)); diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index d1f2c75deb7..46ca3a7314b 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -53,7 +53,6 @@ public abstract class TP extends Protocol { protected static final byte LIST=1; // we have a list of messages rather than a single message when set protected static final byte MULTICAST=2; // message is a multicast (versus a unicast) message when set - protected static final byte OOB=4; // message has OOB flag set (Message.OOB) protected static final boolean can_bind_to_mcast_addr; // are we running on Linux ? @@ -158,12 +157,12 @@ public abstract class TP extends Protocol { @Property(name="oob_thread_pool.queue_enabled", description="Use queue to enqueue incoming OOB messages") protected boolean oob_thread_pool_queue_enabled=true; - @Property(name="oob_thread_pool.queue_max_size",description="Maximum queue size for incoming OOB messages. Default is 500") + @Property(name="oob_thread_pool.queue_max_size",description="Maximum queue size for incoming OOB messages") protected int oob_thread_pool_queue_max_size=500; @Property(name="oob_thread_pool.rejection_policy", - description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run. Default is Discard") - String oob_thread_pool_rejection_policy="discard"; + description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run") + protected String oob_thread_pool_rejection_policy="discard"; protected int thread_pool_min_threads=2; @@ -171,20 +170,46 @@ public abstract class TP extends Protocol { protected long thread_pool_keep_alive_time=30000; - @Property(name="thread_pool.enabled",description="Switch for enabling thread pool for regular messages. Default true") + @Property(name="thread_pool.enabled",description="Switch for enabling thread pool for regular messages") protected boolean thread_pool_enabled=true; - @Property(name="thread_pool.queue_enabled", description="Use queue to enqueue incoming regular messages. Default is true") + @Property(name="thread_pool.queue_enabled", description="Queue to enqueue incoming regular messages") protected boolean thread_pool_queue_enabled=true; - @Property(name="thread_pool.queue_max_size", description="Maximum queue size for incoming OOB messages. Default is 500") + @Property(name="thread_pool.queue_max_size", description="Maximum queue size for incoming OOB messages") protected int thread_pool_queue_max_size=500; @Property(name="thread_pool.rejection_policy", description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run") protected String thread_pool_rejection_policy="Discard"; + + @Property(name="internal_thread_pool.enabled",description="Switch for enabling thread pool for internal messages", + writable=false) + protected boolean internal_thread_pool_enabled=true; + + @Property(name="internal_thread_pool.min_threads",description="Minimum thread pool size for the internal thread pool") + protected int internal_thread_pool_min_threads=2; + + @Property(name="internal_thread_pool.max_threads",description="Maximum thread pool size for the internal thread pool") + protected int internal_thread_pool_max_threads=4; + + @Property(name="internal_thread_pool.keep_alive_time", description="Timeout in ms to remove idle threads from the internal pool") + protected long internal_thread_pool_keep_alive_time=30000; + + @Property(name="internal_thread_pool.queue_enabled", description="Queue to enqueue incoming internal messages") + protected boolean internal_thread_pool_queue_enabled=true; + + @Property(name="internal_thread_pool.queue_max_size",description="Maximum queue size for incoming internal messages") + protected int internal_thread_pool_queue_max_size=500; + + @Property(name="internal_thread_pool.rejection_policy", + description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run") + protected String internal_thread_pool_rejection_policy="discard"; + + + @Property(description="Type of timer to be used. Valid values are \"old\" (DefaultTimeScheduler, used up to 2.10), " + "\"new\" or \"new2\" (TimeScheduler2), \"new3\" (TimeScheduler3) and \"wheel\". Note that this property " + "might disappear in future releases, if one of the 3 timers is chosen as default timer") @@ -443,10 +468,13 @@ public int getTimerQueueSize() { protected String channel_name=null; @ManagedAttribute(description="Number of OOB messages received") - protected long num_oob_msgs_received=0; + protected long num_oob_msgs_received; @ManagedAttribute(description="Number of regular messages received") - protected long num_incoming_msgs_received=0; + protected long num_incoming_msgs_received; + + @ManagedAttribute(description="Number of internal messages received") + protected long num_internal_msgs_received; @ManagedAttribute(description="Class of the timer implementation") public String getTimerClass() { @@ -498,10 +526,10 @@ public void clearDifferentVersionCache() { protected Executor oob_thread_pool; /** Factory which is used by oob_thread_pool */ - protected ThreadFactory oob_thread_factory=null; + protected ThreadFactory oob_thread_factory; /** Used if oob_thread_pool is a ThreadPoolExecutor and oob_thread_pool_queue_enabled is true */ - protected BlockingQueue oob_thread_pool_queue=null; + protected BlockingQueue oob_thread_pool_queue; // ================================== Regular thread pool ====================== @@ -510,10 +538,18 @@ public void clearDifferentVersionCache() { protected Executor thread_pool; /** Factory which is used by oob_thread_pool */ - protected ThreadFactory default_thread_factory=null; + protected ThreadFactory default_thread_factory; + + /** Used if thread_pool is a ThreadPoolExecutor and thread_pool_queue_enabled is true */ + protected BlockingQueue thread_pool_queue; + + // ================================== Internal thread pool ====================== + + /** The thread pool which handles JGroups internal messages (Flag.INTERNAL)*/ + protected Executor internal_thread_pool; /** Used if thread_pool is a ThreadPoolExecutor and thread_pool_queue_enabled is true */ - protected BlockingQueue thread_pool_queue=null; + protected BlockingQueue internal_thread_pool_queue; // ================================== Timer thread pool ========================= protected TimeScheduler timer; @@ -608,7 +644,7 @@ public String toString() { public void resetStats() { num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=0; - num_oob_msgs_received=num_incoming_msgs_received=0; + num_oob_msgs_received=num_incoming_msgs_received=num_internal_msgs_received=0; } public void registerProbeHandler(DiagnosticsHandler.ProbeHandler handler) { @@ -800,6 +836,26 @@ public int getRegularMaxQueueSize() { return thread_pool_queue_max_size; } + + @ManagedAttribute(description="Current number of threads in the internal thread pool") + public int getInternalPoolSize() { + return internal_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)internal_thread_pool).getPoolSize() : 0; + } + + public long getInternalMessages() { + return num_internal_msgs_received; + } + + @ManagedAttribute(description="Number of messages in the internal thread pool's queue") + public int getInternalQueueSize() { + return internal_thread_pool_queue != null? internal_thread_pool_queue.size() : 0; + } + + public int getInternalMaxQueueSize() { + return internal_thread_pool_queue_max_size; + } + + @ManagedAttribute(name="timer_tasks",description="Number of timer tasks queued up for execution") public int getNumTimerTasks() { return timer != null? timer.size() : -1; @@ -939,6 +995,7 @@ else if(timer_type.equalsIgnoreCase("wheel")) { Util.verifyRejectionPolicy(oob_thread_pool_rejection_policy); Util.verifyRejectionPolicy(thread_pool_rejection_policy); + Util.verifyRejectionPolicy(internal_thread_pool_rejection_policy); // ========================================== OOB thread pool ============================== @@ -974,6 +1031,23 @@ else if(timer_type.equalsIgnoreCase("wheel")) { } } + + // ========================================== Internal thread pool ============================== + + if(internal_thread_pool == null + || (internal_thread_pool instanceof ThreadPoolExecutor && ((ThreadPoolExecutor)internal_thread_pool).isShutdown())) { + if(internal_thread_pool_enabled) { + if(internal_thread_pool_queue_enabled) + internal_thread_pool_queue=new LinkedBlockingQueue(internal_thread_pool_queue_max_size); + else + internal_thread_pool_queue=new SynchronousQueue(); + internal_thread_pool=createThreadPool(internal_thread_pool_min_threads, internal_thread_pool_max_threads, internal_thread_pool_keep_alive_time, + internal_thread_pool_rejection_policy, internal_thread_pool_queue, oob_thread_factory); + } + // if the internal thread pool is disabled, we won't create it (not even a DirectExecutor) + } + + Map m=new HashMap(2); if(bind_addr != null) m.put("bind_addr", bind_addr); @@ -1019,6 +1093,9 @@ public void destroy() { if(thread_pool instanceof ThreadPoolExecutor) shutdownThreadPool(thread_pool); + + if(internal_thread_pool instanceof ThreadPoolExecutor) + shutdownThreadPool(internal_thread_pool); } /** @@ -1187,7 +1264,9 @@ public Object down(Event evt) { final String cluster_name=hdr.channel_name; // changed to fix http://jira.jboss.com/jira/browse/JGRP-506 - Executor pool=msg.isFlagSet(Message.OOB)? oob_thread_pool : thread_pool; + boolean internal=msg.isFlagSet(Message.Flag.INTERNAL); + Executor pool=internal && internal_thread_pool != null? internal_thread_pool + : internal || msg.isFlagSet(Message.Flag.OOB)? oob_thread_pool : thread_pool; pool.execute(new Runnable() { public void run() { passMessageUp(copy, cluster_name, false, multicast, false); @@ -1336,24 +1415,34 @@ protected void receive(Address sender, byte[] data, int offset, int length) { if(is_message_list) { // used if message bundling is enabled final MessageBatch[] batches=readMessageBatch(dis, multicast); - final MessageBatch batch=batches[0], oob_batch=batches[1]; + final MessageBatch batch=batches[0], oob_batch=batches[1], internal_batch=batches[2]; if(oob_batch != null) { num_oob_msgs_received+=oob_batch.size(); oob_thread_pool.execute(new BatchHandler(oob_batch)); } - if(batch != null) { num_incoming_msgs_received+=batch.size(); thread_pool.execute(new BatchHandler(batch)); } + if(internal_batch != null) { + num_internal_msgs_received+=internal_batch.size(); + Executor pool=internal_thread_pool != null? internal_thread_pool : oob_thread_pool; + pool.execute(new BatchHandler(internal_batch)); + } } else { Message msg=readMessage(dis); - boolean is_oob=msg.isFlagSet(Message.Flag.OOB); - if(is_oob) num_oob_msgs_received++; - else num_incoming_msgs_received++; - Executor pool=is_oob? oob_thread_pool : thread_pool; + if(msg.isFlagSet(Message.Flag.INTERNAL)) + num_internal_msgs_received++; + else if(msg.isFlagSet(Message.Flag.OOB)) + num_oob_msgs_received++; + else + num_incoming_msgs_received++; + + boolean internal=msg.isFlagSet(Message.Flag.INTERNAL); // use internal pool or OOB (if intrenal pool is null) + Executor pool=internal && internal_thread_pool != null? internal_thread_pool + : internal || msg.isFlagSet(Message.Flag.OOB)? oob_thread_pool : thread_pool; TpHeader hdr=(TpHeader)msg.getHeader(id); String cluster_name=hdr.channel_name; pool.execute(new MyHandler(msg, cluster_name, multicast)); @@ -1442,7 +1531,7 @@ public void run() { /** Serializes and sends a message. This method is not reentrant */ protected void send(Message msg, Address dest, boolean multicast) throws Exception { // bundle all messages except when tagged with DONT_BUNDLE - if(!msg.isFlagSet(Message.DONT_BUNDLE)) { + if(!msg.isFlagSet(Message.Flag.DONT_BUNDLE)) { bundler.send(msg); return; } @@ -1535,8 +1624,6 @@ protected static void writeMessage(Message msg, DataOutputStream dos, boolean mu dos.writeShort(Version.version); // write the version if(multicast) flags+=MULTICAST; - if(msg.isFlagSet(Message.OOB)) - flags+=OOB; dos.writeByte(flags); msg.writeTo(dos); } @@ -1617,13 +1704,14 @@ public static List readMessageList(DataInputStream in, short transport_ } /** - * Reads a list of messages into 2 MessageBatches: a regular one and an OOB one + * Reads a list of messages into 3 MessageBatches: a regular, an OOB and an internal one * @param in - * @return an array of 2 MessageBatches, the regular is at index 0 and the OOB at index 1 (either can be null) + * @return an array of 2 MessageBatches, the regular is at index 0 and the OOB at index 1 + * and the internal at index 2 (either can be null) * @throws Exception */ public static MessageBatch[] readMessageBatch(DataInputStream in, boolean multicast) throws Exception { - MessageBatch[] mbs=new MessageBatch[2]; + MessageBatch[] batches=new MessageBatch[3]; // [0]: reg, [1]: OOB, [2]: internal Address dest=Util.readAddress(in); Address src=Util.readAddress(in); String cluster_name=Util.readString(in); @@ -1635,18 +1723,23 @@ public static MessageBatch[] readMessageBatch(DataInputStream in, boolean multic msg.setDest(dest); if(msg.getSrc() == null) msg.setSrc(src); - if(msg.isFlagSet(Message.Flag.OOB)) { - if(mbs[1] == null) - mbs[1]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.OOB, len); - mbs[1].add(msg); + if(msg.isFlagSet(Message.Flag.INTERNAL)) { + if(batches[2] == null) + batches[2]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.INTERNAL, len); + batches[2].add(msg); + } + else if(msg.isFlagSet(Message.Flag.OOB)) { + if(batches[1] == null) + batches[1]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.OOB, len); + batches[1].add(msg); } else { - if(mbs[0] == null) - mbs[0]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.REG, len); - mbs[0].add(msg); + if(batches[0] == null) + batches[0]=new MessageBatch(dest, src, cluster_name, multicast, MessageBatch.Mode.REG, len); + batches[0].add(msg); } } - return mbs; + return batches; } diff --git a/src/org/jgroups/protocols/UDP.java b/src/org/jgroups/protocols/UDP.java index e774f95227d..3acaf17c217 100644 --- a/src/org/jgroups/protocols/UDP.java +++ b/src/org/jgroups/protocols/UDP.java @@ -603,12 +603,12 @@ protected void handleConfigEvent(Map map) { if(map == null) return; if(map.containsKey("send_buf_size")) { - mcast_send_buf_size=((Integer)map.get("send_buf_size")).intValue(); + mcast_send_buf_size=(Integer)map.get("send_buf_size"); ucast_send_buf_size=mcast_send_buf_size; set_buffers=true; } if(map.containsKey("recv_buf_size")) { - mcast_recv_buf_size=((Integer)map.get("recv_buf_size")).intValue(); + mcast_recv_buf_size=(Integer)map.get("recv_buf_size"); ucast_recv_buf_size=mcast_recv_buf_size; set_buffers=true; } diff --git a/src/org/jgroups/protocols/UNICAST.java b/src/org/jgroups/protocols/UNICAST.java index 7832be5931d..f7632604e76 100644 --- a/src/org/jgroups/protocols/UNICAST.java +++ b/src/org/jgroups/protocols/UNICAST.java @@ -626,7 +626,7 @@ protected void handleDataReceived(Address sender, long seqno, short conn_id, bo // An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering ! // http://jira.jboss.com/jira/browse/JGRP-377 - if(msg.isFlagSet(Message.OOB) && added) { + if(msg.isFlagSet(Message.Flag.OOB) && added) { try { up_prot.up(evt); } @@ -678,7 +678,7 @@ protected void handleBatchReceived(Address sender, Map> map) // An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering ! // http://jira.jboss.com/jira/browse/JGRP-377 - if(msg.isFlagSet(Message.OOB) && msg_added) { + if(msg.isFlagSet(Message.Flag.OOB) && msg_added) { try { up_prot.up(new Event(Event.MSG, msg)); } @@ -725,7 +725,7 @@ protected int removeAndDeliver(final AtomicBoolean processing, Table wi MessageBatch batch=new MessageBatch(local_addr, sender, null, false, list); for(Message msg_to_deliver: batch) { // discard OOB msg: it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-377) - if(msg_to_deliver.isFlagSet(Message.OOB)) + if(msg_to_deliver.isFlagSet(Message.Flag.OOB)) batch.remove(msg_to_deliver); } @@ -887,8 +887,8 @@ protected void stopRetransmitTask() { protected void sendAck(Address dst, long seqno, short conn_id) { if(!running) // if we are disconnected, then don't send any acks which throw exceptions on shutdown return; - Message ack=new Message(dst); - ack.putHeader(this.id, UnicastHeader.createAckHeader(seqno, conn_id)); + Message ack=new Message(dst).setFlag(Message.Flag.INTERNAL) + .putHeader(this.id, UnicastHeader.createAckHeader(seqno, conn_id)); if(log.isTraceEnabled()) log.trace(new StringBuilder().append(local_addr).append(" --> ACK(").append(dst). append(": #").append(seqno).append(')')); @@ -922,8 +922,7 @@ protected synchronized short getNewConnectionId() { protected void sendRequestForFirstSeqno(Address dest, long seqno_received) { - Message msg=new Message(dest); - msg.setFlag(Message.OOB); + Message msg=new Message(dest).setFlag(Message.Flag.OOB); UnicastHeader hdr=UnicastHeader.createSendFirstSeqnoHeader(seqno_received); msg.putHeader(this.id, hdr); if(log.isTraceEnabled()) diff --git a/src/org/jgroups/protocols/UNICAST2.java b/src/org/jgroups/protocols/UNICAST2.java index f4811605251..4397446e030 100644 --- a/src/org/jgroups/protocols/UNICAST2.java +++ b/src/org/jgroups/protocols/UNICAST2.java @@ -633,10 +633,9 @@ public void sendStableMessages() { } protected void sendStableMessage(Address dest, short conn_id, long hd, long hr) { - Message stable_msg=new Message(dest, null, null); - Unicast2Header hdr=Unicast2Header.createStableHeader(conn_id, hd, hr); - stable_msg.putHeader(this.id, hdr); - stable_msg.setFlag(Message.OOB); + Message stable_msg=new Message(dest).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL) + .putHeader(this.id, Unicast2Header.createStableHeader(conn_id, hd, hr)); + if(log.isTraceEnabled()) { StringBuilder sb=new StringBuilder(); sb.append(local_addr).append(" --> STABLE(").append(dest).append(": ") @@ -728,8 +727,8 @@ public void removeAllConnections() { public void retransmit(SeqnoList missing, Address sender) { Unicast2Header hdr=Unicast2Header.createXmitReqHeader(); - Message retransmit_msg=new Message(sender, null, missing); - retransmit_msg.setFlag(Message.OOB); + Message retransmit_msg=new Message(sender, missing); + retransmit_msg.setFlag(Message.Flag.OOB); if(log.isTraceEnabled()) log.trace(local_addr + ": sending XMIT_REQ (" + missing + ") to " + sender); retransmit_msg.putHeader(this.id, hdr); @@ -786,7 +785,7 @@ protected boolean handleDataReceived(Address sender, long seqno, short conn_id, // An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering ! // http://jira.jboss.com/jira/browse/JGRP-377 - if(msg.isFlagSet(Message.OOB) && added) { + if(msg.isFlagSet(Message.Flag.OOB) && added) { try { up_prot.up(evt); } @@ -830,7 +829,7 @@ protected void handleBatchReceived(Address sender, Map> map) // An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering ! // http://jira.jboss.com/jira/browse/JGRP-377 - if(msg.isFlagSet(Message.OOB) && msg_added) { + if(msg.isFlagSet(Message.Flag.OOB) && msg_added) { try { up_prot.up(new Event(Event.MSG, msg)); } @@ -899,7 +898,7 @@ protected void removeAndPassUp(Table win, Address sender) { MessageBatch batch=new MessageBatch(local_addr, sender, null, false, msgs); for(Message msg_to_deliver: batch) { // discard OOB msg: it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-377) - if(msg_to_deliver.isFlagSet(Message.OOB)) + if(msg_to_deliver.isFlagSet(Message.Flag.OOB)) batch.remove(msg_to_deliver); } @@ -919,20 +918,6 @@ protected void removeAndPassUp(Table win, Address sender) { catch(Throwable t) { log.error("failed to deliver batch " + batch, t); } - - - - /*for(Message m: msgs) { - // discard OOB msg: it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-377) - if(m.isFlagSet(Message.OOB)) - continue; - try { - up_prot.up(new Event(Event.MSG, m)); - } - catch(Throwable t) { - log.error("couldn't deliver message " + m, t); - } - }*/ } } finally { @@ -1091,7 +1076,7 @@ protected synchronized short getNewConnectionId() { protected void sendRequestForFirstSeqno(Address dest, long seqno_received) { Message msg=new Message(dest); - msg.setFlag(Message.OOB); + msg.setFlag(Message.Flag.OOB); Unicast2Header hdr=Unicast2Header.createSendFirstSeqnoHeader(seqno_received); msg.putHeader(this.id, hdr); if(log.isTraceEnabled()) @@ -1100,7 +1085,8 @@ protected void sendRequestForFirstSeqno(Address dest, long seqno_received) { } protected void sendAck(Address dest, long seqno, short conn_id) { - Message msg=new Message(dest).setFlag(Message.OOB).putHeader(this.id, Unicast2Header.createAckHeader(seqno, conn_id)); + Message msg=new Message(dest).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL) + .putHeader(this.id, Unicast2Header.createAckHeader(seqno, conn_id)); if(log.isTraceEnabled()) log.trace(local_addr + " --> ACK(" + dest + "," + seqno + " [conn_id=" + conn_id + "])"); down_prot.down(new Event(Event.MSG, msg)); diff --git a/src/org/jgroups/protocols/UNICAST3.java b/src/org/jgroups/protocols/UNICAST3.java index 6e62406c37c..30583a904b5 100644 --- a/src/org/jgroups/protocols/UNICAST3.java +++ b/src/org/jgroups/protocols/UNICAST3.java @@ -565,7 +565,7 @@ public void removeAllConnections() { /** Sends a retransmit request to the given sender */ protected void retransmit(SeqnoList missing, Address sender) { - Message xmit_msg=new Message(sender, missing).setFlag(Message.OOB).putHeader(id, Header.createXmitReqHeader()); + Message xmit_msg=new Message(sender, missing).setFlag(Message.Flag.OOB).putHeader(id, Header.createXmitReqHeader()); if(log.isTraceEnabled()) log.trace(local_addr + ": sending XMIT_REQ (" + missing + ") to " + sender); down_prot.down(new Event(Event.MSG, xmit_msg)); @@ -623,7 +623,7 @@ protected void handleDataReceived(Address sender, long seqno, short conn_id, bo // An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering ! // http://jira.jboss.com/jira/browse/JGRP-377 - if(msg.isFlagSet(Message.OOB) && added) { + if(msg.isFlagSet(Message.Flag.OOB) && added) { try { up_prot.up(evt); } @@ -666,7 +666,7 @@ protected void handleBatchReceived(Address sender, Map> map) // An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering ! // http://jira.jboss.com/jira/browse/JGRP-377 - if(msg.isFlagSet(Message.OOB) && msg_added) { + if(msg.isFlagSet(Message.Flag.OOB) && msg_added) { try { up_prot.up(new Event(Event.MSG, msg)); } @@ -719,7 +719,7 @@ protected int removeAndDeliver(final AtomicBoolean processing, Table wi MessageBatch batch=new MessageBatch(local_addr, sender, null, false, list); for(Message msg_to_deliver: batch) { // discard OOB msg: it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-377) - if(msg_to_deliver.isFlagSet(Message.OOB)) + if(msg_to_deliver.isFlagSet(Message.Flag.OOB)) batch.remove(msg_to_deliver); } if(batch.isEmpty()) @@ -933,7 +933,7 @@ protected void stopRetransmitTask() { protected void sendAck(Address dst, long seqno, short conn_id) { if(!running) // if we are disconnected, then don't send any acks which throw exceptions on shutdown return; - Message ack=new Message(dst).putHeader(this.id, Header.createAckHeader(seqno, conn_id)); + Message ack=new Message(dst).setFlag(Message.Flag.INTERNAL).putHeader(this.id, Header.createAckHeader(seqno, conn_id)); if(log.isTraceEnabled()) log.trace(new StringBuilder().append(local_addr).append(" --> ACK(").append(dst). append(": #").append(seqno).append(')')); @@ -967,10 +967,9 @@ protected synchronized short getNewConnectionId() { protected void sendRequestForFirstSeqno(Address dest, long seqno_received) { - Message msg=new Message(dest); - msg.setFlag(Message.OOB); - Header hdr=Header.createSendFirstSeqnoHeader(seqno_received); - msg.putHeader(this.id, hdr); + Message msg=new Message(dest).setFlag(Message.Flag.OOB) + .putHeader(this.id, Header.createSendFirstSeqnoHeader(seqno_received)); + if(log.isTraceEnabled()) log.trace(local_addr + " --> SEND_FIRST_SEQNO(" + dest + "," + seqno_received + ")"); down_prot.down(new Event(Event.MSG, msg)); diff --git a/src/org/jgroups/protocols/VERIFY_SUSPECT.java b/src/org/jgroups/protocols/VERIFY_SUSPECT.java index aa46f9c2009..d7653b515b8 100644 --- a/src/org/jgroups/protocols/VERIFY_SUSPECT.java +++ b/src/org/jgroups/protocols/VERIFY_SUSPECT.java @@ -121,7 +121,7 @@ public Object up(Event evt) { Message rsp; Address target=use_mcast_rsps? null : hdr.from; for(int i=0; i < num_msgs; i++) { - rsp=new Message(target).setFlag(Message.OOB) + rsp=new Message(target).setFlag(Message.Flag.INTERNAL) .putHeader(this.id, new VerifyHeader(VerifyHeader.I_AM_NOT_DEAD, local_addr)); down_prot.down(new Event(Event.MSG, rsp)); } @@ -201,9 +201,8 @@ void verifySuspect(Address mbr) { if(log.isTraceEnabled()) log.trace("verifying that " + mbr + " is dead"); for(int i=0; i < num_msgs; i++) { - msg=new Message(mbr, null, null); - msg.setFlag(Message.OOB); - msg.putHeader(this.id, new VerifyHeader(VerifyHeader.ARE_YOU_DEAD, local_addr)); + msg=new Message(mbr).setFlag(Message.Flag.INTERNAL) + .putHeader(this.id, new VerifyHeader(VerifyHeader.ARE_YOU_DEAD, local_addr)); down_prot.down(new Event(Event.MSG, msg)); } } diff --git a/src/org/jgroups/protocols/pbcast/ClientGmsImpl.java b/src/org/jgroups/protocols/pbcast/ClientGmsImpl.java index 7215cef9fed..630cac7f338 100644 --- a/src/org/jgroups/protocols/pbcast/ClientGmsImpl.java +++ b/src/org/jgroups/protocols/pbcast/ClientGmsImpl.java @@ -186,7 +186,7 @@ private void joinInternal(Address mbr, boolean joinWithStateTransfer,boolean use } // send VIEW_ACK to sender of view - Message view_ack=new Message(coord).setFlag(Message.OOB) + Message view_ack=new Message(coord).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL) .putHeader(gms.getId(), new GMS.GmsHeader(GMS.GmsHeader.VIEW_ACK)); gms.getDownProtocol().down(new Event(Event.MSG, view_ack)); return; @@ -250,8 +250,7 @@ void sendJoinMessage(Address coord, Address mbr,boolean joinWithTransfer, boolea Message msg; GMS.GmsHeader hdr; - msg=new Message(coord, null, null); - msg.setFlag(Message.OOB); + msg=new Message(coord).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL); if(joinWithTransfer) hdr=new GMS.GmsHeader(GMS.GmsHeader.JOIN_REQ_WITH_STATE_TRANSFER, mbr,useFlushIfPresent); else diff --git a/src/org/jgroups/protocols/pbcast/CoordGmsImpl.java b/src/org/jgroups/protocols/pbcast/CoordGmsImpl.java index f97cf2e5807..4f89da12898 100644 --- a/src/org/jgroups/protocols/pbcast/CoordGmsImpl.java +++ b/src/org/jgroups/protocols/pbcast/CoordGmsImpl.java @@ -261,10 +261,8 @@ public void stop() { private void sendLeaveResponses(Collection
leaving_members) { for(Address address: leaving_members){ - Message msg=new Message(address, null, null); // send an ack to the leaving member - msg.setFlag(Message.OOB); - GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.LEAVE_RSP); - msg.putHeader(gms.getId(), hdr); + Message msg=new Message(address).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL) + .putHeader(gms.getId(), new GMS.GmsHeader(GMS.GmsHeader.LEAVE_RSP)); gms.getDownProtocol().down(new Event(Event.MSG, msg)); } } diff --git a/src/org/jgroups/protocols/pbcast/FLUSH.java b/src/org/jgroups/protocols/pbcast/FLUSH.java index 53de1c97d6f..b6711e7b9da 100644 --- a/src/org/jgroups/protocols/pbcast/FLUSH.java +++ b/src/org/jgroups/protocols/pbcast/FLUSH.java @@ -548,9 +548,8 @@ private void handleFlushReconcile(Message msg, FlushHeader fh) { log.debug(localAddress + ": returned from FLUSH_RECONCILE, " + " sending RECONCILE_OK to " + requester); - Message reconcileOk = new Message(requester); - reconcileOk.setFlag(Message.OOB); - reconcileOk.putHeader(this.id, new FlushHeader(FlushHeader.FLUSH_RECONCILE_OK)); + Message reconcileOk = new Message(requester).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL) + .putHeader(this.id,new FlushHeader(FlushHeader.FLUSH_RECONCILE_OK)); down_prot.down(new Event(Event.MSG, reconcileOk)); } @@ -564,10 +563,8 @@ private void handleStartFlush(Message msg, FlushHeader fh) { } onStartFlush(flushRequester, fh); } else { - FlushHeader fhr = new FlushHeader(FlushHeader.FLUSH_NOT_COMPLETED, fh.viewID, - fh.flushParticipants); - Message response = new Message(flushRequester); - response.putHeader(this.id, fhr); + FlushHeader fhr = new FlushHeader(FlushHeader.FLUSH_NOT_COMPLETED, fh.viewID, fh.flushParticipants); + Message response = new Message(flushRequester).putHeader(this.id, fhr); down_prot.down(new Event(Event.MSG, response)); if (log.isDebugEnabled()) log.debug(localAddress + ": received START_FLUSH, responded with FLUSH_NOT_COMPLETED to " + flushRequester); @@ -580,9 +577,8 @@ private void rejectFlush(Collection participants, long viewId for (Address flushMember : participants) { if(flushMember == null) continue; - Message reject = new Message(flushMember, localAddress, null); - reject.setFlag(Message.OOB); - reject.putHeader(this.id, new FlushHeader(FlushHeader.ABORT_FLUSH, viewId,participants)); + Message reject = new Message(flushMember, localAddress, null).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL) + .putHeader(this.id, new FlushHeader(FlushHeader.ABORT_FLUSH, viewId,participants)); down_prot.down(new Event(Event.MSG, reject)); } } @@ -693,9 +689,8 @@ private void onSuspend(final List
members) { flushMembers.addAll(participantsInFlush); flushMembers.removeAll(suspected); - msg = new Message(null, localAddress, null); - msg.putHeader(this.id, new FlushHeader(FlushHeader.START_FLUSH, currentViewId(), - participantsInFlush)); + msg = new Message(null, localAddress, null) + .putHeader(this.id, new FlushHeader(FlushHeader.START_FLUSH, currentViewId(), participantsInFlush)); } if (participantsInFlush.isEmpty()) { flush_promise.setResult(SUCCESS_START_FLUSH); @@ -778,8 +773,7 @@ private void onStartFlush(Address flushStarter, FlushHeader fh) { FlushHeader fhr = new FlushHeader(FlushHeader.FLUSH_COMPLETED, fh.viewID,fh.flushParticipants); fhr.addDigest(digest); - Message msg = new Message(flushStarter); - msg.putHeader(this.id, fhr); + Message msg = new Message(flushStarter).putHeader(this.id, fhr); down_prot.down(new Event(Event.MSG, msg)); if (log.isDebugEnabled()) log.debug(localAddress + ": received START_FLUSH, responded with FLUSH_COMPLETED to " + flushStarter); @@ -807,8 +801,7 @@ private void onFlushCompleted(Address address, final FlushHeader header) { needsReconciliationPhase = enable_reconciliation && flushCompleted && hasVirtualSynchronyGaps(); if (needsReconciliationPhase) { Digest d = findHighestSequences(); - msg = new Message(); - msg.setFlag(Message.OOB); + msg = new Message().setFlag(Message.Flag.OOB); FlushHeader fh = new FlushHeader(FlushHeader.FLUSH_RECONCILE, currentViewId(),flushMembers); reconcileOks.clear(); fh.addDigest(d); diff --git a/src/org/jgroups/protocols/pbcast/GMS.java b/src/org/jgroups/protocols/pbcast/GMS.java index 729dce91927..e800a2aec49 100644 --- a/src/org/jgroups/protocols/pbcast/GMS.java +++ b/src/org/jgroups/protocols/pbcast/GMS.java @@ -545,9 +545,7 @@ public void castViewChange(View new_view, Digest digest, JoinRsp jr, Collection< } public void sendJoinResponse(JoinRsp rsp, Address dest) { - Message m=new Message(dest, null, null); - GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP, rsp); - m.putHeader(this.id, hdr); + Message m=new Message(dest).putHeader(this.id, new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP, rsp)); getDownProtocol().down(new Event(Event.MSG, m)); } @@ -857,9 +855,8 @@ public Object up(Event evt) { if(digest != null) { GmsHeader rsp_hdr=new GmsHeader(GmsHeader.GET_DIGEST_RSP); rsp_hdr.my_digest=digest; - Message get_digest_rsp=new Message(msg.getSrc(), null, null); - get_digest_rsp.setFlag(Message.OOB); - get_digest_rsp.putHeader(this.id, rsp_hdr); + Message get_digest_rsp=new Message(msg.getSrc()).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL) + .putHeader(this.id,rsp_hdr); down_prot.down(new Event(Event.MSG, get_digest_rsp)); } break; @@ -984,10 +981,8 @@ final void initState() { private void sendViewAck(Address dest) { - Message view_ack=new Message(dest, null, null); - view_ack.setFlag(Message.OOB); - GmsHeader tmphdr=new GmsHeader(GmsHeader.VIEW_ACK); - view_ack.putHeader(this.id, tmphdr); + Message view_ack=new Message(dest).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL) + .putHeader(this.id, new GmsHeader(GmsHeader.VIEW_ACK)); down_prot.down(new Event(Event.MSG,view_ack)); } diff --git a/src/org/jgroups/protocols/pbcast/GmsImpl.java b/src/org/jgroups/protocols/pbcast/GmsImpl.java index 1fdc6498004..3a78b493e4b 100644 --- a/src/org/jgroups/protocols/pbcast/GmsImpl.java +++ b/src/org/jgroups/protocols/pbcast/GmsImpl.java @@ -55,7 +55,7 @@ public void handleViewChange(View new_view, Digest digest) protected void sendMergeRejectedResponse(Address sender, MergeId merge_id) { Message msg=new Message(sender, null, null); - msg.setFlag(Message.OOB); + msg.setFlag(Message.Flag.OOB); GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP); hdr.merge_rejected=true; hdr.merge_id=merge_id; diff --git a/src/org/jgroups/protocols/pbcast/Merger.java b/src/org/jgroups/protocols/pbcast/Merger.java index 5ea4f2e6e80..571b08c935f 100644 --- a/src/org/jgroups/protocols/pbcast/Merger.java +++ b/src/org/jgroups/protocols/pbcast/Merger.java @@ -180,8 +180,7 @@ public void handleMergeView(final MergeData data, final MergeId merge_id) { gms.castViewChange(data.view,data.digest,null,newViewMembers); // if we have flush in stack send ack back to merge coordinator if(gms.flushProtocolInStack) { //[JGRP-700] - FLUSH: flushing should span merge - Message ack=new Message(data.getSender(), null, null); - ack.setFlag(Message.OOB); + Message ack=new Message(data.getSender()).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL); GMS.GmsHeader ack_hdr=new GMS.GmsHeader(GMS.GmsHeader.INSTALL_MERGE_VIEW_OK); ack.putHeader(gms.getId(), ack_hdr); gms.getDownProtocol().down(new Event(Event.MSG, ack)); @@ -254,8 +253,7 @@ public static void sanitizeViews(Map map) { /** Send back a response containing view and digest to sender */ private void sendMergeResponse(Address sender, View view, Digest digest, MergeId merge_id) { - Message msg=new Message(sender, null, null); - msg.setFlag(Message.OOB); + Message msg=new Message(sender).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL); GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP); hdr.merge_id=merge_id; hdr.view=view; @@ -291,7 +289,7 @@ private void sendMergeView(Collection
coords, MergeData combined_merge_ long start=System.currentTimeMillis(); for(Address coord: coords) { - Message msg=new Message(coord, null, null); + Message msg=new Message(coord); GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.INSTALL_MERGE_VIEW); hdr.view=view; hdr.my_digest=digest; @@ -318,8 +316,7 @@ private void sendMergeView(Collection
coords, MergeData combined_merge_ } protected void sendMergeRejectedResponse(Address sender, MergeId merge_id) { - Message msg=new Message(sender, null, null); - msg.setFlag(Message.OOB); + Message msg=new Message(sender).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL); GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP); hdr.merge_rejected=true; hdr.merge_id=merge_id; @@ -332,8 +329,8 @@ private void sendMergeCancelledMessage(Collection
coords, MergeId merge return; for(Address coord: coords) { - Message msg=new Message(coord, null, null); - // msg.setFlag(Message.OOB); + Message msg=new Message(coord); + // msg.setFlag(Message.Flag.OOB); GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.CANCEL_MERGE); hdr.merge_id=merge_id; msg.putHeader(gms.getId(), hdr); @@ -356,9 +353,7 @@ private Digest fetchDigestsFromAllMembersInSubPartition(List
current_mb GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.GET_DIGEST_REQ); hdr.merge_id=merge_id; - Message get_digest_req=new Message(); - get_digest_req.setFlag(Message.OOB); - get_digest_req.putHeader(gms.getId(), hdr); + Message get_digest_req=new Message().setFlag(Message.Flag.OOB, Message.Flag.INTERNAL).putHeader(gms.getId(), hdr); long max_wait_time=gms.merge_timeout / 2; // gms.merge_timeout is guaranteed to be > 0, verified in init() digest_collector.reset(current_mbrs); @@ -681,8 +676,7 @@ protected boolean getMergeDataFromSubgroupCoordinators(Map> entry: coords.entrySet()) { Address coord=entry.getKey(); Collection
mbrs=entry.getValue(); - Message msg=new Message(coord, null, null); - msg.setFlag(Message.OOB); + Message msg=new Message(coord).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL); GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_REQ, mbrs); hdr.mbr=gms.local_addr; hdr.merge_id=new_merge_id; diff --git a/src/org/jgroups/protocols/pbcast/NAKACK.java b/src/org/jgroups/protocols/pbcast/NAKACK.java index f7983924e05..c07f47d644c 100644 --- a/src/org/jgroups/protocols/pbcast/NAKACK.java +++ b/src/org/jgroups/protocols/pbcast/NAKACK.java @@ -581,7 +581,7 @@ public Object up(Event evt) { case Event.MSG: Message msg=(Message)evt.getArg(); - if(msg.isFlagSet(Message.NO_RELIABILITY)) + if(msg.isFlagSet(Message.Flag.NO_RELIABILITY)) break; NakAckHeader hdr=(NakAckHeader)msg.getHeader(this.id); if(hdr == null) @@ -740,10 +740,10 @@ private void handleMessage(Message msg, NakAckHeader hdr) { // OOB msg is passed up. When removed, we discard it. Affects ordering: http://jira.jboss.com/jira/browse/JGRP-379 - if(added && msg.isFlagSet(Message.OOB)) { + if(added && msg.isFlagSet(Message.Flag.OOB)) { if(loopback) msg=win.get(hdr.seqno); // we *have* to get a message, because loopback means we didn't add it to win ! - if(msg != null && msg.isFlagSet(Message.OOB)) { + if(msg != null && msg.isFlagSet(Message.Flag.OOB)) { if(msg.setTransientFlagIfAbsent(Message.OOB_DELIVERED)) { if(log.isTraceEnabled()) log.trace(new StringBuilder().append(local_addr).append(": delivering ").append(sender).append('#').append(hdr.seqno)); @@ -781,7 +781,7 @@ private void handleMessage(Message msg, NakAckHeader hdr) { for(final Message msg_to_deliver: msgs) { // discard OOB msg if it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-379) - if(msg_to_deliver.isFlagSet(Message.OOB) && !msg_to_deliver.setTransientFlagIfAbsent(Message.OOB_DELIVERED)) + if(msg_to_deliver.isFlagSet(Message.Flag.OOB) && !msg_to_deliver.setTransientFlagIfAbsent(Message.OOB_DELIVERED)) continue; //msg_to_deliver.removeHeader(getName()); // Changed by bela Jan 29 2003: not needed (see above) @@ -1313,8 +1313,6 @@ public void retransmit(long first_seqno, long last_seqno, Address sender) { protected void retransmit(long first_seqno, long last_seqno, final Address sender, boolean multicast_xmit_request) { - NakAckHeader hdr; - Message retransmit_msg; Address dest=sender; // to whom do we send the XMIT request ? if(multicast_xmit_request || this.use_mcast_xmit_req) { @@ -1331,9 +1329,8 @@ protected void retransmit(long first_seqno, long last_seqno, final Address sende } } - hdr=NakAckHeader.createXmitRequestHeader(first_seqno, last_seqno, sender); - retransmit_msg=new Message(dest, null, null); - retransmit_msg.setFlag(Message.OOB); + NakAckHeader hdr=NakAckHeader.createXmitRequestHeader(first_seqno, last_seqno, sender); + Message retransmit_msg=new Message(dest).setFlag(Message.Flag.OOB); if(log.isTraceEnabled()) log.trace(local_addr + ": sending XMIT_REQ ([" + first_seqno + ", " + last_seqno + "]) to " + dest); retransmit_msg.putHeader(this.id, hdr); diff --git a/src/org/jgroups/protocols/pbcast/NAKACK2.java b/src/org/jgroups/protocols/pbcast/NAKACK2.java index caa53b94248..ff415e03402 100644 --- a/src/org/jgroups/protocols/pbcast/NAKACK2.java +++ b/src/org/jgroups/protocols/pbcast/NAKACK2.java @@ -777,10 +777,10 @@ protected void handleMessage(Message msg, NakAckHeader2 hdr) { // OOB msg is passed up. When removed, we discard it. Affects ordering: http://jira.jboss.com/jira/browse/JGRP-379 - if(added && msg.isFlagSet(Message.OOB)) { + if(added && msg.isFlagSet(Message.Flag.OOB)) { if(loopback) msg=buf.get(hdr.seqno); // we *have* to get a message, because loopback means we didn't add it to win ! - if(msg != null && msg.isFlagSet(Message.OOB)) { + if(msg != null && msg.isFlagSet(Message.Flag.OOB)) { if(msg.setTransientFlagIfAbsent(Message.OOB_DELIVERED)) { if(log.isTraceEnabled()) log.trace(new StringBuilder().append(local_addr).append(": delivering ").append(sender).append('#').append(hdr.seqno)); @@ -828,7 +828,7 @@ protected void handleMessages(Address sender, List> msgs, bo for(Tuple tuple: msgs) { long seq=tuple.getVal1(); Message msg=loopback? buf.get(seq) : tuple.getVal2(); // we *have* to get the message, because loopback means we didn't add it to win ! - if(msg != null && msg.isFlagSet(Message.OOB)) { + if(msg != null && msg.isFlagSet(Message.Flag.OOB)) { if(msg.setTransientFlagIfAbsent(Message.OOB_DELIVERED)) { if(log.isTraceEnabled()) log.trace(new StringBuilder().append(local_addr).append(": delivering ") @@ -872,7 +872,7 @@ protected void removeAndPassUp(Table buf, Address sender, boolean loopb MessageBatch batch=new MessageBatch(null, sender, cluster_name, true, msgs); for(Message msg_to_deliver: batch) { // discard OOB msg if it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-379) - if(msg_to_deliver.isFlagSet(Message.OOB) && !msg_to_deliver.setTransientFlagIfAbsent(Message.OOB_DELIVERED)) + if(msg_to_deliver.isFlagSet(Message.Flag.OOB) && !msg_to_deliver.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) batch.remove(msg_to_deliver); } if(batch.isEmpty()) @@ -1406,7 +1406,7 @@ protected void retransmit(SeqnoList missing_msgs, final Address sender, boolean NakAckHeader2 hdr=NakAckHeader2.createXmitRequestHeader(sender); Message retransmit_msg=new Message(dest, null, missing_msgs); - retransmit_msg.setFlag(Message.OOB); + retransmit_msg.setFlag(Message.Flag.OOB); if(log.isTraceEnabled()) log.trace(local_addr + ": sending XMIT_REQ (" + missing_msgs + ") to " + dest); retransmit_msg.putHeader(this.id, hdr); diff --git a/src/org/jgroups/protocols/pbcast/ParticipantGmsImpl.java b/src/org/jgroups/protocols/pbcast/ParticipantGmsImpl.java index c33177554e4..eac2d14cced 100644 --- a/src/org/jgroups/protocols/pbcast/ParticipantGmsImpl.java +++ b/src/org/jgroups/protocols/pbcast/ParticipantGmsImpl.java @@ -172,11 +172,8 @@ boolean wouldIBeCoordinator() { void sendLeaveMessage(Address coord, Address mbr) { - Message msg=new Message(coord, null, null); - msg.setFlag(Message.OOB); - GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.LEAVE_REQ, mbr); - - msg.putHeader(gms.getId(), hdr); + Message msg=new Message(coord).setFlag(Message.Flag.OOB) + .putHeader(gms.getId(), new GMS.GmsHeader(GMS.GmsHeader.LEAVE_REQ, mbr)); gms.getDownProtocol().down(new Event(Event.MSG, msg)); } diff --git a/src/org/jgroups/protocols/pbcast/STABLE.java b/src/org/jgroups/protocols/pbcast/STABLE.java index ad15f6160d6..ff21d42e26e 100644 --- a/src/org/jgroups/protocols/pbcast/STABLE.java +++ b/src/org/jgroups/protocols/pbcast/STABLE.java @@ -690,10 +690,8 @@ protected void sendStableMessage(Digest d) { if(log.isTraceEnabled()) log.trace(local_addr + ": sending stable msg " + d.printHighestDeliveredSeqnos()); num_stable_msgs_sent++; - final Message msg=new Message(); // mcast message - msg.setFlag(Message.OOB, Message.Flag.NO_RELIABILITY); - StableHeader hdr=new StableHeader(StableHeader.STABLE_GOSSIP, d); - msg.putHeader(this.id, hdr); + final Message msg=new Message().setFlag(Message.Flag.OOB, Message.Flag.INTERNAL, Message.Flag.NO_RELIABILITY) + .putHeader(this.id,new StableHeader(StableHeader.STABLE_GOSSIP,d)); Runnable r=new Runnable() { public void run() { @@ -877,8 +875,7 @@ public void run() { } if(stability_digest != null) { - Message msg=new Message(); - msg.setFlag(Message.OOB, Message.Flag.NO_RELIABILITY); + Message msg=new Message().setFlag(Message.Flag.OOB, Message.Flag.INTERNAL, Message.Flag.NO_RELIABILITY); StableHeader hdr=new StableHeader(StableHeader.STABILITY, stability_digest); msg.putHeader(id, hdr); if(log.isTraceEnabled()) log.trace(local_addr + ": sending stability msg " + stability_digest.printHighestDeliveredSeqnos()); diff --git a/src/org/jgroups/protocols/pbcast/STATE.java b/src/org/jgroups/protocols/pbcast/STATE.java index e048dd9ef97..4a895fd2426 100644 --- a/src/org/jgroups/protocols/pbcast/STATE.java +++ b/src/org/jgroups/protocols/pbcast/STATE.java @@ -147,8 +147,7 @@ public void write(int b) throws IOException { protected void sendMessage(byte[] b, int off, int len) throws IOException { - Message m=new Message(stateRequester); - m.putHeader(id, new StateHeader(StateHeader.STATE_PART)); + Message m=new Message(stateRequester).putHeader(id, new StateHeader(StateHeader.STATE_PART)); // we're copying the buffer passed from the state provider here: if a BufferedOutputStream is used, the // buffer (b) will always be the same and can be modified after it has been set in the message ! diff --git a/src/org/jgroups/protocols/pbcast/STATE_TRANSFER.java b/src/org/jgroups/protocols/pbcast/STATE_TRANSFER.java index dd25d1958e7..88042a83af8 100644 --- a/src/org/jgroups/protocols/pbcast/STATE_TRANSFER.java +++ b/src/org/jgroups/protocols/pbcast/STATE_TRANSFER.java @@ -207,8 +207,7 @@ public Object down(Event evt) { up_prot.up(new Event(Event.GET_STATE_OK, new StateTransferInfo())); } else { - Message state_req=new Message(target, null, null); - state_req.putHeader(this.id, new StateHeader(StateHeader.STATE_REQ)); + Message state_req=new Message(target).putHeader(this.id, new StateHeader(StateHeader.STATE_REQ)); if(log.isDebugEnabled()) log.debug(local_addr + ": asking " + target + " for state"); @@ -349,8 +348,7 @@ protected void getStateFromApplication(Address requester, Digest digest) { avg_state_size=num_bytes_sent.doubleValue() / num_state_reqs.doubleValue(); } - Message state_rsp=new Message(requester, null, state); - state_rsp.putHeader(this.id, new StateHeader(StateHeader.STATE_RSP, digest)); + Message state_rsp=new Message(requester, state).putHeader(this.id, new StateHeader(StateHeader.STATE_RSP, digest)); if(log.isTraceEnabled()) { int length=state != null? state.length : 0; if(log.isTraceEnabled()) @@ -362,8 +360,7 @@ protected void getStateFromApplication(Address requester, Digest digest) { protected void sendException(Address requester, Throwable exception) { try { - Message ex_msg=new Message(requester, null, exception); - ex_msg.putHeader(getId(), new StateHeader(StateHeader.STATE_EX)); + Message ex_msg=new Message(requester, exception).putHeader(getId(), new StateHeader(StateHeader.STATE_EX)); down(new Event(Event.MSG, ex_msg)); } catch(Throwable t) { diff --git a/src/org/jgroups/protocols/pbcast/StreamingStateTransfer.java b/src/org/jgroups/protocols/pbcast/StreamingStateTransfer.java index d164dfbd384..c3b24a1a286 100644 --- a/src/org/jgroups/protocols/pbcast/StreamingStateTransfer.java +++ b/src/org/jgroups/protocols/pbcast/StreamingStateTransfer.java @@ -191,8 +191,7 @@ public Object down(Event evt) { } else { state_provider=target; - Message state_req=new Message(target, null, null); - state_req.putHeader(this.id, new StateHeader(StateHeader.STATE_REQ)); + Message state_req=new Message(target).putHeader(this.id, new StateHeader(StateHeader.STATE_REQ)); if(log.isDebugEnabled()) log.debug(local_addr + ": asking " + target + " for state"); down_prot.down(new Event(Event.MSG, state_req)); @@ -376,8 +375,7 @@ public void openBarrierAndResumeStable() { protected void sendEof(Address requester) { try { - Message eof_msg=new Message(requester); - eof_msg.putHeader(getId(), new StateHeader(StateHeader.STATE_EOF)); + Message eof_msg=new Message(requester).putHeader(getId(), new StateHeader(StateHeader.STATE_EOF)); if(log.isTraceEnabled()) log.trace(local_addr + " --> EOF --> " + requester); down(new Event(Event.MSG, eof_msg)); @@ -389,8 +387,7 @@ protected void sendEof(Address requester) { protected void sendException(Address requester, Throwable exception) { try { - Message ex_msg=new Message(requester, null, exception); - ex_msg.putHeader(getId(), new StateHeader(StateHeader.STATE_EX)); + Message ex_msg=new Message(requester, null, exception).putHeader(getId(), new StateHeader(StateHeader.STATE_EX)); down(new Event(Event.MSG, ex_msg)); } catch(Throwable t) { diff --git a/src/org/jgroups/protocols/relay/RELAY2.java b/src/org/jgroups/protocols/relay/RELAY2.java index 3b8efea5f7c..07b6b185f4b 100644 --- a/src/org/jgroups/protocols/relay/RELAY2.java +++ b/src/org/jgroups/protocols/relay/RELAY2.java @@ -551,10 +551,9 @@ protected void sendToBridges(Address sender, final Message msg, short ... exclud * @param target_site */ protected void sendSiteUnreachableTo(Address dest, short target_site) { - Message msg=new Message(dest).setFlag(Message.Flag.OOB); - msg.setSrc(new SiteUUID((UUID)local_addr, UUID.get(local_addr), site_id)); - Relay2Header hdr=new Relay2Header(Relay2Header.SITE_UNREACHABLE, new SiteMaster(target_site), null); - msg.putHeader(id, hdr); + Message msg=new Message(dest).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL) + .src(new SiteUUID((UUID)local_addr, UUID.get(local_addr), site_id)) + .putHeader(id, new Relay2Header(Relay2Header.SITE_UNREACHABLE, new SiteMaster(target_site), null)); down_prot.down(new Event(Event.MSG, msg)); } diff --git a/src/org/jgroups/protocols/tom/TOA.java b/src/org/jgroups/protocols/tom/TOA.java index 50c76917633..35f39ac7f0c 100644 --- a/src/org/jgroups/protocols/tom/TOA.java +++ b/src/org/jgroups/protocols/tom/TOA.java @@ -152,16 +152,10 @@ private void handleViewChange(View view) { for (MessageID messageID : pendingSentMessages) { long finalSequenceNumber = senderManager.removeLeavers(messageID, leavers); if (finalSequenceNumber != SenderManager.NOT_READY) { - Message finalMessage = new Message(); - finalMessage.setSrc(localAddress); - - ToaHeader finalHeader = ToaHeader.createNewHeader( - ToaHeader.FINAL_MESSAGE,messageID); - + ToaHeader finalHeader = ToaHeader.createNewHeader(ToaHeader.FINAL_MESSAGE,messageID); finalHeader.setSequencerNumber(finalSequenceNumber); - finalMessage.putHeader(this.id, finalHeader); - finalMessage.setFlag(Message.Flag.OOB); - finalMessage.setFlag(Message.Flag.DONT_BUNDLE); + Message finalMessage = new Message().src(localAddress).putHeader(this.id,finalHeader) + .setFlag(Message.Flag.OOB, Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE); Set
destinations = senderManager.getDestination(messageID); if (destinations.contains(localAddress)) { @@ -293,17 +287,11 @@ private void handleDataMessage(Message message, ToaHeader header) { } //create a new message and send it back - Message proposeMessage = new Message(); - proposeMessage.setSrc(localAddress); - proposeMessage.setDest(messageID.getAddress()); - - ToaHeader newHeader = ToaHeader.createNewHeader( - ToaHeader.PROPOSE_MESSAGE,messageID); - + ToaHeader newHeader = ToaHeader.createNewHeader(ToaHeader.PROPOSE_MESSAGE,messageID); newHeader.setSequencerNumber(myProposeSequenceNumber); - proposeMessage.putHeader(this.id, newHeader); - proposeMessage.setFlag(Message.Flag.OOB); - proposeMessage.setFlag(Message.Flag.DONT_BUNDLE); + + Message proposeMessage = new Message().src(localAddress).dest(messageID.getAddress()) + .putHeader(this.id, newHeader).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE); //multicastSenderThread.addUnicastMessage(proposeMessage); down_prot.down(new Event(Event.MSG, proposeMessage)); @@ -334,16 +322,12 @@ private void handleSequenceNumberPropose(Address from, ToaHeader header) { if (finalSequenceNumber != SenderManager.NOT_READY) { lastProposeReceived = true; - Message finalMessage = new Message(); - finalMessage.setSrc(localAddress); - - ToaHeader finalHeader = ToaHeader.createNewHeader( - ToaHeader.FINAL_MESSAGE,messageID); + ToaHeader finalHeader = ToaHeader.createNewHeader(ToaHeader.FINAL_MESSAGE,messageID); finalHeader.setSequencerNumber(finalSequenceNumber); - finalMessage.putHeader(this.id, finalHeader); - finalMessage.setFlag(Message.Flag.OOB); - finalMessage.setFlag(Message.Flag.DONT_BUNDLE); + + Message finalMessage = new Message().src(localAddress).putHeader(this.id, finalHeader) + .setFlag(Message.Flag.OOB, Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE); Set
destinations = senderManager.getDestination(messageID); if (destinations.contains(localAddress)) { diff --git a/src/org/jgroups/util/MessageBatch.java b/src/org/jgroups/util/MessageBatch.java index 87fce18af02..89f878c2bff 100644 --- a/src/org/jgroups/util/MessageBatch.java +++ b/src/org/jgroups/util/MessageBatch.java @@ -45,15 +45,24 @@ public MessageBatch(int capacity) { public MessageBatch(Collection msgs) { messages=new Message[msgs.size()]; - int num_reg=0, num_oob=0; + int num_reg=0, num_oob=0, num_internal=0; for(Message msg: msgs) { messages[index++]=msg; if(msg.isFlagSet(Message.Flag.OOB)) num_oob++; + else if(msg.isFlagSet(Message.Flag.INTERNAL)) + num_internal++; else num_reg++; } - mode=num_oob == 0? Mode.REG : num_reg == 0? Mode.OOB : Mode.MIXED; + if(num_internal > 0 && num_oob == 0 && num_reg == 0) + mode=Mode.INTERNAL; + else if(num_oob > 0 && num_internal == 0 && num_reg == 0) + mode=Mode.OOB; + else if(num_reg > 0 && num_oob == 0 && num_internal == 0) + mode=Mode.REG; + else + mode=Mode.MIXED; } public MessageBatch(Address dest, Address sender, String cluster_name, boolean multicast, Collection msgs) { @@ -259,7 +268,7 @@ public interface Visitor { T visit(final Message msg, final MessageBatch batch); } - public enum Mode {OOB, REG, MIXED} + public enum Mode {OOB, REG, INTERNAL, MIXED} /** Iterates over non-null elements of a batch, skipping null elements */ diff --git a/tests/junit-functional/org/jgroups/blocks/RpcDispatcherTest.java b/tests/junit-functional/org/jgroups/blocks/RpcDispatcherTest.java index a255cc09bf6..c53449ac844 100644 --- a/tests/junit-functional/org/jgroups/blocks/RpcDispatcherTest.java +++ b/tests/junit-functional/org/jgroups/blocks/RpcDispatcherTest.java @@ -299,7 +299,12 @@ public void testNotifyingFuture() throws Exception { assert !future.isDone(); assert !future.isCancelled(); assert !listener.isDone(); - Util.sleep(2000); + for(int i=0; i < 10; i++) { + if(listener.isDone()) + break; + else + Util.sleep(1000); + } assert listener.isDone(); RspList result=future.get(1L, TimeUnit.MILLISECONDS); System.out.println("result:\n" + result); diff --git a/tests/junit-functional/org/jgroups/protocols/NAKACK_Delivery_Test.java b/tests/junit-functional/org/jgroups/protocols/NAKACK_Delivery_Test.java index 9bed658d036..d06f1c3e508 100644 --- a/tests/junit-functional/org/jgroups/protocols/NAKACK_Delivery_Test.java +++ b/tests/junit-functional/org/jgroups/protocols/NAKACK_Delivery_Test.java @@ -155,7 +155,7 @@ private void send(Address sender, long seqno, int number, boolean oob) { private static Message msg(Address sender, long seqno, int number, boolean oob) { Message msg=new Message(null, sender, number); if(oob) - msg.setFlag(Message.OOB); + msg.setFlag(Message.Flag.OOB); if(seqno != -1) msg.putHeader(NAKACK_ID, NakAckHeader2.createMessageHeader(seqno)); return msg; diff --git a/tests/junit-functional/org/jgroups/protocols/NAKACK_StressTest.java b/tests/junit-functional/org/jgroups/protocols/NAKACK_StressTest.java index f825fc45497..3fe3662763c 100644 --- a/tests/junit-functional/org/jgroups/protocols/NAKACK_StressTest.java +++ b/tests/junit-functional/org/jgroups/protocols/NAKACK_StressTest.java @@ -169,7 +169,7 @@ private static Message createMessage(Address dest, Address src, long seqno, bool NakAckHeader2 hdr=NakAckHeader2.createMessageHeader(seqno) ; msg.putHeader(NAKACK_ID, hdr); if(oob) - msg.setFlag(Message.OOB); + msg.setFlag(Message.Flag.OOB); return msg; } diff --git a/tests/junit-functional/org/jgroups/protocols/UNICAST_OOB_Test.java b/tests/junit-functional/org/jgroups/protocols/UNICAST_OOB_Test.java index 2bb0709b5da..d3a61e3839f 100644 --- a/tests/junit-functional/org/jgroups/protocols/UNICAST_OOB_Test.java +++ b/tests/junit-functional/org/jgroups/protocols/UNICAST_OOB_Test.java @@ -78,7 +78,7 @@ private void sendMessages(boolean oob) throws Exception { for(int i=1; i <=5; i++) { Message msg=new Message(dest, null,(long)i); if(i == 4 && oob) - msg.setFlag(Message.OOB); + msg.setFlag(Message.Flag.OOB); System.out.println("-- sending message #" + i); a.send(msg); Util.sleep(100); diff --git a/tests/junit-functional/org/jgroups/tests/DynamicDiscardTest.java b/tests/junit-functional/org/jgroups/tests/DynamicDiscardTest.java index 4094e3b94ba..d1c159f5e56 100644 --- a/tests/junit-functional/org/jgroups/tests/DynamicDiscardTest.java +++ b/tests/junit-functional/org/jgroups/tests/DynamicDiscardTest.java @@ -78,7 +78,7 @@ public void testLeaveDuringSend() throws Exception { // send a RSVP message Message msg = new Message(null, "message2"); - msg.setFlag(Message.RSVP, Message.OOB); + msg.setFlag(Message.RSVP, Message.Flag.OOB); RspList rsps = dispatchers[0].castMessage(null, msg, RequestOptions.SYNC().setTimeout(5000)); Rsp objectRsp = rsps.get(channels[1].getAddress()); diff --git a/tests/junit-functional/org/jgroups/tests/MessageTest.java b/tests/junit-functional/org/jgroups/tests/MessageTest.java index e51f809535b..8a65e70b43a 100644 --- a/tests/junit-functional/org/jgroups/tests/MessageTest.java +++ b/tests/junit-functional/org/jgroups/tests/MessageTest.java @@ -30,12 +30,12 @@ public class MessageTest { public static void testFlags() { Message m1=new Message(); - assert !(m1.isFlagSet(Message.OOB)); + assert !(m1.isFlagSet(Message.Flag.OOB)); assert m1.getFlags() == 0; m1.setFlag((Message.Flag[])null); - assert !m1.isFlagSet(Message.OOB); + assert !m1.isFlagSet(Message.Flag.OOB); assert !m1.isFlagSet(null); } @@ -45,72 +45,72 @@ public static void testSettingMultipleFlags() { msg.setFlag((Message.Flag[])null); assert msg.getFlags() == 0; - msg.setFlag(Message.OOB, Message.NO_FC, null, Message.DONT_BUNDLE); - assert msg.isFlagSet(Message.OOB); + msg.setFlag(Message.Flag.OOB, Message.NO_FC, null, Message.Flag.DONT_BUNDLE); + assert msg.isFlagSet(Message.Flag.OOB); assert msg.isFlagSet(Message.NO_FC); - assert msg.isFlagSet(Message.DONT_BUNDLE); + assert msg.isFlagSet(Message.Flag.DONT_BUNDLE); } public static void testFlags2() { Message m1=new Message(); - m1.setFlag(Message.OOB); - assert m1.isFlagSet(Message.OOB); - assert Message.isFlagSet(m1.getFlags(), Message.OOB); - assert !(m1.isFlagSet(Message.DONT_BUNDLE)); - assert !Message.isFlagSet(m1.getFlags(), Message.DONT_BUNDLE); + m1.setFlag(Message.Flag.OOB); + assert m1.isFlagSet(Message.Flag.OOB); + assert Message.isFlagSet(m1.getFlags(), Message.Flag.OOB); + assert !(m1.isFlagSet(Message.Flag.DONT_BUNDLE)); + assert !Message.isFlagSet(m1.getFlags(), Message.Flag.DONT_BUNDLE); } public static void testFlags3() { Message msg=new Message(); - assert msg.isFlagSet(Message.OOB) == false; - msg.setFlag(Message.OOB); - assert msg.isFlagSet(Message.OOB); - msg.setFlag(Message.OOB); - assert msg.isFlagSet(Message.OOB); + assert msg.isFlagSet(Message.Flag.OOB) == false; + msg.setFlag(Message.Flag.OOB); + assert msg.isFlagSet(Message.Flag.OOB); + msg.setFlag(Message.Flag.OOB); + assert msg.isFlagSet(Message.Flag.OOB); } public static void testClearFlags() { Message msg=new Message(); - msg.setFlag(Message.OOB); - assert msg.isFlagSet(Message.OOB); - msg.clearFlag(Message.OOB); - assert msg.isFlagSet(Message.OOB) == false; - msg.clearFlag(Message.OOB); - assert msg.isFlagSet(Message.OOB) == false; - msg.setFlag(Message.OOB); - assert msg.isFlagSet(Message.OOB); + msg.setFlag(Message.Flag.OOB); + assert msg.isFlagSet(Message.Flag.OOB); + msg.clearFlag(Message.Flag.OOB); + assert msg.isFlagSet(Message.Flag.OOB) == false; + msg.clearFlag(Message.Flag.OOB); + assert msg.isFlagSet(Message.Flag.OOB) == false; + msg.setFlag(Message.Flag.OOB); + assert msg.isFlagSet(Message.Flag.OOB); } public static void testClearFlags2() { Message msg=new Message(); - msg.setFlag(Message.OOB); + msg.setFlag(Message.Flag.OOB); msg.setFlag(Message.NO_FC); - assert msg.isFlagSet(Message.DONT_BUNDLE) == false; - assert msg.isFlagSet(Message.OOB); + assert msg.isFlagSet(Message.Flag.DONT_BUNDLE) == false; + assert msg.isFlagSet(Message.Flag.OOB); assert msg.isFlagSet(Message.NO_FC); - msg.clearFlag(Message.OOB); - assert msg.isFlagSet(Message.OOB) == false; - msg.setFlag(Message.DONT_BUNDLE); - assert msg.isFlagSet(Message.DONT_BUNDLE); + msg.clearFlag(Message.Flag.OOB); + assert msg.isFlagSet(Message.Flag.OOB) == false; + msg.setFlag(Message.Flag.DONT_BUNDLE); + assert msg.isFlagSet(Message.Flag.DONT_BUNDLE); assert msg.isFlagSet(Message.NO_FC); msg.clearFlag(Message.NO_FC); assert msg.isFlagSet(Message.NO_FC) == false; msg.clearFlag(Message.NO_FC); assert msg.isFlagSet(Message.NO_FC) == false; - msg.clearFlag(Message.DONT_BUNDLE); - msg.clearFlag(Message.OOB); + msg.clearFlag(Message.Flag.DONT_BUNDLE); + msg.clearFlag(Message.Flag.OOB); assert msg.getFlags() == 0; - assert msg.isFlagSet(Message.OOB) == false; - assert msg.isFlagSet(Message.DONT_BUNDLE) == false; + assert msg.isFlagSet(Message.Flag.OOB) == false; + assert msg.isFlagSet(Message.Flag.DONT_BUNDLE) == false; assert msg.isFlagSet(Message.NO_FC) == false; - msg.setFlag(Message.DONT_BUNDLE); - assert msg.isFlagSet(Message.DONT_BUNDLE); - msg.setFlag(Message.DONT_BUNDLE); - assert msg.isFlagSet(Message.DONT_BUNDLE); + msg.setFlag(Message.Flag.DONT_BUNDLE); + assert msg.isFlagSet(Message.Flag.DONT_BUNDLE); + msg.setFlag(Message.Flag.DONT_BUNDLE); + assert msg.isFlagSet(Message.Flag.DONT_BUNDLE); } @@ -359,8 +359,8 @@ public static void testSizeMessageWithDestAndSrc() throws Exception { public static void testSizeMessageWithDestAndSrcAndFlags() throws Exception { Message msg=new Message(UUID.randomUUID(), UUID.randomUUID(), null); - msg.setFlag(Message.OOB); - msg.setFlag(Message.DONT_BUNDLE); + msg.setFlag(Message.Flag.OOB); + msg.setFlag(Message.Flag.DONT_BUNDLE); _testSize(msg); } diff --git a/tests/junit-functional/org/jgroups/tests/NakReceiverWindowTest.java b/tests/junit-functional/org/jgroups/tests/NakReceiverWindowTest.java index e756760c509..5edb41f2415 100644 --- a/tests/junit-functional/org/jgroups/tests/NakReceiverWindowTest.java +++ b/tests/junit-functional/org/jgroups/tests/NakReceiverWindowTest.java @@ -756,7 +756,7 @@ private static void add(int num_msgs, TimeScheduler timer) { private static Message oob() { Message retval=new Message(); - retval.setFlag(Message.OOB); + retval.setFlag(Message.Flag.OOB); return retval; } diff --git a/tests/junit/org/jgroups/blocks/executor/ExecutingServiceTest.java b/tests/junit/org/jgroups/blocks/executor/ExecutingServiceTest.java index abe78d66d95..ff55aef0e79 100644 --- a/tests/junit/org/jgroups/blocks/executor/ExecutingServiceTest.java +++ b/tests/junit/org/jgroups/blocks/executor/ExecutingServiceTest.java @@ -290,7 +290,7 @@ public void readFrom(DataInput in) throws Exception { @Test public void testSimpleSerializableCallableSubmit() throws InterruptedException, ExecutionException, TimeoutException { - Long value = Long.valueOf(100); + long value =100; Callable callable = new SimpleStreamableCallable(value); Thread consumer = new Thread(er2); consumer.start(); @@ -483,7 +483,7 @@ public void testNonSerializableCallable() throws SecurityException, Thread consumer = new Thread(er2); consumer.start(); - Long value = Long.valueOf(100); + long value =100; @SuppressWarnings("rawtypes") Constructor constructor = diff --git a/tests/junit/org/jgroups/tests/ConnectStressTest.java b/tests/junit/org/jgroups/tests/ConnectStressTest.java index 54acb078c0d..d279e584239 100755 --- a/tests/junit/org/jgroups/tests/ConnectStressTest.java +++ b/tests/junit/org/jgroups/tests/ConnectStressTest.java @@ -86,8 +86,8 @@ public void testConcurrentJoining() throws Exception { for(MyThread thread: threads) { View view=thread.getChannel().getView(); - int size=view.size(); - assert size == NUM : "view doesn't have size of " + NUM + " (has " + size + "): " + view; + int size=view != null? view.size() : 0; + assert view != null && size == NUM : "view doesn't have size of " + NUM + " (has " + size + "): " + view; } } diff --git a/tests/junit/org/jgroups/tests/Deadlock2Test.java b/tests/junit/org/jgroups/tests/Deadlock2Test.java index 23179f96e74..c048a78750f 100644 --- a/tests/junit/org/jgroups/tests/Deadlock2Test.java +++ b/tests/junit/org/jgroups/tests/Deadlock2Test.java @@ -165,7 +165,7 @@ public String outerMethod() throws Exception { MethodCall call = new MethodCall("innerMethod", new Object[0], new Class[0]); // RspList rspList = disp.callRemoteMethods(null, call, GroupResponseMode.GET_ALL, 5000); RequestOptions opts=new RequestOptions(ResponseMode.GET_ALL, 0, false, null, (Message.Flag[])null); - opts.setFlags(Message.OOB); + opts.setFlags(Message.Flag.OOB); RspList rspList = disp.callRemoteMethods(null, call, opts); List results = rspList.getResults(); log("results of calling innerMethod():\n" + rspList); diff --git a/tests/junit/org/jgroups/tests/DuplicateTest.java b/tests/junit/org/jgroups/tests/DuplicateTest.java index aa27828c2d3..b446e88202e 100644 --- a/tests/junit/org/jgroups/tests/DuplicateTest.java +++ b/tests/junit/org/jgroups/tests/DuplicateTest.java @@ -146,10 +146,10 @@ private static void send(Channel sender_channel, Address dest, boolean oob, bool Message msg=new Message(dest, null, seqno++); if(mixed) { if(i % 2 == 0) - msg.setFlag(Message.OOB); + msg.setFlag(Message.Flag.OOB); } else if(oob) { - msg.setFlag(Message.OOB); + msg.setFlag(Message.Flag.OOB); } sender_channel.send(msg); diff --git a/tests/junit/org/jgroups/tests/NAKACK_Test.java b/tests/junit/org/jgroups/tests/NAKACK_Test.java index ff54800c640..792d7077d3e 100644 --- a/tests/junit/org/jgroups/tests/NAKACK_Test.java +++ b/tests/junit/org/jgroups/tests/NAKACK_Test.java @@ -66,7 +66,7 @@ public void testOutOfBandMessages() throws Exception { for(int i=1; i <=5; i++) { Message msg=new Message(null, null,(long)i); if(i == 4) - msg.setFlag(Message.OOB); + msg.setFlag(Message.Flag.OOB); System.out.println("-- sending message #" + i); c1.send(msg); Util.sleep(100); diff --git a/tests/junit/org/jgroups/tests/OOBTest.java b/tests/junit/org/jgroups/tests/OOBTest.java index 57133b8923a..7a6fa5a3d73 100644 --- a/tests/junit/org/jgroups/tests/OOBTest.java +++ b/tests/junit/org/jgroups/tests/OOBTest.java @@ -67,7 +67,7 @@ public void testRegularAndOOBUnicasts() throws Exception { Address dest=b.getAddress(); Message m1=new Message(dest, 1); - Message m2=new Message(dest, 2).setFlag(Message.OOB); + Message m2=new Message(dest, 2).setFlag(Message.Flag.OOB); Message m3=new Message(dest, 3); MyReceiver receiver=new MyReceiver("B"); @@ -95,8 +95,8 @@ public void testRegularAndOOBUnicasts2() throws Exception { Address dest=b.getAddress(); Message m1=new Message(dest, 1); - Message m2=new Message(dest, 2).setFlag(Message.OOB); - Message m3=new Message(dest, 3).setFlag(Message.OOB); + Message m2=new Message(dest, 2).setFlag(Message.Flag.OOB); + Message m3=new Message(dest, 3).setFlag(Message.Flag.OOB); Message m4=new Message(dest, 4); MyReceiver receiver=new MyReceiver("B"); @@ -127,7 +127,7 @@ public void testRegularAndOOBMulticasts() throws Exception { Address dest=null; // send to all Message m1=new Message(dest, 1); - Message m2=new Message(dest, 2).setFlag(Message.OOB); + Message m2=new Message(dest, 2).setFlag(Message.Flag.OOB); Message m3=new Message(dest, 3); MyReceiver receiver=new MyReceiver("B"); @@ -198,7 +198,7 @@ public void testOOBMessageLoss() throws Exception { final int NUM=10; for(int i=1; i <= NUM; i++) - a.send(new Message(null, i).setFlag(Message.OOB)); + a.send(new Message(null, i).setFlag(Message.Flag.OOB)); STABLE stable=(STABLE)a.getProtocolStack().findProtocol(STABLE.class); if(stable != null) @@ -230,7 +230,7 @@ public void testOOBUnicastMessageLoss() throws Exception { final int NUM=10; final Address dest=b.getAddress(); for(int i=1; i <= NUM; i++) - a.send(new Message(dest, i).setFlag(Message.OOB)); + a.send(new Message(dest, i).setFlag(Message.Flag.OOB)); Collection msgs=receiver.getMsgs(); for(int i=0; i < 20; i++) { @@ -267,7 +267,7 @@ public void run() { int num=counter.incrementAndGet(); Message msg=new Message(dest, num); if(oob) - msg.setFlag(Message.OOB); + msg.setFlag(Message.Flag.OOB); try { sender.send(msg); } @@ -291,7 +291,7 @@ public void run() { boolean oob=Util.tossWeightedCoin(oob_prob); Message msg=new Message(dest, null, i); if(oob) - msg.setFlag(Message.OOB); + msg.setFlag(Message.Flag.OOB); sender.send(msg); } } @@ -305,7 +305,7 @@ private void send(Address dest) throws Exception { a.send(dest, 1); // the only regular message for(int i=2; i <= NUM; i++) - a.send(new Message(dest, i).setFlag(Message.OOB)); + a.send(new Message(dest, i).setFlag(Message.Flag.OOB)); sendStableMessages(a,b); List list=receiver.getMsgs(); @@ -393,7 +393,7 @@ private static class BlockingReceiver extends ReceiverAdapter { public List getMsgs() {return msgs;} public void receive(Message msg) { - if(!msg.isFlagSet(Message.OOB)) { + if(!msg.isFlagSet(Message.Flag.OOB)) { try { System.out.println(Thread.currentThread() + ": waiting on latch"); latch.await(25000,TimeUnit.MILLISECONDS); diff --git a/tests/junit/org/jgroups/tests/UUIDCacheClearTest.java b/tests/junit/org/jgroups/tests/UUIDCacheClearTest.java index 737b52e242a..579a0f5b3b4 100644 --- a/tests/junit/org/jgroups/tests/UUIDCacheClearTest.java +++ b/tests/junit/org/jgroups/tests/UUIDCacheClearTest.java @@ -24,12 +24,12 @@ public void testCacheClear() throws Exception { try { a=createChannel(true, 2, "A"); a.setReceiver(r1); - a.connect("testCacheClear"); + a.connect("UUIDCacheClearTest"); b=createChannel(a, "B"); b.setReceiver(r2); - b.connect("testCacheClear"); + b.connect("UUIDCacheClearTest"); - Util.waitUntilAllChannelsHaveSameSize(10000, 500, a, b); + Util.waitUntilAllChannelsHaveSameSize(10000, 1000, a, b); // send one unicast message from a to b and vice versa @@ -53,7 +53,6 @@ public void testCacheClear() throws Exception { clearCache(a,b); printCaches(a,b); - r1.clear(); r2.clear(); @@ -103,10 +102,8 @@ protected static void stable(JChannel... channels) { private static void printCaches(JChannel ... channels) { System.out.println("caches:\n"); - for(JChannel ch: channels) { - System.out.println(ch.getAddress() + ":\n" + - ch.getProtocolStack().getTransport().printLogicalAddressCache()); - } + for(JChannel ch: channels) + System.out.println(ch.getAddress() + ":\n" + ch.getProtocolStack().getTransport().printLogicalAddressCache()); } private static class MyReceiver extends ReceiverAdapter { diff --git a/tests/other/org/jgroups/tests/MessageDispatcherSpeedTest.java b/tests/other/org/jgroups/tests/MessageDispatcherSpeedTest.java index cb41fb595e2..d869ededa14 100644 --- a/tests/other/org/jgroups/tests/MessageDispatcherSpeedTest.java +++ b/tests/other/org/jgroups/tests/MessageDispatcherSpeedTest.java @@ -67,7 +67,7 @@ void sendMessages(int num) throws Exception { if(show <=0) show=1; start=System.currentTimeMillis(); - RequestOptions opts=new RequestOptions(ResponseMode.GET_ALL, TIMEOUT).setFlags(Message.DONT_BUNDLE, Message.NO_FC); + RequestOptions opts=new RequestOptions(ResponseMode.GET_ALL, TIMEOUT).setFlags(Message.Flag.DONT_BUNDLE, Message.NO_FC); System.out.println("-- sending " + num + " messages"); for(int i=1; i <= num; i++) { diff --git a/tests/other/org/jgroups/tests/PingPong.java b/tests/other/org/jgroups/tests/PingPong.java index 7948f6af2a0..4d6e2275799 100644 --- a/tests/other/org/jgroups/tests/PingPong.java +++ b/tests/other/org/jgroups/tests/PingPong.java @@ -41,7 +41,7 @@ public void start(String props, String name, boolean unicast) throws Exception { dest=(Address)Util.pickRandomElement(members); Message msg=new Message(dest, null, PING_REQ); - msg.setFlag(Message.DONT_BUNDLE, Message.NO_FC); + msg.setFlag(Message.Flag.DONT_BUNDLE, Message.NO_FC); start=System.nanoTime(); ch.send(msg); } @@ -58,7 +58,7 @@ public void receive(Message msg) { switch(type) { case PING: final Message rsp=new Message(msg.getSrc(), null, PONG_RSP); - rsp.setFlag(Message.DONT_BUNDLE, Message.NO_FC); + rsp.setFlag(Message.Flag.DONT_BUNDLE, Message.NO_FC); try { ch.send(rsp); } diff --git a/tests/other/org/jgroups/tests/RpcDispatcherSpeedTest.java b/tests/other/org/jgroups/tests/RpcDispatcherSpeedTest.java index c16349670da..08609dbc2b2 100644 --- a/tests/other/org/jgroups/tests/RpcDispatcherSpeedTest.java +++ b/tests/other/org/jgroups/tests/RpcDispatcherSpeedTest.java @@ -114,9 +114,9 @@ void invokeRpcs(int num, int num_threads, boolean async, boolean oob) throws Exc measure_method_call=new MethodCall((short)0); RequestOptions opts=new RequestOptions(request_type, TIMEOUT, false, null, - Message.DONT_BUNDLE, Message.NO_FC); + Message.Flag.DONT_BUNDLE, Message.NO_FC); if(oob) - opts.setFlags(Message.OOB); + opts.setFlags(Message.Flag.OOB); final AtomicInteger sent=new AtomicInteger(0); final CountDownLatch latch=new CountDownLatch(1); diff --git a/tests/other/org/jgroups/tests/UnicastTest.java b/tests/other/org/jgroups/tests/UnicastTest.java index 9f9cf1c77be..daf1a79f9af 100644 --- a/tests/other/org/jgroups/tests/UnicastTest.java +++ b/tests/other/org/jgroups/tests/UnicastTest.java @@ -346,7 +346,7 @@ public void run() { buf=Util.objectToByteBuffer(val); Message msg=new Message(destination, null, buf); if(oob) - msg.setFlag(Message.OOB); + msg.setFlag(Message.Flag.OOB); if(i > 0 && print > 0 && i % print == 0) System.out.println("-- sent " + i); channel.send(msg); diff --git a/tests/other/org/jgroups/tests/UnicastTestRpc.java b/tests/other/org/jgroups/tests/UnicastTestRpc.java index 9b7ab32acf8..60607eb29d7 100644 --- a/tests/other/org/jgroups/tests/UnicastTestRpc.java +++ b/tests/other/org/jgroups/tests/UnicastTestRpc.java @@ -176,8 +176,8 @@ void invokeRpcs() throws Throwable { // The first call needs to be synchronous with OOB ! RequestOptions options=new RequestOptions(ResponseMode.GET_ALL, 15000, anycasting, null); - if(sync) options.setFlags(Message.DONT_BUNDLE); - if(oob) options.setFlags(Message.OOB); + if(sync) options.setFlags(Message.Flag.DONT_BUNDLE); + if(oob) options.setFlags(Message.Flag.OOB); options.setMode(sync? ResponseMode.GET_ALL : ResponseMode.GET_NONE); diff --git a/tests/perf/org/jgroups/tests/perf/UPerf.java b/tests/perf/org/jgroups/tests/perf/UPerf.java index e3bb501db47..79b27f433af 100644 --- a/tests/perf/org/jgroups/tests/perf/UPerf.java +++ b/tests/perf/org/jgroups/tests/perf/UPerf.java @@ -364,7 +364,7 @@ else if(prot instanceof UNICAST2) /** Kicks off the benchmark on all cluster nodes */ void startBenchmark() throws Throwable { RequestOptions options=new RequestOptions(ResponseMode.GET_ALL, 0); - options.setFlags(Message.OOB, Message.DONT_BUNDLE, Message.NO_FC); + options.setFlags(Message.Flag.OOB, Message.Flag.DONT_BUNDLE, Message.NO_FC); RspList responses=disp.callRemoteMethods(null, new MethodCall(START), options); long total_reqs=0; @@ -481,16 +481,16 @@ public void run() { RequestOptions put_options=new RequestOptions(sync ? ResponseMode.GET_ALL : ResponseMode.GET_NONE, 40000, true, null); // Don't use bundling as we have sync requests (e.g. GETs) regardless of whether we set sync=true or false - get_options.setFlags(Message.DONT_BUNDLE); - put_options.setFlags(Message.DONT_BUNDLE); + get_options.setFlags(Message.Flag.DONT_BUNDLE); + put_options.setFlags(Message.Flag.DONT_BUNDLE); if(oob) { - get_options.setFlags(Message.OOB); - put_options.setFlags(Message.OOB); + get_options.setFlags(Message.Flag.OOB); + put_options.setFlags(Message.Flag.OOB); } if(sync) { - get_options.setFlags(Message.DONT_BUNDLE, Message.NO_FC); - put_options.setFlags(Message.DONT_BUNDLE, Message.NO_FC); + get_options.setFlags(Message.Flag.DONT_BUNDLE, Message.NO_FC); + put_options.setFlags(Message.Flag.DONT_BUNDLE, Message.NO_FC); } if(use_anycast_addrs) { put_options.useAnycastAddresses(true); diff --git a/tests/perf/org/jgroups/tests/perf/UUPerf.java b/tests/perf/org/jgroups/tests/perf/UUPerf.java index dadbb662642..e866c9844be 100644 --- a/tests/perf/org/jgroups/tests/perf/UUPerf.java +++ b/tests/perf/org/jgroups/tests/perf/UUPerf.java @@ -292,7 +292,7 @@ else if(prot instanceof UNICAST2) */ void startBenchmark() throws Throwable { RequestOptions options=new RequestOptions(ResponseMode.GET_ALL,0); - options.setFlags(Message.OOB,Message.DONT_BUNDLE); + options.setFlags(Message.Flag.OOB,Message.Flag.DONT_BUNDLE); RspList responses=disp.callRemoteMethods(null,new MethodCall(START),options); long total_reqs=0; @@ -382,11 +382,11 @@ public void run() { RequestOptions apply_state_options=new RequestOptions(sync? ResponseMode.GET_ALL : ResponseMode.GET_NONE,400000,true,null); if(oob) { - apply_state_options.setFlags(Message.OOB); + apply_state_options.setFlags(Message.Flag.OOB); } if(sync) { - // apply_state_options.setFlags(Message.DONT_BUNDLE,Message.NO_FC); - apply_state_options.setFlags(Message.DONT_BUNDLE); + // apply_state_options.setFlags(Message.Flag.DONT_BUNDLE,Message.NO_FC); + apply_state_options.setFlags(Message.Flag.DONT_BUNDLE); } apply_state_options.setFlags(Message.Flag.RSVP);