Skip to content

Commit

Permalink
IGNITE-10285 Fixed U.doInParallel may lead to deadlock - Fixes #5404.
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
  • Loading branch information
dgovorukhin committed Nov 19, 2018
1 parent a63a81a commit acfdcda
Show file tree
Hide file tree
Showing 2 changed files with 257 additions and 20 deletions.
Expand Up @@ -10782,40 +10782,92 @@ public static <T, R> Collection<R> doInParallel(
) throws IgniteCheckedException, IgniteInterruptedCheckedException {
if(srcDatas.isEmpty())
return Collections.emptyList();

int[] batchSizes = calculateOptimalBatchSizes(parallelismLvl, srcDatas.size());

List<List<T>> batches = new ArrayList<>(batchSizes.length);
List<Batch<T, R>> batches = new ArrayList<>(batchSizes.length);

// Set for sharing batches between executor and current thread.
// If executor cannot perform immediately, we will execute task in the current thread.
Set<Batch<T, R>> sharedBatchesSet = new GridConcurrentHashSet<>(batchSizes.length);

Iterator<T> iterator = srcDatas.iterator();

for (int batchSize : batchSizes) {
List<T> batch = new ArrayList<>(batchSize);
for (int idx = 0; idx < batchSizes.length; idx++) {
int batchSize = batchSizes[idx];

Batch<T, R> batch = new Batch<>(batchSize);

for (int i = 0; i < batchSize; i++)
batch.add(iterator.next());
batch.addTask(iterator.next());

batches.add(batch);
}

List<Future<Collection<R>>> consumerFutures = batches.stream()
.filter(batch -> !batch.isEmpty())
.map(batch -> executorSvc.submit(() -> {
Collection<R> results = new ArrayList<>(batch.size());
batches = batches.stream()
.filter(batch -> !batch.tasks.isEmpty())
// Add to set only after check that batch is not empty.
.peek(sharedBatchesSet::add)
// Setup future in batch for waiting result.
.peek(batch -> batch.future = executorSvc.submit(() -> {
// Batch was stolen by the main stream.
if (!sharedBatchesSet.remove(batch)) {
return null;
}

Collection<R> results = new ArrayList<>(batch.tasks.size());

for (T item : batch)
for (T item : batch.tasks)
results.add(operation.accept(item));

return results;
}))
.collect(Collectors.toList());

Throwable error =null;
Throwable error = null;

// Stealing jobs if executor is busy and cannot process task immediately.
// Perform batches in a current thread.
for (Batch<T, R> batch : sharedBatchesSet) {
// Executor steal task.
if (!sharedBatchesSet.remove(batch))
continue;

Collection<R> res = new ArrayList<>(batch.tasks.size());

try {
for (T item : batch.tasks)
res.add(operation.accept(item));

batch.result(res);
}
catch (IgniteCheckedException e) {
batch.result(e);
}
}

// Final result collection.
Collection<R> results = new ArrayList<>(srcDatas.size());

for (Future<Collection<R>> future : consumerFutures) {
for (Batch<T, R> batch: batches) {
try {
results.addAll(future.get());
Throwable err = batch.error;

if (err != null) {
if (error == null)
error = err;
else
error.addSuppressed(err);

continue;
}

Collection<R> res = batch.result();

if (res != null)
results.addAll(res);
else
assert error != null;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -10852,6 +10904,60 @@ public static <T, R> Collection<R> doInParallel(
return results;
}

/**
* The batch of tasks with a batch index in global array.
*/
private static class Batch<T,R> {
/** List tasks. */
private final List<T> tasks;

/** */
private Collection<R> result;

/** */
private Throwable error;

/** */
private Future<Collection<R>> future;

/**
* @param batchSize Batch size.
*/
private Batch(int batchSize) {
this.tasks = new ArrayList<>(batchSize);
}

/**
* @param task Add task.
*/
public void addTask(T task){
tasks.add(task);
}

/**
* @param res Setup results for tasks.
*/
public void result(Collection<R> res) {
this.result = res;
}

/**
* @param e Throwable if task was completed with error.
*/
public void result(Throwable e) {
this.error = e;
}

/**
* Get tasks results.
*/
public Collection<R> result() throws ExecutionException, InterruptedException {
assert future != null;

return result != null ? result : future.get();
}
}

/**
* Split number of tasks into optimized batches.
* @param parallelismLvl Level of parallelism.
Expand Down
Expand Up @@ -44,20 +44,27 @@
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand All @@ -70,7 +77,9 @@
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;

import static java.util.Arrays.asList;
import static org.junit.Assert.assertArrayEquals;

/**
Expand Down Expand Up @@ -334,7 +343,7 @@ private SelfReferencedJob(Ignite ignite) throws IgniteCheckedException {

arr = new SelfReferencedJob[] {this, this};

col = Arrays.asList(this, this, this);
col = asList(this, this, this);

newContext();

Expand Down Expand Up @@ -875,10 +884,10 @@ public void testIsOldestNodeVersionAtLeast() {
TcpDiscoveryNode node250ts = new TcpDiscoveryNode();
node250ts.version(v250ts);

assertTrue(U.isOldestNodeVersionAtLeast(v240, Arrays.asList(node240, node241, node250, node250ts)));
assertFalse(U.isOldestNodeVersionAtLeast(v241, Arrays.asList(node240, node241, node250, node250ts)));
assertTrue(U.isOldestNodeVersionAtLeast(v250, Arrays.asList(node250, node250ts)));
assertTrue(U.isOldestNodeVersionAtLeast(v250ts, Arrays.asList(node250, node250ts)));
assertTrue(U.isOldestNodeVersionAtLeast(v240, asList(node240, node241, node250, node250ts)));
assertFalse(U.isOldestNodeVersionAtLeast(v241, asList(node240, node241, node250, node250ts)));
assertTrue(U.isOldestNodeVersionAtLeast(v250, asList(node250, node250ts)));
assertTrue(U.isOldestNodeVersionAtLeast(v250ts, asList(node250, node250ts)));
}

/**
Expand All @@ -892,7 +901,7 @@ public void testDoInParallel() throws Throwable {
try {
IgniteUtils.doInParallel(3,
executorService,
Arrays.asList(1, 2, 3),
asList(1, 2, 3),
i -> {
try {
barrier.await(1, TimeUnit.SECONDS);
Expand Down Expand Up @@ -920,7 +929,7 @@ public void testDoInParallelBatch() {
try {
IgniteUtils.doInParallel(2,
executorService,
Arrays.asList(1, 2, 3),
asList(1, 2, 3),
i -> {
try {
barrier.await(400, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -988,6 +997,128 @@ public void testDoInParallelResultsOrder() throws IgniteCheckedException {
}
}

/**
* Test parallel execution steal job.
*/
public void testDoInParallelWithStealingJob() throws IgniteCheckedException {
// Pool size should be less that input data collection.
ExecutorService executorService = Executors.newFixedThreadPool(1);

CountDownLatch mainThreadLatch = new CountDownLatch(1);
CountDownLatch poolThreadLatch = new CountDownLatch(1);

// Busy one thread from the pool.
executorService.submit(new Runnable() {
@Override
public void run() {
try {
poolThreadLatch.await();
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
}
});

List<Integer> data = asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);

AtomicInteger taskProcessed = new AtomicInteger();

long threadId = Thread.currentThread().getId();

AtomicInteger curThreadCnt = new AtomicInteger();
AtomicInteger poolThreadCnt = new AtomicInteger();

Collection<Integer> res = U.doInParallel(10,
executorService,
data,
new IgniteThrowableConsumer<Integer, Integer>() {
@Override public Integer accept(Integer cnt) throws IgniteInterruptedCheckedException {
// Release thread in pool in the middle of range.
if (taskProcessed.getAndIncrement() == (data.size() / 2) - 1) {
poolThreadLatch.countDown();

try {
// Await thread in thread pool complete task.
mainThreadLatch.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();

throw new IgniteInterruptedCheckedException(e);
}
}

// Increment if executed in current thread.
if (Thread.currentThread().getId() == threadId)
curThreadCnt.incrementAndGet();
else {
poolThreadCnt.incrementAndGet();

if (taskProcessed.get() == data.size())
mainThreadLatch.countDown();
}

return -cnt;
}
});

Assert.assertEquals(curThreadCnt.get() + poolThreadCnt.get(), data.size());
Assert.assertEquals(5, curThreadCnt.get());
Assert.assertEquals(5, poolThreadCnt.get());
Assert.assertEquals(asList(0, -1, -2, -3, -4, -5, -6, -7, -8, -9), res);
}

/**
* Test parallel execution steal job.
*/
public void testDoInParallelWithStealingJobRunTaskInExecutor() throws Exception {
// Pool size should be less that input data collection.
ExecutorService executorService = Executors.newFixedThreadPool(2);

Future<?> f1 = executorService.submit(()-> runTask(executorService));
Future<?> f2 = executorService.submit(()-> runTask(executorService));
Future<?> f3 = executorService.submit(()-> runTask(executorService));

f1.get();
f2.get();
f3.get();
}

/**
*
* @param executorService Executor service.
*/
private void runTask(ExecutorService executorService) {
List<Integer> data = asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);

long threadId = Thread.currentThread().getId();

AtomicInteger curThreadCnt = new AtomicInteger();

Collection<Integer> res;

try {
res = U.doInParallel(10,
executorService,
data,
new IgniteThrowableConsumer<Integer, Integer>() {
@Override public Integer accept(Integer cnt) {
if (Thread.currentThread().getId() == threadId)
curThreadCnt.incrementAndGet();

return -cnt;
}
});
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}

Assert.assertTrue(curThreadCnt.get() > 0);
Assert.assertEquals(asList(0, -1, -2, -3, -4, -5, -6, -7, -8, -9), res);
}

/**
* Template method to test parallel execution
* @param executorService ExecutorService.
Expand Down Expand Up @@ -1030,7 +1161,7 @@ public void testDoInParallelException() {
IgniteUtils.doInParallel(
1,
executorService,
Arrays.asList(1, 2, 3),
asList(1, 2, 3),
i -> {
if (Integer.valueOf(1).equals(i))
throw new IgniteCheckedException(expectedException);
Expand Down

0 comments on commit acfdcda

Please sign in to comment.