Skip to content

Commit

Permalink
util: add Splittable and concurrency utilities
Browse files Browse the repository at this point in the history
New interface util.Splittable forms the basis for concurrent processing
based on the Java7 fork/join framework.

The classes introduced here have some resemblance to the Java8
streams framework, but some differences too: implementing a
Splittable and using a SplitProcessor and SplitCollector to
perform (potentially) concurrent reduction operations does not
require you to phrase your processing using internal iteration,
and generally gives you more control over how the processing is done.
It's less smart, but more predictable and controllable than streams,
which I've found in practice to be too magical to work with.
But both get their concurrency gains from ForkJoinPools.
  • Loading branch information
mbtaylor committed Nov 5, 2019
1 parent a1abe6a commit 1831b62
Show file tree
Hide file tree
Showing 5 changed files with 747 additions and 0 deletions.
48 changes: 48 additions & 0 deletions util/src/main/uk/ac/starlink/util/SplitCollector.java
@@ -0,0 +1,48 @@
package uk.ac.starlink.util;

/**
* Defines an object that can collect values from a Splittable into
* an accumulator.
* The order of the split items is not considered significant,
* so splittables may be presented to accumulator instances in any order.
*
* @param <A> accumulator type
* @param <S> splittable content type
*
* @author Mark Taylor
* @since 12 Sep 2019
*/
public interface SplitCollector<S extends Splittable<S>,A> {

/**
* Returns a new accumulator into which results can be collected.
* Accumulator instances may only be used from one thread at
* any one time.
*
* @return new accumulator
*/
A createAccumulator();

/**
* Consumes the content of a splittable, collecting results
* into the supplied accumulator.
* This method may not be called concurrently on the same splittable.
*
* @param splittable splittable object
* @param accumulator accumulator
*/
void accumulate( S splittable, A accumulator );

/**
* Combines the content of two accumulators.
* The returned value may or may not be the same object as
* one of the input values.
* The input values should not be used following this call.
* The sequence of the input values is not significant.
*
* @param acc1 one input accumulator
* @param acc2 other input accumulator
* @return accumulator containing the combined result of the inputs
*/
A combine( A acc1, A acc2 );
}
122 changes: 122 additions & 0 deletions util/src/main/uk/ac/starlink/util/SplitPolicy.java
@@ -0,0 +1,122 @@
package uk.ac.starlink.util;

import java.util.concurrent.ForkJoinPool;

/**
* Defines the details of a concurrency policy as used by SplitProcessor.
*
* @author Mark Taylor
* @since 5 Nov 2019
*/
public class SplitPolicy {

private final ForkJoinPool fjPool_;
private final int minTaskSize_;
private final short maxTasksPerCore_;

/**
* Default value for minimum subtask size.
* The current value for this is conservative, in that it will not
* result in concurrency for relatively small jobs.
* The (JDK8) {@link java.util.concurrent.ForkJoinTask} javadocs recommend
* as a rough rule of thumb a value in the range of 1e2-1e4 steps,
* so this possibly could be reduced.
*/
public static final int DFLT_MIN_TASK_SIZE = 100_000;

/**
* Default value for maximum average number of subtasks per core.
* The value of 8 is suggested by the example parallel computation
* framework sketch in the (JDK8) {@link java.util.Spliterator} javadocs.
*/
public static final short DFLT_MAX_TASKS_PER_CORE = 8;

/** Default splitting policy. */
public static final SplitPolicy DFLT_POLICY = new SplitPolicy();

/**
* Constructs a policy with default configuration.
*/
public SplitPolicy() {
this( null, 0, (short) -1 );
}

/**
* Constructs a policy with supplied configuration options.
*
* @param fjPool fork/join pool for execution,
* or null to use the common pol
* @param minTaskSize smallest acceptable size of sub-task
* to split tasks into, or non-positive value
* for default ({@link #DFLT_MIN_TASK_SIZE})
* @param maxTasksPerCore maximum number of tasks (on average)
* to be executed on each core as a result
* of decomposition, or zero for no limit,
* or negative value for default limit
* ({@link #DFLT_MAX_TASKS_PER_CORE})
*/
public SplitPolicy( ForkJoinPool fjPool, int minTaskSize,
short maxTasksPerCore ) {
fjPool_ = fjPool == null ? ForkJoinPool.commonPool() : fjPool;
minTaskSize_ = minTaskSize > 0 ? minTaskSize
: DFLT_MIN_TASK_SIZE;
maxTasksPerCore_ = maxTasksPerCore >= 0 ? maxTasksPerCore
: DFLT_MAX_TASKS_PER_CORE;
}

/**
* Returns the ForkJoinPool used by this policy.
*
* @return forkjoinpool
*/
public ForkJoinPool getForkJoinPool() {
return fjPool_;
}

/**
* Returns the smallest task size used by this policy.
*
* @return smallest acceptable size of sub-task to split tasks into
*/
public int getMinTaskSize() {
return minTaskSize_;
}

/**
* Returns the maximum number of tasks (on average) to be executed
* on each core as a result of decomposition, or zero for no limit.
*
* @return maximum tasks per core, or zero
*/
public short getMaxTasksPerCore() {
return maxTasksPerCore_;
}

/**
* Indicates whether an attempt should be made to split a splittable
* in order to process it.
* If it's too small for instance, false will be returned.
*
* @param content splittable
* @return true iff processing will try to split content
*/
public boolean willAttemptSplit( Splittable<?> content ) {
long size = content.splittableSize();
return size >= 0 && size >= 2 * getMinTaskSize();
}

@Override
public String toString() {
return new StringBuffer()
.append( "SplitPolicy(" )
.append( "parallelism=" )
.append( fjPool_.getParallelism() )
.append( ", " )
.append( "minTaskSize=" )
.append( getMinTaskSize() )
.append( ", " )
.append( "maxTasksPerCore=" )
.append( getMaxTasksPerCore() )
.toString();
}
}

0 comments on commit 1831b62

Please sign in to comment.