diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ChannelFactoryBuilder.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ChannelFactoryBuilder.java index a5d97ff3aff8..ce22a9815212 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ChannelFactoryBuilder.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ChannelFactoryBuilder.java @@ -17,6 +17,7 @@ package com.navercorp.pinpoint.grpc.client; import com.navercorp.pinpoint.grpc.client.config.ClientOption; +import com.navercorp.pinpoint.grpc.client.config.ClientRetryOption; import io.grpc.ClientInterceptor; import io.grpc.NameResolverProvider; import io.netty.handler.ssl.SslContext; @@ -40,6 +41,8 @@ public interface ChannelFactoryBuilder { void setSslContext(SslContext sslContext); + void setClientRetryOption(ClientRetryOption clientRetryOption); + void setNameResolverProvider(NameResolverProvider nameResolverProvider); ChannelFactory build(); diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/DefaultChannelFactory.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/DefaultChannelFactory.java index 721abf4a01f4..a5c176e86f52 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/DefaultChannelFactory.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/DefaultChannelFactory.java @@ -20,6 +20,7 @@ import com.navercorp.pinpoint.grpc.ChannelTypeEnum; import com.navercorp.pinpoint.grpc.ExecutorUtils; import com.navercorp.pinpoint.grpc.client.config.ClientOption; +import com.navercorp.pinpoint.grpc.client.config.ClientRetryOption; import io.grpc.ClientInterceptor; import io.grpc.ManagedChannel; import io.grpc.Metadata; @@ -63,6 +64,9 @@ public class DefaultChannelFactory implements ChannelFactory { // nullable private final SslContext sslContext; + // nullable + private final ClientRetryOption clientRetryOption; + private final List clientInterceptorList; private final NameResolverProvider nameResolverProvider; @@ -78,7 +82,8 @@ public class DefaultChannelFactory implements ChannelFactory { NameResolverProvider nameResolverProvider, ClientOption clientOption, List clientInterceptorList, - SslContext sslContext) { + SslContext sslContext, + ClientRetryOption clientRetryOption) { this.factoryName = Objects.requireNonNull(factoryName, "factoryName"); this.executorQueueSize = executorQueueSize; this.headerFactory = Objects.requireNonNull(headerFactory, "headerFactory"); @@ -90,6 +95,8 @@ public class DefaultChannelFactory implements ChannelFactory { this.clientInterceptorList = new ArrayList<>(clientInterceptorList); // nullable this.sslContext = sslContext; + // nullable + this.clientRetryOption = clientRetryOption; ChannelType channelType = getChannelType(); @@ -157,6 +164,8 @@ public ManagedChannel build(String channelName, String host, int port) { channelBuilder.negotiationType(NegotiationType.TLS); } + //test + channelBuilder.maxTraceEvents(clientOption.getMaxTraceEvent()); return channelBuilder.build(); @@ -199,6 +208,11 @@ private void setupClientOption(final NettyChannelBuilder channelBuilder) { channelBuilder.idleTimeout(clientOption.getIdleTimeoutMillis(), TimeUnit.MILLISECONDS); channelBuilder.defaultLoadBalancingPolicy(clientOption.getDefaultLoadBalancer()); + // RetryOption + if (clientRetryOption != null && clientRetryOption.isRetryEnabled()) { + setupRetryOption(channelBuilder); + } + // ChannelOption channelBuilder.withOption(ChannelOption.TCP_NODELAY, true); channelBuilder.withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientOption.getConnectTimeout()); @@ -210,6 +224,15 @@ private void setupClientOption(final NettyChannelBuilder channelBuilder) { } } + private void setupRetryOption(final NettyChannelBuilder channelBuilder) { + channelBuilder.enableRetry(); + channelBuilder.retryBufferSize(clientRetryOption.getRetryBufferSize()); + channelBuilder.perRpcBufferLimit(clientRetryOption.getPerRpcBufferLimit()); + + //channelBuilder.disableServiceConfigLookUp(); + channelBuilder.defaultServiceConfig(clientRetryOption.getRetryServiceConfig()); + } + @Override public void close() { final Future future = eventLoopGroup.shutdownGracefully(); diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/DefaultChannelFactoryBuilder.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/DefaultChannelFactoryBuilder.java index d7d527eb3b9a..c876d5d47a8a 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/DefaultChannelFactoryBuilder.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/DefaultChannelFactoryBuilder.java @@ -18,6 +18,7 @@ import com.navercorp.pinpoint.common.util.Assert; import com.navercorp.pinpoint.grpc.client.config.ClientOption; +import com.navercorp.pinpoint.grpc.client.config.ClientRetryOption; import io.grpc.ClientInterceptor; import io.grpc.NameResolverProvider; import io.netty.handler.ssl.SslContext; @@ -41,6 +42,7 @@ public class DefaultChannelFactoryBuilder implements ChannelFactoryBuilder { private ClientOption clientOption; private SslContext sslContext; + private ClientRetryOption clientRetryOption; private final LinkedList clientInterceptorList = new LinkedList<>(); private NameResolverProvider nameResolverProvider; @@ -82,6 +84,11 @@ public void setSslContext(SslContext sslContext) { this.sslContext = sslContext; } + @Override + public void setClientRetryOption(ClientRetryOption clientRetryOption) { + this.clientRetryOption = clientRetryOption; + } + @Override public void setNameResolverProvider(NameResolverProvider nameResolverProvider) { this.nameResolverProvider = Objects.requireNonNull(nameResolverProvider, "nameResolverProvider"); @@ -95,6 +102,7 @@ public ChannelFactory build() { return new DefaultChannelFactory(factoryName, executorQueueSize, headerFactory, nameResolverProvider, - clientOption, clientInterceptorList, sslContext); + clientOption, clientInterceptorList, + sslContext, clientRetryOption); } } diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/config/ClientRetryOption.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/config/ClientRetryOption.java new file mode 100644 index 000000000000..bcaaac44b833 --- /dev/null +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/config/ClientRetryOption.java @@ -0,0 +1,34 @@ +package com.navercorp.pinpoint.grpc.client.config; + +import java.util.Map; + +public class ClientRetryOption { + + private final boolean retryEnabled; + private final long retryBufferSize; + private final long perRpcBufferLimit; + private final Map retryServiceConfig; + + public ClientRetryOption(boolean retryEnabled, long retryBufferSize, long perRpcBufferLimit, Map retryServiceConfig) { + this.retryEnabled = retryEnabled; + this.retryBufferSize = retryBufferSize; + this.perRpcBufferLimit = perRpcBufferLimit; + this.retryServiceConfig = retryServiceConfig; + } + + public boolean isRetryEnabled() { + return retryEnabled; + } + + public long getRetryBufferSize() { + return retryBufferSize; + } + + public long getPerRpcBufferLimit() { + return perRpcBufferLimit; + } + + public Map getRetryServiceConfig() { + return retryServiceConfig; + } +} diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/HedgingServiceConfigBuilder.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/HedgingServiceConfigBuilder.java new file mode 100644 index 000000000000..3a64fdf26b6b --- /dev/null +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/HedgingServiceConfigBuilder.java @@ -0,0 +1,59 @@ +package com.navercorp.pinpoint.grpc.client.retry; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class HedgingServiceConfigBuilder implements ServiceConfigBuilder { + + public static final int DEFAULT_MAX_ATTEMPTS = 3; + public static final long DEFAULT_HEDGING_DELAY_MILLIS = 1000L; + + private double maxAttempts = DEFAULT_MAX_ATTEMPTS; //Required. Must be two or greater + private String hedgingDelay = millisToString(DEFAULT_HEDGING_DELAY_MILLIS); //Required. Long decimal with "s" appended + private List nonFatalStatusCodes; //Optional (eg. [14], ["UNAVAILABLE"] or ["unavailable"]) + + @Override + public Map buildMetadataConfig() { + Map methodConfig = new LinkedHashMap<>(); + addMetadataService(methodConfig); + addHedgingPolicy(methodConfig); + return Collections.singletonMap("methodConfig", Collections.singletonList(methodConfig)); + } + + private void addMetadataService(Map methodConfig) { + Map service = Collections.singletonMap("service", METADATA_SERVICE); + methodConfig.put("name", Collections.singletonList(service)); + } + + private void addHedgingPolicy(Map methodConfig) { + Map retryPolicy = new LinkedHashMap<>(); + retryPolicy.put("maxAttempts", maxAttempts); + retryPolicy.put("hedgingDelay", hedgingDelay); + if (nonFatalStatusCodes != null && !nonFatalStatusCodes.isEmpty()) { + retryPolicy.put("nonFatalStatusCodes", nonFatalStatusCodes); + } + + methodConfig.put("hedgingPolicy", retryPolicy); + } + + + public void setMaxAttempts(int maxAttempts) { + if (maxAttempts >= 2) { + this.maxAttempts = maxAttempts; + } + } + + public void setHedgingDelayMillis(long hedgingDelay) { + this.hedgingDelay = millisToString(hedgingDelay); + } + + public void setNonFatalStatusCodes(List nonFatalStatusCodes) { + this.nonFatalStatusCodes = nonFatalStatusCodes; + } + + public String millisToString(long value) { + return value / 1000.0 + "s"; + } +} diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/RetryServiceConfigBuilder.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/RetryServiceConfigBuilder.java new file mode 100644 index 000000000000..0c693a881cce --- /dev/null +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/RetryServiceConfigBuilder.java @@ -0,0 +1,76 @@ +package com.navercorp.pinpoint.grpc.client.retry; + +import io.grpc.Status; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class RetryServiceConfigBuilder implements ServiceConfigBuilder { + + public static final double DEFAULT_MAX_ATTEMPTS = 3.0; + public static final long DEFAULT_INITIAL_BACKOFF_MILLIS = 1000L; + public static final long DEFAULT_MAX_BACKOFF_MILLIS = 4000L; + public static final double DEFAULT_BACKOFF_MULTIPLIER = 2.0; + public static final List DEFAULT_RETRYABLE_STATUS_CODES = Collections.singletonList(Status.Code.UNAVAILABLE.name()); + + private Double maxAttempts = DEFAULT_MAX_ATTEMPTS; //Required. Must be two or greater + private String initialBackoff = millisToString(DEFAULT_INITIAL_BACKOFF_MILLIS); //Required. Long decimal with "s" appended + private String maxBackoff = millisToString(DEFAULT_MAX_BACKOFF_MILLIS); //Required. Long decimal with "s" appended + private Double backoffMultiplier = DEFAULT_BACKOFF_MULTIPLIER; //Required. Must be greater than zero. + private List retryableStatusCodes; //Required and must be non-empty (eg. [14], ["UNAVAILABLE"] or ["unavailable"]) + + @Override + public Map buildMetadataConfig() { + Map methodConfig = new LinkedHashMap<>(); + addMetadataService(methodConfig); + addRetryPolicy(methodConfig); + return Collections.singletonMap("methodConfig", Collections.singletonList(methodConfig)); + } + + private void addMetadataService(Map methodConfig) { + Map service = Collections.singletonMap("service", METADATA_SERVICE); + methodConfig.put("name", Collections.singletonList(service)); + } + + private void addRetryPolicy(Map methodConfig) { + Map retryPolicy = new LinkedHashMap<>(); + retryPolicy.put("maxAttempts", maxAttempts); + retryPolicy.put("initialBackoff", initialBackoff); + retryPolicy.put("maxBackoff", maxBackoff); + retryPolicy.put("backoffMultiplier", backoffMultiplier); + if (retryableStatusCodes == null || retryableStatusCodes.isEmpty()) { + retryableStatusCodes = DEFAULT_RETRYABLE_STATUS_CODES; + } + retryPolicy.put("retryableStatusCodes", retryableStatusCodes); + + methodConfig.put("retryPolicy", retryPolicy); + } + + public void setMaxAttempts(double maxAttempts) { + if (maxAttempts >= 2) { + this.maxAttempts = maxAttempts; + } + } + + public void setInitialBackOff(long initialBackoff) { + this.initialBackoff = millisToString(initialBackoff); + } + + public void setMaxBackoff(long maxBackoff) { + this.maxBackoff = millisToString(maxBackoff); + } + + public void setBackoffMultiplier(double backoffMultiplier) { + this.backoffMultiplier = backoffMultiplier; + } + + public void setRetryableStatusCodes(List retryableStatusCodes) { + this.retryableStatusCodes = retryableStatusCodes; + } + + public String millisToString(long value) { + return value / 1000.0 + "s"; + } +} diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/ServiceConfigBuilder.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/ServiceConfigBuilder.java new file mode 100644 index 000000000000..4873da2f6d05 --- /dev/null +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/retry/ServiceConfigBuilder.java @@ -0,0 +1,11 @@ +package com.navercorp.pinpoint.grpc.client.retry; + +import java.util.Map; + +public interface ServiceConfigBuilder { + + String METADATA_SERVICE = "v1.metadata"; + + Map buildMetadataConfig(); + +} diff --git a/grpc/src/test/java/com/navercorp/pinpoint/grpc/client/config/RetryConfigTest.java b/grpc/src/test/java/com/navercorp/pinpoint/grpc/client/config/RetryConfigTest.java new file mode 100644 index 000000000000..6935bb68152c --- /dev/null +++ b/grpc/src/test/java/com/navercorp/pinpoint/grpc/client/config/RetryConfigTest.java @@ -0,0 +1,71 @@ +package com.navercorp.pinpoint.grpc.client.config; + +import com.google.gson.Gson; +import com.google.gson.stream.JsonReader; +import com.navercorp.pinpoint.grpc.client.retry.HedgingServiceConfigBuilder; +import com.navercorp.pinpoint.grpc.client.retry.RetryServiceConfigBuilder; +import io.grpc.internal.JsonUtil; +import org.junit.jupiter.api.Test; + +import java.io.InputStreamReader; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; + + +public class RetryConfigTest { + + @Test + public void retryServiceConfigBuilderTest() { + Map retryServiceConfig = new RetryServiceConfigBuilder().buildMetadataConfig(); + Map exampleServiceConfig = + new Gson() + .fromJson( + new JsonReader( + new InputStreamReader( + Objects.requireNonNull(this.getClass().getClassLoader().getResourceAsStream( + "client/example/retry_service_config.json")), + UTF_8)), + Map.class); + System.out.println(retryServiceConfig); + System.out.println(exampleServiceConfig); + + Map retryPolicy = getPolicy(retryServiceConfig, "retryPolicy"); + Map examplePolicy = getPolicy(exampleServiceConfig, "retryPolicy"); + for (String key : examplePolicy.keySet()) { + assertThat(retryPolicy.containsKey(key)).isTrue(); + } + } + + @Test + public void hedgeServiceConfigBuilderTest() { + Map hedgeServiceConfig = new HedgingServiceConfigBuilder().buildMetadataConfig(); + Map exampleServiceConfig = + new Gson() + .fromJson( + new JsonReader( + new InputStreamReader( + Objects.requireNonNull(this.getClass().getClassLoader().getResourceAsStream( + "client/example/hedging_service_config.json")), + UTF_8)), + Map.class); + + System.out.println(hedgeServiceConfig); + System.out.println(exampleServiceConfig); + + Map retryPolicy = getPolicy(hedgeServiceConfig, "hedgingPolicy"); + Map examplePolicy = getPolicy(exampleServiceConfig, "hedgingPolicy"); + for (String key : examplePolicy.keySet()) { + assertThat(retryPolicy.containsKey(key)).isTrue(); + } + } + + private Map getPolicy(Map serviceConfig, String policy) { + List methodConfig = JsonUtil.getList(serviceConfig, "methodConfig"); + assertThat(methodConfig).isNotEmpty(); + return JsonUtil.getObject((Map) methodConfig.get(0), policy); + } +} diff --git a/grpc/src/test/resources/client/example/hedging_service_config.json b/grpc/src/test/resources/client/example/hedging_service_config.json new file mode 100644 index 000000000000..6a2a30ab07a9 --- /dev/null +++ b/grpc/src/test/resources/client/example/hedging_service_config.json @@ -0,0 +1,22 @@ +{ + "methodConfig": [ + { + "name": [ + { + "service": "helloworld.Greeter", + "method": "SayHello" + } + ], + + "hedgingPolicy": { + "maxAttempts": 3, + "hedgingDelay": "1s" + } + } + ], + + "retryThrottling": { + "maxTokens": 10, + "tokenRatio": 0.1 + } +} \ No newline at end of file diff --git a/grpc/src/test/resources/client/example/retry_service_config.json b/grpc/src/test/resources/client/example/retry_service_config.json new file mode 100644 index 000000000000..88f4139be6b5 --- /dev/null +++ b/grpc/src/test/resources/client/example/retry_service_config.json @@ -0,0 +1,22 @@ +{ + "methodConfig": [ + { + "name": [ + { + "service": "helloworld.Greeter", + "method": "SayHello" + } + ], + + "retryPolicy": { + "maxAttempts": 5, + "initialBackoff": "0.5s", + "maxBackoff": "30s", + "backoffMultiplier": 2, + "retryableStatusCodes": [ + "UNAVAILABLE" + ] + } + } + ] +} \ No newline at end of file diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/config/GrpcTransportConfig.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/config/GrpcTransportConfig.java index 9e92e5fc1b13..abf23c5f9563 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/config/GrpcTransportConfig.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/config/GrpcTransportConfig.java @@ -28,9 +28,10 @@ /** * NOTE module accessibility - * @see com.navercorp.pinpoint.bootstrap.java9.module.ModuleSupport#addPermissionToValueAnnotation(JavaModule) + * * @author Woonduk Kang(emeroad) * @author jaehong.kim + * @see com.navercorp.pinpoint.bootstrap.java9.module.ModuleSupport#addPermissionToValueAnnotation(JavaModule) */ public class GrpcTransportConfig { @@ -67,6 +68,13 @@ public class GrpcTransportConfig { private static final int DEFAULT_METADATA_RETRY_MAX_COUNT = 3; private static final int DEFAULT_METADATA_RETRY_DELAY_MILLIS = 1000; + + private static final boolean DEFAULT_METADATA_RETRY_ENABLE = false; + private static final long DEFAULT_METADATA_RETRY_BUFFER_SIZE = 1L << 24; // 16M + private static final long DEFAULT_METADATA_PER_RPC_BUFFER_LIMIT = 1L << 20; // 1M + private static final int DEFAULT_METADATA_MAX_ATTEMPTS = 3; + private static final long DEFAULT_METADATA_HEDGING_DELAY_MILLIS = 1000; + public static final boolean DEFAULT_NETTY_SYSTEM_PROPERTY_TRY_REFLECTIVE_SET_ACCESSIBLE = true; private static final boolean DEFAULT_ENABLE_SPAN_STATS_LOGGING = false; @@ -109,7 +117,17 @@ public class GrpcTransportConfig { private int metadataRetryMaxCount = DEFAULT_METADATA_RETRY_MAX_COUNT; @Value("${profiler.transport.grpc.metadata.sender.retry.delay.millis}") private int metadataRetryDelayMillis = DEFAULT_METADATA_RETRY_DELAY_MILLIS; - + //grpc client retry + @Value("${profiler.transport.grpc.metadata.sender.retry.enable}") + private boolean metadataRetryEnable = DEFAULT_METADATA_RETRY_ENABLE; + @Value("${profiler.transport.grpc.metadata.sender.retry.buffer.size}") + private long metadataRetryBufferSize = DEFAULT_METADATA_RETRY_BUFFER_SIZE; + @Value("${profiler.transport.grpc.metadata.sender.retry.per.rpc.buffer.limit}") + private long metadataPerRpcBufferLimit = DEFAULT_METADATA_PER_RPC_BUFFER_LIMIT; + @Value("${profiler.transport.grpc.metadata.sender.max.attempts}") + private int metadataMaxAttempts = DEFAULT_METADATA_MAX_ATTEMPTS; + @Value("${profiler.transport.grpc.metadata.sender.hedging.delay.millis}") + private long metadataHedgingDelayMillis = DEFAULT_METADATA_HEDGING_DELAY_MILLIS; @Value("${profiler.transport.grpc.stat.collector.ip}") private String statCollectorIp = DEFAULT_IP; @@ -304,9 +322,11 @@ public long getSpanDiscardCountForReconnect() { public long getSpanNotReadyTimeoutMillis() { return spanNotReadyTimeoutMillis; } + public long getSpanRpcMaxAgeMillis() { return spanRpcMaxAgeMillis; } + public long getRenewTransportPeriodMillis() { return renewTransportPeriodMillis; } @@ -371,6 +391,26 @@ public int getMetadataRetryDelayMillis() { return metadataRetryDelayMillis; } + public boolean isMetadataRetryEnable() { + return metadataRetryEnable; + } + + public long getMetadataRetryBufferSize() { + return metadataRetryBufferSize; + } + + public long getMetadataPerRpcBufferLimit() { + return metadataPerRpcBufferLimit; + } + + public int getMetadataMaxAttempts() { + return metadataMaxAttempts; + } + + public long getMetadataHedgingDelayMillis() { + return metadataHedgingDelayMillis; + } + public boolean isSpanEnableStatLogging() { return spanEnableStatLogging; } diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java index 11df22b3dc6b..2b60e348f72b 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java @@ -27,6 +27,8 @@ import com.navercorp.pinpoint.grpc.client.HeaderFactory; import com.navercorp.pinpoint.grpc.client.UnaryCallDeadlineInterceptor; import com.navercorp.pinpoint.grpc.client.config.ClientOption; +import com.navercorp.pinpoint.grpc.client.config.ClientRetryOption; +import com.navercorp.pinpoint.grpc.client.retry.HedgingServiceConfigBuilder; import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.context.grpc.config.GrpcTransportConfig; import com.navercorp.pinpoint.profiler.context.module.MetadataDataSender; @@ -39,6 +41,7 @@ import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -82,12 +85,28 @@ public EnhancedDataSender get() { final int senderExecutorQueueSize = grpcTransportConfig.getMetadataSenderExecutorQueueSize(); final ChannelFactoryBuilder channelFactoryBuilder = newChannelFactoryBuilder(sslEnable); + final boolean clientRetryEnable = grpcTransportConfig.isMetadataRetryEnable(); + + if (clientRetryEnable) { + HedgingServiceConfigBuilder hedgingServiceConfigBuilder = new HedgingServiceConfigBuilder(); + hedgingServiceConfigBuilder.setMaxAttempts(grpcTransportConfig.getMetadataMaxAttempts()); + hedgingServiceConfigBuilder.setHedgingDelayMillis(grpcTransportConfig.getMetadataHedgingDelayMillis()); + Map metadataServiceConfig = hedgingServiceConfigBuilder.buildMetadataConfig(); + + channelFactoryBuilder.setClientRetryOption(new ClientRetryOption( + grpcTransportConfig.isMetadataRetryEnable(), + grpcTransportConfig.getMetadataRetryBufferSize(), + grpcTransportConfig.getMetadataPerRpcBufferLimit(), + metadataServiceConfig) + ); + } + final ChannelFactory channelFactory = channelFactoryBuilder.build(); final int retryMaxCount = grpcTransportConfig.getMetadataRetryMaxCount(); final int retryDelayMillis = grpcTransportConfig.getMetadataRetryDelayMillis(); - return new MetadataGrpcDataSender<>(collectorIp, collectorPort, senderExecutorQueueSize, messageConverter, channelFactory, retryMaxCount, retryDelayMillis); + return new MetadataGrpcDataSender<>(collectorIp, collectorPort, senderExecutorQueueSize, messageConverter, channelFactory, retryMaxCount, retryDelayMillis, clientRetryEnable); } protected ChannelFactoryBuilder newChannelFactoryBuilder(boolean sslEnable) { diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/LogResponseStreamObserver.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/LogResponseStreamObserver.java new file mode 100644 index 000000000000..a851094161c8 --- /dev/null +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/LogResponseStreamObserver.java @@ -0,0 +1,78 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.profiler.sender.grpc; + +import com.google.protobuf.GeneratedMessageV3; +import com.google.protobuf.TextFormat; +import com.navercorp.pinpoint.grpc.StatusError; +import com.navercorp.pinpoint.grpc.StatusErrors; +import io.grpc.stub.StreamObserver; +import org.apache.logging.log4j.Logger; + +import java.util.Objects; + +public class LogResponseStreamObserver implements StreamObserver { + private final Logger logger; + + public LogResponseStreamObserver(Logger logger) { + this.logger = Objects.requireNonNull(logger, "logger"); + } + + @Override + public void onNext(ResT response) { + if (logger.isDebugEnabled()) { + logger.debug("Request success. result={}", logString(response)); + } + } + + + @Override + public void onError(Throwable throwable) { + final StatusError statusError = StatusErrors.throwable(throwable); + if (statusError.isSimpleError()) { + logger.info("Error. cause={}", statusError.getMessage()); + } else { + logger.info("Error. cause={}", statusError.getMessage(), statusError.getThrowable()); + } + } + + @Override + public void onCompleted() { + if (logger.isDebugEnabled()) { + logger.debug("onCompleted"); + } + } + + + private String logString(Object message) { + if (message == null) { + return "NULL"; + } + if (message instanceof GeneratedMessageV3) { + GeneratedMessageV3 messageV3 = (GeneratedMessageV3) message; + return TextFormat.shortDebugString(messageV3); + } + return message.toString(); + } + + @Override + public String toString() { + return "LogResponseStreamObserver{" + + "logger=" + logger + + '}'; + } +} diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java index fb44bf45ccac..52c5ddc21319 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java @@ -45,9 +45,11 @@ * @author jaehong.kim */ public class MetadataGrpcDataSender extends GrpcDataSender implements EnhancedDataSender { + // private final MetadataGrpc.MetadataStub metadataStub; private final int maxAttempts; private final int retryDelayMillis; + private final boolean clientRetryEnable; private final Timer retryTimer; private static final long MAX_PENDING_TIMEOUTS = 1024 * 4; @@ -56,7 +58,7 @@ public class MetadataGrpcDataSender extends GrpcDataSender implements Enha public MetadataGrpcDataSender(String host, int port, int executorQueueSize, MessageConverter messageConverter, - ChannelFactory channelFactory, int retryMaxCount, int retryDelayMillis) { + ChannelFactory channelFactory, int retryMaxCount, int retryDelayMillis, boolean clientRetryEnable) { super(host, port, executorQueueSize, messageConverter, channelFactory); this.maxAttempts = getMaxAttempts(retryMaxCount); @@ -76,6 +78,7 @@ public void scheduleNextRetry(GeneratedMessageV3 request, int remainingRetryCoun MetadataGrpcDataSender.this.scheduleNextRetry(request, remainingRetryCount); } }; + this.clientRetryEnable = clientRetryEnable; } private int getMaxAttempts(int retryMaxCount) { @@ -101,13 +104,47 @@ public boolean request(T data, BiConsumer listener) throw new UnsupportedOperationException("unsupported operation request(data, listener)"); } + //send with retry @Override public boolean send(T data) { - throw new UnsupportedOperationException("unsupported operation send(data)"); + try { + final GeneratedMessageV3 message = messageConverter.toMessage(data); + + if (message instanceof PSqlMetaData) { + final PSqlMetaData sqlMetaData = (PSqlMetaData) message; + this.metadataStub.requestSqlMetaData(sqlMetaData, newLogStreamObserver()); + } else if (message instanceof PSqlUidMetaData) { + final PSqlUidMetaData sqlUidMetaData = (PSqlUidMetaData) message; + this.metadataStub.requestSqlUidMetaData(sqlUidMetaData, newLogStreamObserver()); + } else if (message instanceof PApiMetaData) { + final PApiMetaData apiMetaData = (PApiMetaData) message; + this.metadataStub.requestApiMetaData(apiMetaData, newLogStreamObserver()); + } else if (message instanceof PStringMetaData) { + final PStringMetaData stringMetaData = (PStringMetaData) message; + this.metadataStub.requestStringMetaData(stringMetaData, newLogStreamObserver()); + } else if (message instanceof PExceptionMetaData) { + final PExceptionMetaData exceptionMetaData = (PExceptionMetaData) message; + this.metadataStub.requestExceptionMetaData(exceptionMetaData, newLogStreamObserver()); + } else { + logger.warn("Unsupported message {}", MessageFormatUtils.debugLog(message)); + } + } catch (Exception e) { + logger.info("Failed to send metadata={}", data, e); + return false; + } + return true; + } + + private StreamObserver newLogStreamObserver() { + return new LogResponseStreamObserver<>(logger); } @Override public boolean request(final T data) { + if (clientRetryEnable) { + return this.send(data); + } + final Runnable convertAndRun = new Runnable() { @Override public void run() { @@ -202,7 +239,6 @@ public void run(Timeout timeout) throws Exception { } - @Override public void stop() { if (shutdown) {