Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed race condition on bulkhead. #5747

Merged
merged 3 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package io.helidon.nima.faulttolerance;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import io.helidon.common.LazyValue;
Expand All @@ -31,6 +30,8 @@
* Implementation of {@code Async}. Default executor accessed from {@link FaultTolerance#executor()}.
*/
class AsyncImpl implements Async {
private static final System.Logger LOGGER = System.getLogger(AsyncImpl.class.getName());

private final LazyValue<? extends ExecutorService> executor;
private final CompletableFuture<Async> onStart;

Expand All @@ -45,12 +46,19 @@ class AsyncImpl implements Async {

@Override
public <T> CompletableFuture<T> invoke(Supplier<T> supplier) {
AtomicBoolean mayInterrupt = new AtomicBoolean(false);
AtomicReference<Future<?>> ourFuture = new AtomicReference<>();
CompletableFuture<T> result = new CompletableFuture<>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
mayInterrupt.set(mayInterruptIfRunning);
return super.cancel(mayInterruptIfRunning);
Future<?> toCancel = ourFuture.get();
if (toCancel == null) {
// cancelled before the future was assigned - this should not happen, as we do
// not escape this method before that
LOGGER.log(System.Logger.Level.WARNING, "Failed to cancel future, it is not yet available.");
return false;
} else {
return toCancel.cancel(mayInterruptIfRunning);
}
}
};
Future<?> future = executor.get().submit(() -> {
Expand All @@ -67,12 +75,8 @@ public boolean cancel(boolean mayInterruptIfRunning) {
result.completeExceptionally(throwable);
}
});
result.exceptionally(t -> {
if (t instanceof CancellationException) {
future.cancel(mayInterrupt.get());
}
return null;
});
ourFuture.set(future);

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

Expand All @@ -37,6 +38,7 @@
class BulkheadImpl implements Bulkhead {
private static final System.Logger LOGGER = System.getLogger(BulkheadImpl.class.getName());

private final Lock inProgressLock;
private final Semaphore inProgress;
private final String name;
private final BarrierQueue queue;
Expand All @@ -53,6 +55,7 @@ class BulkheadImpl implements Bulkhead {
this.queue = builder.queueLength() > 0
? new BlockingQueue(builder.queueLength())
: new ZeroCapacityQueue();
this.inProgressLock = new ReentrantLock(true);
}

@Override
Expand All @@ -62,22 +65,53 @@ public String name() {

@Override
public <T> T invoke(Supplier<? extends T> supplier) {
// we need to hold the lock until we decide what to do with this request
// cannot release it in between attempts, as that would give window for another thread to change the state
inProgressLock.lock();

// execute immediately if semaphore can be acquired
if (inProgress.tryAcquire()) {
boolean acquired;
try {
acquired = inProgress.tryAcquire();
} catch (Throwable t) {
inProgressLock.unlock();
throw t;
}
if (acquired) {
inProgressLock.unlock(); // we managed to get a semaphore permit, in progress lock can be released for now
if (LOGGER.isLoggable(Level.DEBUG)) {
LOGGER.log(Level.DEBUG, name + " invoke immediate " + supplier);
}
return execute(supplier);
}

if (queue.isFull()) {
boolean full;
try {
full = queue.isFull();
} catch (Throwable t) {
inProgressLock.unlock();
throw t;
}
if (full) {
inProgressLock.unlock(); // this request will fail, release lock
callsRejected.incrementAndGet();
throw new BulkheadException("Bulkhead queue \"" + name + "\" is full");
}

try {
// block current thread until barrier is retracted
listeners.forEach(l -> l.enqueueing(supplier));
queue.enqueueAndWaitOn(supplier);
Barrier barrier;
try {
listeners.forEach(l -> l.enqueueing(supplier));
barrier = queue.enqueue(supplier);
} finally {
inProgressLock.unlock(); // we have enqueued, now we can wait
}

if (barrier == null) {
throw new BulkheadException("Bulkhead queue \"" + name + "\" is full");
}
barrier.waitOn();

// unblocked so we can proceed with execution
listeners.forEach(l -> l.dequeued(supplier));
Expand Down Expand Up @@ -143,9 +177,14 @@ private <T> T execute(Supplier<? extends T> supplier) {
throw toRuntimeException(throwable);
} finally {
concurrentExecutions.decrementAndGet();
boolean dequeued = queue.dequeueAndRetract();
if (!dequeued) {
inProgress.release(); // nothing dequeued, one more permit
inProgressLock.lock();
try {
boolean dequeued = queue.dequeueAndRetract();
if (!dequeued) {
inProgress.release(); // nothing dequeued, one more permit
}
} finally {
inProgressLock.unlock();
}
}
}
Expand Down Expand Up @@ -182,11 +221,9 @@ private interface BarrierQueue {
* Enqueue supplier and block thread on barrier.
*
* @param supplier the supplier
* @return {@code true} if supplier was enqueued or {@code false} otherwise
* @throws ExecutionException if exception encountered while blocked
* @throws InterruptedException if blocking is interrupted
* @return barrier if supplier was enqueued or null otherwise
*/
boolean enqueueAndWaitOn(Supplier<?> supplier) throws ExecutionException, InterruptedException;
Barrier enqueue(Supplier<?> supplier);

/**
* Dequeue supplier and retract its barrier.
Expand Down Expand Up @@ -220,8 +257,9 @@ public boolean isFull() {
}

@Override
public boolean enqueueAndWaitOn(Supplier<?> supplier) {
return false;
public Barrier enqueue(Supplier<?> supplier) {
// never enqueue, should always fail execution if permits are not available
return null;
}

@Override
Expand Down Expand Up @@ -273,19 +311,13 @@ public boolean isFull() {
}

@Override
public boolean enqueueAndWaitOn(Supplier<?> supplier) throws ExecutionException, InterruptedException {
Barrier barrier;
public Barrier enqueue(Supplier<?> supplier) {
lock.lock();
try {
barrier = enqueue(supplier);
return doEnqueue(supplier);
} finally {
lock.unlock();
}
if (barrier != null) {
barrier.waitOn();
return true;
}
return false;
}

@Override
Expand Down Expand Up @@ -318,7 +350,7 @@ private Barrier dequeue() {
return supplier == null ? null : map.remove(supplier);
}

private Barrier enqueue(Supplier<?> supplier) {
private Barrier doEnqueue(Supplier<?> supplier) {
boolean added = queue.offer(supplier);
return added ? map.computeIfAbsent(supplier, s -> new Barrier()) : null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;

class AsyncTest {
private static final long WAIT_TIMEOUT_MILLIS = 2000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import io.helidon.logging.common.LogConfig;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

import static java.lang.System.Logger.Level.INFO;
import static java.lang.System.Logger.Level.TRACE;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -169,7 +170,7 @@ void testBulkheadQueue() throws InterruptedException, ExecutionException, java.u
}
}

@Test
@RepeatedTest(100)
void testBulkheadWithError() throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
// Create bulkhead of 1 with a queue of 1
Bulkhead bulkhead = Bulkhead.builder()
Expand All @@ -185,13 +186,16 @@ void testBulkheadWithError() throws InterruptedException, ExecutionException, ja
Task inProgress = new Task(0);
CompletableFuture<?> inProgressFuture = Async.invokeStatic(
() -> bulkhead.invoke(inProgress::run));
CompletableFuture<?> failedFuture = Async.invokeStatic(
() -> bulkhead.invoke(() -> { throw new IllegalStateException(); }));

// Verify completion of inProgress task
if (!inProgress.waitUntilStarted(WAIT_TIMEOUT_MILLIS)) {
fail("Task inProgress never started");
}

// as we use an async to submit to bulkhead, we should wait until the first task is submitted
CompletableFuture<?> failedFuture = Async.invokeStatic(
() -> bulkhead.invoke(() -> { throw new IllegalStateException(); }));

inProgress.unblock();
inProgressFuture.get(WAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);

Expand Down Expand Up @@ -219,7 +223,7 @@ private static class Task {
}

int run() {
LOGGER.log(INFO, "Task " + index + " running on thread " + Thread.currentThread().getName());
LOGGER.log(TRACE, "Task " + index + " running on thread " + Thread.currentThread().getName());

started.countDown();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.is;
Expand All @@ -47,6 +48,7 @@ class TracingPropagationTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);

@Test
@Disabled // intermittently failing on pipeline, issue 5754
void testTracingSuccess() throws ExecutionException, InterruptedException {
MockTracer mockTracer = new MockTracer();

Expand Down