Skip to content

Commit

Permalink
cells: drop ListenableFuture from CellNucleus startup logic
Browse files Browse the repository at this point in the history
Less dependency on ListenableFuture and guava.

Acked-by: Lea Morschel
Target: master
Require-book: no
Require-notes: no
  • Loading branch information
kofemann committed Apr 6, 2023
1 parent 7c14b06 commit 777aa7b
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions modules/cells/src/main/java/dmg/cells/nucleus/CellNucleus.java
Expand Up @@ -5,17 +5,13 @@
import static com.google.common.collect.Iterables.consumingIterable;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Comparator.comparingLong;
import static org.dcache.util.CompletableFutures.fromListenableFuture;
import static org.dcache.util.MathUtils.addWithInfinity;
import static org.dcache.util.MathUtils.subWithInfinity;

import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import dmg.cells.zookeeper.CellCuratorFramework;
Expand All @@ -38,6 +34,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -54,6 +51,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.curator.framework.CuratorFramework;
Expand Down Expand Up @@ -154,7 +152,7 @@ private enum State {
/**
* Task starting the cell.
*/
private ListenableFuture<Void> _startup;
private CompletableFuture<Void> _startup;

private Pinboard _pinboard;

Expand Down Expand Up @@ -442,22 +440,22 @@ public void sendMessage(CellMessage msg, boolean locally, boolean remotely,
*/
public CellMessage sendAndWait(CellMessage envelope, long timeout)
throws SerializationException, NoRouteToCellException, InterruptedException, ExecutionException {
final SettableFuture<CellMessage> future = SettableFuture.create();
final CompletableFuture<CellMessage> future = new CompletableFuture<>();
sendMessage(envelope, true, true,
true, new CellMessageAnswerable() {
@Override
public void answerArrived(CellMessage request, CellMessage answer) {
future.set(answer);
future.complete(answer);
}

@Override
public void exceptionArrived(CellMessage request, Exception exception) {
future.setException(exception);
future.completeExceptionally(exception);
}

@Override
public void answerTimedOut(CellMessage request) {
future.set(null);
future.complete(null);
}
}, directExecutor(), timeout);
try {
Expand Down Expand Up @@ -880,14 +878,16 @@ public CompletableFuture<Void> start() {
try {
checkState(_state == State.NEW);
_state = State.PRE_STARTUP;
_startup = _messageExecutor.submit(wrapLoggingContext(this::doStart));
_startup = CompletableFuture.runAsync(wrapLoggingContext(this::doStart), _messageExecutor);
} finally {
_lifeCycleMonitor.leave();
}
return fromListenableFuture(Futures.nonCancellationPropagating(_startup));

// the `cancel` on newly returned future is not canceling the _startup one.
return _startup.thenApply(Function.identity());
}

private Void doStart() throws Exception {
private void doStart() {
try {
checkState(_state == State.PRE_STARTUP);
_timeoutTask = _timer.scheduleWithFixedDelay(
Expand All @@ -912,9 +912,8 @@ private Void doStart() throws Exception {
} catch (Throwable e) {
setState(State.FAILED);
__cellGlue.kill(CellNucleus.this);
throw e;
throw new CompletionException(e);
}
return null;
}

private void addCallbackTimeout(CellLock lock) {
Expand Down

0 comments on commit 777aa7b

Please sign in to comment.