Permalink
Browse files

documentation for COUNTER and CounterService

  • Loading branch information...
1 parent 33e0473 commit 2d4b6d318937de515bd3e3077f324aebb06db29b @belaban committed Oct 6, 2011
@@ -848,10 +848,10 @@ for (int i = 0; i < runnerCount; ++i) {
ExecutorCompletionService to be used, but due to it's implementation using a non serializable object
the ExecutionCompletionService was implemented to be used instead in conjunction with an ExecutorService.
Also utility class was designed to help users to submit tasks which use a non serializable class. The
- Executions class contains a method serializableCallable which allows for a user to pass a constructor of a class that implements Callable and
- it's arguments to then return to a user a Callable that will upon running will automatically create
- and object from the constructor passing the provided arguments to it and then will call the call
- method on the object and return it's result as a normal callable. All the arguments provided
+ Executions class contains a method serializableCallable which allows for a user to pass a constructor of a
+ class that implements Callable and it's arguments to then return to a user a Callable that will upon running
+ will automatically create and object from the constructor passing the provided arguments to it and then will
+ call the call method on the object and return it's result as a normal callable. All the arguments provided
must still be serializable and the return object as detailed previously.
</para>
@@ -867,6 +867,164 @@ for (int i = 0; i &lt; runnerCount; ++i) {
towards the top of the stack (close to the channel).
</para>
</section>
+
+ <section id="CounterService">
+ <title>Cluster wide atomic counters</title>
+ <para>
+ Cluster wide counters provide named counters (similar to AtomicLong) which can be changed atomically. 2
+ nodes incrementing the same counter with initial value 10 will see 11 and 12 as results, respectively.
+ </para>
+ <para>
+ To create a named counter, the following steps have to be taken:
+ <orderedlist>
+ <listitem>
+ Add protocol COUNTER to the top of the stack configuration
+ </listitem>
+ <listitem>
+ Create an instanceof CounterService
+ </listitem>
+ <listitem>
+ Create a new or get an existing named counter
+ </listitem>
+ <listitem>
+ Use the counter to increment, decrement, get, set, compare-and-set etc the counter
+ </listitem>
+ </orderedlist>
+ </para>
+ <para>
+ In the first step, we add COUNTER to the top of the protocol stack configuration:
+ </para>
+ <programlisting language="Java">
+&lt;config&gt;
+ ...
+ &lt;MFC max_credits="2M"
+ min_threshold="0.4"/&gt;
+ &lt;FRAG2 frag_size="60K" /&gt;
+ &lt;COUNTER bypass_bundling="true" timeout="5000"/&gt;
+&lt;/config&gt;
+ </programlisting>
+ <para>
+ Configuration of the COUNTER protocol is described in <xref linkend="COUNTER">COUNTER</xref>.
+ </para>
+ <para>
+ Next, we create a CounterService, which is used to create and delete named counters:
+ </para>
+ <programlisting language="Java">
+ch=new JChannel(props);
+CounterService counter_service=new CounterService(ch);
+ch.connect("counter-cluster");
+Counter counter=counter_service.getOrCreateCounter("mycounter", 1);
+ </programlisting>
+ <para>
+ In the sample code above, we create a channel first, then create the CounterService referencing the channel.
+ Then we connect the channel and finally create a new named counter "mycounter", with an initial value of 1.
+ If the counter already exists, the existing counter will be returned and the initial value will be ignored.
+ </para>
+ <para>
+ CounterService doesn't consume any messages from the channel over which it is created; instead it grabs
+ a reference to the COUNTER protocols and invokes methods on it directly. This has the advantage that
+ CounterService is non-intrusive: many instances can be created over the same channel. CounterService even
+ co-exists with other services which use the same mechanism, e.g. LockService or ExecutionService (see above).
+ </para>
+ <para>
+ The returned counter instance implements interface Counter:
+ </para>
+ <programlisting language="Java">
+package org.jgroups.blocks.atomic;
+
+public interface Counter {
+
+ public String getName();
+
+ /**
+ * Gets the current value of the counter
+ * @return The current value
+ */
+ public long get();
+
+ /**
+ * Sets the counter to a new value
+ * @param new_value The new value
+ */
+ public void set(long new_value);
+
+ /**
+ * Atomically updates the counter using a CAS operation
+ *
+ * @param expect The expected value of the counter
+ * @param update The new value of the counter
+ * @return True if the counter could be updated, false otherwise
+ */
+ public boolean compareAndSet(long expect, long update);
+
+ /**
+ * Atomically increments the counter and returns the new value
+ * @return The new value
+ */
+ public long incrementAndGet();
+
+ /**
+ * Atomically decrements the counter and returns the new value
+ * @return The new value
+ */
+ public long decrementAndGet();
+
+
+ /**
+ * Atomically adds the given value to the current value.
+ *
+ * @param delta the value to add
+ * @return the updated value
+ */
+ public long addAndGet(long delta);
+}
+ </programlisting>
+
+ <section id="CounterServiceDesign">
+ <title>Design</title>
+ <para>
+ The design of COUNTER is described in details in
+ <ulink url="https://github.com/belaban/JGroups/blob/master/doc/design/CounterService.txt">CounterService</ulink>.
+ </para>
+ <para>
+ In a nutshell, in a cluster the current coordinator maintains a hashmap of named counters. Members send
+ requests (increment, decrement etc) to it, and the coordinator atomically applies the requests and
+ sends back responses.
+ </para>
+ <para>
+ The advantage of this centralized approach is that - regardless of the size of a cluster - every
+ request has a constant execution cost, namely a network round trip.
+ </para>
+ <para>
+ A crash or leaving of the coordinator is handled as follows. The coordinator maintains a version for
+ every counter value. Whenever the counter value is changed, the version is incremented. For every
+ request that modifies a counter, both the counter value and the version are returned to the requester.
+ The requester caches all counter values and associated versions in its own local cache.
+ </para>
+ <para>
+ When the coordinator leaves or crashes, the next-in-line member becomes the new coordinator. It then
+ starts a reconciliation phase, and discards all requests until the reconciliation phase has completed.
+ The reconciliation phase solicits all members for their cached values and versions. To reduce traffic,
+ the request also carries all version numbers with it.
+ </para>
+ <para>
+ Clients return values whose versions are higher than the ones shipped by the new coordinator. The new
+ coordinator waits for responses from all members or timeout milliseconds. Then it updates its own
+ hashmap with values whose versions are higher than its own. Finally, it stops discarding requests and
+ sends a resend message to all clients in order to resend any requests that might be pending.
+ </para>
+ <para>
+ There's another edge case that also needs to be covered: if a client P updates a counter, and both P and
+ the coordinator crash, then the update is lost. To reduce the chances of this happening, COUNTER
+ can be enabled to replicate all counter changes to one or more backup coordinators. The num_backups
+ property defines the number of such backups. Whenever a counter was changed in the current coordinator,
+ it also updates the backups (asynchronously). 0 disables this.
+ </para>
+ </section>
+
+
+ </section>
+
</chapter>
@@ -1397,6 +1397,14 @@ keytool -genseckey -alias myKey -keypass changeit -storepass changeit -keyalg B
${CENTRAL_EXECUTOR}
</section>
+ <section id="COUNTER">
+ <title>COUNTER</title>
+ <para>
+ COUNTER is the implementation of cluster wide counters, used by the CounterService.
+ </para>
+ ${COUNTER}
+ </section>
+
</section>
@@ -9,7 +9,7 @@
public String getName();
/**
- * Get the current value of the counter
+ * Gets the current value of the counter
* @return The current value
*/
public long get();
@@ -1,10 +1,7 @@
package org.jgroups.protocols;
import org.jgroups.*;
-import org.jgroups.annotations.MBean;
-import org.jgroups.annotations.ManagedAttribute;
-import org.jgroups.annotations.ManagedOperation;
-import org.jgroups.annotations.Property;
+import org.jgroups.annotations.*;
import org.jgroups.blocks.atomic.Counter;
import org.jgroups.stack.Protocol;
import org.jgroups.util.*;
@@ -25,13 +22,14 @@
* @author Bela Ban
* @since 3.0.0
*/
+@Experimental
@MBean(description="Protocol to maintain distributed atomic counters")
public class COUNTER extends Protocol {
- @Property(description="bypasses message bundling if true")
+ @Property(description="Bypasses message bundling if true")
protected boolean bypass_bundling=true;
- @Property(description="Request timeouts (in ms)")
+ @Property(description="Request timeouts (in ms). If the timeout elapses, a Timeout (runtime) exception will be thrown")
protected long timeout=60000;
@Property(description="Number of milliseconds to wait for reconciliation responses from all current members")

0 comments on commit 2d4b6d3

Please sign in to comment.