From 1831b6269d728a03a5908d9c062ad8adc2f3eaed Mon Sep 17 00:00:00 2001 From: Mark Taylor Date: Thu, 12 Sep 2019 16:55:38 +0100 Subject: [PATCH] util: add Splittable and concurrency utilities New interface util.Splittable forms the basis for concurrent processing based on the Java7 fork/join framework. The classes introduced here have some resemblance to the Java8 streams framework, but some differences too: implementing a Splittable and using a SplitProcessor and SplitCollector to perform (potentially) concurrent reduction operations does not require you to phrase your processing using internal iteration, and generally gives you more control over how the processing is done. It's less smart, but more predictable and controllable than streams, which I've found in practice to be too magical to work with. But both get their concurrency gains from ForkJoinPools. --- .../uk/ac/starlink/util/SplitCollector.java | 48 ++ .../main/uk/ac/starlink/util/SplitPolicy.java | 122 +++++ .../uk/ac/starlink/util/SplitProcessor.java | 508 ++++++++++++++++++ .../main/uk/ac/starlink/util/Splittable.java | 48 ++ .../ac/starlink/util/SplittableSequence.java | 21 + 5 files changed, 747 insertions(+) create mode 100644 util/src/main/uk/ac/starlink/util/SplitCollector.java create mode 100644 util/src/main/uk/ac/starlink/util/SplitPolicy.java create mode 100644 util/src/main/uk/ac/starlink/util/SplitProcessor.java create mode 100644 util/src/main/uk/ac/starlink/util/Splittable.java create mode 100644 util/src/main/uk/ac/starlink/util/SplittableSequence.java diff --git a/util/src/main/uk/ac/starlink/util/SplitCollector.java b/util/src/main/uk/ac/starlink/util/SplitCollector.java new file mode 100644 index 0000000000..170f21af79 --- /dev/null +++ b/util/src/main/uk/ac/starlink/util/SplitCollector.java @@ -0,0 +1,48 @@ +package uk.ac.starlink.util; + +/** + * Defines an object that can collect values from a Splittable into + * an accumulator. + * The order of the split items is not considered significant, + * so splittables may be presented to accumulator instances in any order. + * + * @param accumulator type + * @param splittable content type + * + * @author Mark Taylor + * @since 12 Sep 2019 + */ +public interface SplitCollector,A> { + + /** + * Returns a new accumulator into which results can be collected. + * Accumulator instances may only be used from one thread at + * any one time. + * + * @return new accumulator + */ + A createAccumulator(); + + /** + * Consumes the content of a splittable, collecting results + * into the supplied accumulator. + * This method may not be called concurrently on the same splittable. + * + * @param splittable splittable object + * @param accumulator accumulator + */ + void accumulate( S splittable, A accumulator ); + + /** + * Combines the content of two accumulators. + * The returned value may or may not be the same object as + * one of the input values. + * The input values should not be used following this call. + * The sequence of the input values is not significant. + * + * @param acc1 one input accumulator + * @param acc2 other input accumulator + * @return accumulator containing the combined result of the inputs + */ + A combine( A acc1, A acc2 ); +} diff --git a/util/src/main/uk/ac/starlink/util/SplitPolicy.java b/util/src/main/uk/ac/starlink/util/SplitPolicy.java new file mode 100644 index 0000000000..84269b1c4f --- /dev/null +++ b/util/src/main/uk/ac/starlink/util/SplitPolicy.java @@ -0,0 +1,122 @@ +package uk.ac.starlink.util; + +import java.util.concurrent.ForkJoinPool; + +/** + * Defines the details of a concurrency policy as used by SplitProcessor. + * + * @author Mark Taylor + * @since 5 Nov 2019 + */ +public class SplitPolicy { + + private final ForkJoinPool fjPool_; + private final int minTaskSize_; + private final short maxTasksPerCore_; + + /** + * Default value for minimum subtask size. + * The current value for this is conservative, in that it will not + * result in concurrency for relatively small jobs. + * The (JDK8) {@link java.util.concurrent.ForkJoinTask} javadocs recommend + * as a rough rule of thumb a value in the range of 1e2-1e4 steps, + * so this possibly could be reduced. + */ + public static final int DFLT_MIN_TASK_SIZE = 100_000; + + /** + * Default value for maximum average number of subtasks per core. + * The value of 8 is suggested by the example parallel computation + * framework sketch in the (JDK8) {@link java.util.Spliterator} javadocs. + */ + public static final short DFLT_MAX_TASKS_PER_CORE = 8; + + /** Default splitting policy. */ + public static final SplitPolicy DFLT_POLICY = new SplitPolicy(); + + /** + * Constructs a policy with default configuration. + */ + public SplitPolicy() { + this( null, 0, (short) -1 ); + } + + /** + * Constructs a policy with supplied configuration options. + * + * @param fjPool fork/join pool for execution, + * or null to use the common pol + * @param minTaskSize smallest acceptable size of sub-task + * to split tasks into, or non-positive value + * for default ({@link #DFLT_MIN_TASK_SIZE}) + * @param maxTasksPerCore maximum number of tasks (on average) + * to be executed on each core as a result + * of decomposition, or zero for no limit, + * or negative value for default limit + * ({@link #DFLT_MAX_TASKS_PER_CORE}) + */ + public SplitPolicy( ForkJoinPool fjPool, int minTaskSize, + short maxTasksPerCore ) { + fjPool_ = fjPool == null ? ForkJoinPool.commonPool() : fjPool; + minTaskSize_ = minTaskSize > 0 ? minTaskSize + : DFLT_MIN_TASK_SIZE; + maxTasksPerCore_ = maxTasksPerCore >= 0 ? maxTasksPerCore + : DFLT_MAX_TASKS_PER_CORE; + } + + /** + * Returns the ForkJoinPool used by this policy. + * + * @return forkjoinpool + */ + public ForkJoinPool getForkJoinPool() { + return fjPool_; + } + + /** + * Returns the smallest task size used by this policy. + * + * @return smallest acceptable size of sub-task to split tasks into + */ + public int getMinTaskSize() { + return minTaskSize_; + } + + /** + * Returns the maximum number of tasks (on average) to be executed + * on each core as a result of decomposition, or zero for no limit. + * + * @return maximum tasks per core, or zero + */ + public short getMaxTasksPerCore() { + return maxTasksPerCore_; + } + + /** + * Indicates whether an attempt should be made to split a splittable + * in order to process it. + * If it's too small for instance, false will be returned. + * + * @param content splittable + * @return true iff processing will try to split content + */ + public boolean willAttemptSplit( Splittable content ) { + long size = content.splittableSize(); + return size >= 0 && size >= 2 * getMinTaskSize(); + } + + @Override + public String toString() { + return new StringBuffer() + .append( "SplitPolicy(" ) + .append( "parallelism=" ) + .append( fjPool_.getParallelism() ) + .append( ", " ) + .append( "minTaskSize=" ) + .append( getMinTaskSize() ) + .append( ", " ) + .append( "maxTasksPerCore=" ) + .append( getMaxTasksPerCore() ) + .toString(); + } +} diff --git a/util/src/main/uk/ac/starlink/util/SplitProcessor.java b/util/src/main/uk/ac/starlink/util/SplitProcessor.java new file mode 100644 index 0000000000..3615b90c8b --- /dev/null +++ b/util/src/main/uk/ac/starlink/util/SplitProcessor.java @@ -0,0 +1,508 @@ +package uk.ac.starlink.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.CountedCompleter; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.RecursiveTask; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +/** + * Invokes processing on Splittables. + * + *

Instances of this class perform a similar function to that + * implemented in the Stream class from the Java 8 Streams framework. + * There is much less functionality and cleverness here, + * but the behaviour is predictable and permits external iteration + * rather than imposing internal iteration. + * + * @author Mark Taylor + * @since 9 Sep 2019 + */ +public abstract class SplitProcessor> { + + private final String name_; + + private static final Logger logger_ = + Logger.getLogger( "uk.ac.starlink.util" ); + + /** + * Constructor. + * + * @param name name of this instance + */ + protected SplitProcessor( String name ) { + name_ = name; + } + + /** + * Indicates whether this processor will attempt to split the given + * splittable object when processing it. + * If for instance its size is known to be too small for this processor's + * policy, false will be returned. + * + * @param content splittable object + * @return true iff splitting will be attempted during processing + */ + public abstract boolean willAttemptSplit( S content ); + + /** + * Collects content from a splittable object into an accumulator, + * as defined by a provided collector. + * + * @param collector defines collection semantics + * @param content input data + * @return accumulator into which content has been collected + */ + public abstract A collect( SplitCollector collector, S content ); + + @Override + public String toString() { + return name_; + } + + /** + * Returns a SplitProcessor instance that performs sequential processing. + * + * @return new processor + */ + public static > SplitProcessor + createSequentialProcessor() { + return new SequentialProcessor( "Sequential" ); + } + + /** + * Returns a SplitProcessor instance that works in parallel, + * with one accumulator created for every forked subtask. + * + * @param policy parallel execution policy, or null for default + * @return new processor + */ + public static > SplitProcessor + createBasicParallelProcessor( SplitPolicy policy ) { + if ( policy == null ) { + policy = new SplitPolicy(); + } + return new BasicParallelProcessor( "BasicParallel", policy ); + } + + /** + * Returns a SplitProcessor instance that works in parallel, + * with a pool of reusable accumulators shared by forked subtasks. + * This is expected to be beneficial if accumulator construction or + * combination is computationally expensive. However, it's probably + * not necessary when working with accumulators that simply have + * a large memory footprint, since multiple accumulators ought not + * to be in simultaneous use either way. + * + * @param policy parallel execution policy, or null for default + * @return new processor + */ + public static > SplitProcessor + createPoolParallelProcessor( SplitPolicy policy ) { + if ( policy == null ) { + policy = new SplitPolicy(); + } + return new PoolParallelProcessor( "PoolParallel", policy ); + } + + /** + * Attempts to split a splittable if its size is not already too small. + * + * @param content splittable + * @param minSize minimum acceptable size of a split task + * @return if split has taken place, the other half of the + * supplied content; otherwise null + */ + private static > S maybeSplit( S content, + int minSize ) { + long size = content.splittableSize(); + return size >= 0 && size >= 2 * minSize + ? content.split() + : null; + } + + /** + * Returns the minimum size into which a given splittable's subtasks + * should be divided for execution. + * + * @param content top-level (so far unsplit) splittable object + * @param policy concurrency policy + * @return maximum advised subtask size + */ + private static int getMinSplitSize( Splittable content, + SplitPolicy policy ) { + int minSize = policy.getMinTaskSize(); + short maxPerCore = policy.getMaxTasksPerCore(); + if ( maxPerCore > 0 ) { + long size = content.splittableSize(); + if ( size >= 0 ) { + int ncore = policy.getForkJoinPool().getParallelism(); + long limit = size / ( maxPerCore * ncore ); + return limit > Integer.MAX_VALUE + ? Integer.MAX_VALUE + : Math.max( (int) limit, minSize ); + } + } + return minSize; + } + + /** + * SplitProcessor that processes content sequentially. + */ + private static class SequentialProcessor> + extends SplitProcessor { + + /** + * Constructor. + * + * @param name instance name + */ + SequentialProcessor( String name ) { + super( name ); + } + + public boolean willAttemptSplit( S content ) { + return false; + } + + public A collect( SplitCollector collector, S content ) { + A acc = collector.createAccumulator(); + collector.accumulate( content, acc ); + return acc; + } + } + + /** + * SplitProcessor that works in parallel, using a new accumulator + * instance for every forked task. + */ + private static class BasicParallelProcessor> + extends SplitProcessor { + + private final SplitPolicy policy_; + + /** + * Constructor. + * + * @param name instance name + * @param policy defines details of parallelisation policy + */ + BasicParallelProcessor( String name, SplitPolicy policy ) { + super( name ); + policy_ = policy; + } + + public boolean willAttemptSplit( S content ) { + return policy_.willAttemptSplit( content ); + } + + public A collect( SplitCollector collector, S content ) { + AtomicInteger nfork = new AtomicInteger(); + int minSize = getMinSplitSize( content, policy_ ); + ForkJoinTask task = + new BasicSplitTask( null, collector, content, minSize, + nfork ); + long start = System.nanoTime(); + A acc = policy_.getForkJoinPool().invoke( task ); + String msg = new StringBuffer() + .append( this ) + .append( " - " ) + .append( "tasks: " ) + .append( nfork ) + .append( ", time: " ) + .append( ( System.nanoTime() - start ) / 1000000 ) + .toString(); + logger_.info( msg ); + return acc; + } + + /** + * ForkJoinTask used for content collection in which + * a new accumulator is created every time one is used. + */ + private static class BasicSplitTask,A> + extends CountedCompleter { + + private final SplitCollector collector_; + private final S content_; + private final int minSize_; + private final AtomicInteger nfork_; + private BasicSplitTask sibling_; + private A result_; + + /** + * Constructor. + * + * @param parent owner task + * @param collector collector + * @param content input data + * @param minSize minimum size of split tasks + * @param nfork task fork counter + */ + BasicSplitTask( BasicSplitTask parent, + SplitCollector collector, S content, + int minSize, AtomicInteger nfork ) { + super( parent ); + collector_ = collector; + content_ = content; + minSize_ = minSize; + nfork_ = nfork; + } + + public void compute() { + S content1 = maybeSplit( content_, minSize_ ); + if ( content1 != null ) { + BasicSplitTask t0 = + new BasicSplitTask( this, collector_, content_, + minSize_, nfork_ ); + BasicSplitTask t1 = + new BasicSplitTask( this, collector_, content1, + minSize_, nfork_ ); + t0.sibling_ = t1; + t1.sibling_ = t0; + setPendingCount( 1 ); + t1.fork(); + t0.compute(); + } + else { + A accumulator = collector_.createAccumulator(); + collector_.accumulate( content_, accumulator ); + result_ = accumulator; + if ( nfork_ != null ) { + nfork_.incrementAndGet(); + } + tryComplete(); + } + } + + public void onCompletion( CountedCompleter caller ) { + if ( caller != this ) { + @SuppressWarnings("unchecked") + BasicSplitTask child1 = (BasicSplitTask) caller; + BasicSplitTask child2 = child1.sibling_; + if ( child2 == null || child2.result_ == null ) { + result_ = child1.result_; + } + else { + result_ = collector_.combine( child1.result_, + child2.result_ ); + } + } + } + + public A getRawResult() { + return result_; + } + } + } + + /** + * SplitProcessor that works in parallel, with a pool of reusable + * accumulators shared by forked tasks. + */ + private static class PoolParallelProcessor> + extends SplitProcessor { + + private final SplitPolicy policy_; + + /** + * Constructor. + * + * @param name instance name + * @param policy defines details of parallelisation policy + */ + PoolParallelProcessor( String name, SplitPolicy policy ) { + super( name ); + policy_ = policy; + } + + public boolean willAttemptSplit( S content ) { + return policy_.willAttemptSplit( content ); + } + + public A collect( SplitCollector collector, S content ) { + Collection accPool = new ArrayList(); + AtomicInteger nfork = new AtomicInteger(); + int minSize = getMinSplitSize( content, policy_ ); + long t0 = System.nanoTime(); + ForkJoinTask accTask = + new PoolSplitTask( null, collector, content, minSize, + accPool, nfork ); + ForkJoinPool fjPool = policy_.getForkJoinPool(); + fjPool.invoke( accTask ); + int npool = accPool.size(); + long t1 = System.nanoTime(); + ForkJoinTask joinTask = + new AccumulatorJoinRecursiveTask( collector, accPool ); + accPool = null; + A result = fjPool.invoke( joinTask ); + long t2 = System.nanoTime(); + String msg = new StringBuffer() + .append( this ) + .append( " - " ) + .append( "tasks: " ) + .append( nfork ) + .append( ", pool: " ) + .append( npool ) + .append( ", time: " ) + .append( ( t1 - t0 ) / 1000000 ) + .append( " + " ) + .append( ( t2 - t1 ) / 1000000 ) + .toString(); + logger_.info( msg ); + return result; + } + + /** + * ForkJoinTask used for content collection in which a pool of + * reusable accumulators is used. + */ + private static class PoolSplitTask,A> + extends CountedCompleter { + + private final SplitCollector collector_; + private final S content_; + private final int minSize_; + private final Collection accPool_; + private final AtomicInteger nfork_; + + /** + * Constructor. + * + * @param parent owner task + * @param collector collector + * @param content input data + * @param minSize minimum size of split tasks + * @param accPool pool of accumulators, + * will be expanded as required + * @param nfork task fork counter + */ + PoolSplitTask( PoolSplitTask parent, + SplitCollector collector, S content, + int minSize, Collection accPool, + AtomicInteger nfork ) { + super( parent ); + collector_ = collector; + content_ = content; + minSize_ = minSize; + accPool_ = accPool; + nfork_ = nfork; + } + + public void compute() { + S content1 = maybeSplit( content_, minSize_ ); + if ( content1 != null ) { + PoolSplitTask t0 = + new PoolSplitTask( this, collector_, content_, + minSize_, accPool_, nfork_ ); + PoolSplitTask t1 = + new PoolSplitTask( this, collector_, content1, + minSize_, accPool_, nfork_ ); + setPendingCount( 1 ); + t1.fork(); + t0.compute(); + } + else { + A accumulator = getAccumulator(); + collector_.accumulate( content_, accumulator ); + releaseAccumulator( accumulator ); + if ( nfork_ != null ) { + nfork_.incrementAndGet(); + } + tryComplete(); + } + } + + /** + * Returns an accumulator ready for use. + * Objects obtained using this method should be released when + * no longer required. + * May be called from any thread. + * + * @return accumulator not in use by any other thread + */ + private A getAccumulator() { + synchronized ( accPool_ ) { + if ( accPool_.size() == 0 ) { + accPool_.add( collector_.createAccumulator() ); + } + A item = accPool_.iterator().next(); + boolean removed = accPool_.remove( item ); + assert removed; + return item; + } + } + + /** + * Releases an accumulator previously acquired from the pool. + * It must not be used for further accumulation following this call. + * May be called from any thread. + * + * @param accumulator + */ + private void releaseAccumulator( A accumulator ) { + synchronized ( accPool_ ) { + + /* I used to have an assertion here + * assert !accPool_.contains(accumulator). + * But don't do that - the pool may contain distinct + * entries that are equal by Object.equals(). */ + accPool_.add( accumulator ); + } + } + } + + /** + * ForkJoinTask that can combine multiple accumulators together. + * This implementation uses a RecursiveTask doing divide and conquer. + */ + private static class AccumulatorJoinRecursiveTask + extends RecursiveTask { + + private final SplitCollector collector_; + private final Collection accList_; + + /** + * Constructor. + * + * @param collector collector + * @param accPool list of populated accumulators to be combined + */ + AccumulatorJoinRecursiveTask( SplitCollector collector, + Collection accList ) { + collector_ = collector; + accList_ = accList; + } + + public A compute() { + int n = accList_.size(); + if ( n > 2 ) { + Pair> pair = Pair.splitCollection( accList_ ); + Collection sub0 = pair.getItem1(); + Collection sub1 = pair.getItem2(); + AccumulatorJoinRecursiveTask t0 = + new AccumulatorJoinRecursiveTask( collector_, sub0 ); + AccumulatorJoinRecursiveTask t1 = + new AccumulatorJoinRecursiveTask( collector_, sub1 ); + t1.fork(); + return collector_.combine( t0.compute(), t1.join() ); + } + else if ( n == 2 ) { + Iterator it = accList_.iterator(); + return collector_.combine( it.next(), it.next() ); + } + else if ( n == 1 ) { + Iterator it = accList_.iterator(); + return it.next(); + } + else { + throw new AssertionError(); + } + } + } + } +} diff --git a/util/src/main/uk/ac/starlink/util/Splittable.java b/util/src/main/uk/ac/starlink/util/Splittable.java new file mode 100644 index 0000000000..dad9dad5e1 --- /dev/null +++ b/util/src/main/uk/ac/starlink/util/Splittable.java @@ -0,0 +1,48 @@ +package uk.ac.starlink.util; + +/** + * Defines an object which can be split into two for subdivided processing. + * + *

This does a similar job to {@link java.util.Spliterator}, + * but it imposes no assumptions about the form of the split objects, + * for instance that they form a sequence that can be iterated over + * internally, which idiom forms the basis of the Java 8 streams framework. + * Collections or sequences based on Splittable can use external + * iteration, which allows better control in some cases. + * + * @author Mark Taylor + * @since 12 Sep 2019 + */ +public interface Splittable> { + + /** + * Attempts to partition this object into two halves, + * ideally of similar size. + * If a non-null value is returned, then the content previously + * contained by this object is now split between this object and + * the returned object. If for any reason a split is not carried out, + * null is returned. + * + *

Following a successful call, the two parts may be processed + * in different threads. + * + * @return other half of this splittable, or null + * @see java.util.Spliterator#trySplit + */ + S split(); + + /** + * Provides an estimate of the number of processable items in this object. + * A processable item is not a well-defined quantity, + * but it should generally be something that can be processed fast. + * For instance, if this object represents a collection of collections, + * the value that should be returned is the total number of elements + * rather than the number of collections. + * + *

If no estimate for the size is available, + * a negative value should be returned. + * + * @return approximate size, or negative value if not known + */ + long splittableSize(); +} diff --git a/util/src/main/uk/ac/starlink/util/SplittableSequence.java b/util/src/main/uk/ac/starlink/util/SplittableSequence.java new file mode 100644 index 0000000000..a99ebc60fe --- /dev/null +++ b/util/src/main/uk/ac/starlink/util/SplittableSequence.java @@ -0,0 +1,21 @@ +package uk.ac.starlink.util; + +/** + * Utility sub-interface of Splittable suitable for use with + * splittable data that can be iterated over. + * + * @author Mark Taylor + * @since 13 Sep 2019 + */ +public interface SplittableSequence> + extends Splittable { + + /** + * Move to the next item in the sequence. + * Must be called before accessing each item, including the first one. + * Returns false when there are no items left. + * + * @return true iff the current sequence element has data + */ + boolean next(); +}