Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-2096. Add a conf to enable/disable zero copy. #1099

Merged
merged 3 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,15 @@ static void setLogMessageBatchDuration(RaftProperties properties,
setTimeDuration(properties::setTimeDuration,
LOG_MESSAGE_BATCH_DURATION_KEY, logMessageBatchDuration);
}

String ZERO_COPY_ENABLED_KEY = PREFIX + ".zerocopy.enabled";
boolean ZERO_COPY_ENABLED_DEFAULT = false;
static boolean zeroCopyEnabled(RaftProperties properties) {
return getBoolean(properties::getBoolean, ZERO_COPY_ENABLED_KEY, ZERO_COPY_ENABLED_DEFAULT, getDefaultLog());
}
static void setZeroCopyEnabled(RaftProperties properties, boolean enabled) {
setBoolean(properties::setBoolean, ZERO_COPY_ENABLED_KEY, enabled);
}
}

String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,15 @@ void closeAllExisting(RaftGroupId groupId) {
private final ExecutorService executor;

private final OrderedStreamObservers orderedStreamObservers = new OrderedStreamObservers();
private final boolean zeroCopyEnabled;
private final ZeroCopyMessageMarshaller<RaftClientRequestProto> zeroCopyRequestMarshaller;

GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier, RaftClientAsynchronousProtocol protocol,
ExecutorService executor, ZeroCopyMetrics zeroCopyMetrics) {
ExecutorService executor, boolean zeroCopyEnabled, ZeroCopyMetrics zeroCopyMetrics) {
this.idSupplier = idSupplier;
this.protocol = protocol;
this.executor = executor;
this.zeroCopyEnabled = zeroCopyEnabled;
this.zeroCopyRequestMarshaller = new ZeroCopyMessageMarshaller<>(RaftClientRequestProto.getDefaultInstance(),
zeroCopyMetrics::onZeroCopyMessage, zeroCopyMetrics::onNonZeroCopyMessage, zeroCopyMetrics::onReleasedMessage);
zeroCopyMetrics.addUnreleased("client_protocol", zeroCopyRequestMarshaller::getUnclosedCount);
Expand All @@ -170,6 +172,10 @@ RaftPeerId getId() {

ServerServiceDefinition bindServiceWithZeroCopy() {
ServerServiceDefinition orig = super.bindService();
if (!zeroCopyEnabled) {
LOG.info("{}: Zero copy is disabled.", getId());
return orig;
}
ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(orig.getServiceDescriptor().getName());

addMethodWithCustomMarshaller(orig, builder, getOrderedMethod(), zeroCopyRequestMarshaller);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,14 @@ private void releaseLast() {

private final Supplier<RaftPeerId> idSupplier;
private final RaftServer server;
private final boolean zeroCopyEnabled;
private final ZeroCopyMessageMarshaller<AppendEntriesRequestProto> zeroCopyRequestMarshaller;

GrpcServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer server, ZeroCopyMetrics zeroCopyMetrics) {
GrpcServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer server, boolean zeroCopyEnabled,
ZeroCopyMetrics zeroCopyMetrics) {
this.idSupplier = idSupplier;
this.server = server;
this.zeroCopyEnabled = zeroCopyEnabled;
this.zeroCopyRequestMarshaller = new ZeroCopyMessageMarshaller<>(AppendEntriesRequestProto.getDefaultInstance(),
zeroCopyMetrics::onZeroCopyMessage, zeroCopyMetrics::onNonZeroCopyMessage, zeroCopyMetrics::onReleasedMessage);
zeroCopyMetrics.addUnreleased("server_protocol", zeroCopyRequestMarshaller::getUnclosedCount);
Expand All @@ -241,6 +244,10 @@ RaftPeerId getId() {

ServerServiceDefinition bindServiceWithZeroCopy() {
ServerServiceDefinition orig = super.bindService();
if (!zeroCopyEnabled) {
LOG.info("{}: Zero copy is disabled.", getId());
return orig;
}
ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(orig.getServiceDescriptor().getName());

// Add appendEntries with zero copy marshaller.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ private GrpcService(RaftServer server,
RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()),
GrpcConfigKeys.Server.heartbeatChannel(server.getProperties()));
GrpcConfigKeys.Server.heartbeatChannel(server.getProperties()),
GrpcConfigKeys.Server.zeroCopyEnabled(server.getProperties()));
}

@SuppressWarnings("checkstyle:ParameterNumber") // private constructor
Expand All @@ -187,7 +188,7 @@ private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier,
String serverHost, int serverPort, GrpcTlsConfig serverTlsConfig,
SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration,
boolean useSeparateHBChannel) {
boolean useSeparateHBChannel, boolean zeroCopyEnabled) {
super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(),
requestTimeoutDuration, serverTlsConfig, useSeparateHBChannel)));
Expand All @@ -203,7 +204,8 @@ private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier,
GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties),
getId() + "-request-");
this.zeroCopyMetrics = new ZeroCopyMetrics();
this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer, executor, zeroCopyMetrics);
this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer, executor,
zeroCopyEnabled, zeroCopyMetrics);

this.serverInterceptor = new MetricServerInterceptor(
idSupplier,
Expand All @@ -216,7 +218,7 @@ private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier,
final NettyServerBuilder serverBuilder =
startBuildingNettyServer(serverHost, serverPort, serverTlsConfig, grpcMessageSizeMax, flowControlWindow);
GrpcServerProtocolService serverProtocolService = new GrpcServerProtocolService(idSupplier, raftServer,
zeroCopyMetrics);
zeroCopyEnabled, zeroCopyMetrics);
serverBuilder.addService(ServerInterceptors.intercept(
serverProtocolService.bindServiceWithZeroCopy(), serverInterceptor));
if (!separateAdminServer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ protected Parameters setPropertiesAndInitParameters(RaftPeerId id, RaftGroup gro
GrpcConfigKeys.Client.setPort(properties, NetUtils.createSocketAddr(address).getPort()));
Optional.ofNullable(getAddress(id, group, RaftPeer::getAdminAddress)).ifPresent(address ->
GrpcConfigKeys.Admin.setPort(properties, NetUtils.createSocketAddr(address).getPort()));
// Always run grpc integration tests with zero-copy enabled because the path of nonzero-copy is not risky.
GrpcConfigKeys.Server.setZeroCopyEnabled(properties, true);
return parameters;
}

Expand Down
Loading