Skip to content

Commit

Permalink
RATIS-1602. Add a ProxiesPool inner class in NettyServerStreamRpc.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Jun 30, 2022
1 parent 6f2c532 commit 9b2bad3
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ static void assertSame(long expected, long computed, String name) {
() -> name + ": expected == " + expected + " but computed == " + computed);
}

static void assertSame(Object expected, Object computed, String name) {
assertTrue(expected == computed,
() -> name + ": expected == " + expected + " but computed == " + computed);
}

static void assertNull(Object object, Supplier<String> message) {
assertTrue(object == null, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,8 @@ void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
try {
readImpl(request, ctx, buf, getStreams);
} catch (Throwable t) {
replyDataStreamException(t, request, ctx);
buf.release();
throw t;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.ratis.netty.NettyUtils;
import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamPacket;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.security.TlsConf;
Expand Down Expand Up @@ -57,15 +58,18 @@
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -115,13 +119,39 @@ void close() {
}
}

static class ProxiesPool {
private final List<Proxies> list;

ProxiesPool(String name, RaftProperties properties) {
final int clientPoolSize = RaftServerConfigKeys.DataStream.clientPoolSize(properties);
final List<Proxies> proxies = new ArrayList<>(clientPoolSize);
for (int i = 0; i < clientPoolSize; i++) {
proxies.add(new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties))));
}
this.list = Collections.unmodifiableList(proxies);
}

void addRaftPeers(Collection<RaftPeer> newPeers) {
list.forEach(proxy -> proxy.addPeers(newPeers));
}

Proxies get(DataStreamPacket p) {
final long hash = Integer.toUnsignedLong(Objects.hash(p.getClientId(), p.getStreamId()));
return list.get(Math.toIntExact(hash % list.size()));
}

void close() {
list.forEach(Proxies::close);
}
}

private final String name;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final ChannelFuture channelFuture;

private final DataStreamManagement requests;
private final List<Proxies> proxies = new ArrayList<>();
private final ProxiesPool proxies;

private final NettyServerStreamRpcMetrics metrics;

Expand All @@ -131,11 +161,7 @@ public NettyServerStreamRpc(RaftServer server, TlsConf tlsConf) {
this.requests = new DataStreamManagement(server, metrics);

final RaftProperties properties = server.getProperties();

int clientPoolSize = RaftServerConfigKeys.DataStream.clientPoolSize(properties);
for (int i = 0; i < clientPoolSize; i ++) {
this.proxies.add(new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties))));
}
this.proxies = new ProxiesPool(name, properties);

final boolean useEpoll = NettyConfigKeys.DataStream.Server.useEpoll(properties);
this.bossGroup = NettyUtils.newEventLoopGroup(name + "-bossGroup",
Expand Down Expand Up @@ -166,22 +192,17 @@ static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {

@Override
public void addRaftPeers(Collection<RaftPeer> newPeers) {
proxies.forEach(proxy -> proxy.addPeers(newPeers));
proxies.addRaftPeers(newPeers);
}

static class RequestRef {
private final AtomicReference<DataStreamRequestByteBuf> ref = new AtomicReference<>();

DataStreamRequestByteBuf set(DataStreamRequestByteBuf current) {
Optional.ofNullable(ref.getAndSet(current)).ifPresent(previous -> {
throw new IllegalStateException("previous = " + previous + " != null, current=" + current);
});
return current;
}
UncheckedAutoCloseable set(DataStreamRequestByteBuf current) {
final DataStreamRequestByteBuf previous = ref.getAndUpdate(p -> p == null ? current : p);
Preconditions.assertNull(previous, () -> "previous = " + previous + " != null, current = " + current);

void reset(DataStreamRequestByteBuf expected) {
final DataStreamRequestByteBuf stored = ref.getAndSet(null);
Preconditions.assertTrue(stored == expected, () -> "Expected=" + expected + " but stored=" + stored);
return () -> Preconditions.assertSame(current, getAndSetNull(), "RequestRef");
}

DataStreamRequestByteBuf getAndSetNull() {
Expand All @@ -201,13 +222,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
return;
}

final DataStreamRequestByteBuf request = requestRef.set((DataStreamRequestByteBuf)msg);

int index = Math.toIntExact(
((0xFFFFFFFFL & request.getClientId().hashCode()) + request.getStreamId()) % proxies.size());
requests.read(request, ctx, proxies.get(index)::getDataStreamOutput);

requestRef.reset(request);
final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
try(UncheckedAutoCloseable autoReset = requestRef.set(request)) {
requests.read(request, ctx, proxies.get(request)::getDataStreamOutput);
}
}

@Override
Expand Down Expand Up @@ -280,7 +298,7 @@ public void close() {
LOG.error(this + ": Interrupted close()", e);
}

proxies.forEach(Proxies::close);
proxies.close();
}

@Override
Expand Down

0 comments on commit 9b2bad3

Please sign in to comment.