Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.client;

import org.apache.ratis.client.api.AsyncApi;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;

import java.util.concurrent.CompletableFuture;

/** An RPC interface which extends the user interface {@link AsyncApi}. */
public interface AsyncRpcApi extends AsyncApi {
/**
* Send the given RaftClientRequest asynchronously to the raft service.
* The RaftClientRequest will wrapped as Message in a new RaftClientRequest
* and leader will be decode it from the Message
* @param request The RaftClientRequest.
* @return a future of the reply.
*/
CompletableFuture<RaftClientReply> sendForward(RaftClientRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,4 @@
public interface DataStreamOutputRpc extends DataStreamOutput {
/** Get the future of the header request. */
CompletableFuture<DataStreamReply> getHeaderFuture();

/** Create a transaction asynchronously once the stream data is replicated to all servers */
CompletableFuture<DataStreamReply> startTransactionAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.ratis.client.api.AsyncApi;

import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;

/** Async api implementations. */
class AsyncImpl implements AsyncApi {
class AsyncImpl implements AsyncRpcApi {
private final RaftClientImpl client;

AsyncImpl(RaftClientImpl client) {
Expand Down Expand Up @@ -58,4 +60,10 @@ public CompletableFuture<RaftClientReply> sendStaleRead(Message message, long mi
public CompletableFuture<RaftClientReply> watch(long index, ReplicationLevel replication) {
return UnorderedAsync.send(RaftClientRequest.watchRequestType(index, replication), client);
}

@Override
public CompletableFuture<RaftClientReply> sendForward(RaftClientRequest request) {
final RaftProtos.RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request);
return send(RaftClientRequest.forwardRequestType(), Message.valueOf(proto.toByteString()), null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ static RaftClientRequest.Type toRaftClientRequestType(RaftClientRequestProto p)
return RaftClientRequest.Type.valueOf(p.getWrite());
case DATASTREAM:
return RaftClientRequest.Type.valueOf(p.getDataStream());
case FORWARD:
return RaftClientRequest.Type.valueOf(p.getForward());
case MESSAGESTREAM:
return RaftClientRequest.Type.valueOf(p.getMessageStream());
case READ:
Expand Down Expand Up @@ -140,6 +142,9 @@ static RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request
case DATASTREAM:
b.setDataStream(type.getDataStream());
break;
case FORWARD:
b.setForward(type.getForward());
break;
case MESSAGESTREAM:
b.setMessageStream(type.getMessageStream());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,6 @@ boolean isClosed() {
return closeSupplier.isInitialized();
}

@Override
public CompletableFuture<DataStreamReply> startTransactionAsync() {
return send(Type.START_TRANSACTION);
}

public RaftClientRequest getHeader() {
return header;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
*/
public class RaftClientRequest extends RaftClientMessage {
private static final Type DATA_STREAM_DEFAULT = new Type(DataStreamRequestTypeProto.getDefaultInstance());
private static final Type FORWARD_DEFAULT = new Type(ForwardRequestTypeProto.getDefaultInstance());
private static final Type WRITE_DEFAULT = new Type(WriteRequestTypeProto.getDefaultInstance());
private static final Type WATCH_DEFAULT = new Type(
WatchRequestTypeProto.newBuilder().setIndex(0L).setReplication(ReplicationLevel.MAJORITY).build());
Expand All @@ -45,6 +46,10 @@ public static Type dataStreamRequestType() {
return DATA_STREAM_DEFAULT;
}

public static Type forwardRequestType() {
return FORWARD_DEFAULT;
}

public static Type messageStreamRequestType(long streamId, long messageId, boolean endOfRequest) {
return new Type(MessageStreamRequestTypeProto.newBuilder()
.setStreamId(streamId)
Expand Down Expand Up @@ -79,6 +84,10 @@ public static Type valueOf(DataStreamRequestTypeProto dataStream) {
return DATA_STREAM_DEFAULT;
}

public static Type valueOf(ForwardRequestTypeProto forward) {
return FORWARD_DEFAULT;
}

public static Type valueOf(ReadRequestTypeProto read) {
return READ_DEFAULT;
}
Expand Down Expand Up @@ -117,6 +126,10 @@ private Type(DataStreamRequestTypeProto dataStream) {
this(DATASTREAM, dataStream);
}

private Type(ForwardRequestTypeProto forward) {
this(FORWARD, forward);
}

private Type(MessageStreamRequestTypeProto messageStream) {
this(MESSAGESTREAM, messageStream);
}
Expand Down Expand Up @@ -151,6 +164,11 @@ public DataStreamRequestTypeProto getDataStream() {
return (DataStreamRequestTypeProto)proto;
}

public ForwardRequestTypeProto getForward() {
Preconditions.assertTrue(is(FORWARD));
return (ForwardRequestTypeProto)proto;
}

public MessageStreamRequestTypeProto getMessageStream() {
Preconditions.assertTrue(is(MESSAGESTREAM), () -> "proto = " + proto);
return (MessageStreamRequestTypeProto)proto;
Expand Down Expand Up @@ -190,6 +208,8 @@ public String toString() {
return "RW";
case DATASTREAM:
return "DataStream";
case FORWARD:
return "Forward";
case MESSAGESTREAM:
return toString(getMessageStream());
case READ:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.ratis.netty.server;

import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
Expand Down Expand Up @@ -61,7 +62,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class DataStreamManagement {
public static final Logger LOG = LoggerFactory.getLogger(DataStreamManagement.class);
Expand Down Expand Up @@ -97,16 +97,6 @@ CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request) {
return out.writeAsync(request.slice().nioBuffer(), request.getType() == Type.STREAM_DATA_SYNC);
}

CompletableFuture<DataStreamReply> startTransaction(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx, Executor executor) {
return out.startTransactionAsync().thenApplyAsync(reply -> {
if (reply.isSuccess()) {
ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
}
return reply;
}, executor);
}

CompletableFuture<DataStreamReply> close() {
return out.closeAsync();
}
Expand Down Expand Up @@ -279,18 +269,6 @@ static long close(DataStream stream) {
}
}

static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(
DataStreamRequestByteBuf request, DataStreamReply reply) {
final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
((DataStreamReplyByteBuffer)reply).slice(): null;
return DataStreamReplyByteBuffer.newBuilder()
.setDataStreamPacket(request)
.setBuffer(buffer)
.setSuccess(reply.isSuccess())
.setBytesWritten(reply.getBytesWritten())
.build();
}

static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(
DataStreamRequestByteBuf request, RaftClientReply reply) {
final ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
Expand All @@ -316,41 +294,17 @@ static void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
private CompletableFuture<Void> startTransaction(StreamInfo info, DataStreamRequestByteBuf request,
ChannelHandlerContext ctx) {
try {
return server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
if (reply.isSuccess()) {
ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
} else if (request.getType() == Type.STREAM_CLOSE) {
// if this server is not the leader, forward start transition to the other peers
// there maybe other unexpected reason cause failure except not leader, forwardStartTransaction anyway
forwardStartTransaction(info, request, reply, ctx, executor);
} else if (request.getType() == Type.START_TRANSACTION){
ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
}, executor);
AsyncRpcApi asyncRpcApi = (AsyncRpcApi) (server.getDivision(info.getRequest()
.getRaftGroupId())
.getRaftClient()
.async());
return asyncRpcApi.sendForward(info.request)
.thenAcceptAsync(reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply)), executor);
} catch (IOException e) {
throw new CompletionException(e);
}
}

static void sendLeaderFailedReply(final List<CompletableFuture<DataStreamReply>> results,
DataStreamRequestByteBuf request, RaftClientReply localReply, ChannelHandlerContext ctx) {
// get replies from the results, ignored exceptional replies
final Stream<RaftClientReply> remoteReplies = results.stream()
.filter(r -> !r.isCompletedExceptionally())
.map(CompletableFuture::join)
.map(ClientProtoUtils::getRaftClientReply);

// choose the leader's reply if there is any. Otherwise, use the local reply
final RaftClientReply chosen = Stream.concat(Stream.of(localReply), remoteReplies)
.filter(reply -> reply.getNotLeaderException() == null)
.findAny().orElse(localReply);

// send reply
ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, chosen));
}

static void replyDataStreamException(RaftServer server, Throwable cause, RaftClientRequest raftClientRequest,
DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
final RaftClientReply reply = RaftClientReply.newBuilder()
Expand Down Expand Up @@ -380,22 +334,6 @@ static void sendDataStreamException(Throwable throwable, DataStreamRequestByteBu
}
}

static void forwardStartTransaction(StreamInfo info, DataStreamRequestByteBuf request, RaftClientReply localReply,
ChannelHandlerContext ctx, Executor executor) {
final List<CompletableFuture<DataStreamReply>> results = info.applyToRemotes(
out -> out.startTransaction(request, ctx, executor));

JavaUtils.allOf(results).thenAccept(v -> {
for (CompletableFuture<DataStreamReply> result : results) {
if (result.join().isSuccess()) {
return;
}
}

sendLeaderFailedReply(results, request, localReply, ctx);
});
}

void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
CheckedFunction<RaftClientRequest, List<DataStreamOutputRpc>, IOException> getDataStreamOutput) {
LOG.debug("{}: read {}", this, request);
Expand All @@ -414,22 +352,6 @@ void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
() -> new IllegalStateException("Failed to get StreamInfo for " + request));
}


if (request.getType() == Type.START_TRANSACTION) {
// for peers to start transaction
composeAsync(info.getPrevious(), executor, v -> startTransaction(info, request, ctx))
.whenComplete((v, exception) -> {
try {
if (exception != null) {
replyDataStreamException(server, exception, info.getRequest(), request, ctx);
}
} finally {
buf.release();
}
});
return;
}

final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites;
if (request.getType() == Type.STREAM_HEADER) {
Expand Down
5 changes: 4 additions & 1 deletion ratis-proto/src/main/proto/Raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ message MessageStreamRequestTypeProto {
message DataStreamRequestTypeProto {
}

message ForwardRequestTypeProto {
}

message ReadRequestTypeProto {
}

Expand All @@ -289,6 +292,7 @@ message RaftClientRequestProto {
WatchRequestTypeProto watch = 6;
MessageStreamRequestTypeProto messageStream = 7;
DataStreamRequestTypeProto dataStream = 8;
ForwardRequestTypeProto forward = 9;
}
}

Expand All @@ -298,7 +302,6 @@ message DataStreamPacketHeaderProto {
STREAM_DATA = 1;
STREAM_DATA_SYNC = 2;
STREAM_CLOSE = 3;
START_TRANSACTION = 4;
}

uint64 streamId = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.ratis.server;

import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
Expand Down Expand Up @@ -71,6 +72,8 @@ default RaftPeer getPeer() {
StateMachine getStateMachine();

DataStreamMap getDataStreamMap();

RaftClient getRaftClient();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szetszwo it seems that we no longer use this method to submit request. If it's the case, I'm willing to figure out whether we can drop this "dead code".

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that we may remove it. But we need to keep it in branch-2 for compatibility.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I submitted #886 already that is against master (3.0). I'll file a JIRA ticket later.

}

/** @return the server ID. */
Expand Down
Loading