Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,6 @@ private static class ResultConjunctFuture<T> extends ConjunctFuture<Collection<T
/** The total number of futures in the conjunction. */
private final int numTotal;

/** The next free index in the results arrays. */
private final AtomicInteger nextIndex = new AtomicInteger(0);

/** The number of futures in the conjunction that are already complete. */
private final AtomicInteger numCompleted = new AtomicInteger(0);

Expand All @@ -575,12 +572,10 @@ private static class ResultConjunctFuture<T> extends ConjunctFuture<Collection<T
/** The function that is attached to all futures in the conjunction. Once a future
* is complete, this function tracks the completion or fails the conjunct.
*/
private void handleCompletedFuture(T value, Throwable throwable) {
private void handleCompletedFuture(int index, T value, Throwable throwable) {
if (throwable != null) {
completeExceptionally(throwable);
} else {
int index = nextIndex.getAndIncrement();

results[index] = value;

if (numCompleted.incrementAndGet() == numTotal) {
Expand All @@ -598,8 +593,11 @@ private void handleCompletedFuture(T value, Throwable throwable) {
complete(Collections.emptyList());
}
else {
int counter = 0;
for (CompletableFuture<? extends T> future : resultFutures) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it common to only accept a List if the order of elements is relevant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not restrict the user on the type of collection he wants to use. The contract is that the result collection will have the same order as the input collection.

future.whenComplete(this::handleCompletedFuture);
final int index = counter;
counter++;
future.whenComplete((value, throwable) -> handleCompletedFuture(index, value, throwable));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public void shutdown() {
}
}

@VisibleForTesting
public boolean isShutdown() {
synchronized (lock) {
return shutdown;
}
}

private void fatalError(Throwable error) {
LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);

Expand Down Expand Up @@ -171,7 +178,11 @@ private void addContender(EmbeddedLeaderElectionService service, LeaderContender
service.contender = contender;
service.running = true;

updateLeader();
updateLeader().whenComplete((aVoid, throwable) -> {
if (throwable != null) {
fatalError(throwable);
}
});
}
catch (Throwable t) {
fatalError(t);
Expand Down Expand Up @@ -210,7 +221,11 @@ private void removeContender(EmbeddedLeaderElectionService service) {
currentLeaderSessionId = null;
}

updateLeader();
updateLeader().whenComplete((aVoid, throwable) -> {
if (throwable != null) {
fatalError(throwable);
}
});
}
catch (Throwable t) {
fatalError(t);
Expand Down Expand Up @@ -280,11 +295,12 @@ private CompletableFuture<Void> updateLeader() {

currentLeaderSessionId = leaderSessionId;
currentLeaderProposed = leaderService;
currentLeaderProposed.isLeader = true;

LOG.info("Proposing leadership to contender {} @ {}",
leaderService.contender, leaderService.contender.getAddress());

return execute(new GrantLeadershipCall(leaderService, leaderSessionId, LOG));
return execute(new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
}
} else {
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -373,7 +389,8 @@ CompletableFuture<Void> revokeLeadership() {
}

LOG.info("Revoking leadership of {}.", leaderService.contender);
CompletableFuture<Void> revokeLeadershipCallFuture = execute(new RevokeLeadershipCall(leaderService));
leaderService.isLeader = false;
CompletableFuture<Void> revokeLeadershipCallFuture = execute(new RevokeLeadershipCall(leaderService.contender));

CompletableFuture<Void> notifyAllListenersFuture = notifyAllListeners(null, null);

Expand Down Expand Up @@ -506,51 +523,44 @@ public void run() {

private static class GrantLeadershipCall implements Runnable {

private final EmbeddedLeaderElectionService leaderElectionService;
private final LeaderContender contender;
private final UUID leaderSessionId;
private final Logger logger;

GrantLeadershipCall(
EmbeddedLeaderElectionService leaderElectionService,
LeaderContender contender,
UUID leaderSessionId,
Logger logger) {

this.leaderElectionService = checkNotNull(leaderElectionService);
this.contender = checkNotNull(contender);
this.leaderSessionId = checkNotNull(leaderSessionId);
this.logger = checkNotNull(logger);
}

@Override
public void run() {
leaderElectionService.isLeader = true;

final LeaderContender contender = leaderElectionService.contender;

try {
contender.grantLeadership(leaderSessionId);
}
catch (Throwable t) {
logger.warn("Error granting leadership to contender", t);
contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
leaderElectionService.isLeader = false;
}
}
}

private static class RevokeLeadershipCall implements Runnable {

@Nonnull
private final EmbeddedLeaderElectionService leaderElectionService;
private final LeaderContender contender;

RevokeLeadershipCall(@Nonnull EmbeddedLeaderElectionService leaderElectionService) {
this.leaderElectionService = leaderElectionService;
RevokeLeadershipCall(@Nonnull LeaderContender contender) {
this.contender = contender;
}

@Override
public void run() {
leaderElectionService.isLeader = false;

leaderElectionService.contender.revokeLeadership();
contender.revokeLeadership();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,35 @@

package org.apache.flink.runtime.concurrent;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
import org.apache.flink.util.TestLogger;

import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* Tests for the {@link ConjunctFuture} and {@link FutureUtils.WaitingConjunctFuture}.
* Tests for the {@link ConjunctFuture} and its sub classes.
*/
@RunWith(Parameterized.class)
public class ConjunctFutureTest extends TestLogger {
Expand Down Expand Up @@ -193,23 +199,33 @@ public void testConjunctFutureFailureOnSuccessive() throws Exception {
}

/**
* Tests that the conjunct future returns upon completion the collection of all future values.
* Tests that the conjunct future returns upon completion the collection of all future values
* in the same order in which the futures were inserted.
*/
@Test
public void testConjunctFutureValue() throws ExecutionException, InterruptedException {
java.util.concurrent.CompletableFuture<Integer> future1 = java.util.concurrent.CompletableFuture.completedFuture(1);
java.util.concurrent.CompletableFuture<Long> future2 = java.util.concurrent.CompletableFuture.completedFuture(2L);
java.util.concurrent.CompletableFuture<Double> future3 = new java.util.concurrent.CompletableFuture<>();
public void testConjunctFutureValue() throws Exception {
final int numberFutures = 10;

ConjunctFuture<Collection<Number>> result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3));
final List<CompletableFuture<Integer>> futures = new ArrayList<>(numberFutures);
for (int i = 0; i < numberFutures; i++) {
futures.add(new CompletableFuture<>());
}

assertFalse(result.isDone());
ConjunctFuture<Collection<Number>> result = FutureUtils.combineAll(futures);

future3.complete(.1);
final List<Tuple2<Integer, CompletableFuture<Integer>>> shuffledFutures = IntStream.range(0, futures.size())
.mapToObj(index -> Tuple2.of(index, futures.get(index)))
.collect(Collectors.toList());
Collections.shuffle(shuffledFutures);

assertTrue(result.isDone());
for (Tuple2<Integer, CompletableFuture<Integer>> shuffledFuture : shuffledFutures) {
assertThat(result.isDone(), is(false));
shuffledFuture.f1.complete(shuffledFuture.f0);
}

assertThat(result.isDone(), is(true));

assertThat(result.get(), IsIterableContainingInAnyOrder.<Number>containsInAnyOrder(1, 2L, .1));
assertThat(result.get(), is(equalTo(IntStream.range(0, numberFutures).boxed().collect(Collectors.toList()))));
}

@Test
Expand Down
Loading