Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ static boolean registerMBean(
* The peer belongs to the current configuration, should start as a follower or listener
*/
private void startAsPeer(RaftPeerRole newRole) {
Object reason = "";
final Object reason;
if (newRole == RaftPeerRole.FOLLOWER) {
reason = "startAsFollower";
setRole(RaftPeerRole.FOLLOWER, reason);
Expand Down Expand Up @@ -485,8 +485,8 @@ void groupRemove(boolean deleteDirectory, boolean renameDirectory) {
break;
} catch (NoSuchFileException e) {
LOG.warn("{}: Some file does not exist {}", getMemberId(), dir, e);
} catch (Exception ignored) {
LOG.error("{}: Failed to remove RaftStorageDirectory {}", getMemberId(), dir, ignored);
} catch (Exception e) {
LOG.error("{}: Failed to remove RaftStorageDirectory {}", getMemberId(), dir, e);
break;
}
}
Expand Down Expand Up @@ -514,45 +514,45 @@ public void close() {
LOG.info("{}: shutdown", getMemberId());
try {
jmxAdapter.unregister();
} catch (Exception ignored) {
LOG.warn("{}: Failed to un-register RaftServer JMX bean", getMemberId(), ignored);
} catch (Exception e) {
LOG.warn("{}: Failed to un-register RaftServer JMX bean", getMemberId(), e);
}
try {
role.shutdownFollowerState();
} catch (Exception ignored) {
LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), ignored);
} catch (Exception e) {
LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), e);
}
try{
role.shutdownLeaderElection();
} catch (Exception ignored) {
LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), ignored);
} catch (Exception e) {
LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), e);
}
try{
role.shutdownLeaderState(true);
} catch (Exception ignored) {
LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(), ignored);
} catch (Exception e) {
LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(), e);
}
try{
state.close();
} catch (Exception ignored) {
LOG.warn("{}: Failed to close state", getMemberId(), ignored);
} catch (Exception e) {
LOG.warn("{}: Failed to close state", getMemberId(), e);
}
try {
leaderElectionMetrics.unregister();
raftServerMetrics.unregister();
RaftServerMetricsImpl.removeRaftServerMetrics(getMemberId());
} catch (Exception ignored) {
LOG.warn("{}: Failed to unregister metric", getMemberId(), ignored);
} catch (Exception e) {
LOG.warn("{}: Failed to unregister metric", getMemberId(), e);
}
try {
ConcurrentUtils.shutdownAndWait(clientExecutor);
} catch (Exception ignored) {
LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", ignored);
} catch (Exception e) {
LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", e);
}
try {
ConcurrentUtils.shutdownAndWait(serverExecutor);
} catch (Exception ignored) {
LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", ignored);
} catch (Exception e) {
LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", e);
}
});
}
Expand Down Expand Up @@ -740,11 +740,14 @@ RaftClientReply newExceptionReply(RaftClientRequest request, RaftException excep
.build();
}

private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request) {
return checkLeaderState(request, null);
}

/**
* @return null if the server is in leader state.
*/
private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry,
boolean isWrite) {
private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry) {
try {
assertGroup(request.getRequestorId(), request.getRaftGroupId());
} catch (GroupMismatchException e) {
Expand All @@ -766,7 +769,7 @@ private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest re
return RetryCacheImpl.failWithReply(reply, entry);
}

if (isWrite && isSteppingDown()) {
if (!request.isReadOnly() && isSteppingDown()) {
final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId() + " is stepping down");
final RaftClientReply reply = newExceptionReply(request, lsde);
return RetryCacheImpl.failWithReply(reply, entry);
Expand All @@ -790,10 +793,9 @@ NotLeaderException generateNotLeaderException() {
return new NotLeaderException(getMemberId(), conf.getPeer(leaderId), peers);
}

LifeCycle.State assertLifeCycleState(Set<LifeCycle.State> expected) throws ServerNotReadyException {
return lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException(
getMemberId() + " is not in " + expected + ": current state is " + c),
expected);
void assertLifeCycleState(Set<LifeCycle.State> expected) throws ServerNotReadyException {
lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException(
getMemberId() + " is not in " + expected + ": current state is " + c), expected);
}

void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws GroupMismatchException {
Expand All @@ -814,11 +816,10 @@ private CompletableFuture<RaftClientReply> appendTransaction(
request.getClientId(), request, context, cacheEntry);

assertLifeCycleState(LifeCycle.States.RUNNING);
CompletableFuture<RaftClientReply> reply;

final PendingRequest pending;
synchronized (this) {
reply = checkLeaderState(request, cacheEntry, true);
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, cacheEntry);
if (reply != null) {
return reply;
}
Expand Down Expand Up @@ -890,75 +891,67 @@ public CompletableFuture<RaftClientReply> submitClientRequestAsync(
LOG.debug("{}: receive client request({})", getMemberId(), request);
final Timekeeper timer = raftServerMetrics.getClientRequestTimer(request.getType());
final Optional<Timekeeper.Context> timerContext = Optional.ofNullable(timer).map(Timekeeper::time);

final CompletableFuture<RaftClientReply> replyFuture;

if (request.is(TypeCase.STALEREAD)) {
replyFuture = staleReadAsync(request);
} else if (request.is(TypeCase.READ)) {
replyFuture = readAsync(request);
} else {
// first check the server's leader state
CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null,
!request.is(TypeCase.READ) && !request.is(TypeCase.WATCH));
if (reply != null) {
return reply;
return replyFuture(request).whenComplete((clientReply, exception) -> {
timerContext.ifPresent(Timekeeper.Context::stop);
if (exception != null || clientReply.getException() != null) {
raftServerMetrics.incFailedRequestCount(request.getType());
}
});
}

// let the state machine handle read-only request from client
RaftClientRequest.Type type = request.getType();
if (type.is(TypeCase.MESSAGESTREAM)) {
if (type.getMessageStream().getEndOfRequest()) {
final CompletableFuture<RaftClientRequest> f = streamEndOfRequestAsync(request);
if (f.isCompletedExceptionally()) {
return f.thenApply(r -> null);
}
request = f.join();
type = request.getType();
}
}
private CompletableFuture<RaftClientReply> replyFuture(RaftClientRequest request) throws IOException {
final TypeCase type = request.getType().getTypeCase();
switch (type) {
case STALEREAD:
return staleReadAsync(request);
case READ:
return readAsync(request);
case WATCH:
return watchAsync(request);
case MESSAGESTREAM:
return messageStreamAsync(request);
case WRITE:
case FORWARD:
return writeAsync(request);
default:
throw new IllegalStateException("Unexpected request type: " + type + ", request=" + request);
}
}

if (type.is(TypeCase.WATCH)) {
replyFuture = watchAsync(request);
} else if (type.is(TypeCase.MESSAGESTREAM)) {
replyFuture = streamAsync(request);
} else {
// query the retry cache
final RetryCacheImpl.CacheQueryResult queryResult = retryCache.queryCache(ClientInvocationId.valueOf(request));
final CacheEntry cacheEntry = queryResult.getEntry();
if (queryResult.isRetry()) {
// if the previous attempt is still pending or it succeeded, return its
// future
replyFuture = cacheEntry.getReplyFuture();
} else {
// TODO: this client request will not be added to pending requests until
// later which means that any failure in between will leave partial state in
// the state machine. We should call cancelTransaction() for failed requests
final TransactionContextImpl context = (TransactionContextImpl) stateMachine.startTransaction(
filterDataStreamRaftClientRequest(request));
if (context.getException() != null) {
final StateMachineException e = new StateMachineException(getMemberId(), context.getException());
final RaftClientReply exceptionReply = newExceptionReply(request, e);
cacheEntry.failWithReply(exceptionReply);
replyFuture = CompletableFuture.completedFuture(exceptionReply);
} else {
replyFuture = appendTransaction(request, context, cacheEntry);
}
}
}
private CompletableFuture<RaftClientReply> writeAsync(RaftClientRequest request) throws IOException {
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
return reply;
}

final RaftClientRequest.Type type = request.getType();
replyFuture.whenComplete((clientReply, exception) -> {
timerContext.ifPresent(Timekeeper.Context::stop);
if (exception != null || clientReply.getException() != null) {
raftServerMetrics.incFailedRequestCount(type);
}
});
return replyFuture;
// query the retry cache
final RetryCacheImpl.CacheQueryResult queryResult = retryCache.queryCache(ClientInvocationId.valueOf(request));
final CacheEntry cacheEntry = queryResult.getEntry();
if (queryResult.isRetry()) {
// return the cached future.
return cacheEntry.getReplyFuture();
}
// TODO: this client request will not be added to pending requests until
// later which means that any failure in between will leave partial state in
// the state machine. We should call cancelTransaction() for failed requests
final TransactionContextImpl context = (TransactionContextImpl) stateMachine.startTransaction(
filterDataStreamRaftClientRequest(request));
if (context.getException() != null) {
final StateMachineException e = new StateMachineException(getMemberId(), context.getException());
final RaftClientReply exceptionReply = newExceptionReply(request, e);
cacheEntry.failWithReply(exceptionReply);
return CompletableFuture.completedFuture(exceptionReply);
}

return appendTransaction(request, context, cacheEntry);
}

private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request) {
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
return reply;
}

return role.getLeaderState()
.map(ls -> ls.addWatchReqeust(request))
.orElseGet(() -> CompletableFuture.completedFuture(
Expand Down Expand Up @@ -1003,7 +996,7 @@ private CompletableFuture<Long> getReadIndex(RaftClientRequest request, LeaderSt
private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request) {
if (request.getType().getRead().getPreferNonLinearizable()
|| readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
return reply;
}
Expand Down Expand Up @@ -1053,7 +1046,21 @@ private RaftClientReply readException2Reply(RaftClientRequest request, Throwable
}
}

private CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request) {
private CompletableFuture<RaftClientReply> messageStreamAsync(RaftClientRequest request) throws IOException {
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
return reply;
}

if (request.getType().getMessageStream().getEndOfRequest()) {
final CompletableFuture<RaftClientRequest> f = streamEndOfRequestAsync(request);
if (f.isCompletedExceptionally()) {
return f.thenApply(r -> null);
}
// the message stream has ended and the request become a WRITE request
return replyFuture(f.join());
}

return role.getLeaderState()
.map(ls -> ls.streamAsync(request))
.orElseGet(() -> CompletableFuture.completedFuture(
Expand Down Expand Up @@ -1146,7 +1153,7 @@ CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipReq
assertGroup(request.getRequestorId(), request.getRaftGroupId());

synchronized (this) {
CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
return reply;
}
Expand Down Expand Up @@ -1254,15 +1261,15 @@ public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfiguration
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(request.getRequestorId(), request.getRaftGroupId());

CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, true);
CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
return reply;
}

final SetConfigurationRequest.Arguments arguments = request.getArguments();
final PendingRequest pending;
synchronized (this) {
reply = checkLeaderState(request, null, false);
reply = checkLeaderState(request);
if (reply != null) {
return reply;
}
Expand Down