Permalink
Browse files

- Implemented up(MessageBatch) in RSVP

  • Loading branch information...
1 parent 19fd77d commit 089f2c1d1e860cbda33ef97f6fbb7cdc4c1db3c2 Bela Ban committed Feb 26, 2013
Showing with 62 additions and 50 deletions.
  1. +1 −1 src/org/jgroups/demos/ProgrammaticChat.java
  2. +23 −2 src/org/jgroups/protocols/RSVP.java
  3. +2 −2 tests/byteman/org/jgroups/tests/byteman/BecomeServerTest.java
  4. +2 −2 tests/byteman/org/jgroups/tests/byteman/ForwardToCoordFailoverTest.java
  5. +2 −2 tests/byteman/org/jgroups/tests/byteman/MessageBeforeConnectedTest.java
  6. +3 −3 tests/byteman/org/jgroups/tests/helpers/MessageBeforeConnectedTestHelper.java
  7. +2 −4 tests/junit-functional/org/jgroups/protocols/FORWARD_TO_COORD_Test.java
  8. +1 −1 tests/junit-functional/org/jgroups/protocols/SUPERVISOR_Test.java
  9. +1 −1 tests/junit-functional/org/jgroups/tests/FCTest.java
  10. +2 −2 tests/junit-functional/org/jgroups/tests/FdMonitorTest.java
  11. +1 −1 tests/junit-functional/org/jgroups/tests/FragTest.java
  12. +2 −6 tests/junit-functional/org/jgroups/tests/LargeMergeTest.java
  13. +2 −2 tests/junit-functional/org/jgroups/tests/MergeTest2.java
  14. +1 −1 tests/junit-functional/org/jgroups/tests/MergeTest3.java
  15. +1 −1 tests/junit-functional/org/jgroups/tests/MessageDispatcherRSVPTest.java
  16. +1 −1 tests/junit-functional/org/jgroups/tests/NakackTest.java
  17. +1 −1 tests/junit-functional/org/jgroups/tests/OrderingTest.java
  18. +1 −1 tests/junit-functional/org/jgroups/tests/ProgrammaticApiTest.java
  19. +3 −6 tests/junit-functional/org/jgroups/tests/RSVPTest.java
  20. +3 −3 tests/junit-functional/org/jgroups/tests/Relay2Test.java
  21. +2 −2 tests/junit-functional/org/jgroups/tests/RpcDispatcherAsyncInvocationTest.java
  22. +1 −1 tests/junit/org/jgroups/tests/GossipRouterTest.java
  23. +1 −1 tests/junit/org/jgroups/tests/SequencerMergeTest.java
  24. +1 −1 tests/junit/org/jgroups/tests/TCPGOSSIP_Test.java
  25. +1 −1 tests/junit/org/jgroups/tests/TUNNELDeadLockTest.java
  26. +1 −1 tests/junit/org/jgroups/tests/TUNNEL_Test.java
@@ -30,7 +30,7 @@ public static void main(String[] args) throws Exception {
.addProtocol(new VERIFY_SUSPECT())
.addProtocol(new BARRIER())
.addProtocol(new NAKACK2())
- .addProtocol(new UNICAST2())
+ .addProtocol(new UNICAST3())
.addProtocol(new STABLE())
.addProtocol(new GMS())
.addProtocol(new UFC())
@@ -7,6 +7,7 @@
import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol;
import org.jgroups.util.AckCollector;
+import org.jgroups.util.MessageBatch;
import org.jgroups.util.TimeScheduler;
import java.io.DataInput;
@@ -188,6 +189,27 @@ public Object up(Event evt) {
return up_prot.up(evt);
}
+ public void up(MessageBatch batch) {
+ for(Message msg: batch) {
+ if(!msg.isFlagSet(Message.Flag.RSVP))
+ continue;
+ RsvpHeader hdr=(RsvpHeader)msg.getHeader(id);
+ if(hdr == null) {
+ log.error("message with RSVP flag needs to have an RsvpHeader");
+ continue;
+ }
+ try {
+ batch.remove(msg);
+ up(new Event(Event.MSG, msg));
+ }
+ catch(Throwable t) {
+ log.error("failed passing up message", t);
+ }
+ }
+
+ if(!batch.isEmpty())
+ up_prot.up(batch);
+ }
protected void handleView(View view) {
members=view.getMembers();
@@ -264,8 +286,7 @@ public void run() {
cancelTask();
return;
}
- Message msg=new Message(target);
- msg.setFlag(Message.Flag.RSVP);
+ Message msg=new Message(target).setFlag(Message.Flag.RSVP);
RsvpHeader hdr=new RsvpHeader(RsvpHeader.REQ_ONLY, rsvp_id);
msg.putHeader(id, hdr);
if(log.isTraceEnabled())
@@ -8,7 +8,7 @@
import org.jgroups.ReceiverAdapter;
import org.jgroups.protocols.PING;
import org.jgroups.protocols.SHARED_LOOPBACK;
-import org.jgroups.protocols.UNICAST2;
+import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.util.Util;
@@ -93,7 +93,7 @@ protected JChannel createChannel(String name) throws Exception {
.setValue("enable_unicast_bundling", false),
new PING().setValue("timeout", 500).setValue("num_initial_members", 2),
new NAKACK2().setValue("become_server_queue_size", 10),
- new UNICAST2(),
+ new UNICAST3(),
new GMS().setValue("print_local_addr", false));
ch.setName(name);
return ch;
@@ -7,7 +7,7 @@
import org.jgroups.protocols.FORWARD_TO_COORD;
import org.jgroups.protocols.PING;
import org.jgroups.protocols.SHARED_LOOPBACK;
-import org.jgroups.protocols.UNICAST2;
+import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.util.Util;
@@ -157,7 +157,7 @@ protected JChannel createChannel(final String name, final String cluster_name) t
// new DISCARD(),
new PING().setValue("timeout", 500),
new NAKACK2(),
- new UNICAST2(),
+ new UNICAST3(),
new GMS(),
new FORWARD_TO_COORD());
retval.setName(name);
@@ -8,7 +8,7 @@
import org.jgroups.ReceiverAdapter;
import org.jgroups.protocols.PING;
import org.jgroups.protocols.SHARED_LOOPBACK;
-import org.jgroups.protocols.UNICAST2;
+import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.util.Util;
@@ -86,7 +86,7 @@ protected JChannel createChannel(String name) throws Exception {
.setValue("enable_unicast_bundling", false),
new PING().setValue("timeout", 500).setValue("num_initial_members", 2),
new NAKACK2().setValue("become_server_queue_size", 10),
- new UNICAST2(),
+ new UNICAST3(),
new GMS().setValue("print_local_addr", false));
ch.setName(name);
return ch;
@@ -5,7 +5,7 @@
import org.jgroups.Event;
import org.jgroups.JChannel;
import org.jgroups.Message;
-import org.jgroups.protocols.UNICAST2;
+import org.jgroups.protocols.UNICAST3;
/**
* @author Bela Ban
@@ -24,8 +24,8 @@ public void sendUnicast(JChannel ch) throws Exception {
final Message msg=new Message(ch.getAddress(), ch.getAddress(), "hello-1");
// Add a UNICAST2 header
- final UNICAST2 unicast=(UNICAST2)ch.getProtocolStack().findProtocol(UNICAST2.class);
- UNICAST2.Unicast2Header hdr=UNICAST2.Unicast2Header.createDataHeader(1, (short)1, true);
+ final UNICAST3 unicast=(UNICAST3)ch.getProtocolStack().findProtocol(UNICAST3.class);
+ UNICAST3.Header hdr=UNICAST3.Header.createDataHeader(1, (short)1, true);
msg.putHeader(unicast.getId(), hdr);
new Thread() {
@@ -40,11 +40,9 @@ void setUp() throws Exception {
.setValue("log_discard_msgs",true).setValue("log_not_found_msgs",true)
.setValue("xmit_table_num_rows",5)
.setValue("xmit_table_msgs_per_row",10),
- new UNICAST2().setValue("xmit_table_num_rows",5).setValue("xmit_interval", 300)
+ new UNICAST3().setValue("xmit_table_num_rows",5).setValue("xmit_interval", 300)
.setValue("xmit_table_msgs_per_row",10)
- .setValue("conn_expiry_timeout", 10000)
- .setValue("stable_interval",30000)
- .setValue("max_bytes",50000),
+ .setValue("conn_expiry_timeout", 10000),
new GMS().setValue("print_local_addr",false)
.setValue("leave_timeout",2000)
.setValue("log_view_warnings",false)
@@ -66,7 +66,7 @@ protected JChannel createChannel(String name) throws Exception {
new PING().setValue("timeout", 500),
new FD(),
new NAKACK2().setValue("use_mcast_xmit", false),
- new UNICAST2(),
+ new UNICAST3(),
new STABLE().setValue("max_bytes", 50000),
new GMS().setValue("print_local_addr", false),
new SUPERVISOR());
@@ -43,7 +43,7 @@ protected void setUp(Class<? extends Protocol> flow_control_class) throws Except
ch=new JChannel(new SHARED_LOOPBACK().setValue("thread_pool_rejection_policy", "run").setValue("loopback", true),
new PING(),
new NAKACK2().setValue("use_mcast_xmit", false),
- new UNICAST2(),
+ new UNICAST3(),
new STABLE().setValue("max_bytes", 50000),
new GMS().setValue("print_local_addr", false),
flow_control_prot,
@@ -5,7 +5,7 @@
import org.jgroups.protocols.FD;
import org.jgroups.protocols.PING;
import org.jgroups.protocols.SHARED_LOOPBACK;
-import org.jgroups.protocols.UNICAST2;
+import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.util.Util;
@@ -68,7 +68,7 @@ protected JChannel createChannel(String name) throws Exception {
new PING().setValue("timeout",500).setValue("num_initial_members",2),
new FD().setValue("timeout", 1000).setValue("max_tries", 3),
new NAKACK2(),
- new UNICAST2(),
+ new UNICAST3(),
new GMS().setValue("print_local_addr",false));
ch.setName(name);
return ch;
@@ -100,7 +100,7 @@ protected static JChannel createChannel() throws Exception {
stack.addProtocol(new SHARED_LOOPBACK())
.addProtocol(new PING())
.addProtocol(new NAKACK2().setValue("use_mcast_xmit", false))
- .addProtocol(new UNICAST2())
+ .addProtocol(new UNICAST3())
.addProtocol(new STABLE().setValue("max_bytes", 50000))
.addProtocol(new GMS().setValue("print_local_addr", false))
.addProtocol(new UFC())
@@ -92,13 +92,9 @@ void setUp() throws Exception {
.setValue("log_discard_msgs",false).setValue("log_not_found_msgs",false)
.setValue("xmit_table_num_rows",5)
.setValue("xmit_table_msgs_per_row",10),
- //new UNICAST().setValue("segment_capacity", 100)
- //.setValue("conn_expiry_timeout", 10000),
- new UNICAST2().setValue("xmit_table_num_rows",5)
+ new UNICAST3().setValue("xmit_table_num_rows",5)
.setValue("xmit_table_msgs_per_row",10)
- .setValue("conn_expiry_timeout", 10000)
- .setValue("stable_interval", 30000)
- .setValue("max_bytes", 50000),
+ .setValue("conn_expiry_timeout", 10000),
new STABLE().setValue("max_bytes",500000),
new GMS().setValue("print_local_addr",false)
.setValue("leave_timeout",100)
@@ -7,7 +7,7 @@
import org.jgroups.protocols.DISCARD;
import org.jgroups.protocols.PING;
import org.jgroups.protocols.SHARED_LOOPBACK;
-import org.jgroups.protocols.UNICAST;
+import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.STABLE;
@@ -76,7 +76,7 @@ protected JChannel createChannel(String name, TimeScheduler timer, Executor thre
new PING().setValue("timeout",100),
new NAKACK2().setValue("use_mcast_xmit",false)
.setValue("log_discard_msgs",false).setValue("log_not_found_msgs",false),
- new UNICAST(),
+ new UNICAST3(),
new STABLE().setValue("max_bytes",50000),
new GMS().setValue("print_local_addr",false)
.setValue("leave_timeout",100)
@@ -76,7 +76,7 @@ protected JChannel createChannel(String name, TimeScheduler timer, Executor thre
new PING().setValue("timeout",100),
new NAKACK2().setValue("use_mcast_xmit",false)
.setValue("log_discard_msgs",false).setValue("log_not_found_msgs",false),
- new UNICAST(),
+ new UNICAST3(),
new STABLE().setValue("max_bytes",50000),
new GMS().setValue("print_local_addr",false)
.setValue("leave_timeout",100)
@@ -80,7 +80,7 @@ void setUp() throws Exception {
new NAKACK2().setValue("use_mcast_xmit",false)
.setValue("discard_delivered_msgs",true)
.setValue("log_discard_msgs", false).setValue("log_not_found_msgs", false),
- new UNICAST2().setValue("xmit_table_num_rows",5).setValue("xmit_interval", 300),
+ new UNICAST3().setValue("xmit_table_num_rows",5).setValue("xmit_interval", 300),
new RSVP().setValue("timeout", 10000).setValue("throw_exception_on_timeout", true),
new GMS().setValue("print_local_addr",false)
.setValue("leave_timeout",100)
@@ -134,7 +134,7 @@ protected static JChannel createChannel() throws Exception {
.addProtocol(new PING().setValue("timeout", 2000).setValue("num_initial_members", 3))
.addProtocol(new MERGE2().setValue("min_interval", 1000).setValue("max_interval", 3000))
.addProtocol(new NAKACK2().setValue("use_mcast_xmit", false))
- .addProtocol(new UNICAST2())
+ .addProtocol(new UNICAST3())
.addProtocol(new STABLE().setValue("max_bytes", 50000))
.addProtocol(new GMS().setValue("print_local_addr", false))
.addProtocol(new UFC())
@@ -77,7 +77,7 @@ protected static JChannel createChannel() throws Exception {
.addProtocol(new VERIFY_SUSPECT())
.addProtocol(new BARRIER())
.addProtocol(new NAKACK2().setValue("use_mcast_xmit", false).setValue("discard_delivered_msgs", true))
- .addProtocol(new UNICAST2().setValue("stable_interval", 10000).setValue("max_bytes", 50000))
+ .addProtocol(new UNICAST3())
.addProtocol(new STABLE().setValue("max_bytes", 50000))
.addProtocol(new GMS().setValue("print_local_addr", false))
.addProtocol(new UFC().setValue("max_credits", 2000000))
@@ -100,7 +100,7 @@ public void testSharedTransport() throws Exception {
new VERIFY_SUSPECT(),
new BARRIER(),
new NAKACK2(),
- new UNICAST2(),
+ new UNICAST3(),
new STABLE(),
new GMS(),
new UFC(),
@@ -24,7 +24,7 @@
/**
- * Tests tthe {@link RSVP} protocol
+ * Tests the {@link RSVP} protocol
* @author Bela Ban
*/
@Test(groups=Global.FUNCTIONAL,sequential=true)
@@ -79,12 +79,9 @@ void setUp() throws Exception {
.setValue("log_discard_msgs",false).setValue("log_not_found_msgs",false)
.setValue("xmit_table_num_rows",5)
.setValue("xmit_table_msgs_per_row",10),
- // new UNICAST(),
- new UNICAST2().setValue("xmit_table_num_rows",5).setValue("xmit_interval", 300)
+ new UNICAST3().setValue("xmit_table_num_rows",5).setValue("xmit_interval", 300)
.setValue("xmit_table_msgs_per_row",10)
- .setValue("conn_expiry_timeout", 10000)
- .setValue("stable_interval", 30000)
- .setValue("max_bytes", 50000),
+ .setValue("conn_expiry_timeout", 10000),
new RSVP().setValue("timeout", 10000).setValue("throw_exception_on_timeout", false),
// new STABLE().setValue("max_bytes",500000).setValue("desired_avg_gossip", 60000),
new GMS().setValue("print_local_addr",false)
@@ -4,7 +4,7 @@
import org.jgroups.protocols.FORWARD_TO_COORD;
import org.jgroups.protocols.PING;
import org.jgroups.protocols.SHARED_LOOPBACK;
-import org.jgroups.protocols.UNICAST2;
+import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.relay.RELAY2;
@@ -259,7 +259,7 @@ protected JChannel createNode(String site_name, String node_name, String cluster
JChannel ch=new JChannel(new SHARED_LOOPBACK(),
new PING().setValue("timeout", 300).setValue("num_initial_members", 2),
new NAKACK2(),
- new UNICAST2(),
+ new UNICAST3(),
new GMS().setValue("print_local_addr", false),
new FORWARD_TO_COORD(),
createRELAY2(site_name));
@@ -290,7 +290,7 @@ protected RELAY2 createRELAY2(String site_name) {
new SHARED_LOOPBACK(),
new PING().setValue("timeout", 500).setValue("num_initial_members", 2),
new NAKACK2(),
- new UNICAST2(),
+ new UNICAST3(),
new GMS().setValue("print_local_addr", false)
};
}
@@ -7,7 +7,7 @@
import org.jgroups.protocols.PING;
import org.jgroups.protocols.SHARED_LOOPBACK;
import org.jgroups.protocols.TP;
-import org.jgroups.protocols.UNICAST;
+import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.stack.Protocol;
@@ -132,7 +132,7 @@ protected static JChannel createChannel(String name) throws Exception {
transport,
new PING().setValue("timeout", 500),
new NAKACK2(),
- new UNICAST(),
+ new UNICAST3(),
new GMS()
}).name(name);
}
@@ -118,7 +118,7 @@ protected JChannel createTunnelChannel(String name, boolean include_failure_dete
protocols.addAll(Arrays.asList(tunnel,new PING(),new MERGE2().setValue("min_interval",1000).setValue("max_interval",3000)));
if(include_failure_detection)
protocols.addAll(Arrays.asList(new FD().setValue("timeout", 2000).setValue("max_tries", 2), new VERIFY_SUSPECT()));
- protocols.addAll(Arrays.asList(new NAKACK2().setValue("use_mcast_xmit", false), new UNICAST(), new STABLE(), new GMS()));
+ protocols.addAll(Arrays.asList(new NAKACK2().setValue("use_mcast_xmit", false), new UNICAST3(), new STABLE(), new GMS()));
JChannel ch=new JChannel(protocols);
if(name != null)
ch.setName(name);
@@ -198,7 +198,7 @@ protected JChannel create(String name, boolean insert_discard) throws Exception
new PING().setValue("timeout",100),
new NAKACK2().setValue("use_mcast_xmit",false)
.setValue("log_discard_msgs",false).setValue("log_not_found_msgs",false),
- new UNICAST2(),
+ new UNICAST3(),
new STABLE().setValue("max_bytes",50000),
new SEQUENCER(), // below GMS, to establish total order between views and messages
new GMS().setValue("print_local_addr",false).setValue("leave_timeout",100)
Oops, something went wrong.

0 comments on commit 089f2c1

Please sign in to comment.