Skip to content

Commit

Permalink
Remove usage of thenComposeAsync due to risk of deadlock.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 11, 2015
1 parent 736e9d5 commit 4ba7e3d
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 32 deletions.
4 changes: 2 additions & 2 deletions raft/src/main/java/net/kuujo/copycat/raft/log/Compactor.java
Expand Up @@ -126,7 +126,7 @@ synchronized CompletableFuture<Void> compact() {
return compaction;
}
return null;
}, context).thenComposeAsync(c -> {
}, context).thenCompose(c -> {
if (compaction != null) {
return compaction.run(log.segments).thenRun(() -> {
synchronized (this) {
Expand All @@ -136,7 +136,7 @@ synchronized CompletableFuture<Void> compact() {
});
}
return CompletableFuture.completedFuture(null);
}, context);
});
return compactFuture;
}

Expand Down
Expand Up @@ -49,9 +49,15 @@ public Type type() {

@Override
CompletableFuture<Void> run(SegmentManager segments) {
LOGGER.info("compacting the log");
setRunning(true);
return compactSegments(getActiveSegments(segments).iterator(), segments, new CompletableFuture<>()).thenRun(() -> setRunning(false));
CompletableFuture<Void> future = new CompletableFuture<>();
context.execute(() -> {
LOGGER.info("Compacting the log");
setRunning(true);
compactSegments(getActiveSegments(segments).iterator(), segments, future).whenComplete((result, error) -> {
setRunning(false);
});
});
return future;
}

/**
Expand Down
Expand Up @@ -48,9 +48,15 @@ public Type type() {

@Override
CompletableFuture<Void> run(SegmentManager segments) {
LOGGER.info("compacting the log");
setRunning(true);
return compactLevels(getCompactSegments(segments).iterator(), segments, new CompletableFuture<>()).thenRun(() -> setRunning(false));
CompletableFuture<Void> future = new CompletableFuture<>();
context.execute(() -> {
LOGGER.info("Compacting the log");
setRunning(true);
compactLevels(getCompactSegments(segments).iterator(), segments, future).whenComplete((result, error) -> {
setRunning(false);
});
});
return future;
}

/**
Expand Down
Expand Up @@ -362,14 +362,15 @@ private CompletableFuture<Void> register() {
* Registers the client.
*/
private CompletableFuture<Void> register(long interval, CompletableFuture<Void> future) {
register(new ArrayList<>(members.members())).whenComplete((result, error) -> {
register(new ArrayList<>(members.members())).whenCompleteAsync((result, error) -> {
threadChecker.checkThread();
if (error == null) {
future.complete(null);
} else {
long nextInterval = Math.min(interval * 2, 5000);
registerTimer = context.schedule(() -> register(nextInterval, future), nextInterval, TimeUnit.MILLISECONDS);
}
});
}, context);
return future;
}

Expand Down Expand Up @@ -517,9 +518,8 @@ private void cancelKeepAliveTimer() {
public CompletableFuture<Void> open() {
openFuture = new CompletableFuture<>();
context.execute(() -> {
register().thenRun(this::startKeepAliveTimer).thenApply(v -> {
register().thenRun(this::startKeepAliveTimer).thenRun(() -> {
open = true;
return this;
});
});
return openFuture;
Expand Down
Expand Up @@ -426,7 +426,11 @@ CompletableFuture<RaftState> transition(Class<? extends AbstractState> state) {
* Joins the cluster.
*/
private CompletableFuture<Void> join() {
return join(100, new CompletableFuture<>());
CompletableFuture<Void> future = new CompletableFuture<>();
context.execute(() -> {
join(100, future);
});
return future;
}

/**
Expand Down Expand Up @@ -611,18 +615,19 @@ public synchronized CompletableFuture<Void> open() {
log.open(context);
transition(PassiveState.class);
}, context)
.thenComposeAsync(v -> join(), context)
.thenCompose(v -> join())
.thenCompose(v -> super.open())
.thenRun(() -> {
.thenRunAsync(() -> {
startHeartbeatTimer();
open = true;
});
}, context);
} else {
return cluster.open().thenRunAsync(() -> {
log.open(context);
transition(FollowerState.class);
open = true;
}, context)
.thenCompose(v -> join())
.thenCompose(v -> super.open());
}
}
Expand All @@ -642,24 +647,30 @@ public synchronized CompletableFuture<Void> close() {
cancelJoinTimer();
cancelHeartbeatTimer();
open = false;
transition(StartState.class)
.thenComposeAsync(v -> super.close(), context)
.thenComposeAsync(v -> leave(), context)
.thenComposeAsync(v -> cluster.close(), context)
.whenCompleteAsync((result, error) -> {
try {
if (log != null) {
log.close();
transition(StartState.class);

super.close().whenCompleteAsync((r1, e1) -> {
leave().whenComplete((r2, e2) -> {
cluster.close().whenCompleteAsync((r3, e3) -> {
try {
if (log != null) {
log.close();
}
} catch (Exception e) {
}
} catch (Exception e) {
}

if (error == null) {
future.complete(null);
} else {
future.completeExceptionally(error);
}
}, context);
if (e1 != null) {
future.completeExceptionally(e1);
} else if (e2 != null) {
future.completeExceptionally(e2);
} else if (e3 != null) {
future.completeExceptionally(e3);
} else {
future.complete(null);
}
}, context);
});
}, context);
});
return future;
}
Expand Down

0 comments on commit 4ba7e3d

Please sign in to comment.