Skip to content

Commit

Permalink
- Added Table.removeMany() which is passed a message batch that messa…
Browse files Browse the repository at this point in the history
…ges from the table are moved into (https://issues.jboss.org/browse/JGRP-2126)

- Using MessageBatch$BatchIterator.remove() (O[1]) instead of MessageBatch.remove() (O[n])
- Replaced MessageBatch.getMatchingMessages() with replaceIf()
  • Loading branch information
belaban committed Jan 3, 2017
1 parent fea7759 commit f888cea
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 136 deletions.
7 changes: 4 additions & 3 deletions src/org/jgroups/protocols/FD_ALL.java
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;


/** /**
Expand Down Expand Up @@ -86,7 +87,7 @@ public class FD_ALL extends Protocol {


protected final Lock lock=new ReentrantLock(); protected final Lock lock=new ReentrantLock();



protected final Predicate<Message> HAS_HEADER=msg -> msg.getHeader(this.id) != null;




public FD_ALL() {} public FD_ALL() {}
Expand Down Expand Up @@ -198,8 +199,8 @@ else if(msg_counts_as_heartbeat) {




public void up(MessageBatch batch) { public void up(MessageBatch batch) {
Collection<Message> msgs=batch.getMatchingMessages(id, true); int matching_msgs=batch.replaceIf(HAS_HEADER, null, true);
if((msgs != null && !msgs.isEmpty()) || msg_counts_as_heartbeat) { if(matching_msgs > 0 || msg_counts_as_heartbeat) {
update(batch.sender()); update(batch.sender());
num_heartbeats_received++; num_heartbeats_received++;
if(has_suspected_mbrs) if(has_suspected_mbrs)
Expand Down
7 changes: 5 additions & 2 deletions src/org/jgroups/protocols/FD_ALL2.java
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;


/** /**
Expand Down Expand Up @@ -79,6 +80,8 @@ public class FD_ALL2 extends Protocol {


protected final Lock lock=new ReentrantLock(); protected final Lock lock=new ReentrantLock();


protected final Predicate<Message> HAS_HEADER=msg -> msg.getHeader(this.id) != null;

protected final BoundedList<Tuple<Address,Long>> suspect_history=new BoundedList<>(20); protected final BoundedList<Tuple<Address,Long>> suspect_history=new BoundedList<>(20);




Expand Down Expand Up @@ -180,8 +183,8 @@ else if(msg_counts_as_heartbeat) {




public void up(MessageBatch batch) { public void up(MessageBatch batch) {
Collection<Message> msgs=batch.getMatchingMessages(id, true); int matched_msgs=batch.replaceIf(HAS_HEADER, null, true);
if((msgs != null && !msgs.isEmpty()) || msg_counts_as_heartbeat) { if(matched_msgs > 0 || msg_counts_as_heartbeat) {
update(batch.sender()); update(batch.sender());
num_heartbeats_received++; num_heartbeats_received++;
if(has_suspected_mbrs) if(has_suspected_mbrs)
Expand Down
32 changes: 15 additions & 17 deletions src/org/jgroups/protocols/SEQUENCER2.java
Expand Up @@ -21,6 +21,8 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;




Expand Down Expand Up @@ -54,6 +56,9 @@ public class SEQUENCER2 extends Protocol {


protected volatile boolean running=true; protected volatile boolean running=true;


protected static final BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR=MessageBatch::add;
protected static final Predicate<MessageBatch> BATCH_VALIDATOR=mb -> mb != null && !mb.isEmpty();



@ManagedAttribute protected long request_msgs; @ManagedAttribute protected long request_msgs;
@ManagedAttribute protected long response_msgs; @ManagedAttribute protected long response_msgs;
Expand Down Expand Up @@ -367,23 +372,16 @@ protected void deliver(Message msg, SequencerHeader hdr) {




protected void removeAndDeliver(final AtomicBoolean processing, Table<Message> win, Address sender) { protected void removeAndDeliver(final AtomicBoolean processing, Table<Message> win, Address sender) {
boolean released_processing=false; final MessageBatch batch=new MessageBatch(max_msg_batch_size).dest(local_addr).sender(sender).multicast(false);
try { Supplier<MessageBatch> batch_creator=() -> batch;
while(true) { while(true) {
List<Message> list=win.removeMany(processing, true, max_msg_batch_size); batch.reset();
if(list != null) // list is guaranteed to NOT contain any OOB messages as the drop_oob_msgs_filter removed them win.removeMany(processing, true, max_msg_batch_size, null,
deliverBatch(new MessageBatch(local_addr, sender, null, false, list)); batch_creator, BATCH_ACCUMULATOR, BATCH_VALIDATOR);
else { if(batch.isEmpty())
released_processing=true; return;
return; // batch is guaranteed to NOT contain any OOB messages as the drop_oob_msgs_filter removed them
} deliverBatch(batch);
}
}
finally {
// processing is always set in win.remove(processing) above and never here ! This code is just a
// 2nd line of defense should there be an exception before win.removeMany(processing) sets processing
if(!released_processing)
processing.set(false);
} }
} }


Expand Down
50 changes: 25 additions & 25 deletions src/org/jgroups/protocols/UNICAST3.java
Expand Up @@ -17,7 +17,9 @@
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.function.Supplier;




/** /**
Expand Down Expand Up @@ -143,6 +145,8 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {
protected static final Predicate<Message> dont_loopback_filter= protected static final Predicate<Message> dont_loopback_filter=
msg -> msg != null && msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK); msg -> msg != null && msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);


protected static final BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR=MessageBatch::add;
protected static final Predicate<MessageBatch> BATCH_VALIDATOR=mb -> mb != null && !mb.isEmpty();


public void setMaxMessageBatchSize(int size) { public void setMaxMessageBatchSize(int size) {
if(size >= 1) if(size >= 1)
Expand Down Expand Up @@ -488,11 +492,12 @@ public void up(MessageBatch batch) {
Map<Short,List<LongTuple<Message>>> msgs=new LinkedHashMap<>(); Map<Short,List<LongTuple<Message>>> msgs=new LinkedHashMap<>();
ReceiverEntry entry=recv_table.get(batch.sender()); ReceiverEntry entry=recv_table.get(batch.sender());


for(Message msg: batch) { for(Iterator<Message> it=batch.iterator(); it.hasNext();) {
Message msg=it.next();
UnicastHeader3 hdr; UnicastHeader3 hdr;
if(msg == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY) || (hdr=msg.getHeader(id)) == null) if(msg == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY) || (hdr=msg.getHeader(id)) == null)
continue; continue;
batch.remove(msg); // remove the message from the batch, so it won't be passed up the stack it.remove(); // remove the message from the batch, so it won't be passed up the stack


if(hdr.type != UnicastHeader3.DATA) { if(hdr.type != UnicastHeader3.DATA) {
handleUpEvent(msg.getSrc(), msg, hdr); handleUpEvent(msg.getSrc(), msg, hdr);
Expand Down Expand Up @@ -528,19 +533,20 @@ public void up(MessageBatch batch) {
protected void handleBatchFromSelf(MessageBatch batch, Entry entry) { protected void handleBatchFromSelf(MessageBatch batch, Entry entry) {
List<LongTuple<Message>> list=new ArrayList<>(batch.size()); List<LongTuple<Message>> list=new ArrayList<>(batch.size());


for(Message msg: batch) { for(Iterator<Message> it=batch.iterator(); it.hasNext();) {
Message msg=it.next();
UnicastHeader3 hdr; UnicastHeader3 hdr;
if(msg == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY) || (hdr=msg.getHeader(id)) == null) if(msg == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY) || (hdr=msg.getHeader(id)) == null)
continue; continue;
batch.remove(msg); // remove the message from the batch, so it won't be passed up the stack it.remove(); // remove the message from the batch, so it won't be passed up the stack


if(hdr.type != UnicastHeader3.DATA) { if(hdr.type != UnicastHeader3.DATA) {
handleUpEvent(msg.getSrc(), msg, hdr); handleUpEvent(msg.getSrc(), msg, hdr);
continue; continue;
} }


if(entry.conn_id != hdr.conn_id) { if(entry.conn_id != hdr.conn_id) {
batch.remove(msg); it.remove();
continue; continue;
} }
list.add(new LongTuple<>(hdr.seqno(), msg)); list.add(new LongTuple<>(hdr.seqno(), msg));
Expand Down Expand Up @@ -875,26 +881,20 @@ protected void handleBatchReceived(final ReceiverEntry entry, Address sender, Li
* order in which they were sent * order in which they were sent
*/ */
protected void removeAndDeliver(final AtomicBoolean processing, Table<Message> win, Address sender) { protected void removeAndDeliver(final AtomicBoolean processing, Table<Message> win, Address sender) {
boolean released_processing=false; final MessageBatch batch=new MessageBatch(max_msg_batch_size).dest(local_addr).sender(sender).multicast(false);
try { Supplier<MessageBatch> batch_creator=() -> batch;
while(true) {
List<Message> list=win.removeMany(processing, true, max_msg_batch_size, drop_oob_and_dont_loopback_msgs_filter); while(true) {
if(list != null) { // list is guaranteed to NOT contain any OOB messages as the drop_oob_msgs_filter removed them batch.reset(); // sets index to 0: important as batch delivery may not remove messages from batch!
if(stats) win.removeMany(processing, true, max_msg_batch_size, drop_oob_and_dont_loopback_msgs_filter,
avg_delivery_batch_size.add(list.size()); batch_creator, BATCH_ACCUMULATOR, BATCH_VALIDATOR);
deliverBatch(new MessageBatch(local_addr, sender, null, false, list)); if(batch.isEmpty())
} return;
else {
released_processing=true; // batch is guaranteed to NOT contain any OOB messages as the drop_oob_msgs_filter removed them
return; if(stats)
} avg_delivery_batch_size.add(batch.size());
} deliverBatch(batch);
}
finally {
// processing is always set in win.remove(processing) above and never here ! This code is just a
// 2nd line of defense should there be an exception before win.removeMany(processing) sets processing
if(!released_processing)
processing.set(false);
} }
} }


Expand Down
44 changes: 21 additions & 23 deletions src/org/jgroups/protocols/pbcast/NAKACK2.java
Expand Up @@ -19,7 +19,9 @@
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.function.Supplier;




/** /**
Expand Down Expand Up @@ -150,6 +152,9 @@ public class NAKACK2 extends Protocol implements DiagnosticsHandler.ProbeHandler


protected static final Predicate<Message> dont_loopback_filter=msg -> msg != null && msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK); protected static final Predicate<Message> dont_loopback_filter=msg -> msg != null && msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);


protected static final BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR=MessageBatch::add;
protected static final Predicate<MessageBatch> BATCH_VALIDATOR=mb -> mb != null && !mb.isEmpty();



@ManagedAttribute(description="Number of retransmit requests received") @ManagedAttribute(description="Number of retransmit requests received")
protected final LongAdder xmit_reqs_received=new LongAdder(); protected final LongAdder xmit_reqs_received=new LongAdder();
Expand Down Expand Up @@ -870,37 +875,30 @@ protected void handleMessages(Address dest, Address sender, List<LongTuple<Messa


/** Efficient way of checking whether another thread is already processing messages from sender. If that's the case, /** Efficient way of checking whether another thread is already processing messages from sender. If that's the case,
* we return immediately and let the existing thread process our message (https://jira.jboss.org/jira/browse/JGRP-829). * we return immediately and let the existing thread process our message (https://jira.jboss.org/jira/browse/JGRP-829).
* Benefit: fewer threads blocked on the same lock, these threads an be returned to the thread pool */ * Benefit: fewer threads blocked on the same lock, these threads can be returned to the thread pool */
protected void removeAndPassUp(Table<Message> buf, Address sender, boolean loopback, AsciiString cluster_name) { protected void removeAndPassUp(Table<Message> buf, Address sender, boolean loopback, AsciiString cluster_name) {
final AtomicBoolean processing=buf.getProcessing(); final AtomicBoolean processing=buf.getProcessing();
if(!processing.compareAndSet(false, true)) if(!processing.compareAndSet(false, true))
return; return;


boolean remove_msgs=discard_delivered_msgs && !loopback; boolean remove_msgs=discard_delivered_msgs && !loopback;
boolean released_processing=false; MessageBatch batch=new MessageBatch(max_msg_batch_size).dest(null).sender(sender).clusterName(cluster_name).multicast(true);
try { Supplier<MessageBatch> batch_creator=() -> batch;
while(true) { while(true) {
// We're removing as many msgs as possible and set processing to false (if null) *atomically* (wrt to add()) batch.reset();
// Don't include DUMMY and OOB_DELIVERED messages in the removed set // We're removing as many msgs as possible and set processing to false (if null) *atomically* (wrt to add())
List<Message> msgs=buf.removeMany(processing, remove_msgs, max_msg_batch_size, // Don't include DUMMY and OOB_DELIVERED messages in the removed set
no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs); buf.removeMany(processing, remove_msgs, max_msg_batch_size,
if(msgs == null || msgs.isEmpty()) { no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs,
released_processing=true; batch_creator, BATCH_ACCUMULATOR, BATCH_VALIDATOR);
if(rebroadcasting) if(batch.isEmpty()) {
checkForRebroadcasts(); if(rebroadcasting)
return; checkForRebroadcasts();
} return;

MessageBatch batch=new MessageBatch(null, sender, cluster_name, true, msgs);
deliverBatch(batch);
} }
deliverBatch(batch);
} }
finally {
// processing is always set in win.remove(processing) above and never here ! This code is just a
// 2nd line of defense should there be an exception before win.remove(processing) sets processing
if(!released_processing)
processing.set(false);
}
} }




Expand Down
23 changes: 21 additions & 2 deletions src/org/jgroups/util/MessageBatch.java
Expand Up @@ -170,16 +170,30 @@ public MessageBatch replace(Message existing_msg, Message new_msg) {
* @return the MessageBatch * @return the MessageBatch
*/ */
public MessageBatch replace(Predicate<Message> filter, Message replacement, boolean match_all) { public MessageBatch replace(Predicate<Message> filter, Message replacement, boolean match_all) {
replaceIf(filter, replacement, match_all);
return this;
}

/**
* Replaces all messages that match a given filter with a replacement message
* @param filter the filter. If null, no changes take place. Note that filter needs to be able to handle null msgs
* @param replacement the replacement message. Can be null, which essentially removes all messages matching filter
* @param match_all whether to replace the first or all matches
* @return the number of matched messages
*/
public int replaceIf(Predicate<Message> filter, Message replacement, boolean match_all) {
if(filter == null) if(filter == null)
return this; return 0;
int matched=0;
for(int i=0; i < index; i++) { for(int i=0; i < index; i++) {
if(filter.test(messages[i])) { if(filter.test(messages[i])) {
messages[i]=replacement; messages[i]=replacement;
matched++;
if(!match_all) if(!match_all)
break; break;
} }
} }
return this; return matched;
} }


/** /**
Expand Down Expand Up @@ -231,6 +245,11 @@ public MessageBatch clear() {
return this; return this;
} }


public MessageBatch reset() {
index=0;
return this;
}

/** Removes and returns all messages which have a header with ID == id */ /** Removes and returns all messages which have a header with ID == id */
public Collection<Message> getMatchingMessages(final short id, boolean remove) { public Collection<Message> getMatchingMessages(final short id, boolean remove) {
return map((msg, batch) -> { return map((msg, batch) -> {
Expand Down
7 changes: 5 additions & 2 deletions src/org/jgroups/util/SubmitToThreadPool.java
Expand Up @@ -8,6 +8,8 @@
import org.jgroups.protocols.TpHeader; import org.jgroups.protocols.TpHeader;
import org.jgroups.stack.MessageProcessingPolicy; import org.jgroups.stack.MessageProcessingPolicy;


import java.util.Iterator;

/** /**
* Default message processing policy. Submits all received messages and batches to the thread pool * Default message processing policy. Submits all received messages and batches to the thread pool
* @author Bela Ban * @author Bela Ban
Expand Down Expand Up @@ -44,10 +46,11 @@ protected void removeAndDispatchNonBundledMessages(MessageBatch oob_batch) {
return; return;
AsciiString tmp=oob_batch.clusterName(); AsciiString tmp=oob_batch.clusterName();
byte[] cname=tmp != null? tmp.chars() : null; byte[] cname=tmp != null? tmp.chars() : null;
for(Message msg: oob_batch) { for(Iterator<Message> it=oob_batch.iterator(); it.hasNext();) {
Message msg=it.next();
if(msg.isFlagSet(Message.Flag.DONT_BUNDLE) && msg.isFlagSet(Message.Flag.OOB)) { if(msg.isFlagSet(Message.Flag.DONT_BUNDLE) && msg.isFlagSet(Message.Flag.OOB)) {
boolean internal=msg.isFlagSet(Message.Flag.INTERNAL); boolean internal=msg.isFlagSet(Message.Flag.INTERNAL);
oob_batch.remove(msg); it.remove();
if(tp.statsEnabled()) if(tp.statsEnabled())
tp.getMessageStats().incrNumOOBMsgsReceived(1); tp.getMessageStats().incrNumOOBMsgsReceived(1);
tp.submitToThreadPool(new SingleMessageHandlerWithClusterName(msg, cname), internal); tp.submitToThreadPool(new SingleMessageHandlerWithClusterName(msg, cname), internal);
Expand Down

0 comments on commit f888cea

Please sign in to comment.