Skip to content

Commit

Permalink
Replace failure.get().addSuppressed with failure.accumulateAndGet() (#…
Browse files Browse the repository at this point in the history
…37649)

Also add a test for concurrent incoming failures
  • Loading branch information
javanna authored and jasontedor committed Mar 8, 2020
1 parent 68b6f4d commit 1fa12bd
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ public void onResponse(T element) {
@Override
public void onFailure(Exception e) {
if (failure.compareAndSet(null, e) == false) {
failure.get().addSuppressed(e);
failure.accumulateAndGet(e, (previous, current) -> {
previous.addSuppressed(current);
return previous;
});
}
if (countDown.countDown()) {
delegate.onFailure(failure.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.CoreMatchers.instanceOf;

public class GroupedActionListenerTests extends ESTestCase {

public void testNotifications() throws InterruptedException {
Expand All @@ -55,20 +59,17 @@ public void onFailure(Exception e) {
Thread[] threads = new Thread[numThreads];
CyclicBarrier barrier = new CyclicBarrier(numThreads);
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread() {
@Override
public void run() {
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new AssertionError(e);
}
int c = 0;
while((c = count.incrementAndGet()) <= groupSize) {
listener.onResponse(c-1);
}
threads[i] = new Thread(() -> {
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new AssertionError(e);
}
int c = 0;
while((c = count.incrementAndGet()) <= groupSize) {
listener.onResponse(c-1);
}
};
});
threads[i].start();
}
for (Thread t : threads) {
Expand Down Expand Up @@ -100,11 +101,9 @@ public void onFailure(Exception e) {
excRef.set(e);
}
};
Collection<Integer> defaults = randomBoolean() ? Collections.singletonList(-1) :
Collections.emptyList();
Collection<Integer> defaults = randomBoolean() ? Collections.singletonList(-1) : Collections.emptyList();
int size = randomIntBetween(3, 4);
GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, size,
defaults);
GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, size, defaults);
listener.onResponse(0);
IOException ioException = new IOException();
RuntimeException rtException = new RuntimeException();
Expand All @@ -121,4 +120,23 @@ public void onFailure(Exception e) {
listener.onResponse(1);
assertNull(resRef.get());
}

public void testConcurrentFailures() throws InterruptedException {
AtomicReference<Exception> finalException = new AtomicReference<>();
int numGroups = randomIntBetween(10, 100);
GroupedActionListener<Void> listener = new GroupedActionListener<>(
ActionListener.wrap(r -> {}, finalException::set), numGroups, Collections.emptyList());
ExecutorService executorService = Executors.newFixedThreadPool(numGroups);
for (int i = 0; i < numGroups; i++) {
executorService.submit(() -> listener.onFailure(new IOException()));
}

executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);

Exception exception = finalException.get();
assertNotNull(exception);
assertThat(exception, instanceOf(IOException.class));
assertEquals(numGroups - 1, exception.getSuppressed().length);
}
}

0 comments on commit 1fa12bd

Please sign in to comment.