Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
- Moved FlowControl.Credit into separate class
- Added SizeBoundedQueue and unit test
- Changed Credit from synchronized to use of ReentrantLock
- If a message is queued, we check if we need to send a credit request (based on the time of the last credit request sent)
- Added section on non-blocking flow control to manual
  • Loading branch information
belaban committed Jun 26, 2017
1 parent 8435c37 commit f1bd631
Show file tree
Hide file tree
Showing 20 changed files with 1,557 additions and 189 deletions.
2 changes: 2 additions & 0 deletions conf/jg-protocol-ids.xml
Expand Up @@ -64,6 +64,8 @@
<class id="60" name="org.jgroups.protocols.NAMING"/> <class id="60" name="org.jgroups.protocols.NAMING"/>
<class id="61" name="org.jgroups.protocols.dns.DNS_PING"/> <class id="61" name="org.jgroups.protocols.dns.DNS_PING"/>
<class id="62" name="org.jgroups.protocols.FRAG3"/> <class id="62" name="org.jgroups.protocols.FRAG3"/>
<class id="63" name="org.jgroups.protocols.UFC_NB"/>
<class id="64" name="org.jgroups.protocols.MFC_NB"/>


<!-- IDs reserved for building blocks --> <!-- IDs reserved for building blocks -->
<class id="200" name="org.jgroups.blocks.RequestCorrelator"/> <!-- ID should be the same as Global.BLOCKS_START_ID --> <class id="200" name="org.jgroups.blocks.RequestCorrelator"/> <!-- ID should be the same as Global.BLOCKS_START_ID -->
Expand Down
38 changes: 38 additions & 0 deletions doc/manual/protocols.adoc
Expand Up @@ -1421,6 +1421,44 @@ UFC has currently no properties other than those inherited by FlowControl (see a
${UFC} ${UFC}




[[NonBlockingFlowControl]]
=== Non blocking flow control

Contrary to <<FlowControl,blocking flow control>>, which blocks senders from sending a message when credits are lacking,
non-blocking flow control avoids blocking the sender thread.

Instead, when a sender has insufficient credits to send a message, the message is queued and the control flow returns to
the calling thread. When more credits are received, the queued messages are sent.

This means that a `JChannel.send(Message)` never blocks and - if the transport is also non-blocking (e.g. <<TCP_NIO2>>) -
we have a completely non-blocking stack.

However, if the send rate is always faster than the receive (processing) rate, messages will end up in the queues and
the queues will grow, leading to memory exhaustion.

It is therefore possible to fall back to blocking the sender threads if the message queues grow beyond a certain limit.

The attribute to bound a queue is `max_queue_size`, and defines the max number of bytes the accumulated messages can
have. If that size is exceeded, the addition of a message to a queue will block until messages are removed from the queue.

The `max_queue_size` attribute is per queue, so for unicast messages we have 1 queue per destination and for multicast
messages we have a single queue for all destinations. For example, if `max_queue_size` is set to `5M` (5 million bytes),
and we have members `{A,B,C,D}`, then on A the queues for B, C and D will have a combined max size of 15MB.

[[UFC_NB]]
==== UFC_NB
This is the non-blocking alternative to <<UFC>>. It extends UFC, so all attributes from UFC are inherited.

${UFC_NB}


[[MFC_NB]]
==== MFC_NB
This is the non-blocking alternative to <<MFC>>. It inherits from MFC, so all attributes are inherited.

${MFC_NB}




=== Fragmentation === Fragmentation


Expand Down
126 changes: 19 additions & 107 deletions src/org/jgroups/protocols/FlowControl.java
Expand Up @@ -6,15 +6,14 @@
import org.jgroups.annotations.ManagedOperation; import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property; import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol; import org.jgroups.stack.Protocol;
import org.jgroups.util.Average;
import org.jgroups.util.Bits; import org.jgroups.util.Bits;
import org.jgroups.util.Credit;
import org.jgroups.util.MessageBatch; import org.jgroups.util.MessageBatch;
import org.jgroups.util.Util; import org.jgroups.util.Util;


import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.TimeUnit;




/** /**
Expand All @@ -34,14 +33,14 @@ public abstract class FlowControl extends Protocol {
* Max number of bytes to send per receiver until an ack must be received before continuing sending * Max number of bytes to send per receiver until an ack must be received before continuing sending
*/ */
@Property(description="Max number of bytes to send per receiver until an ack must be received to proceed") @Property(description="Max number of bytes to send per receiver until an ack must be received to proceed")
protected long max_credits=500000; protected long max_credits=500000;


/** /**
* Max time (in milliseconds) to block. If credit hasn't been received after max_block_time, we send * Max time (in milliseconds) to block. If credit hasn't been received after max_block_time, we send
* a REPLENISHMENT request to the members from which we expect credits. A value <= 0 means to wait forever. * a REPLENISHMENT request to the members from which we expect credits. A value <= 0 means to wait forever.
*/ */
@Property(description="Max time (in ms) to block") @Property(description="Max time (in ms) to block")
protected long max_block_time=500; protected long max_block_time=500;


/** /**
* Defines the max number of milliseconds for a message to block before being sent, based on the length of * Defines the max number of milliseconds for a message to block before being sent, based on the length of
Expand All @@ -52,7 +51,7 @@ public abstract class FlowControl extends Protocol {
* If a message's length (size of the payload in bytes) is for example 15'000 bytes, * If a message's length (size of the payload in bytes) is for example 15'000 bytes,
* FlowControl blocks it for a max of 100 ms. * FlowControl blocks it for a max of 100 ms.
*/ */
protected Map<Long,Long> max_block_times=null; protected Map<Long,Long> max_block_times;




/** /**
Expand All @@ -63,21 +62,21 @@ public abstract class FlowControl extends Protocol {
@Property(description="The threshold (as a percentage of max_credits) at which a receiver sends more credits to " + @Property(description="The threshold (as a percentage of max_credits) at which a receiver sends more credits to " +
"a sender. Example: if max_credits is 1'000'000, and min_threshold 0.25, then we send ca. 250'000 credits " + "a sender. Example: if max_credits is 1'000'000, and min_threshold 0.25, then we send ca. 250'000 credits " +
"to P once we've got only 250'000 credits left for P (we've received 750'000 bytes from P)") "to P once we've got only 250'000 credits left for P (we've received 750'000 bytes from P)")
protected double min_threshold=0.40; protected double min_threshold=0.40;


/** /**
* Computed as <tt>max_credits</tt> times <tt>min_theshold</tt>. If explicitly set, this will * Computed as <tt>max_credits</tt> times <tt>min_theshold</tt>. If explicitly set, this will
* override the above computation * override the above computation
*/ */
@Property(description="Computed as max_credits x min_theshold unless explicitly set") @Property(description="Computed as max_credits x min_theshold unless explicitly set")
protected long min_credits=0; protected long min_credits;








/* --------------------------------------------- JMX ------------------------------------------------------ */ /* --------------------------------------------- JMX -------------------------------------------------- */
protected int num_credit_requests_received=0, num_credit_requests_sent=0; protected int num_credit_requests_received, num_credit_requests_sent;
protected int num_credit_responses_sent=0, num_credit_responses_received=0; protected int num_credit_responses_sent, num_credit_responses_received;




/* --------------------------------------------- Fields ------------------------------------------------------ */ /* --------------------------------------------- Fields ------------------------------------------------------ */
Expand All @@ -90,16 +89,15 @@ public abstract class FlowControl extends Protocol {
*/ */
protected final Map<Address,Credit> received=Util.createConcurrentMap(); protected final Map<Address,Credit> received=Util.createConcurrentMap();


protected Address local_addr; protected Address local_addr;




/** Whether FlowControl is still running, this is set to false when the protocol terminates (on stop()) */ /** Whether FlowControl is still running, this is set to false when the protocol terminates (on stop()) */
protected volatile boolean running=true; protected volatile boolean running=true;

protected int frag_size; // remember frag_size from the fragmentation protocol



protected int frag_size;









Expand Down Expand Up @@ -318,7 +316,7 @@ public Object down(Message msg) {
if(length == 0) if(length == 0)
return down_prot.down(msg); return down_prot.down(msg);


Object retval=handleDownMessage(msg, dest, length); Object retval=handleDownMessage(msg);


// if the message is DONT_LOOPBACK, we will not receive it, therefore the credit // if the message is DONT_LOOPBACK, we will not receive it, therefore the credit
// check needs to be done now // check needs to be done now
Expand Down Expand Up @@ -438,7 +436,7 @@ protected void handleConfigEvent(Map<String,Object> info) {
} }




protected abstract Object handleDownMessage(final Message msg, Address dest, int length); protected abstract Object handleDownMessage(final Message msg);






Expand All @@ -456,7 +454,7 @@ protected long adjustCredit(Map<Address,Credit> map, Address sender, int length)
return 0; return 0;
if(log.isTraceEnabled()) if(log.isTraceEnabled())
log.trace("%s used %d credits, %d remaining", sender, length, cred.get() - length); log.trace("%s used %d credits, %d remaining", sender, length, cred.get() - length);
return cred.decrementAndGet(length); return cred.decrementAndGet(length, min_credits, max_credits);
} }


/** /**
Expand All @@ -471,7 +469,7 @@ protected void handleCreditRequest(Map<Address,Credit> map, Address sender, long
return; return;
if(log.isTraceEnabled()) if(log.isTraceEnabled())
log.trace("received credit request from %s: sending %d credits", sender, requested_credits); log.trace("received credit request from %s: sending %d credits", sender, requested_credits);
cred.increment(requested_credits); cred.increment(requested_credits, max_credits);
sendCredit(sender, requested_credits); sendCredit(sender, requested_credits);
} }
} }
Expand Down Expand Up @@ -509,15 +507,15 @@ protected void handleViewChange(List<Address> mbrs) {
if(log.isTraceEnabled()) log.trace("new membership: %s", mbrs); if(log.isTraceEnabled()) log.trace("new membership: %s", mbrs);


// add members not in membership to received and sent hashmap (with full credits) // add members not in membership to received and sent hashmap (with full credits)
mbrs.stream().filter(addr -> !received.containsKey(addr)).forEach(addr -> received.put(addr, new Credit(max_credits, null))); mbrs.stream().filter(addr -> !received.containsKey(addr)).forEach(addr -> received.put(addr, new Credit(max_credits)));


// remove members that left // remove members that left
received.keySet().retainAll(mbrs); received.keySet().retainAll(mbrs);
} }






protected static String printMap(Map<Address,Credit> m) { protected static String printMap(Map<Address,? extends Credit> m) {
return m.entrySet().stream().collect(StringBuilder::new, return m.entrySet().stream().collect(StringBuilder::new,
(sb,entry) -> sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"), (sb,entry) -> sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"),
(l,r) -> {}).toString(); (l,r) -> {}).toString();
Expand All @@ -535,92 +533,6 @@ protected static long bufferToLong(byte[] buf, int offset) {






protected class Credit {
protected long credits_left;
protected int num_blockings;
protected long last_credit_request; // ns
protected final Average avg_blockings;


protected Credit(long credits, Average avg_blockings) {
this.credits_left=credits;
this.avg_blockings=avg_blockings;
}

public void reset() {num_blockings=0; if(avg_blockings != null) avg_blockings.clear();}

protected synchronized boolean decrementIfEnoughCredits(long credits, long timeout) {
if(decrement(credits))
return true;

if(timeout <= 0)
return false;

long start=avg_blockings != null? System.nanoTime() : 0;
try {
this.wait(timeout);
}
catch(InterruptedException e) {
}
finally {
num_blockings++;
if(avg_blockings != null)
avg_blockings.add(System.nanoTime() - start);
}

return decrement(credits);
}


protected boolean decrement(long credits) {
if(credits <= credits_left) {
credits_left-=credits;
return true;
}
return false;
}


protected synchronized long decrementAndGet(long credits) {
credits_left=Math.max(0, credits_left - credits);
if(credits_left <= min_credits) {
long credit_response=Math.min(max_credits, max_credits - credits_left);
credits_left=max_credits;
return credit_response;
}
return 0;
}


protected synchronized void increment(long credits) {
credits_left=Math.min(max_credits, credits_left + credits);
notifyAll();
}

protected synchronized boolean needToSendCreditRequest() {
long current_time=System.nanoTime();
// will most likely send a request the first time (last_credit_request is 0), unless nanoTime() is negative
if(current_time - last_credit_request >= TimeUnit.NANOSECONDS.convert(max_block_time, TimeUnit.MILLISECONDS)) {
last_credit_request=current_time;
return true;
}
return false;
}

protected int getNumBlockings() {return num_blockings;}

protected synchronized long get() {return credits_left;}

protected synchronized void set(long new_credits) {
credits_left=Math.min(max_credits, new_credits);
notifyAll();
}

public String toString() {
return String.valueOf(credits_left);
}

}




} }
12 changes: 9 additions & 3 deletions src/org/jgroups/protocols/MFC.java
Expand Up @@ -85,7 +85,7 @@ public double getAverageTimeBlocked() {


public void init() throws Exception { public void init() throws Exception {
super.init(); super.init();
credits=new CreditMap(max_credits); credits=createCreditMap(max_credits);
} }


public void stop() { public void stop() {
Expand All @@ -98,14 +98,20 @@ public void resetStats() {
credits.reset(); credits.reset();
} }


protected CreditMap createCreditMap(long max_creds) {
return new CreditMap(max_creds);
}

@Override @Override
protected Object handleDownMessage(final Message msg, Address dest, int length) { protected Object handleDownMessage(final Message msg) {
Address dest=msg.dest();
if(dest != null) // 2nd line of defense, not really needed if(dest != null) // 2nd line of defense, not really needed
return down_prot.down(msg); return down_prot.down(msg);


int length=msg.length();
long block_time=max_block_times != null? getMaxBlockTime(length) : max_block_time; long block_time=max_block_times != null? getMaxBlockTime(length) : max_block_time;
while(running) { while(running) {
boolean rc=credits.decrement(length, block_time); boolean rc=credits.decrement(msg, length, block_time);
if(rc || max_block_times != null || !running) if(rc || max_block_times != null || !running)
break; break;


Expand Down

0 comments on commit f1bd631

Please sign in to comment.