Permalink
Browse files

- MessageBatch: added replace() and remove(), removed method with ind…

…ex (set(), get())

- Not bundling unicast messages whose dest is a physical address
- Removing messages not added to the receiver table from the message list (https://issues.jboss.org/browse/JGRP-1593)
- Messages whose dest is a physical address are marked as DONT_BUNDLE, so they don't get sent as multicast by the bundler
- Set rejection_policy to "discard" by default
- Implemented up(MessageBatch) in ProtocolStack and JChannel
  • Loading branch information...
1 parent 316313c commit 66fee6c9034086473bf2a9b30922d02655fdb1a9 Bela Ban committed Feb 15, 2013
Showing with 588 additions and 424 deletions.
  1. +1 −1 conf/testng/byteman.xml
  2. +1 −1 conf/testng/testng-tcp.xml
  3. +1 −1 conf/toa.xml
  4. +1 −1 conf/udp-largecluster.xml
  5. +1 −1 conf/udp.xml
  6. +36 −3 src/org/jgroups/JChannel.java
  7. +2 −5 src/org/jgroups/blocks/RpcDispatcher.java
  8. +14 −4 src/org/jgroups/blocks/TCPConnectionMap.java
  9. +1 −1 src/org/jgroups/demos/Draw.java
  10. +2 −4 src/org/jgroups/protocols/COMPRESS.java
  11. +1 −4 src/org/jgroups/protocols/DELAY.java
  12. +2 −4 src/org/jgroups/protocols/Discovery.java
  13. +1 −3 src/org/jgroups/protocols/EXAMPLE.java
  14. +2 −2 src/org/jgroups/protocols/FD_ALL.java
  15. +1 −2 src/org/jgroups/protocols/HDRS.java
  16. +2 −3 src/org/jgroups/protocols/SHUFFLE.java
  17. +1 −1 src/org/jgroups/protocols/TCP.java
  18. +27 −26 src/org/jgroups/protocols/TP.java
  19. +1 −1 src/org/jgroups/protocols/pbcast/NAKACK2.java
  20. +10 −1 src/org/jgroups/stack/ProtocolStack.java
  21. +62 −40 src/org/jgroups/util/MessageBatch.java
  22. +22 −4 src/org/jgroups/util/Table.java
  23. +4 −4 src/org/jgroups/util/Util.java
  24. +0 −16 tests/byteman/org/jgroups/tests/byteman/TCPConnectionMapTest.java
  25. +2 −2 tests/junit-functional/org/jgroups/blocks/RpcDispatcherTest.java
  26. +2 −1 tests/junit-functional/org/jgroups/protocols/UNICAST_ConnectionTests.java
  27. +7 −4 tests/junit-functional/org/jgroups/tests/ConfiguratorTest.java
  28. +150 −33 tests/junit-functional/org/jgroups/tests/MessageBatchTest.java
  29. +13 −8 tests/junit-functional/org/jgroups/tests/RpcDispatcherAsyncInvocationTest.java
  30. +24 −0 tests/junit-functional/org/jgroups/tests/TableTest.java
  31. +4 −2 tests/junit/org/jgroups/blocks/LockServiceTest.java
  32. +40 −53 tests/junit/org/jgroups/blocks/executor/ExecutingServiceTest.java
  33. +1 −0 tests/junit/org/jgroups/tests/ChannelTestBase.java
  34. +2 −2 tests/junit/org/jgroups/tests/ConnectStressTest.java
  35. +51 −55 tests/junit/org/jgroups/tests/DuplicateTest.java
  36. +0 −1 tests/junit/org/jgroups/tests/FlushCloseOpenTest.java
  37. +2 −2 tests/junit/org/jgroups/tests/JoinTest.java
  38. +2 −5 tests/junit/org/jgroups/tests/LargeStateTransferTest.java
  39. +67 −94 tests/junit/org/jgroups/tests/MessageBundlingTest.java
  40. +1 −1 tests/junit/org/jgroups/tests/NAKACK_Test.java
  41. +1 −1 tests/junit/org/jgroups/tests/OOBTest.java
  42. +23 −27 tests/junit/org/jgroups/tests/UUIDCacheClearTest.java
View
@@ -3,7 +3,7 @@
parallel="false"
thread-count="1" >
- <test name="byteman" junit="false" time-out="30000">
+ <test name="byteman" junit="false" time-out="1200000">
<groups>
<run>
<include name="byteman"/>
@@ -4,7 +4,7 @@
thread-count="5"
>
- <test name="tcp" junit="false" time-out="1200000">
+ <test name="tcp" junit="false" time-out="60000">
<parameter name="channel.conf" value="tcp.xml"/>
<groups>
<run>
View
@@ -42,7 +42,7 @@
oob_thread_pool.keep_alive_time="5000"
oob_thread_pool.queue_enabled="false"
oob_thread_pool.queue_max_size="100"
- oob_thread_pool.rejection_policy="Run"/>
+ oob_thread_pool.rejection_policy="discard"/>
<PING timeout="2000"
num_initial_members="20"/>
@@ -43,7 +43,7 @@
oob_thread_pool.min_threads="5"
oob_thread_pool.max_threads="100"
oob_thread_pool.keep_alive_time="5000"
- oob_thread_pool.rejection_policy="Run"/>
+ oob_thread_pool.rejection_policy="discard"/>
<PING timeout="5000"
num_initial_members="100"
View
@@ -42,7 +42,7 @@
oob_thread_pool.keep_alive_time="5000"
oob_thread_pool.queue_enabled="false"
oob_thread_pool.queue_max_size="100"
- oob_thread_pool.rejection_policy="Run"/>
+ oob_thread_pool.rejection_policy="discard"/>
<PING timeout="2000"
num_initial_members="20"/>
@@ -434,15 +434,15 @@ public void send(Message msg) throws Exception {
public void send(Address dst, Object obj) throws Exception {
- send(new Message(dst, null, obj));
+ send(new Message(dst, obj));
}
public void send(Address dst, byte[] buf) throws Exception {
- send(new Message(dst, null, buf));
+ send(new Message(dst, buf));
}
public void send(Address dst, byte[] buf, int offset, int length) throws Exception {
- send(new Message(dst, null, buf, offset, length));
+ send(new Message(dst, buf, offset, length));
}
@@ -724,6 +724,39 @@ public Object up(Event evt) {
}
+ /** Callback invoked by the protocol stack to deliver a message batch */
+ public void up(MessageBatch batch) {
+ if(stats) {
+ received_msgs+=batch.size();
+ received_bytes+=batch.length();
+ }
+
+ // discard local messages (sent by myself to me)
+ if(discard_own_messages && local_addr != null && batch.sender() != null && local_addr.equals(batch.sender()))
+ return;
+
+ for(Message msg: batch) {
+ if(up_handler != null) {
+ try {
+ up_handler.up(new Event(Event.MSG, msg));
+ }
+ catch(Throwable t) {
+ log.error("failed passing message to up-handler", t);
+ }
+ }
+ else if(receiver != null) {
+ try {
+ receiver.receive(msg);
+ }
+ catch(Throwable t) {
+ log.error("failed passing message to receiver", t);
+ }
+ }
+ }
+ }
+
+
+
/**
* Sends an event down the protocol stack. Note that - contrary to {@link #send(Message)}, if the event is a message,
* no checks are performed whether the channel is closed or disconnected.
@@ -331,9 +331,6 @@ protected void correlatorStarted() {
* Use MethodCall.invoke() to do this. Return result.
*/
public Object handle(Message req) throws Exception {
- Object body;
- MethodCall method_call;
-
if(server_obj == null) {
if(log.isErrorEnabled()) log.error("no method handler is registered. Discarding request.");
return null;
@@ -344,13 +341,13 @@ public Object handle(Message req) throws Exception {
return null;
}
- body=req_marshaller != null?
+ Object body=req_marshaller != null?
req_marshaller.objectFromBuffer(req.getBuffer(), req.getOffset(), req.getLength()) : req.getObject();
if(!(body instanceof MethodCall))
throw new IllegalArgumentException("message does not contain a MethodCall object") ;
- method_call=(MethodCall)body;
+ MethodCall method_call=(MethodCall)body;
if(log.isTraceEnabled())
log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call);
@@ -35,7 +35,7 @@
protected final ServerSocket srv_sock;
protected Receiver recvr;
protected final long conn_expire_time;
- protected final Log log=LogFactory.getLog(getClass());
+ protected Log log=LogFactory.getLog(getClass());
protected int recv_buf_size=120000;
protected int send_buf_size=60000;
protected int send_queue_size=0;
@@ -136,6 +136,7 @@ else if(bind_addr != null)
public void retainAll(Collection<Address> members) {mapper.retainAll(members);}
public long getConnectionExpiryTimeout() {return conn_expire_time;}
public int getSenderQueueSize() {return send_queue_size;}
+ public TCPConnectionMap log(Log new_log) {this.log=new_log; return this;}
public void addConnectionMapListener(AbstractConnectionMap.ConnectionMapListener<TCPConnection> l) {
mapper.addConnectionMapListener(l);
@@ -178,12 +179,21 @@ public void send(Address dest, byte[] data, int offset, int length) throws Excep
}
// 1. Try to obtain correct Connection (or create one if not yet existent)
- TCPConnection conn=mapper.getConnection(dest);
+ TCPConnection conn=null;
+ try {
+ conn=mapper.getConnection(dest);
+ }
+ catch(Throwable t) {
+ }
// 2. Send the message using that connection
if(conn != null && !conn.isConnected()) { // perhaps not connected because of concurrent connections (JGRP-1549)
Util.sleepRandom(1, 50);
- conn=mapper.getConnection(dest); // try one more time
+ try {
+ conn=mapper.getConnection(dest); // try one more time
+ }
+ catch(Throwable t) {
+ }
}
if(conn != null) {
@@ -726,7 +736,7 @@ public Mapper(ThreadFactory factory,long reaper_interval) {
super(factory,reaper_interval);
}
- public TCPConnection getConnection(Address dest) throws Exception { // S. Simeonoff
+ public TCPConnection getConnection(Address dest) throws Exception {
TCPConnection conn;
getLock().lock();
try {
@@ -276,7 +276,7 @@ public void viewAccepted(View v) {
members.addAll(v.getMembers());
if(v instanceof MergeView) {
- System.out.println("** MergeView=" + v);
+ System.out.println("** " + v);
// This is an example of a simple merge function, which fetches the state from the coordinator
// on a merge and overwrites all of its own state
@@ -142,21 +142,19 @@ public Object up(Event evt) {
}
public void up(MessageBatch batch) {
- int index=0;
for(Message msg: batch) {
CompressHeader hdr=(CompressHeader)msg.getHeader(this.id);
if(hdr != null) {
Message uncompressed_msg=uncompress(msg, hdr.original_size);
if(uncompressed_msg != null) {
if(log.isTraceEnabled())
log.trace("up(): uncompressed " + msg.getLength() + " bytes to " + uncompressed_msg.getLength() + " bytes");
- batch.set(index, uncompressed_msg); // replace msg in batch with uncompressed_msg
+ batch.replace(msg, uncompressed_msg); // replace msg in batch with uncompressed_msg
}
}
- index++;
}
- if(batch != null)
+ if(!batch.isEmpty())
up_prot.up(batch);
}
@@ -2,7 +2,6 @@
package org.jgroups.protocols;
import org.jgroups.Event;
-import org.jgroups.Message;
import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol;
import org.jgroups.util.MessageBatch;
@@ -43,9 +42,7 @@ public Object up(Event evt) {
public void up(MessageBatch batch) {
if(in_delay > 0)
- for(Message msg: batch)
- if(msg != null)
- Util.sleep(computeDelay(in_delay));
+ Util.sleep(computeDelay(in_delay));
if(!batch.isEmpty())
up_prot.up(batch);
}
@@ -260,7 +260,7 @@ public void sendDiscoveryRequest(String cluster_name, Promise promise, ViewId vi
Collection<PhysicalAddress> cluster_members=fetchClusterMembers(cluster_name);
if(cluster_members == null) {
// multicast msg
- Message msg=new Message(null).setFlag(Message.OOB).putHeader(getId(), hdr);
+ Message msg=new Message(null).setFlag(Message.OOB, Message.Flag.DONT_BUNDLE).putHeader(getId(), hdr);
sendMcastDiscoveryRequest(msg);
}
else {
@@ -281,9 +281,7 @@ public void sendDiscoveryRequest(String cluster_name, Promise promise, ViewId vi
for(final Address addr: cluster_members) {
if(addr.equals(physical_addr)) // no need to send the request to myself
continue;
- final Message msg=new Message(addr, null, null);
- msg.setFlag(Message.OOB);
- msg.putHeader(this.id, hdr);
+ final Message msg=new Message(addr).setFlag(Message.OOB, Message.Flag.DONT_BUNDLE).putHeader(this.id, hdr);
if(log.isTraceEnabled())
log.trace(local_addr + ": sending discovery request to " + msg.getDest());
if(!sendDiscoveryRequestsInParallel()) {
@@ -47,9 +47,7 @@ public Object up(Event evt) {
public void up(MessageBatch batch) {
for(Message msg: batch) {
- if(msg != null) {
- // do something; perhaps check for the presence of a header
- }
+ // do something; perhaps check for the presence of a header
}
if(!batch.isEmpty())
@@ -28,10 +28,10 @@
/* ----------------------------------------- Properties -------------------------------------------------- */
@Property(description="Interval at which a HEARTBEAT is sent to the cluster")
- long interval=3000;
+ long interval=8000;
@Property(description="Timeout after which a node P is suspected if neither a heartbeat nor data were received from P")
- long timeout=10000;
+ long timeout=40000;
@Property(description="Interval at which the HEARTBEAT timeouts are checked")
long timeout_check_interval=2000;
@@ -55,8 +55,7 @@ public Object up(Event evt) {
public void up(MessageBatch batch) {
for(Message msg: batch)
- if(msg != null)
- printMessage(msg, "up");
+ printMessage(msg, "up");
if(!batch.isEmpty())
up_prot.up(batch);
}
@@ -101,10 +101,9 @@ public void up(MessageBatch batch) {
synchronized(up_msgs) {
for(Message msg: batch)
- if(msg != null)
- up_msgs.add(msg);
+ up_msgs.add(msg);
}
- batch.removeAll();
+ batch.clear();
if(up_msgs.size() >= max_size)
shuffleAndSendMessages();
@@ -67,7 +67,7 @@ public void start() throws Exception {
external_addr,
external_port,
bind_port,
- bind_port+port_range);
+ bind_port+port_range).log(log);
ct.setReceiveBufferSize(recv_buf_size);
ct.setSendQueueSize(send_queue_size);
ct.setUseSendQueues(use_send_queues);
Oops, something went wrong.

0 comments on commit 66fee6c

Please sign in to comment.