diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index fc6be6d1c4b27..f1f4253d793df 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -10782,40 +10782,92 @@ public static Collection doInParallel( ) throws IgniteCheckedException, IgniteInterruptedCheckedException { if(srcDatas.isEmpty()) return Collections.emptyList(); + int[] batchSizes = calculateOptimalBatchSizes(parallelismLvl, srcDatas.size()); - List> batches = new ArrayList<>(batchSizes.length); + List> batches = new ArrayList<>(batchSizes.length); + + // Set for sharing batches between executor and current thread. + // If executor cannot perform immediately, we will execute task in the current thread. + Set> sharedBatchesSet = new GridConcurrentHashSet<>(batchSizes.length); Iterator iterator = srcDatas.iterator(); - for (int batchSize : batchSizes) { - List batch = new ArrayList<>(batchSize); + for (int idx = 0; idx < batchSizes.length; idx++) { + int batchSize = batchSizes[idx]; + + Batch batch = new Batch<>(batchSize); for (int i = 0; i < batchSize; i++) - batch.add(iterator.next()); + batch.addTask(iterator.next()); batches.add(batch); } - List>> consumerFutures = batches.stream() - .filter(batch -> !batch.isEmpty()) - .map(batch -> executorSvc.submit(() -> { - Collection results = new ArrayList<>(batch.size()); + batches = batches.stream() + .filter(batch -> !batch.tasks.isEmpty()) + // Add to set only after check that batch is not empty. + .peek(sharedBatchesSet::add) + // Setup future in batch for waiting result. + .peek(batch -> batch.future = executorSvc.submit(() -> { + // Batch was stolen by the main stream. + if (!sharedBatchesSet.remove(batch)) { + return null; + } + + Collection results = new ArrayList<>(batch.tasks.size()); - for (T item : batch) + for (T item : batch.tasks) results.add(operation.accept(item)); return results; })) .collect(Collectors.toList()); - Throwable error =null; + Throwable error = null; + + // Stealing jobs if executor is busy and cannot process task immediately. + // Perform batches in a current thread. + for (Batch batch : sharedBatchesSet) { + // Executor steal task. + if (!sharedBatchesSet.remove(batch)) + continue; + + Collection res = new ArrayList<>(batch.tasks.size()); + + try { + for (T item : batch.tasks) + res.add(operation.accept(item)); + + batch.result(res); + } + catch (IgniteCheckedException e) { + batch.result(e); + } + } + // Final result collection. Collection results = new ArrayList<>(srcDatas.size()); - for (Future> future : consumerFutures) { + for (Batch batch: batches) { try { - results.addAll(future.get()); + Throwable err = batch.error; + + if (err != null) { + if (error == null) + error = err; + else + error.addSuppressed(err); + + continue; + } + + Collection res = batch.result(); + + if (res != null) + results.addAll(res); + else + assert error != null; } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -10852,6 +10904,60 @@ public static Collection doInParallel( return results; } + /** + * The batch of tasks with a batch index in global array. + */ + private static class Batch { + /** List tasks. */ + private final List tasks; + + /** */ + private Collection result; + + /** */ + private Throwable error; + + /** */ + private Future> future; + + /** + * @param batchSize Batch size. + */ + private Batch(int batchSize) { + this.tasks = new ArrayList<>(batchSize); + } + + /** + * @param task Add task. + */ + public void addTask(T task){ + tasks.add(task); + } + + /** + * @param res Setup results for tasks. + */ + public void result(Collection res) { + this.result = res; + } + + /** + * @param e Throwable if task was completed with error. + */ + public void result(Throwable e) { + this.error = e; + } + + /** + * Get tasks results. + */ + public Collection result() throws ExecutionException, InterruptedException { + assert future != null; + + return result != null ? result : future.get(); + } + } + /** * Split number of tasks into optimized batches. * @param parallelismLvl Level of parallelism. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java index 2f52b2183ef36..39bb21eae72e7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java @@ -44,20 +44,27 @@ import java.util.List; import java.util.Random; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobAdapter; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.util.lang.GridPeerDeployAware; +import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; @@ -70,7 +77,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.testframework.junits.common.GridCommonTest; import org.jetbrains.annotations.Nullable; +import org.junit.Assert; +import static java.util.Arrays.asList; import static org.junit.Assert.assertArrayEquals; /** @@ -334,7 +343,7 @@ private SelfReferencedJob(Ignite ignite) throws IgniteCheckedException { arr = new SelfReferencedJob[] {this, this}; - col = Arrays.asList(this, this, this); + col = asList(this, this, this); newContext(); @@ -875,10 +884,10 @@ public void testIsOldestNodeVersionAtLeast() { TcpDiscoveryNode node250ts = new TcpDiscoveryNode(); node250ts.version(v250ts); - assertTrue(U.isOldestNodeVersionAtLeast(v240, Arrays.asList(node240, node241, node250, node250ts))); - assertFalse(U.isOldestNodeVersionAtLeast(v241, Arrays.asList(node240, node241, node250, node250ts))); - assertTrue(U.isOldestNodeVersionAtLeast(v250, Arrays.asList(node250, node250ts))); - assertTrue(U.isOldestNodeVersionAtLeast(v250ts, Arrays.asList(node250, node250ts))); + assertTrue(U.isOldestNodeVersionAtLeast(v240, asList(node240, node241, node250, node250ts))); + assertFalse(U.isOldestNodeVersionAtLeast(v241, asList(node240, node241, node250, node250ts))); + assertTrue(U.isOldestNodeVersionAtLeast(v250, asList(node250, node250ts))); + assertTrue(U.isOldestNodeVersionAtLeast(v250ts, asList(node250, node250ts))); } /** @@ -892,7 +901,7 @@ public void testDoInParallel() throws Throwable { try { IgniteUtils.doInParallel(3, executorService, - Arrays.asList(1, 2, 3), + asList(1, 2, 3), i -> { try { barrier.await(1, TimeUnit.SECONDS); @@ -920,7 +929,7 @@ public void testDoInParallelBatch() { try { IgniteUtils.doInParallel(2, executorService, - Arrays.asList(1, 2, 3), + asList(1, 2, 3), i -> { try { barrier.await(400, TimeUnit.MILLISECONDS); @@ -988,6 +997,128 @@ public void testDoInParallelResultsOrder() throws IgniteCheckedException { } } + /** + * Test parallel execution steal job. + */ + public void testDoInParallelWithStealingJob() throws IgniteCheckedException { + // Pool size should be less that input data collection. + ExecutorService executorService = Executors.newFixedThreadPool(1); + + CountDownLatch mainThreadLatch = new CountDownLatch(1); + CountDownLatch poolThreadLatch = new CountDownLatch(1); + + // Busy one thread from the pool. + executorService.submit(new Runnable() { + @Override + public void run() { + try { + poolThreadLatch.await(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + } + }); + + List data = asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + AtomicInteger taskProcessed = new AtomicInteger(); + + long threadId = Thread.currentThread().getId(); + + AtomicInteger curThreadCnt = new AtomicInteger(); + AtomicInteger poolThreadCnt = new AtomicInteger(); + + Collection res = U.doInParallel(10, + executorService, + data, + new IgniteThrowableConsumer() { + @Override public Integer accept(Integer cnt) throws IgniteInterruptedCheckedException { + // Release thread in pool in the middle of range. + if (taskProcessed.getAndIncrement() == (data.size() / 2) - 1) { + poolThreadLatch.countDown(); + + try { + // Await thread in thread pool complete task. + mainThreadLatch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + } + + // Increment if executed in current thread. + if (Thread.currentThread().getId() == threadId) + curThreadCnt.incrementAndGet(); + else { + poolThreadCnt.incrementAndGet(); + + if (taskProcessed.get() == data.size()) + mainThreadLatch.countDown(); + } + + return -cnt; + } + }); + + Assert.assertEquals(curThreadCnt.get() + poolThreadCnt.get(), data.size()); + Assert.assertEquals(5, curThreadCnt.get()); + Assert.assertEquals(5, poolThreadCnt.get()); + Assert.assertEquals(asList(0, -1, -2, -3, -4, -5, -6, -7, -8, -9), res); + } + + /** + * Test parallel execution steal job. + */ + public void testDoInParallelWithStealingJobRunTaskInExecutor() throws Exception { + // Pool size should be less that input data collection. + ExecutorService executorService = Executors.newFixedThreadPool(2); + + Future f1 = executorService.submit(()-> runTask(executorService)); + Future f2 = executorService.submit(()-> runTask(executorService)); + Future f3 = executorService.submit(()-> runTask(executorService)); + + f1.get(); + f2.get(); + f3.get(); + } + + /** + * + * @param executorService Executor service. + */ + private void runTask(ExecutorService executorService) { + List data = asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + long threadId = Thread.currentThread().getId(); + + AtomicInteger curThreadCnt = new AtomicInteger(); + + Collection res; + + try { + res = U.doInParallel(10, + executorService, + data, + new IgniteThrowableConsumer() { + @Override public Integer accept(Integer cnt) { + if (Thread.currentThread().getId() == threadId) + curThreadCnt.incrementAndGet(); + + return -cnt; + } + }); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + Assert.assertTrue(curThreadCnt.get() > 0); + Assert.assertEquals(asList(0, -1, -2, -3, -4, -5, -6, -7, -8, -9), res); + } + /** * Template method to test parallel execution * @param executorService ExecutorService. @@ -1030,7 +1161,7 @@ public void testDoInParallelException() { IgniteUtils.doInParallel( 1, executorService, - Arrays.asList(1, 2, 3), + asList(1, 2, 3), i -> { if (Integer.valueOf(1).equals(i)) throw new IgniteCheckedException(expectedException);