Skip to content

Commit

Permalink
RATIS-2096. Add a conf to enable/disable zero copy. (#1099)
Browse files Browse the repository at this point in the history
  • Loading branch information
duongkame committed May 24, 2024
1 parent 1782cd9 commit 26385f3
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,15 @@ static void setLogMessageBatchDuration(RaftProperties properties,
setTimeDuration(properties::setTimeDuration,
LOG_MESSAGE_BATCH_DURATION_KEY, logMessageBatchDuration);
}

String ZERO_COPY_ENABLED_KEY = PREFIX + ".zerocopy.enabled";
boolean ZERO_COPY_ENABLED_DEFAULT = false;
static boolean zeroCopyEnabled(RaftProperties properties) {
return getBoolean(properties::getBoolean, ZERO_COPY_ENABLED_KEY, ZERO_COPY_ENABLED_DEFAULT, getDefaultLog());
}
static void setZeroCopyEnabled(RaftProperties properties, boolean enabled) {
setBoolean(properties::setBoolean, ZERO_COPY_ENABLED_KEY, enabled);
}
}

String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,15 @@ void closeAllExisting(RaftGroupId groupId) {
private final ExecutorService executor;

private final OrderedStreamObservers orderedStreamObservers = new OrderedStreamObservers();
private final boolean zeroCopyEnabled;
private final ZeroCopyMessageMarshaller<RaftClientRequestProto> zeroCopyRequestMarshaller;

GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier, RaftClientAsynchronousProtocol protocol,
ExecutorService executor, ZeroCopyMetrics zeroCopyMetrics) {
ExecutorService executor, boolean zeroCopyEnabled, ZeroCopyMetrics zeroCopyMetrics) {
this.idSupplier = idSupplier;
this.protocol = protocol;
this.executor = executor;
this.zeroCopyEnabled = zeroCopyEnabled;
this.zeroCopyRequestMarshaller = new ZeroCopyMessageMarshaller<>(RaftClientRequestProto.getDefaultInstance(),
zeroCopyMetrics::onZeroCopyMessage, zeroCopyMetrics::onNonZeroCopyMessage, zeroCopyMetrics::onReleasedMessage);
zeroCopyMetrics.addUnreleased("client_protocol", zeroCopyRequestMarshaller::getUnclosedCount);
Expand All @@ -170,6 +172,10 @@ RaftPeerId getId() {

ServerServiceDefinition bindServiceWithZeroCopy() {
ServerServiceDefinition orig = super.bindService();
if (!zeroCopyEnabled) {
LOG.info("{}: Zero copy is disabled.", getId());
return orig;
}
ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(orig.getServiceDescriptor().getName());

addMethodWithCustomMarshaller(orig, builder, getOrderedMethod(), zeroCopyRequestMarshaller);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,14 @@ private void releaseLast() {

private final Supplier<RaftPeerId> idSupplier;
private final RaftServer server;
private final boolean zeroCopyEnabled;
private final ZeroCopyMessageMarshaller<AppendEntriesRequestProto> zeroCopyRequestMarshaller;

GrpcServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer server, ZeroCopyMetrics zeroCopyMetrics) {
GrpcServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer server, boolean zeroCopyEnabled,
ZeroCopyMetrics zeroCopyMetrics) {
this.idSupplier = idSupplier;
this.server = server;
this.zeroCopyEnabled = zeroCopyEnabled;
this.zeroCopyRequestMarshaller = new ZeroCopyMessageMarshaller<>(AppendEntriesRequestProto.getDefaultInstance(),
zeroCopyMetrics::onZeroCopyMessage, zeroCopyMetrics::onNonZeroCopyMessage, zeroCopyMetrics::onReleasedMessage);
zeroCopyMetrics.addUnreleased("server_protocol", zeroCopyRequestMarshaller::getUnclosedCount);
Expand All @@ -241,6 +244,10 @@ RaftPeerId getId() {

ServerServiceDefinition bindServiceWithZeroCopy() {
ServerServiceDefinition orig = super.bindService();
if (!zeroCopyEnabled) {
LOG.info("{}: Zero copy is disabled.", getId());
return orig;
}
ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(orig.getServiceDescriptor().getName());

// Add appendEntries with zero copy marshaller.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ private GrpcService(RaftServer server,
RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()),
GrpcConfigKeys.Server.heartbeatChannel(server.getProperties()));
GrpcConfigKeys.Server.heartbeatChannel(server.getProperties()),
GrpcConfigKeys.Server.zeroCopyEnabled(server.getProperties()));
}

@SuppressWarnings("checkstyle:ParameterNumber") // private constructor
Expand All @@ -187,7 +188,7 @@ private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier,
String serverHost, int serverPort, GrpcTlsConfig serverTlsConfig,
SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration,
boolean useSeparateHBChannel) {
boolean useSeparateHBChannel, boolean zeroCopyEnabled) {
super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(),
requestTimeoutDuration, serverTlsConfig, useSeparateHBChannel)));
Expand All @@ -203,7 +204,8 @@ private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier,
GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties),
getId() + "-request-");
this.zeroCopyMetrics = new ZeroCopyMetrics();
this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer, executor, zeroCopyMetrics);
this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer, executor,
zeroCopyEnabled, zeroCopyMetrics);

this.serverInterceptor = new MetricServerInterceptor(
idSupplier,
Expand All @@ -216,7 +218,7 @@ private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier,
final NettyServerBuilder serverBuilder =
startBuildingNettyServer(serverHost, serverPort, serverTlsConfig, grpcMessageSizeMax, flowControlWindow);
GrpcServerProtocolService serverProtocolService = new GrpcServerProtocolService(idSupplier, raftServer,
zeroCopyMetrics);
zeroCopyEnabled, zeroCopyMetrics);
serverBuilder.addService(ServerInterceptors.intercept(
serverProtocolService.bindServiceWithZeroCopy(), serverInterceptor));
if (!separateAdminServer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ protected Parameters setPropertiesAndInitParameters(RaftPeerId id, RaftGroup gro
GrpcConfigKeys.Client.setPort(properties, NetUtils.createSocketAddr(address).getPort()));
Optional.ofNullable(getAddress(id, group, RaftPeer::getAdminAddress)).ifPresent(address ->
GrpcConfigKeys.Admin.setPort(properties, NetUtils.createSocketAddr(address).getPort()));
// Always run grpc integration tests with zero-copy enabled because the path of nonzero-copy is not risky.
GrpcConfigKeys.Server.setZeroCopyEnabled(properties, true);
return parameters;
}

Expand Down

0 comments on commit 26385f3

Please sign in to comment.