Skip to content

Commit

Permalink
Improve doc.
Browse files Browse the repository at this point in the history
  • Loading branch information
nitsanw committed Aug 27, 2015
1 parent bfba108 commit 7d12873
Showing 1 changed file with 111 additions and 79 deletions.
190 changes: 111 additions & 79 deletions jctools-core/src/main/java/org/jctools/queues/MessagePassingQueue.java
Expand Up @@ -16,34 +16,54 @@
import java.util.Queue;

/**
* This is a tagging interface for the queues in this library which implement a subset of the {@link Queue} interface
* sufficient for concurrent message passing.<br>
* Message passing queues provide happens before semantics to messages passed through, namely that writes made by the
* producer before offering the message are visible to the consuming thread after the message has been polled out of the
* queue.
* This is a tagging interface for the queues in this library which implement a subset of the {@link Queue}
* interface sufficient for concurrent message passing.<br>
* Message passing queues provide happens before semantics to messages passed through, namely that writes made
* by the producer before offering the message are visible to the consuming thread after the message has been
* polled out of the queue.
*
* @author nitsanw
*
* @param <T> the event/message type
* @param <T>
* the event/message type
*/
public interface MessagePassingQueue<T> {
int UNBOUNDED_CAPACITY = -1;

interface Supplier<T> {
/**
* This method will return the next value to be written to the queue. As such the queue
* implementations are commited to insert the value once the call is made.
* <p>
* Users should be aware that underlying queue implementations may upfront claim parts of the queue
* for batch operations and this will effect the view on the queue from the supplier method. In
* particular size and any offer methods may take the view that the full batch has already happened.
*
* @return new element, NEVER null
*/
T get();
}

interface Consumer<T> {
void accept(T m);
/**
* This method will process an element already removed from the queue. This method is expected to
* never throw an exception.
* <p>
* Users should be aware that underlying queue implementations may upfront claim parts of the queue
* for batch operations and this will effect the view on the queue from the accept method. In
* particular size and any poll/peek methods may take the view that the full batch has already
* happened.
*
* @param e not null
*/
void accept(T e);
}

interface WaitStrategy {
/**
* This method can implement static or dynamic backoff. Dynamic backoff will rely on the counter for estimating how
* long the caller has been idling. The expected usage is:
* This method can implement static or dynamic backoff. Dynamic backoff will rely on the counter for
* estimating how long the caller has been idling. The expected usage is:
*
* <pre>
* <code>
* int ic = 0;
Expand All @@ -57,128 +77,131 @@ interface WaitStrategy {
* }
* </code>
* </pre>
*
* @param idleCounter idle calls counter, managed by the idle method until reset
* @return new counter value to be used on subsequent idle cycle
*/
int idle(int idleCounter);
}

interface ExitCondition {

/**
* This method should be implemented such that the flag read or determination cannot be hoisted out of a loop
* which notmally means a volatile load, but with JDK9 VarHandles may mean getOpaque.
* This method should be implemented such that the flag read or determination cannot be hoisted out of
* a loop which notmally means a volatile load, but with JDK9 VarHandles may mean getOpaque.
*
* @return true as long as we should keep running
*/
boolean keepRunning();
}

/**
* Called from a producer thread subject to the restrictions appropriate to the implementation and according to the
* {@link Queue#offer(Object)} interface.
* Called from a producer thread subject to the restrictions appropriate to the implementation and
* according to the {@link Queue#offer(Object)} interface.
*
* @param message
* @param e not null, will throw NPE if it is
* @return true if element was inserted into the queue, false iff full
*/
boolean offer(T message);
boolean offer(T e);

/**
* Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
* the {@link Queue#poll()} interface.
* Called from the consumer thread subject to the restrictions appropriate to the implementation and
* according to the {@link Queue#poll()} interface.
*
* @return a message from the queue if one is available, null iff empty
*/
T poll();

/**
* Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
* the {@link Queue#peek()} interface.
* Called from the consumer thread subject to the restrictions appropriate to the implementation and
* according to the {@link Queue#peek()} interface.
*
* @return a message from the queue if one is available, null iff empty
*/
T peek();

/**
* This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a
* best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1).
* This method's accuracy is subject to concurrent modifications happening as the size is estimated and as
* such is a best effort rather than absolute value. For some implementations this method may be O(n)
* rather than O(1).
*
* @return number of messages in the queue, between 0 and {@link Integer#MAX_VALUE} but less or equals to capacity (if bounded).
* @return number of messages in the queue, between 0 and {@link Integer#MAX_VALUE} but less or equals to
* capacity (if bounded).
*/
int size();

/**
* Removes all items from the queue. Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
* the {@link Queue#clear()} interface.
* Removes all items from the queue. Called from the consumer thread subject to the restrictions
* appropriate to the implementation and according to the {@link Queue#clear()} interface.
*/
void clear();



/**
* This method's accuracy is subject to concurrent modifications happening as the observation is carried out.
* This method's accuracy is subject to concurrent modifications happening as the observation is carried
* out.
*
* @return true if empty, false otherwise
*/
boolean isEmpty();

/**
* @return the capacity of this queue or UNBOUNDED_CAPACITY if not bounded
*/
int capacity();

/**
* Called from a producer thread subject to the restrictions appropriate to the implementation. As opposed to
* {@link Queue#offer(Object)} this method may return false without the queue being full.
* Called from a producer thread subject to the restrictions appropriate to the implementation. As opposed
* to {@link Queue#offer(Object)} this method may return false without the queue being full.
*
* @param message
* @param e not null, will throw NPE if it is
* @return true if element was inserted into the queue, false if unable to offer
*/
boolean relaxedOffer(T message);
boolean relaxedOffer(T e);

/**
* Called from the consumer thread subject to the restrictions appropriate to the implementation. As opposed to
* {@link Queue#poll()} this method may return null without the queue being empty.
* Called from the consumer thread subject to the restrictions appropriate to the implementation. As
* opposed to {@link Queue#poll()} this method may return null without the queue being empty.
*
* @return a message from the queue if one is available, null if unable to poll
*/
T relaxedPoll();

/**
* Called from the consumer thread subject to the restrictions appropriate to the implementation. As opposed to
* {@link Queue#peek()} this method may return null without the queue being empty.
* Called from the consumer thread subject to the restrictions appropriate to the implementation. As
* opposed to {@link Queue#peek()} this method may return null without the queue being empty.
*
* @return a message from the queue if one is available, null if unable to peek
*/
T relaxedPeek();

/**
* Remove all available item from the queue and hand to consume. This should be semantically similar to:
* <code><br/>
* M m;</br>
* while((m = relaxedPoll()) != null){</br>
* c.accept(m);</br>
* }</br>
* </code>
* There's no strong commitment to the queue being empty at the end of a drain.
* Called from a consumer thread subject to the restrictions appropriate to the implementation.
* </code> There's no strong commitment to the queue being empty at the end of a drain. Called from a
* consumer thread subject to the restrictions appropriate to the implementation.
*
* @return the number of polled elements
*/
int drain(Consumer<T> c);

/**
* Stuff the queue with elements from the supplier. Semantically similar to:
* <code><br/>
* Stuff the queue with elements from the supplier. Semantically similar to: <code><br/>
* while(relaxedOffer(s.get());</br>
* </code>
* There's no strong commitment to the queue being full at the end of a fill.
* Called from a producer thread subject to the restrictions appropriate to the implementation.
* </code> There's no strong commitment to the queue being full at the end of a fill. Called from a
* producer thread subject to the restrictions appropriate to the implementation.
*
* @return the number of offered elements
*/
int fill(Supplier<T> s);

/**
* Remove up to <i>limit</i> elements from the queue and hand to consume. This should be semantically similar to:
* Remove up to <i>limit</i> elements from the queue and hand to consume. This should be semantically
* similar to:
*
* <pre>
* <code>
* M m;
Expand All @@ -187,63 +210,72 @@ interface ExitCondition {
* }
* </code>
* </pre>
* There's no strong commitment to the queue being empty at the end of a drain.
* Called from a consumer thread subject to the restrictions appropriate to the implementation.
*
* There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer
* thread subject to the restrictions appropriate to the implementation.
*
* @return the number of polled elements
*/
int drain(Consumer<T> c, int limit);

/**
* Stuff the queue with up to <i>limit</i> elements from the supplier. Semantically similar to:
*
* <pre>
* <code>
* for(int i=0; i < limit && relaxedOffer(s.get(); i++);</br>
* </code>
* </pre>
* There's no strong commitment to the queue being full at the end of a fill.
* Called from a producer thread subject to the restrictions appropriate to the implementation.
*
* There's no strong commitment to the queue being full at the end of a fill. Called from a producer
* thread subject to the restrictions appropriate to the implementation.
*
* @return the number of offered elements
*/
int fill(Supplier<T> s, int limit);

/**
* Remove elements from the queue and hand to consume forever. Semantically similar to:
*
* <pre>
* <code>
* int idleCounter = 0;
* while(e.keepRunning()){
* M m;
* if((m = relaxedPoll()) != null){
* c.accept(m);
* idleCounter = 0;
* }
* else
* idleCounter = w.idle(idleCounter);
* }
* int idleCounter = 0;
* while (exit.keepRunning()) {
* E e = relaxedPoll();
* if(e==null){
* idleCounter = wait.idle(idleCounter);
* continue;
* }
* idleCounter = 0;
* c.accept(e);
* }
* </code>
* </pre>
*
* Called from a consumer thread subject to the restrictions appropriate to the implementation.
*
*/
void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit);

/**
* Stuff the queue with elements from the supplier forever. Semantically similar to:
*
* <pre>
* <code>
* int idleCounter = 0;
* while(e.keepRunning()){
* if(!relaxedOffer(s.get()))
* idleCounter = w.idle(idleCounter);
* else
* idleCounter = 0;
* }
* int idleCounter = 0;
* while (exit.keepRunning()) {
* E e = s.get();
* while (!relaxedOffer(e)) {
* idleCounter = wait.idle(idleCounter);
* continue;
* }
* idleCounter = 0;
* }
* </code>
* </pre>
*
* Called from a producer thread subject to the restrictions appropriate to the implementation.
*
*/
void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit);
void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit);
}

0 comments on commit 7d12873

Please sign in to comment.