From 7d12873e1bdb4d0c09c97ef3ac27e0095b2baf53 Mon Sep 17 00:00:00 2001 From: nitsanw Date: Thu, 27 Aug 2015 20:58:06 +0200 Subject: [PATCH] Improve doc. --- .../jctools/queues/MessagePassingQueue.java | 190 ++++++++++-------- 1 file changed, 111 insertions(+), 79 deletions(-) diff --git a/jctools-core/src/main/java/org/jctools/queues/MessagePassingQueue.java b/jctools-core/src/main/java/org/jctools/queues/MessagePassingQueue.java index 125c9686..c7760750 100644 --- a/jctools-core/src/main/java/org/jctools/queues/MessagePassingQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MessagePassingQueue.java @@ -16,34 +16,54 @@ import java.util.Queue; /** - * This is a tagging interface for the queues in this library which implement a subset of the {@link Queue} interface - * sufficient for concurrent message passing.
- * Message passing queues provide happens before semantics to messages passed through, namely that writes made by the - * producer before offering the message are visible to the consuming thread after the message has been polled out of the - * queue. + * This is a tagging interface for the queues in this library which implement a subset of the {@link Queue} + * interface sufficient for concurrent message passing.
+ * Message passing queues provide happens before semantics to messages passed through, namely that writes made + * by the producer before offering the message are visible to the consuming thread after the message has been + * polled out of the queue. * * @author nitsanw * - * @param the event/message type + * @param + * the event/message type */ public interface MessagePassingQueue { int UNBOUNDED_CAPACITY = -1; - + interface Supplier { /** + * This method will return the next value to be written to the queue. As such the queue + * implementations are commited to insert the value once the call is made. + *

+ * Users should be aware that underlying queue implementations may upfront claim parts of the queue + * for batch operations and this will effect the view on the queue from the supplier method. In + * particular size and any offer methods may take the view that the full batch has already happened. + * * @return new element, NEVER null */ T get(); } - + interface Consumer { - void accept(T m); + /** + * This method will process an element already removed from the queue. This method is expected to + * never throw an exception. + *

+ * Users should be aware that underlying queue implementations may upfront claim parts of the queue + * for batch operations and this will effect the view on the queue from the accept method. In + * particular size and any poll/peek methods may take the view that the full batch has already + * happened. + * + * @param e not null + */ + void accept(T e); } - + interface WaitStrategy { /** - * This method can implement static or dynamic backoff. Dynamic backoff will rely on the counter for estimating how - * long the caller has been idling. The expected usage is: + * This method can implement static or dynamic backoff. Dynamic backoff will rely on the counter for + * estimating how long the caller has been idling. The expected usage is: + * *

          * 
          * int ic = 0;
@@ -57,99 +77,103 @@ interface WaitStrategy {
          * }
          * 
          * 
+ * * @param idleCounter idle calls counter, managed by the idle method until reset * @return new counter value to be used on subsequent idle cycle */ int idle(int idleCounter); } - + interface ExitCondition { - + /** - * This method should be implemented such that the flag read or determination cannot be hoisted out of a loop - * which notmally means a volatile load, but with JDK9 VarHandles may mean getOpaque. + * This method should be implemented such that the flag read or determination cannot be hoisted out of + * a loop which notmally means a volatile load, but with JDK9 VarHandles may mean getOpaque. * * @return true as long as we should keep running */ boolean keepRunning(); } + /** - * Called from a producer thread subject to the restrictions appropriate to the implementation and according to the - * {@link Queue#offer(Object)} interface. + * Called from a producer thread subject to the restrictions appropriate to the implementation and + * according to the {@link Queue#offer(Object)} interface. * - * @param message + * @param e not null, will throw NPE if it is * @return true if element was inserted into the queue, false iff full */ - boolean offer(T message); + boolean offer(T e); /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation and according to - * the {@link Queue#poll()} interface. + * Called from the consumer thread subject to the restrictions appropriate to the implementation and + * according to the {@link Queue#poll()} interface. * * @return a message from the queue if one is available, null iff empty */ T poll(); /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation and according to - * the {@link Queue#peek()} interface. + * Called from the consumer thread subject to the restrictions appropriate to the implementation and + * according to the {@link Queue#peek()} interface. * * @return a message from the queue if one is available, null iff empty */ T peek(); /** - * This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a - * best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1). + * This method's accuracy is subject to concurrent modifications happening as the size is estimated and as + * such is a best effort rather than absolute value. For some implementations this method may be O(n) + * rather than O(1). * - * @return number of messages in the queue, between 0 and {@link Integer#MAX_VALUE} but less or equals to capacity (if bounded). + * @return number of messages in the queue, between 0 and {@link Integer#MAX_VALUE} but less or equals to + * capacity (if bounded). */ int size(); /** - * Removes all items from the queue. Called from the consumer thread subject to the restrictions appropriate to the implementation and according to - * the {@link Queue#clear()} interface. + * Removes all items from the queue. Called from the consumer thread subject to the restrictions + * appropriate to the implementation and according to the {@link Queue#clear()} interface. */ void clear(); - - + /** - * This method's accuracy is subject to concurrent modifications happening as the observation is carried out. + * This method's accuracy is subject to concurrent modifications happening as the observation is carried + * out. * * @return true if empty, false otherwise */ boolean isEmpty(); - + /** * @return the capacity of this queue or UNBOUNDED_CAPACITY if not bounded */ int capacity(); - + /** - * Called from a producer thread subject to the restrictions appropriate to the implementation. As opposed to - * {@link Queue#offer(Object)} this method may return false without the queue being full. + * Called from a producer thread subject to the restrictions appropriate to the implementation. As opposed + * to {@link Queue#offer(Object)} this method may return false without the queue being full. * - * @param message + * @param e not null, will throw NPE if it is * @return true if element was inserted into the queue, false if unable to offer */ - boolean relaxedOffer(T message); + boolean relaxedOffer(T e); /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation. As opposed to - * {@link Queue#poll()} this method may return null without the queue being empty. + * Called from the consumer thread subject to the restrictions appropriate to the implementation. As + * opposed to {@link Queue#poll()} this method may return null without the queue being empty. * * @return a message from the queue if one is available, null if unable to poll */ T relaxedPoll(); /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation. As opposed to - * {@link Queue#peek()} this method may return null without the queue being empty. + * Called from the consumer thread subject to the restrictions appropriate to the implementation. As + * opposed to {@link Queue#peek()} this method may return null without the queue being empty. * * @return a message from the queue if one is available, null if unable to peek */ T relaxedPeek(); - + /** * Remove all available item from the queue and hand to consume. This should be semantically similar to: *
@@ -157,28 +181,27 @@ interface ExitCondition { * while((m = relaxedPoll()) != null){
* c.accept(m);
* }
- *
- * There's no strong commitment to the queue being empty at the end of a drain. - * Called from a consumer thread subject to the restrictions appropriate to the implementation. + * There's no strong commitment to the queue being empty at the end of a drain. Called from a + * consumer thread subject to the restrictions appropriate to the implementation. * * @return the number of polled elements */ int drain(Consumer c); - + /** - * Stuff the queue with elements from the supplier. Semantically similar to: - *
+ * Stuff the queue with elements from the supplier. Semantically similar to:
* while(relaxedOffer(s.get());
- *
- * There's no strong commitment to the queue being full at the end of a fill. - * Called from a producer thread subject to the restrictions appropriate to the implementation. + *
There's no strong commitment to the queue being full at the end of a fill. Called from a + * producer thread subject to the restrictions appropriate to the implementation. * * @return the number of offered elements */ int fill(Supplier s); - + /** - * Remove up to limit elements from the queue and hand to consume. This should be semantically similar to: + * Remove up to limit elements from the queue and hand to consume. This should be semantically + * similar to: + * *
      * 
      *   M m;
@@ -187,63 +210,72 @@ interface ExitCondition {
      *   }
      * 
      * 
- * There's no strong commitment to the queue being empty at the end of a drain. - * Called from a consumer thread subject to the restrictions appropriate to the implementation. + * + * There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer + * thread subject to the restrictions appropriate to the implementation. * * @return the number of polled elements */ int drain(Consumer c, int limit); - + /** * Stuff the queue with up to limit elements from the supplier. Semantically similar to: + * *
      * 
      *   for(int i=0; i < limit && relaxedOffer(s.get(); i++);
*
*
- * There's no strong commitment to the queue being full at the end of a fill. - * Called from a producer thread subject to the restrictions appropriate to the implementation. + * + * There's no strong commitment to the queue being full at the end of a fill. Called from a producer + * thread subject to the restrictions appropriate to the implementation. * * @return the number of offered elements */ int fill(Supplier s, int limit); - + /** * Remove elements from the queue and hand to consume forever. Semantically similar to: + * *
      * 
-     *   int idleCounter = 0;
-     *   while(e.keepRunning()){
-     *     M m;
-     *     if((m = relaxedPoll()) != null){
-     *       c.accept(m);
-     *       idleCounter = 0;
-     *     }
-     *     else
-     *       idleCounter = w.idle(idleCounter);
-     *   }
+     *  int idleCounter = 0;
+     *  while (exit.keepRunning()) {
+     *      E e = relaxedPoll();
+     *      if(e==null){
+     *          idleCounter = wait.idle(idleCounter);
+     *          continue;
+     *      }
+     *      idleCounter = 0;
+     *      c.accept(e);
+     *  }
      * 
      * 
+ * * Called from a consumer thread subject to the restrictions appropriate to the implementation. * */ void drain(Consumer c, WaitStrategy wait, ExitCondition exit); - + /** * Stuff the queue with elements from the supplier forever. Semantically similar to: + * *
      * 
-     *   int idleCounter = 0;
-     *   while(e.keepRunning()){
-     *     if(!relaxedOffer(s.get()))
-     *       idleCounter = w.idle(idleCounter);
-     *     else
-     *       idleCounter = 0;
-     *   }
+     *  int idleCounter = 0;
+     *  while (exit.keepRunning()) {
+     *      E e = s.get();
+     *      while (!relaxedOffer(e)) {
+     *          idleCounter = wait.idle(idleCounter);
+     *          continue;
+     *      }
+     *      idleCounter = 0;    
+     *  }
      * 
      * 
+ * * Called from a producer thread subject to the restrictions appropriate to the implementation. * */ - void fill(Supplier s, WaitStrategy wait, ExitCondition exit); + void fill(Supplier s, WaitStrategy wait, ExitCondition exit); }