Skip to content

Commit

Permalink
[pinpoint-apm#9932] Replace with Netty4
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed May 31, 2023
1 parent 535d046 commit 26f8c8b
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.protobuf.GeneratedMessageV3;
import com.navercorp.pinpoint.collector.cluster.GrpcAgentConnection;
import com.navercorp.pinpoint.collector.cluster.ProfilerClusterManager;
import com.navercorp.pinpoint.collector.util.RequestManager;
import com.navercorp.pinpoint.common.server.cluster.ClusterKey;
import com.navercorp.pinpoint.grpc.MessageFormatUtils;
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCount;
Expand All @@ -30,11 +31,9 @@
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
import com.navercorp.pinpoint.io.ResponseMessage;
import com.navercorp.pinpoint.rpc.PinpointSocketException;
import com.navercorp.pinpoint.rpc.client.RequestManager;
import com.navercorp.pinpoint.rpc.common.SocketState;
import com.navercorp.pinpoint.rpc.common.SocketStateChangeResult;
import com.navercorp.pinpoint.rpc.common.SocketStateCode;
import com.navercorp.pinpoint.rpc.packet.ResponsePacket;
import com.navercorp.pinpoint.rpc.packet.stream.StreamCode;
import com.navercorp.pinpoint.rpc.packet.stream.StreamResponsePacket;
import com.navercorp.pinpoint.rpc.stream.ClientStreamChannel;
Expand Down Expand Up @@ -79,13 +78,13 @@ public class PinpointGrpcServer {

private final InetSocketAddress remoteAddress;
private final ClusterKey clusterKey;
private final RequestManager requestManager;
private final RequestManager<ResponseMessage> requestManager;
private final ProfilerClusterManager profilerClusterManager;
private final StreamObserver<PCmdRequest> requestObserver;

private Runnable onCloseHandler;
private volatile Runnable onCloseHandler;

public PinpointGrpcServer(InetSocketAddress remoteAddress, ClusterKey clusterKey, RequestManager requestManager, ProfilerClusterManager profilerClusterManager, StreamObserver<PCmdRequest> requestObserver) {
public PinpointGrpcServer(InetSocketAddress remoteAddress, ClusterKey clusterKey, RequestManager<ResponseMessage> requestManager, ProfilerClusterManager profilerClusterManager, StreamObserver<PCmdRequest> requestObserver) {
this.remoteAddress = Objects.requireNonNull(remoteAddress, "remoteAddress");
this.clusterKey = Objects.requireNonNull(clusterKey, "clusterKey");
this.requestManager = Objects.requireNonNull(requestManager, "requestManager");
Expand Down Expand Up @@ -207,14 +206,20 @@ public void handleMessage(int responseId, GeneratedMessageV3 message) {
if (isInfo) {
logger.info("{} handleMessage:{}", clusterKey, MessageFormatUtils.debugLog(message));
}
TBase<?, ?> tMessage = messageConverter.toMessage(message);

final CompletableFuture<ResponseMessage> responseFuture = requestManager.messageReceived(responseId, clusterKey::format);
if (responseFuture == null) {
logger.warn("Response future is timeout responseId:{}", responseId);
return;
}
try {
TBase<?, ?> tMessage = messageConverter.toMessage(message);
byte[] serialize = SerializationUtils.serialize(tMessage, commandHeaderTBaseSerializerFactory);
ResponsePacket responsePacket = new ResponsePacket(responseId, serialize);
requestManager.messageReceived(responsePacket, clusterKey.format());

ResponseMessage responseMessage = ResponseMessage.wrap(serialize);
responseFuture.complete(responseMessage);
} catch (TException e) {
setFailMessageToFuture(responseId, e.getMessage());
responseFuture.completeExceptionally(new PinpointSocketException(e.getMessage()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

package com.navercorp.pinpoint.collector.receiver.grpc.service.command;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.collector.cluster.ClusterService;
import com.navercorp.pinpoint.collector.cluster.ProfilerClusterManager;
import com.navercorp.pinpoint.collector.receiver.grpc.PinpointGrpcServer;
import com.navercorp.pinpoint.collector.receiver.grpc.PinpointGrpcServerRepository;
import com.navercorp.pinpoint.collector.util.RequestManager;
import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory;
import com.navercorp.pinpoint.common.server.cluster.ClusterKey;
import com.navercorp.pinpoint.grpc.Header;
import com.navercorp.pinpoint.grpc.StatusError;
Expand All @@ -34,17 +37,15 @@
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
import com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc;
import com.navercorp.pinpoint.rpc.client.RequestManager;
import com.navercorp.pinpoint.rpc.util.TimerFactory;

import com.google.protobuf.Empty;
import com.navercorp.pinpoint.io.ResponseMessage;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.jboss.netty.util.Timer;
import org.apache.logging.log4j.Logger;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -73,7 +74,13 @@ public class GrpcCommandService extends ProfilerCommandServiceGrpc.ProfilerComma
public GrpcCommandService(ClusterService clusterService) {
Objects.requireNonNull(clusterService, "clusterService");
this.profilerClusterManager = Objects.requireNonNull(clusterService.getProfilerClusterManager(), "profilerClusterManager");
this.timer = TimerFactory.createHashedWheelTimer("GrpcCommandService-Timer", 100, TimeUnit.MILLISECONDS, 512);
this.timer = newTimer();

}

private Timer newTimer() {
final PinpointThreadFactory threadFactory = new PinpointThreadFactory("GrpcCommandService-Timer", true);
return new HashedWheelTimer(threadFactory, 100, TimeUnit.MILLISECONDS, 512);
}

@Override
Expand Down Expand Up @@ -213,7 +220,7 @@ private void unregisterPinpointGrpcServer(Long transportId) {
}

private PinpointGrpcServer createPinpointGrpcServer(StreamObserver<PCmdRequest> requestObserver, ClusterKey clusterKey) {
final RequestManager requestManager = new RequestManager(timer, 3000);
final RequestManager<ResponseMessage> requestManager = new RequestManager<>(timer, 3000);
return new PinpointGrpcServer(getRemoteAddress(), clusterKey, requestManager, profilerClusterManager, requestObserver);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.navercorp.pinpoint.collector.util;

import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.IntConsumer;

public class RequestFailListener<RES> implements BiConsumer<RES, Throwable> {
private final int requestId;
private final IntConsumer consumer;

public RequestFailListener(int requestId, IntConsumer consumer) {
this.requestId = requestId;
this.consumer = Objects.requireNonNull(consumer, "consumer");
}

@Override
public void accept(RES res, Throwable throwable) {
if (throwable != null) {
consumer.accept(requestId);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.navercorp.pinpoint.collector.util;


import com.navercorp.pinpoint.rpc.PinpointSocketException;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

/**
* @author emeroad
*/
public class RequestManager<RES> {

private final Logger logger = LogManager.getLogger(this.getClass());

private final AtomicInteger requestId = new AtomicInteger(1);

private final ConcurrentMap<Integer, CompletableFuture<RES>> requestMap = new ConcurrentHashMap<>();
// Have to move Timer into factory?
private final Timer timer;
private final long defaultTimeoutMillis;

public RequestManager(Timer timer, long defaultTimeoutMillis) {
this.timer = Objects.requireNonNull(timer, "timer");

if (defaultTimeoutMillis <= 0) {
throw new IllegalArgumentException("defaultTimeoutMillis must greater than zero.");
}
this.defaultTimeoutMillis = defaultTimeoutMillis;
}

private BiConsumer<RES, Throwable> createFailureEventHandler(final int requestId) {
return new RequestFailListener<>(requestId, this::removeMessageFuture);
}

private void addTimeoutTask(CompletableFuture<RES> future, long timeoutMillis) {
Objects.requireNonNull(future, "future");

try {
Timeout timeout = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
if (future.isDone()) {
return;
}
future.completeExceptionally(new TimeoutException("Timeout by RequestManager-TIMER"));
}
}, timeoutMillis, TimeUnit.MILLISECONDS);
future.thenAccept(t -> timeout.cancel());
} catch (IllegalStateException e) {
// this case is that timer has been shutdown. That maybe just means that socket has been closed.
future.completeExceptionally(new PinpointSocketException("socket closed")) ;
}
}

public int nextRequestId() {
return this.requestId.getAndIncrement();
}


public CompletableFuture<RES> messageReceived(int responseId, Supplier<Object> sourceName) {
final CompletableFuture<RES> future = removeMessageFuture(responseId);
if (future == null) {
logger.warn("ResponseFuture not found. responseId:{}, sourceName:{}", responseId, sourceName.get());
return null;
}
if (logger.isDebugEnabled()) {
logger.debug("ResponsePacket is arrived responseId:{}, sourceName:{}", responseId, sourceName.get());
}
return future;
}


public CompletableFuture<RES> removeMessageFuture(int requestId) {
return this.requestMap.remove(requestId);
}

public CompletableFuture<RES> register(int requestId) {
return register(requestId, defaultTimeoutMillis);
}

public CompletableFuture<RES> register(int requestId, long timeoutMillis) {
// shutdown check
final CompletableFuture<RES> responseFuture = new CompletableFuture<>();

final CompletableFuture<RES> old = this.requestMap.put(requestId, responseFuture);
if (old != null) {
throw new PinpointSocketException("unexpected error. old future exist:" + old + " id:" + requestId);
}
// when future fails, put a handle in order to remove a failed future in the requestMap.
BiConsumer<RES, Throwable> removeTable = createFailureEventHandler(requestId);
responseFuture.whenComplete(removeTable);

addTimeoutTask(responseFuture, timeoutMillis);
return responseFuture;
}

public void close() {
logger.debug("close()");
final PinpointSocketException closed = new PinpointSocketException("socket closed");

int requestFailCount = 0;
for (Map.Entry<Integer, CompletableFuture<RES>> entry : requestMap.entrySet()) {
if (entry.getValue().completeExceptionally(closed)) {
requestFailCount++;
}
}
this.requestMap.clear();
if (requestFailCount > 0) {
logger.info("Close fail count:{}", requestFailCount);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@

import com.google.protobuf.StringValue;
import com.navercorp.pinpoint.collector.cluster.ProfilerClusterManager;
import com.navercorp.pinpoint.collector.util.RequestManager;
import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory;
import com.navercorp.pinpoint.common.server.cluster.ClusterKey;
import com.navercorp.pinpoint.grpc.trace.PCmdEcho;
import com.navercorp.pinpoint.grpc.trace.PCmdEchoResponse;
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
import com.navercorp.pinpoint.io.ResponseMessage;
import com.navercorp.pinpoint.rpc.client.RequestManager;
import com.navercorp.pinpoint.rpc.common.SocketStateCode;
import com.navercorp.pinpoint.rpc.util.TimerFactory;
import com.navercorp.pinpoint.thrift.io.TCommandType;
import org.jboss.netty.util.Timer;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -39,6 +40,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -55,7 +57,8 @@ public class PinpointGrpcServerTest {

@BeforeAll
public static void setUp() throws Exception {
testTimer = TimerFactory.createHashedWheelTimer(PinpointGrpcServerTest.class + "-Timer", 100, TimeUnit.MILLISECONDS, 512);
ThreadFactory threadFactory = new PinpointThreadFactory(PinpointGrpcServerTest.class + "-Timer", true);
testTimer = new HashedWheelTimer(threadFactory, 100, TimeUnit.MILLISECONDS, 512);
}

@AfterAll
Expand All @@ -69,7 +72,8 @@ public static void tearDown() throws Exception {
public void stateTest() {
RecordedStreamObserver recordedStreamObserver = new RecordedStreamObserver();

PinpointGrpcServer pinpointGrpcServer = new PinpointGrpcServer(Mockito.mock(InetSocketAddress.class), clusterKey, new RequestManager(testTimer, 3000), Mockito.mock(ProfilerClusterManager.class), recordedStreamObserver);
RequestManager<ResponseMessage> requestManager = new RequestManager<>(testTimer, 3000);
PinpointGrpcServer pinpointGrpcServer = new PinpointGrpcServer(Mockito.mock(InetSocketAddress.class), clusterKey, requestManager, Mockito.mock(ProfilerClusterManager.class), recordedStreamObserver);
assertCurrentState(SocketStateCode.NONE, pinpointGrpcServer);
CompletableFuture<ResponseMessage> future = pinpointGrpcServer.request(request);
requestOnInvalidState(future, recordedStreamObserver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ public DefaultResponseMessage(byte[] message) {
public byte[] getMessage() {
return message;
}

@Override
public String toString() {
return "DefaultResponseMessage{" +
"message.length=" + message.length +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class DefaultPinpointClientHandler extends SimpleChannelHandler implement
private final MessageListener messageListener;
private final ServerStreamChannelMessageHandler serverStreamChannelMessageHandler;

private final RequestManager requestManager;
private final RequestManager<ResponseMessage> requestManager;

private final ChannelFutureListener pingWriteFailFutureListener = new WriteFailFutureListener(this.logger, "ping write fail.", "ping write success.");
private final ChannelFutureListener sendWriteFailFutureListener = new WriteFailFutureListener(this.logger, "send() write fail.", "send() write success.");
Expand Down Expand Up @@ -120,7 +120,7 @@ public DefaultPinpointClientHandler(ConnectionFactory connectionFactory, SocketA
this.socketAddressProvider = Objects.requireNonNull(socketAddressProvider, "socketAddressProvider");

this.channelTimer = Objects.requireNonNull(channelTimer, "channelTimer");
this.requestManager = new RequestManager(channelTimer, clientOption.getRequestTimeoutMillis());
this.requestManager = new RequestManager<>(channelTimer, clientOption.getRequestTimeoutMillis());
this.clientOption = Objects.requireNonNull(clientOption, "clientOption");


Expand Down Expand Up @@ -372,7 +372,8 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
final short packetType = packet.getPacketType();
switch (packetType) {
case PacketType.APPLICATION_RESPONSE:
this.requestManager.messageReceived((ResponsePacket) message, objectUniqName);
ResponsePacket responsePacket = (ResponsePacket) message;
this.requestManager.messageReceived(responsePacket, () -> ResponseMessage.wrap(responsePacket.getPayload()), objectUniqName::toString);
return;
// have to handle a request message through connector
case PacketType.APPLICATION_REQUEST:
Expand Down
Loading

0 comments on commit 26f8c8b

Please sign in to comment.