Skip to content
Browse files

- Back-ported JGRP-1542 and JGRP-1543

- Changed version to 3.2.3
  • Loading branch information...
1 parent 4aeff52 commit 96318863ddb455ba2dd55a843167c792e17de8e9 @belaban committed Nov 27, 2012
View
2 build.xml
@@ -6,7 +6,7 @@
build.xml file for JGroups. Needs Ant (jakarta.apache.org) to run
</description>
- <property name="version" value="3.2.2.Final"/>
+ <property name="version" value="3.2.3.Final"/>
<property name="build.properties.file" value="build.properties"/>
<property file="${build.properties.file}"/>
<property name="root.dir" value="${basedir}"/>
View
2 pom.xml
@@ -5,7 +5,7 @@
<artifactId>jgroups</artifactId>
<packaging>bundle</packaging>
<name>JGroups</name>
- <version>3.2.2.Final</version>
+ <version>3.2.3.Final</version>
<url>http://www.jgroups.org</url>
<description>
Reliable cluster communication toolkit
View
81 src/org/jgroups/Message.java
@@ -26,24 +26,24 @@
* @author Bela Ban
*/
public class Message implements Streamable {
- protected Address dest_addr;
- protected Address src_addr;
+ protected Address dest_addr;
+ protected Address src_addr;
/** The payload */
- private byte[] buf;
+ protected byte[] buf;
/** The index into the payload (usually 0) */
- protected int offset;
+ protected int offset;
/** The number of bytes in the buffer (usually buf.length is buf not equal to null). */
- protected int length;
+ protected int length;
/** All headers are placed here */
- protected Headers headers;
+ protected Headers headers;
- private volatile short flags;
+ protected volatile short flags;
- private volatile byte transient_flags; // transient_flags is neither marshalled nor copied
+ protected volatile byte transient_flags; // transient_flags is neither marshalled nor copied
protected static final Log log=LogFactory.getLog(Message.class);
@@ -208,21 +208,14 @@ public Message(boolean create_headers) {
headers=createHeaders(3);
}
- public Address getDest() {
- return dest_addr;
- }
-
- public void setDest(Address new_dest) {
- dest_addr=new_dest;
- }
-
- public Address getSrc() {
- return src_addr;
- }
-
- public void setSrc(Address new_src) {
- src_addr=new_src;
- }
+ public Address getDest() {return dest_addr;}
+ public Address dest() {return dest_addr;}
+ public void setDest(Address new_dest) {dest_addr=new_dest;}
+ public Message dest(Address new_dest) {dest_addr=new_dest; return this;}
+ public Address getSrc() {return src_addr;}
+ public Address src() {return src_addr;}
+ public void setSrc(Address new_src) {src_addr=new_src;}
+ public Message src(Address new_src) {src_addr=new_src; return this;}
/**
* Returns a <em>reference</em> to the payload (byte buffer). Note that this buffer should not be
@@ -576,15 +569,23 @@ public Message copy(boolean copy_buffer, boolean copy_headers) {
* @return A message with headers whose ID are >= starting_id
*/
public Message copy(boolean copy_buffer, short starting_id) {
+ return copy(copy_buffer, starting_id, (short[])null);
+ }
+
+ /**
+ * Copies a message. Copies only headers with IDs >= starting_id or IDs which are in the copy_only_ids list
+ * @param copy_buffer
+ * @param starting_id
+ * @param copy_only_ids
+ * @return
+ */
+ public Message copy(boolean copy_buffer, short starting_id, short ... copy_only_ids) {
Message retval=copy(copy_buffer, false);
- if(starting_id > 0) {
- for(Map.Entry<Short,Header> entry: getHeaders().entrySet()) {
- short id=entry.getKey();
- if(id >= starting_id)
- retval.putHeader(id, entry.getValue());
- }
+ for(Map.Entry<Short,Header> entry: getHeaders().entrySet()) {
+ short id=entry.getKey();
+ if(id >= starting_id || containsId(id, copy_only_ids))
+ retval.putHeader(id, entry.getValue());
}
-
return retval;
}
@@ -676,7 +677,7 @@ public void writeTo(DataOutput out) throws Exception {
// 1. write the leading byte first
out.write(leading);
- // 2. the flags (e.g. OOB, LOW_PRIO)
+ // 2. the flags (e.g. OOB, LOW_PRIO), skip the transient flags
out.writeShort(flags);
// 3. dest_addr
@@ -891,14 +892,24 @@ public String transientFlagsToString() {
return sb.toString();
}
- private static void writeHeader(Header hdr, DataOutput out) throws Exception {
+ protected static void writeHeader(Header hdr, DataOutput out) throws Exception {
short magic_number=ClassConfigurator.getMagicNumber(hdr.getClass());
out.writeShort(magic_number);
hdr.writeTo(out);
}
- private static Header readHeader(DataInput in) throws Exception {
+ protected static boolean containsId(short id, short[] ids) {
+ if(ids == null)
+ return false;
+ for(short tmp: ids)
+ if(tmp == id)
+ return true;
+ return false;
+ }
+
+
+ protected static Header readHeader(DataInput in) throws Exception {
short magic_number=in.readShort();
Class clazz=ClassConfigurator.get(magic_number);
if(clazz == null)
@@ -909,12 +920,12 @@ private static Header readHeader(DataInput in) throws Exception {
return hdr;
}
- private static Headers createHeaders(int size) {
+ protected static Headers createHeaders(int size) {
return size > 0? new Headers(size) : new Headers(3);
}
- private static Headers createHeaders(Headers m) {
+ protected static Headers createHeaders(Headers m) {
return new Headers(m);
}
View
4 src/org/jgroups/Version.java
@@ -21,8 +21,8 @@
public class Version {
public static final short major = 3;
public static final short minor = 2;
- public static final short micro = 2;
- public static final String description="3.2.2.Final";
+ public static final short micro = 3;
+ public static final String description="3.2.3.Final";
public static final short version=encode(major, minor, micro);
public static final String string_version=print(version);
View
146 src/org/jgroups/blocks/MessageDispatcher.java
@@ -478,6 +478,76 @@ public void channelClosed(Channel channel) {
/* ----------------------------------------------------------------------- */
+ protected Object handleUpEvent(Event evt) throws Exception {
+ switch(evt.getType()) {
+ case Event.MSG:
+ if(msg_listener != null)
+ msg_listener.receive((Message) evt.getArg());
+ break;
+
+ case Event.GET_APPLSTATE: // reply with GET_APPLSTATE_OK
+ byte[] tmp_state=null;
+ if(msg_listener != null) {
+ ByteArrayOutputStream output=new ByteArrayOutputStream(1024);
+ msg_listener.getState(output);
+ tmp_state=output.toByteArray();
+ }
+ return new StateTransferInfo(null, 0L, tmp_state);
+
+ case Event.GET_STATE_OK:
+ if(msg_listener != null) {
+ StateTransferResult result=(StateTransferResult)evt.getArg();
+ ByteArrayInputStream input=new ByteArrayInputStream(result.getBuffer());
+ msg_listener.setState(input);
+ }
+ break;
+
+ case Event.STATE_TRANSFER_OUTPUTSTREAM:
+ OutputStream os=(OutputStream)evt.getArg();
+ if(msg_listener != null && os != null) {
+ msg_listener.getState(os);
+ }
+ break;
+
+ case Event.STATE_TRANSFER_INPUTSTREAM:
+ InputStream is=(InputStream)evt.getArg();
+ if(msg_listener != null && is!=null)
+ msg_listener.setState(is);
+ break;
+
+ case Event.VIEW_CHANGE:
+ View v=(View) evt.getArg();
+ List<Address> new_mbrs=v.getMembers();
+ setMembers(new_mbrs);
+ if(membership_listener != null)
+ membership_listener.viewAccepted(v);
+ break;
+
+ case Event.SET_LOCAL_ADDRESS:
+ if(log.isTraceEnabled())
+ log.trace("setting local_addr (" + local_addr + ") to " + evt.getArg());
+ local_addr=(Address)evt.getArg();
+ break;
+
+ case Event.SUSPECT:
+ if(membership_listener != null)
+ membership_listener.suspect((Address) evt.getArg());
+ break;
+
+ case Event.BLOCK:
+ if(membership_listener != null)
+ membership_listener.block();
+ break;
+ case Event.UNBLOCK:
+ if(membership_listener != null)
+ membership_listener.unblock();
+ break;
+ }
+
+ return null;
+ }
+
+
class MyProbeHandler implements DiagnosticsHandler.ProbeHandler {
public Map<String,String> handleProbe(String... keys) {
@@ -520,82 +590,6 @@ public String getName() {
}
-
- protected Object handleUpEvent(Event evt) throws Exception {
- switch(evt.getType()) {
- case Event.MSG:
- if(msg_listener != null) {
- msg_listener.receive((Message) evt.getArg());
- }
- break;
-
- case Event.GET_APPLSTATE: // reply with GET_APPLSTATE_OK
- byte[] tmp_state=null;
- if(msg_listener != null) {
- ByteArrayOutputStream output=new ByteArrayOutputStream(1024);
- msg_listener.getState(output);
- tmp_state=output.toByteArray();
- }
- return new StateTransferInfo(null, 0L, tmp_state);
-
- case Event.GET_STATE_OK:
- if(msg_listener != null) {
- StateTransferResult result=(StateTransferResult)evt.getArg();
- ByteArrayInputStream input=new ByteArrayInputStream(result.getBuffer());
- msg_listener.setState(input);
- }
- break;
-
- case Event.STATE_TRANSFER_OUTPUTSTREAM:
- OutputStream os=(OutputStream)evt.getArg();
- if(msg_listener != null && os != null) {
- msg_listener.getState(os);
- }
- break;
-
- case Event.STATE_TRANSFER_INPUTSTREAM:
- InputStream is=(InputStream)evt.getArg();
- if(msg_listener != null && is!=null)
- msg_listener.setState(is);
- break;
-
- case Event.VIEW_CHANGE:
- View v=(View) evt.getArg();
- List<Address> new_mbrs=v.getMembers();
- setMembers(new_mbrs);
- if(membership_listener != null)
- membership_listener.viewAccepted(v);
- break;
-
- case Event.SET_LOCAL_ADDRESS:
- if(log.isTraceEnabled())
- log.trace("setting local_addr (" + local_addr + ") to " + evt.getArg());
- local_addr=(Address)evt.getArg();
- break;
-
- case Event.SUSPECT:
- if(membership_listener != null)
- membership_listener.suspect((Address) evt.getArg());
- break;
-
- case Event.BLOCK:
- if(membership_listener != null)
- membership_listener.block();
- break;
- case Event.UNBLOCK:
- if(membership_listener != null)
- membership_listener.unblock();
- break;
- }
-
- return null;
- }
-
-
-
-
-
-
/**
* Called by channel (we registered before) when event is received. This is the UpHandler interface.
*/
View
2 src/org/jgroups/blocks/RequestCorrelator.java
@@ -252,7 +252,7 @@ public boolean receive(Event evt) {
SiteMaster site_master=(SiteMaster)evt.getArg();
short site=site_master.getSite();
setSiteUnreachable(site);
- return true;
+ break; // let others have a stab at this event, too
}
return false;
}
View
11 src/org/jgroups/protocols/FORWARD_TO_COORD.java
@@ -12,6 +12,7 @@
import java.io.DataOutput;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Forwards a message to the current coordinator. When the coordinator changes, forwards all pending messages to
@@ -43,7 +44,7 @@
protected volatile Address local_addr;
/** ID to be used to identify forwarded messages. Wrap-around shouldn't be an issue. */
- protected long current_id=0;
+ protected final AtomicLong current_id=new AtomicLong(0);
protected final ForwardQueue fwd_queue=new ForwardQueue(log);
@@ -58,8 +59,10 @@ public FORWARD_TO_COORD() {
@ManagedAttribute(description="Number of messages for which no ack has been received yet")
- public int getPendingMessages() {return fwd_queue.size();}
- public List<Integer> providedUpServices() {return Arrays.asList(Event.FORWARD_TO_COORD);}
+ public int getForwardTableSize() {return fwd_queue.size();}
+ @ManagedAttribute(description="Total number of all seqnos maintained for all receivers")
+ public int getDeliveryTableSize() {return fwd_queue.deliveryTableSize();}
+ public List<Integer> providedUpServices() {return Arrays.asList(Event.FORWARD_TO_COORD);}
public void start() throws Exception {
super.start();
@@ -152,7 +155,7 @@ public Object up(Event evt) {
- protected synchronized long getNextId() {return current_id++;}
+ protected long getNextId() {return current_id.incrementAndGet();}
protected void handleViewChange(View view) {
View
2 src/org/jgroups/protocols/TP.java
@@ -220,7 +220,7 @@
/** Enable bundling for unicast messages. Ignored if enable_bundling is off */
@Property(description="Enable bundling of smaller messages into bigger ones for unicast messages. Default is false")
- protected boolean enable_unicast_bundling=false;
+ protected boolean enable_unicast_bundling=true;
@Property(description="Switch to enable diagnostic probing. Default is true")
protected boolean enable_diagnostics=true;
View
99 src/org/jgroups/protocols/relay/RELAY2.java
@@ -95,6 +95,8 @@
@ManagedAttribute(description="FORWARD_TO_COORD protocol is present below the current protocol")
protected boolean forwarding_protocol_present;
+ // protocol IDs above RELAY2
+ protected short[] prots_above;
// Fluent configuration
@@ -201,8 +203,11 @@ else if(addr instanceof UUID)
}
});
}
+
+ prots_above=getIdsAbove();
}
+
public void stop() {
super.stop();
is_coord=false;
@@ -263,33 +268,28 @@ public Object down(Event evt) {
Address dest=msg.getDest();
if(dest == null || !(dest instanceof SiteAddress))
break;
- SiteAddress target=(SiteAddress)dest;
-
- byte[] buf=marshal(msg);
- if(buf == null)
- return null; // don't pass down
+ SiteAddress target=(SiteAddress)dest;
Address src=msg.getSrc();
- SiteAddress sender;
- if(src instanceof SiteMaster)
- sender=new SiteMaster(((SiteMaster)src).getSite());
- else
- sender=new SiteUUID((UUID)local_addr, UUID.get(local_addr), site_id);
+ SiteAddress sender=src instanceof SiteMaster? new SiteMaster(((SiteMaster)src).getSite())
+ : new SiteUUID((UUID)local_addr, UUID.get(local_addr), site_id);
- // target is in the same site; we can deliver the message locally
+ // target is in the same site; we can deliver the message in our local cluster
if(target.getSite() == site_id ) {
- if(local_addr.equals(target) || ((target instanceof SiteMaster) && is_coord))
- deliver(target, sender, buf);
+ if(local_addr.equals(target) || (target instanceof SiteMaster && is_coord)) {
+ // we cannot simply pass msg down, as the transport doesn't know how to send a message to a (e.g.) SiteMaster
+ forwardTo(local_addr, target, sender, msg, false);
+ }
else
- deliverLocally(target, sender, buf);
+ deliverLocally(target, sender, msg);
return null;
}
// forward to the coordinator unless we're the coord (then route the message directly)
if(!is_coord)
- forwardTo(coord, target, sender, buf, true);
+ forwardTo(coord, target, sender, msg, true);
else
- route(target, sender, buf);
+ route(target, sender, msg);
return null;
case Event.SET_LOCAL_ADDRESS:
@@ -313,17 +313,16 @@ public Object up(Event evt) {
if(hdr == null) {
// forward a multicast message to all bridges except myself, then pass up
if(dest == null && is_coord && relay_multicasts && !msg.isFlagSet(Message.Flag.NO_RELAY)) {
- byte[] buf=marshal(msg);
Address sender=new SiteUUID((UUID)msg.getSrc(), UUID.get(msg.getSrc()), site_id);
- sendToBridges(sender, buf, site_id);
+ sendToBridges(sender, msg, site_id);
}
break; // pass up
}
else { // header is not null
if(dest != null)
handleMessage(hdr, msg);
else
- deliver(null, hdr.original_sender, msg.getBuffer());
+ deliver(null, hdr.original_sender, msg);
}
return null;
@@ -359,7 +358,7 @@ protected void handleRelayMessage(Relay2Header hdr, Message msg) {
protected void handleMessage(Relay2Header hdr, Message msg) {
switch(hdr.type) {
case Relay2Header.DATA:
- route((SiteAddress)hdr.final_dest, (SiteAddress)hdr.original_sender, msg.getBuffer());
+ route((SiteAddress)hdr.final_dest, (SiteAddress)hdr.original_sender, msg);
break;
case Relay2Header.SITE_UNREACHABLE:
up_prot.up(new Event(Event.SITE_UNREACHABLE, hdr.final_dest));
@@ -379,14 +378,14 @@ protected void handleMessage(Relay2Header hdr, Message msg) {
* @param dest
* @param buf
*/
- protected void route(SiteAddress dest, SiteAddress sender, byte[] buf) {
+ protected void route(SiteAddress dest, SiteAddress sender, Message msg) {
short target_site=dest.getSite();
if(target_site == site_id) {
if(local_addr.equals(dest) || ((dest instanceof SiteMaster) && is_coord)) {
- deliver(dest, sender, buf);
+ deliver(dest, sender, msg);
}
else
- deliverLocally(dest, sender, buf); // send to member in same local site
+ deliverLocally(dest, sender, msg); // send to member in same local site
return;
}
Relayer tmp=relayer;
@@ -399,11 +398,12 @@ protected void route(SiteAddress dest, SiteAddress sender, byte[] buf) {
if(route == null)
log.error(local_addr + ": no route to " + SiteUUID.getSiteName(target_site) + ": dropping message");
else
- route.send(target_site, dest, sender, buf);
+ route.send(target_site, dest, sender, msg);
}
+
/** Sends the message via all bridges excluding the excluded_sites bridges */
- protected void sendToBridges(Address sender, byte[] buf, short ... excluded_sites) {
+ protected void sendToBridges(Address sender, final Message msg, short ... excluded_sites) {
Relayer tmp=relayer;
List<Relayer.Route> routes=tmp != null? tmp.getRoutes(excluded_sites) : null;
if(routes == null)
@@ -412,7 +412,7 @@ protected void sendToBridges(Address sender, byte[] buf, short ... excluded_site
if(log.isTraceEnabled())
log.trace(local_addr + ": relaying multicast message from " + sender + " via route " + route);
try {
- route.send(((SiteAddress)route.siteMaster()).getSite(), null, sender, buf);
+ route.send(((SiteAddress)route.siteMaster()).getSite(), null, sender, msg);
}
catch(Exception ex) {
log.error(local_addr + ": failed relaying message from " + sender + " via route " + route, ex);
@@ -421,29 +421,29 @@ protected void sendToBridges(Address sender, byte[] buf, short ... excluded_site
}
protected void sendSiteUnreachableTo(Address dest, short target_site) {
- Message msg=new Message(dest);
+ Message msg=new Message(dest).setFlag(Message.Flag.OOB);
msg.setSrc(new SiteUUID((UUID)local_addr, UUID.get(local_addr), site_id));
Relay2Header hdr=new Relay2Header(Relay2Header.SITE_UNREACHABLE, new SiteMaster(target_site), null);
msg.putHeader(id, hdr);
down_prot.down(new Event(Event.MSG, msg));
}
- protected void forwardTo(Address next_dest, SiteAddress final_dest, Address original_sender, byte[] buf,
+ protected void forwardTo(Address next_dest, SiteAddress final_dest, Address original_sender, final Message msg,
boolean forward_to_current_coord) {
if(log.isTraceEnabled())
log.trace(local_addr + ": forwarding message to final destination " + final_dest + " to " +
(forward_to_current_coord? " the current coordinator" : next_dest));
- Message msg=new Message(next_dest, buf);
+ Message copy=copy(msg).dest(next_dest).src(null);
Relay2Header hdr=new Relay2Header(Relay2Header.DATA, final_dest, original_sender);
- msg.putHeader(id, hdr);
+ copy.putHeader(id, hdr);
if(forward_to_current_coord && forwarding_protocol_present)
- down_prot.down(new Event(Event.FORWARD_TO_COORD, msg));
+ down_prot.down(new Event(Event.FORWARD_TO_COORD, copy));
else
- down_prot.down(new Event(Event.MSG, msg));
+ down_prot.down(new Event(Event.MSG, copy));
}
-
- protected void deliverLocally(SiteAddress dest, SiteAddress sender, byte[] buf) {
+
+ protected void deliverLocally(SiteAddress dest, SiteAddress sender, Message msg) {
Address local_dest;
boolean send_to_coord=false;
if(dest instanceof SiteUUID) {
@@ -461,38 +461,25 @@ protected void deliverLocally(SiteAddress dest, SiteAddress sender, byte[] buf)
if(log.isTraceEnabled())
log.trace(local_addr + ": delivering message to " + dest + " in local cluster");
- forwardTo(local_dest, dest, sender, buf, send_to_coord);
+ forwardTo(local_dest, dest, sender, msg, send_to_coord);
}
- protected void deliver(Address dest, Address sender, byte[] buf) {
+
+ protected void deliver(Address dest, Address sender, final Message msg) {
try {
- Message original_msg=(Message)Util.streamableFromByteBuffer(Message.class, buf);
- original_msg.setSrc(sender);
- original_msg.setDest(dest);
+ Message copy=copy(msg).dest(dest).src(sender);
if(log.isTraceEnabled())
log.trace(local_addr + ": delivering message from " + sender);
- up_prot.up(new Event(Event.MSG, original_msg));
+ up_prot.up(new Event(Event.MSG, copy));
}
catch(Exception e) {
log.error("failed unmarshalling message", e);
}
}
-
-
- protected byte[] marshal(Message msg) {
- Message tmp=msg.copy(true, Global.BLOCKS_START_ID); // // we only copy headers from building blocks
- // setting dest and src to null reduces the serialized size of the message; we'll set dest/src from the header later
- tmp.setDest(null);
- tmp.setSrc(null);
-
- try {
- return Util.streamableToByteBuffer(tmp);
- }
- catch(Exception e) {
- log.error("marshalling failure", e);
- return null;
- }
+ /** Copies the message, but only the headers above the current protocol (RELAY) (or RpcDispatcher related headers) */
+ protected Message copy(Message msg) {
+ return msg.copy(true, Global.BLOCKS_START_ID, this.prots_above);
}
@@ -571,7 +558,7 @@ protected static Address determineSiteMaster(View view) {
public static class Relay2Header extends Header {
public static final byte DATA = 1;
public static final byte SITE_UNREACHABLE = 2; // final_dest is a SiteMaster
- public static final byte HOST_UNREACHABLE = 3; // final_dest is a SiteUUID
+ public static final byte HOST_UNREACHABLE = 3; // final_dest is a SiteUUID (not currently used)
protected byte type;
protected Address final_dest;
View
38 src/org/jgroups/protocols/relay/Relayer.java
@@ -217,7 +217,7 @@ public Route(Address site_master, JChannel bridge, RELAY2.RouteStatus status) {
public Route status(RELAY2.RouteStatus new_status) {status=new_status; return this;}
public Route reset() {return bridge(null).siteMaster(null).status(RELAY2.RouteStatus.DOWN);}
- public void send(short target_site, Address final_destination, Address original_sender, byte[] buf) {
+ public void send(short target_site, Address final_destination, Address original_sender, final Message msg) {
switch(status) {
case DOWN: // send SITE-UNREACHABLE message back to sender
relay.sendSiteUnreachableTo(original_sender, target_site);
@@ -231,7 +231,7 @@ public void send(short target_site, Address final_destination, Address original_
queue=existing;
}
try {
- queue.put(createMessage(new SiteMaster(target_site), final_destination, original_sender, buf));
+ queue.put(createMessage(new SiteMaster(target_site), final_destination, original_sender, msg));
}
catch(InterruptedException e) {
}
@@ -242,8 +242,8 @@ public void send(short target_site, Address final_destination, Address original_
if(log.isTraceEnabled())
log.trace("routing message to " + final_destination + " via " + site_master);
try {
- Message msg=createMessage(site_master, final_destination, original_sender, buf);
- bridge.send(msg);
+ Message copy=createMessage(site_master, final_destination, original_sender, msg);
+ bridge.send(copy);
}
catch(Exception e) {
log.error("failure relaying message", e);
@@ -255,11 +255,11 @@ public String toString() {
return (site_master != null? site_master + " " : "") + "[" + status + "]";
}
- protected Message createMessage(Address target, Address final_destination, Address original_sender, byte[] buf) {
- Message msg=new Message(target, buf);
+ protected Message createMessage(Address target, Address final_destination, Address original_sender, final Message msg) {
+ Message copy=relay.copy(msg).dest(target).src(null);
RELAY2.Relay2Header hdr=new RELAY2.Relay2Header(RELAY2.Relay2Header.DATA, final_destination, original_sender);
- msg.putHeader(relay.getId(), hdr);
- return msg;
+ copy.putHeader(relay.getId(), hdr);
+ return copy;
}
}
@@ -340,10 +340,10 @@ public void run() {
protected void changeStatusToDown(short id) {
Route route=routes[id];
- if(route.status() == RELAY2.RouteStatus.UP)
+ if(route.status() == RELAY2.RouteStatus.UNKNOWN)
route.status(RELAY2.RouteStatus.DOWN); // SITE-UNREACHABLE responses are sent in this state
else {
- log.warn(relay.getLocalAddress() + ": didn't change status of " + id + " to DOWN as it is UP");
+ log.warn(relay.getLocalAddress() + ": didn't change status of " + SiteUUID.getSiteName(id) + " to DOWN as it is UP");
return;
}
BlockingQueue<Message> msgs=fwd_queue.remove(id);
@@ -374,11 +374,13 @@ protected void changeStatusToUp(final short id, JChannel bridge, Address site_ma
case UNKNOWN:
case DOWN: // queue should be empty, but anyway...
cancelTask(id);
- relay.getTimer().execute(new Runnable() {
- public void run() {
- flushQueue(id, route);
- }
- });
+ if(old_status == RELAY2.RouteStatus.UNKNOWN) {
+ relay.getTimer().execute(new Runnable() {
+ public void run() {
+ flushQueue(id, route);
+ }
+ });
+ }
break;
}
}
@@ -398,12 +400,10 @@ protected void flushQueue(short id, Route route) {
JChannel bridge=route.bridge();
if(log.isTraceEnabled())
log.trace(relay.getLocalAddress() + ": forwarding " + msgs.size() + " queued messages");
- System.out.println(relay.getLocalAddress() + ": forwarding " + msgs.size() + " queued messages");
while((msg=msgs.poll()) != null && route.status() == RELAY2.RouteStatus.UP) {
try {
- Message copy=msg.copy(); // need the copy to change the destination to the site master
- copy.setDest(route.siteMaster());
- bridge.send(copy);
+ msg.setDest(route.siteMaster()); // the message in the queue is already a copy !
+ bridge.send(msg);
}
catch(Throwable ex) {
log.error("failed forwarding queued message to " + SiteUUID.getSiteName(id), ex);
View
93 src/org/jgroups/stack/Protocol.java
@@ -70,21 +70,21 @@
* (capitalization not relevant)
*/
@Property(name="level", description="Sets the logger level (see javadocs)")
- public void setLevel(String level) {
- log.setLevel(level);
- }
-
- public String getLevel() {
- return log.getLevel();
- }
-
- public boolean isErgonomics() {
- return ergonomics;
- }
-
- public void setErgonomics(boolean ergonomics) {
- this.ergonomics=ergonomics;
- }
+ public void setLevel(String level) {log.setLevel(level);}
+ public String getLevel() {return log.getLevel();}
+ public boolean isErgonomics() {return ergonomics;}
+ public void setErgonomics(boolean ergonomics) {this.ergonomics=ergonomics;}
+ public ProtocolStack getProtocolStack() {return stack;}
+ public boolean statsEnabled() {return stats;}
+ public void enableStats(boolean flag) {stats=flag;}
+ public String getName() {return name;}
+ public short getId() {return id;}
+ public void setId(short id) {this.id=id;}
+ public Protocol getUpProtocol() {return up_prot;}
+ public Protocol getDownProtocol() {return down_prot;}
+ public void setUpProtocol(Protocol prot) {this.up_prot=prot;}
+ public void setDownProtocol(Protocol prot) {this.down_prot=prot;}
+ public void setProtocolStack(ProtocolStack s) {this.stack=s;}
public Object getValue(String name) {
@@ -122,21 +122,31 @@ public Protocol setValue(String name, Object value) {
- public ProtocolStack getProtocolStack() {
- return stack;
- }
-
/**
* After configuring the protocol itself from the properties defined in the XML config, a protocol might have
* additional objects which need to be configured. This callback allows a protocol developer to configure those
* other objects. This call is guaranteed to be invoked <em>after</em> the protocol itself has
* been configured. See AUTH for an example.
* @return
*/
- protected List<Object> getConfigurableObjects() {
- return null;
+ protected List<Object> getConfigurableObjects() {return null;}
+
+ /** Returns the protocol IDs of all protocols above this one (excluding the current protocol) */
+ public short[] getIdsAbove() {
+ short[] retval;
+ List<Short> ids=new ArrayList<Short>();
+ Protocol current=up_prot;
+ while(current != null) {
+ ids.add(current.getId());
+ current=current.up_prot;
+ }
+ retval=new short[ids.size()];
+ for(int i=0; i < ids.size(); i++)
+ retval[i]=ids.get(i);
+ return retval;
}
+
protected TP getTransport() {
Protocol retval=this;
while(retval != null && retval.down_prot != null) {
@@ -176,14 +186,6 @@ public void setSocketFactory(SocketFactory factory) {
}
- public boolean statsEnabled() {
- return stats;
- }
-
- public void enableStats(boolean flag) {
- stats=flag;
- }
-
@ManagedOperation(description="Resets all stats")
public void resetStatistics() {resetStats();}
@@ -340,39 +342,6 @@ public void destroy() {
}
- /** All protocol names have to be unique ! */
- public String getName() {
- return name;
- }
-
- public short getId() {
- return id;
- }
-
- public void setId(short id) {
- this.id=id;
- }
-
- public Protocol getUpProtocol() {
- return up_prot;
- }
-
- public Protocol getDownProtocol() {
- return down_prot;
- }
-
- public void setUpProtocol(Protocol up_prot) {
- this.up_prot=up_prot;
- }
-
- public void setDownProtocol(Protocol down_prot) {
- this.down_prot=down_prot;
- }
-
- public void setProtocolStack(ProtocolStack stack) {
- this.stack=stack;
- }
-
/**
* An event was received from the layer below. Usually the current layer will want to examine
View
23 src/org/jgroups/util/ForwardQueue.java
@@ -6,10 +6,7 @@
import org.jgroups.logging.Log;
import org.jgroups.stack.Protocol;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
+import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -20,7 +17,7 @@
/**
* Forwards messages in FIFO order to a destination. Uses IDs to prevent duplicates. Used by
- * {@link org.jgroups.protocols.SEQUENCER} and {@link org.jgroups.protocols.FORWARD_TO_COORD}.
+ * {@link org.jgroups.protocols.FORWARD_TO_COORD}.
* @author Bela Ban
* @since 3.3
*/
@@ -33,8 +30,7 @@
// protected volatile Address target;
/** Maintains messages forwarded to the target which which no ack has been received yet.
- * Needs to be sorted so we can resend them in the right order
- */
+ * Needs to be sorted so we can resend them in the right order */
protected final NavigableMap<Long,Message> forward_table=new ConcurrentSkipListMap<Long,Message>();
protected final Lock send_lock=new ReentrantLock();
@@ -60,7 +56,7 @@
protected final Log log;
/** Size of the set to store received seqnos (for duplicate checking) */
- protected int delivery_table_max_size=2000;
+ protected int delivery_table_max_size=500;
@@ -83,6 +79,15 @@ public ForwardQueue(Log log) {
public void setDeliveryTableMaxSize(int max_size) {this.delivery_table_max_size=max_size;}
+ /** Total size of all queues of the delivery table */
+ public int deliveryTableSize() {
+ int retval=0;
+ for(Set<Long> val: delivery_table.values())
+ retval+=val.size();
+ return retval;
+ }
+
+
public void start() {
running=true;
}
@@ -243,6 +248,8 @@ protected boolean canDeliver(Address sender, long seqno) {
int size=seqno_set.size();
if(size > delivery_table_max_size) {
// trim the seqno_set to delivery_table_max_size elements by removing the first N seqnos
+
+ // iteration: very bad !!!
for(int i=0; i < size - delivery_table_max_size; i++) {
if(seqno_set.pollFirst() == null)
break;
View
8 src/org/jgroups/util/Util.java
@@ -2291,6 +2291,14 @@ public static boolean all(Collection c, Object obj) {
}
+ public static <T> boolean contains(T key, T[] list) {
+ if(list == null) return false;
+ for(T tmp: list)
+ if(tmp == key || tmp.equals(key))
+ return true;
+ return false;
+ }
+
public static boolean containsViewId(Collection<View> views, ViewId vid) {
for(View view: views) {
View
4 tests/other/org/jgroups/tests/UnicastTestRpc.java
@@ -200,8 +200,8 @@ void invokeRpcs() throws Throwable {
double time_per_req=time / (double)num_msgs;
double reqs_sec=num_msgs / (time / 1000.0);
double throughput=num_msgs * msg_size / (time / 1000.0);
- System.out.println(Util.bold("\ninvoked " + num_msgs + " requests in " + time + " ms: " + time_per_req + " ms / req, " +
- String.format("%.2f", reqs_sec) + " reqs/sec, " + Util.printBytes(throughput) + " / sec\n"));
+ System.out.println(Util.bold("\ninvoked " + num_msgs + " requests in " + time + " ms: " + time_per_req + " ms/req, " +
+ String.format("%.2f", reqs_sec) + " reqs/sec, " + Util.printBytes(throughput) + "/sec\n"));
}
void setSenderThreads() throws Exception {

0 comments on commit 9631886

Please sign in to comment.
Something went wrong with that request. Please try again.