Skip to content

Commit

Permalink
- Added Table.add() which removes the last added element if it is nex…
Browse files Browse the repository at this point in the history
…t-in-line to be removed

- Adding and at the same time removing a message from table in NAKACK2 if DONT_LOOPBACK is set (https://issues.jboss.org/browse/JGRP-1835)
- If an RSVP message is DONT_LOOPBACK, when sending we remove ourself from the target list and don't block on self
- New Table.add() with filter which removes consecutive messages matching the filter
- NAKACK2 uses Table.add() with filter removing messages (when sending) with DONT_LOOPBACK
- FlowControl: when sending messages with DONT_LOOPBACK, return credits if needed (as the message won't be delivered to self)
-UNICAST2 / UNICAST3 now add DONT_LOOPBACK messages (when sending) with filter, which removes all consecutive DONT_LOOPBACK messages and purges them
- UNICAST3: when removing INTERNAL msg, don't check if seqno < HR as this would not remove msg if seqn == HR !
- MPerf: prevent results less than num_msgs
- Added project.properties
  • Loading branch information
belaban committed May 12, 2014
1 parent 07f34b3 commit 4164438
Show file tree
Hide file tree
Showing 15 changed files with 343 additions and 76 deletions.
6 changes: 6 additions & 0 deletions doc/manual/en/modules/advanced.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2414,6 +2414,12 @@ msg.setFlag(Message.OOB, Message.NO_FC);
Note that this is a <emphasis>transient flag</emphasis>, so Message.isTransientFlagSet(..) has
to be used instead of Message.isFlagSet(..
</para>
<note>
<para>
Note that DONT_LOOPBACK does not make any sense for <emphasis>unicast</emphasis> messages,
as the sender of a message sent to itself will never receive it.
</para>
</note>
</listitem>
</varlistentry>
</variablelist>
Expand Down
66 changes: 66 additions & 0 deletions project.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Modify this file, publish it on your web site and contact
# JBoss middleware team to set project matrix to parse this file
# for your project information

# Required fields
#######################################################
# should be something like /project or /project/subproject. Only alphanumeric
# characters are allowed (no spaces)!
nodePath=/jgroups
# Human readable name of the project. Everything after " - " is cut off.
projectName=JGroups
# Main URL to your project
homePage=http://www.jgroups.org

# Miscellaneous fields
#######################################################
description=An application server for Clojure, built on JBoss AS
# ASL, LGPL, LGPL2, LGPL3, EPL, other
license=ASL2
# if license is other, fill the URL of the license here
otherLicenseLink=
specialIcon=
archived=
excludeInProjectMatrix=
subProjects=


# Downloads
#######################################################
downloadsLink=https://sourceforge.net/projects/javagroups/files/

# Documents
#######################################################
docsLink=http://www.jgroups.org/manual/html/index.html

# Community links
#######################################################
communityLink=http://www.jgroups.org
knowledgeBaseLink=
userForumLink=https://sourceforge.net/p/javagroups/mailman/javagroups-users/
devForumLink=https://sourceforge.net/p/javagroups/mailman/javagroups-development/
mailingListLink=https://sourceforge.net/p/javagroups/mailman/javagroups-users/
chatLink=irc://freenode.net/jgroups
blogLink=http://belaban.blogspot.ch/
twitterLink=

# Source code links
#######################################################
srcLink=https://github.com/belaban/JGroups
anonymousLink=https://github.com/belaban/JGroups.git
committerLink=git@github.com:belaban/JGroups.git
fisheyeLink=
viewvcLink=
githubLink=https://github.com/belaban/JGroups
anonymousGitLink=https://github.com/belaban/JGroups.git
committerGitLink=git@github.com:belaban/JGroups.git

# Build links
#######################################################
buildLink=
hudsonLink=

# Issue tracker links
#######################################################
issueTrackerLink=https://issues.jboss.org/browse/JGRP
jiraLink=
10 changes: 3 additions & 7 deletions src/org/jgroups/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@


import org.jgroups.conf.ClassConfigurator;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.util.*;

import java.io.*;
import java.io.DataInput;
import java.io.DataOutput;
import java.util.Map;

/**
Expand Down Expand Up @@ -42,8 +41,6 @@ public class Message implements Streamable {

protected volatile byte transient_flags; // transient_flags is neither marshalled nor copied

protected static final Log log=LogFactory.getLog(Message.class);



static final byte DEST_SET = 1;
Expand Down Expand Up @@ -400,7 +397,7 @@ public Message setTransientFlag(TransientFlag ... flags) {
if(flags != null)
for(TransientFlag flag: flags)
if(flag != null)
transient_flags |= flag.value();
transient_flags|=flag.value();
return this;
}

Expand Down Expand Up @@ -948,7 +945,6 @@ protected static Headers createHeaders(Headers m) {
return new Headers(m);
}


/* ------------------------------- End of Private methods ---------------------------- */


Expand Down
2 changes: 2 additions & 0 deletions src/org/jgroups/blocks/RequestCorrelator.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ public void sendRequest(long id, Collection<Address> dest_mbrs, Message msg, Rsp
for(Address mbr: dest_mbrs) {
Message copy=msg.copy(true);
copy.setDest(mbr);
if(!mbr.equals(local_addr) && copy.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK))
copy.clearTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
transport.down(new Event(Event.MSG, copy));
}
}
Expand Down
17 changes: 16 additions & 1 deletion src/org/jgroups/protocols/FlowControl.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public abstract class FlowControl extends Protocol {
*/
protected final Map<Address,Credit> received=Util.createConcurrentMap();

protected Address local_addr;


/** Whether FlowControl is still running, this is set to false when the protocol terminates (on stop()) */
protected volatile boolean running=true;
Expand Down Expand Up @@ -321,7 +323,16 @@ public Object down(Event evt) {
if(length == 0)
break;

return handleDownMessage(evt, msg, dest, length);
Object retval=handleDownMessage(evt, msg, dest, length);

// if the message is DONT_LOOPBACK, we will not receive it, therefore the credit
// check needs to be done now
if(msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK)) {
long new_credits=adjustCredit(received, local_addr, length);
if(new_credits > 0)
sendCredit(local_addr, new_credits);
}
return retval;

case Event.CONFIG:
handleConfigEvent((Map<String,Object>)evt.getArg());
Expand All @@ -330,6 +341,10 @@ public Object down(Event evt) {
case Event.VIEW_CHANGE:
handleViewChange(((View)evt.getArg()).getMembers());
break;

case Event.SET_LOCAL_ADDRESS:
local_addr=(Address)evt.getArg();
break;
}
return down_prot.down(evt); // this could potentially use the lower protocol's thread which may block
}
Expand Down
3 changes: 3 additions & 0 deletions src/org/jgroups/protocols/RSVP.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ public Object down(Event evt) {
log.trace(local_addr + ": " + hdr.typeToString() + " --> " + target);
retval=down_prot.down(evt);

if(msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK))
entry.ack(local_addr);

// 4. Block on AckCollector
entry.block(timeout);
}
Expand Down
1 change: 0 additions & 1 deletion src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,6 @@ public Object down(Event evt) {
boolean multicast=dest == null, do_send=multicast || !dest.equals(sender),
loop_back=(multicast || dest.equals(sender)) && !msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);


if(loopback_separate_thread) {
if(loop_back)
loopback(msg, multicast);
Expand Down
11 changes: 10 additions & 1 deletion src/org/jgroups/protocols/UNICAST2.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public class UNICAST2 extends Protocol implements AgeOutCache.Handler<Address> {

protected Future<?> connection_reaper; // closes idle connections

protected static final Filter<Message> dont_loopback_filter=new Filter<Message>() {
public boolean accept(Message msg) {
return msg != null && msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);
}
};

@Deprecated
public int[] getTimeout() {return timeout;}
Expand Down Expand Up @@ -515,15 +520,19 @@ public Object down(Event evt) {
}
}

boolean dont_loopback_set=msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK)
&& dst.equals(local_addr);
short send_conn_id=entry.send_conn_id;
long seqno=entry.sent_msgs_seqno.getAndIncrement();
long sleep=10;
do {
try {
msg.putHeader(this.id, Unicast2Header.createDataHeader(seqno, send_conn_id, seqno == DEFAULT_FIRST_SEQNO));
entry.sent_msgs.add(seqno,msg); // add *including* UnicastHeader, adds to retransmitter
entry.sent_msgs.add(seqno,msg, dont_loopback_set? dont_loopback_filter : null); // add *including* UnicastHeader, adds to retransmitter
if(conn_expiry_timeout > 0)
entry.update();
if(dont_loopback_set)
entry.sent_msgs.purge(entry.sent_msgs.getHighestDeliverable());
break;
}
catch(Throwable t) {
Expand Down
14 changes: 12 additions & 2 deletions src/org/jgroups/protocols/UNICAST3.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {
public boolean accept(Message msg) {return msg != null && msg.hashCode() != DUMMY_OOB_MSG.hashCode();}
};

protected static final Filter<Message> dont_loopback_filter=new Filter<Message>() {
public boolean accept(Message msg) {
return msg != null && msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);
}
};


public void setMaxMessageBatchSize(int size) {
if(size >= 1)
Expand Down Expand Up @@ -521,15 +527,19 @@ public Object down(Event evt) {
if(entry.state() == State.CLOSING)
entry.state(State.OPEN);

boolean dont_loopback_set=msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK)
&& dst.equals(local_addr);
short send_conn_id=entry.connId();
long seqno=entry.sent_msgs_seqno.getAndIncrement();
long sleep=10;
do {
try {
msg.putHeader(this.id,Header.createDataHeader(seqno,send_conn_id,seqno == DEFAULT_FIRST_SEQNO));
entry.sent_msgs.add(seqno,msg); // add *including* UnicastHeader, adds to retransmitter
entry.sent_msgs.add(seqno, msg, dont_loopback_set? dont_loopback_filter : null); // add *including* UnicastHeader, adds to retransmitter
if(conn_expiry_timeout > 0)
entry.update();
if(dont_loopback_set)
entry.sent_msgs.purge(entry.sent_msgs.getHighestDeliverable());
break;
}
catch(Throwable t) {
Expand Down Expand Up @@ -706,7 +716,7 @@ protected void handleDataReceived(final Address sender, long seqno, short conn_i
if(oob && msg.isFlagSet(Message.Flag.INTERNAL)) {
// If there are other msgs, tell the regular thread pool to handle them (https://issues.jboss.org/browse/JGRP-1732)
final AtomicBoolean processing=win.getProcessing();
if(!win.isEmpty() && !processing.get() && seqno < win.getHighestReceived()) {
if(!win.isEmpty() && !processing.get() /* && seqno < win.getHighestReceived() */) { // commented to handle hd == hr !
Executor pool=getTransport().getDefaultThreadPool();
pool.execute(new Runnable() {
public void run() {
Expand Down
13 changes: 10 additions & 3 deletions src/org/jgroups/protocols/pbcast/NAKACK2.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ public boolean accept(Message msg) {
}
};

protected static final Filter<Message> dont_loopback_filter=new Filter<Message>() {
public boolean accept(Message msg) {
return msg != null && msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);
}
};


@ManagedAttribute(description="Number of retransmit requests received")
protected final AtomicLong xmit_reqs_received=new AtomicLong(0);
Expand Down Expand Up @@ -726,15 +732,16 @@ protected void send(Event evt, Message msg) {
if(buf == null) // discard message if there is no entry for local_addr
return;

if(msg.getSrc() == null)
msg.setSrc(local_addr); // this needs to be done so we can check whether the message sender is the local_addr
if(msg.src() == null)
msg.src(local_addr); // this needs to be done so we can check whether the message sender is the local_addr

boolean dont_loopback_set=msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);
msg_id=seqno.incrementAndGet();
long sleep=10;
do {
try {
msg.putHeader(this.id, NakAckHeader2.createMessageHeader(msg_id));
buf.add(msg_id, msg);
buf.add(msg_id, msg, dont_loopback_set? dont_loopback_filter : null);
break;
}
catch(Throwable t) {
Expand Down
46 changes: 43 additions & 3 deletions src/org/jgroups/util/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,25 @@ public void setHighestDelivered(long seqno) {
public boolean add(long seqno, T element) {
lock.lock();
try {
return _add(seqno, element, true);
return _add(seqno, element, true, null);
}
finally {
lock.unlock();
}
}

/**
* Adds an element if the element at the given index is null. Returns true if no element existed at the given index,
* else returns false and doesn't set the element.
* @param seqno
* @param element
* @param remove_filter If not null, a filter used to remove all consecutive messages passing the filter
* @return True if the element at the computed index was null, else false
*/
public boolean add(long seqno, T element, Filter<T> remove_filter) {
lock.lock();
try {
return _add(seqno, element, true, remove_filter);
}
finally {
lock.unlock();
Expand Down Expand Up @@ -240,7 +258,7 @@ public boolean add(final List<Tuple<Long,T>> list, boolean remove_added_elements
Tuple<Long,T> tuple=it.next();
long seqno=tuple.getVal1();
T element=const_value != null? const_value : tuple.getVal2();
if(_add(seqno, element, false))
if(_add(seqno, element, false, null))
added=true;
else if(remove_added_elements)
it.remove();
Expand Down Expand Up @@ -488,7 +506,7 @@ public void forEach(long from, long to, Visitor<T> visitor) {
}
}

protected boolean _add(long seqno, T element, boolean check_if_resize_needed) {
protected boolean _add(long seqno, T element, boolean check_if_resize_needed, Filter<T> remove_filter) {
if(seqno <= hd)
return false;

Expand All @@ -505,6 +523,8 @@ protected boolean _add(long seqno, T element, boolean check_if_resize_needed) {
size++;
if(seqno > hr)
hr=seqno;
if(remove_filter != null && hd +1 == seqno)
forEach(hd+1, hr, new RemoverOnAdd(remove_filter));
return true;
}
return false;
Expand Down Expand Up @@ -765,6 +785,26 @@ public boolean visit(long seqno, T element, int row, int column) {
}
}

protected class RemoverOnAdd implements Visitor<T> {
protected final Filter<T> filter;

public RemoverOnAdd(Filter<T> remover) {
filter=remover;
}

@GuardedBy("lock")
public boolean visit(long seqno, T element, int row, int column) {
if(element == null || !filter.accept(element))
return false;
if(seqno > hd)
hd=seqno;
size=Math.max(size-1, 0); // cannot be < 0 (well that would be a bug, but let's have this 2nd line of defense !)
return true;
}
}



protected class Dump implements Visitor<T> {
protected final StringBuilder sb=new StringBuilder();
protected boolean first=true;
Expand Down
Loading

0 comments on commit 4164438

Please sign in to comment.