Skip to content

Commit

Permalink
[pinpoint-apm#10693] Add enable option for grpc client reties
Browse files Browse the repository at this point in the history
  • Loading branch information
donghun-cho committed Mar 5, 2024
1 parent 72a7c02 commit d0444b4
Show file tree
Hide file tree
Showing 14 changed files with 510 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.navercorp.pinpoint.grpc.client;

import com.navercorp.pinpoint.grpc.client.config.ClientOption;
import com.navercorp.pinpoint.grpc.client.config.ClientRetryOption;
import io.grpc.ClientInterceptor;
import io.grpc.NameResolverProvider;
import io.netty.handler.ssl.SslContext;
Expand All @@ -40,6 +41,8 @@ public interface ChannelFactoryBuilder {

void setSslContext(SslContext sslContext);

void setClientRetryOption(ClientRetryOption clientRetryOption);

void setNameResolverProvider(NameResolverProvider nameResolverProvider);

ChannelFactory build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.navercorp.pinpoint.grpc.ChannelTypeEnum;
import com.navercorp.pinpoint.grpc.ExecutorUtils;
import com.navercorp.pinpoint.grpc.client.config.ClientOption;
import com.navercorp.pinpoint.grpc.client.config.ClientRetryOption;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
Expand Down Expand Up @@ -63,6 +64,9 @@ public class DefaultChannelFactory implements ChannelFactory {
// nullable
private final SslContext sslContext;

// nullable
private final ClientRetryOption clientRetryOption;

private final List<ClientInterceptor> clientInterceptorList;
private final NameResolverProvider nameResolverProvider;

Expand All @@ -78,7 +82,8 @@ public class DefaultChannelFactory implements ChannelFactory {
NameResolverProvider nameResolverProvider,
ClientOption clientOption,
List<ClientInterceptor> clientInterceptorList,
SslContext sslContext) {
SslContext sslContext,
ClientRetryOption clientRetryOption) {
this.factoryName = Objects.requireNonNull(factoryName, "factoryName");
this.executorQueueSize = executorQueueSize;
this.headerFactory = Objects.requireNonNull(headerFactory, "headerFactory");
Expand All @@ -90,6 +95,8 @@ public class DefaultChannelFactory implements ChannelFactory {
this.clientInterceptorList = new ArrayList<>(clientInterceptorList);
// nullable
this.sslContext = sslContext;
// nullable
this.clientRetryOption = clientRetryOption;


ChannelType channelType = getChannelType();
Expand Down Expand Up @@ -157,6 +164,8 @@ public ManagedChannel build(String channelName, String host, int port) {
channelBuilder.negotiationType(NegotiationType.TLS);
}

//test

channelBuilder.maxTraceEvents(clientOption.getMaxTraceEvent());

return channelBuilder.build();
Expand Down Expand Up @@ -199,6 +208,11 @@ private void setupClientOption(final NettyChannelBuilder channelBuilder) {
channelBuilder.idleTimeout(clientOption.getIdleTimeoutMillis(), TimeUnit.MILLISECONDS);
channelBuilder.defaultLoadBalancingPolicy(clientOption.getDefaultLoadBalancer());

// RetryOption
if (clientRetryOption != null && clientRetryOption.isRetryEnabled()) {
setupRetryOption(channelBuilder);
}

// ChannelOption
channelBuilder.withOption(ChannelOption.TCP_NODELAY, true);
channelBuilder.withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientOption.getConnectTimeout());
Expand All @@ -210,6 +224,15 @@ private void setupClientOption(final NettyChannelBuilder channelBuilder) {
}
}

private void setupRetryOption(final NettyChannelBuilder channelBuilder) {
channelBuilder.enableRetry();
channelBuilder.retryBufferSize(clientRetryOption.getRetryBufferSize());
channelBuilder.perRpcBufferLimit(clientRetryOption.getPerRpcBufferLimit());

//channelBuilder.disableServiceConfigLookUp();
channelBuilder.defaultServiceConfig(clientRetryOption.getRetryServiceConfig());
}

@Override
public void close() {
final Future<?> future = eventLoopGroup.shutdownGracefully();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.grpc.client.config.ClientOption;
import com.navercorp.pinpoint.grpc.client.config.ClientRetryOption;
import io.grpc.ClientInterceptor;
import io.grpc.NameResolverProvider;
import io.netty.handler.ssl.SslContext;
Expand All @@ -41,6 +42,7 @@ public class DefaultChannelFactoryBuilder implements ChannelFactoryBuilder {

private ClientOption clientOption;
private SslContext sslContext;
private ClientRetryOption clientRetryOption;

private final LinkedList<ClientInterceptor> clientInterceptorList = new LinkedList<>();
private NameResolverProvider nameResolverProvider;
Expand Down Expand Up @@ -82,6 +84,11 @@ public void setSslContext(SslContext sslContext) {
this.sslContext = sslContext;
}

@Override
public void setClientRetryOption(ClientRetryOption clientRetryOption) {
this.clientRetryOption = clientRetryOption;
}

@Override
public void setNameResolverProvider(NameResolverProvider nameResolverProvider) {
this.nameResolverProvider = Objects.requireNonNull(nameResolverProvider, "nameResolverProvider");
Expand All @@ -95,6 +102,7 @@ public ChannelFactory build() {

return new DefaultChannelFactory(factoryName, executorQueueSize,
headerFactory, nameResolverProvider,
clientOption, clientInterceptorList, sslContext);
clientOption, clientInterceptorList,
sslContext, clientRetryOption);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.navercorp.pinpoint.grpc.client.config;

import java.util.Map;

public class ClientRetryOption {

private final boolean retryEnabled;
private final long retryBufferSize;
private final long perRpcBufferLimit;
private final Map<String, ?> retryServiceConfig;

public ClientRetryOption(boolean retryEnabled, long retryBufferSize, long perRpcBufferLimit, Map<String, ?> retryServiceConfig) {
this.retryEnabled = retryEnabled;
this.retryBufferSize = retryBufferSize;
this.perRpcBufferLimit = perRpcBufferLimit;
this.retryServiceConfig = retryServiceConfig;
}

public boolean isRetryEnabled() {
return retryEnabled;
}

public long getRetryBufferSize() {
return retryBufferSize;
}

public long getPerRpcBufferLimit() {
return perRpcBufferLimit;
}

public Map<String, ?> getRetryServiceConfig() {
return retryServiceConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.navercorp.pinpoint.grpc.client.retry;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class HedgingServiceConfigBuilder implements ServiceConfigBuilder {

public static final int DEFAULT_MAX_ATTEMPTS = 3;
public static final long DEFAULT_HEDGING_DELAY_MILLIS = 1000L;

private double maxAttempts = DEFAULT_MAX_ATTEMPTS; //Required. Must be two or greater
private String hedgingDelay = millisToString(DEFAULT_HEDGING_DELAY_MILLIS); //Required. Long decimal with "s" appended
private List<String> nonFatalStatusCodes; //Optional (eg. [14], ["UNAVAILABLE"] or ["unavailable"])

@Override
public Map<String, ?> buildMetadataConfig() {
Map<String, Object> methodConfig = new LinkedHashMap<>();
addMetadataService(methodConfig);
addHedgingPolicy(methodConfig);
return Collections.singletonMap("methodConfig", Collections.singletonList(methodConfig));
}

private void addMetadataService(Map<String, Object> methodConfig) {
Map<String, Object> service = Collections.singletonMap("service", METADATA_SERVICE);
methodConfig.put("name", Collections.singletonList(service));
}

private void addHedgingPolicy(Map<String, Object> methodConfig) {
Map<String, Object> retryPolicy = new LinkedHashMap<>();
retryPolicy.put("maxAttempts", maxAttempts);
retryPolicy.put("hedgingDelay", hedgingDelay);
if (nonFatalStatusCodes != null && !nonFatalStatusCodes.isEmpty()) {
retryPolicy.put("nonFatalStatusCodes", nonFatalStatusCodes);
}

methodConfig.put("hedgingPolicy", retryPolicy);
}


public void setMaxAttempts(int maxAttempts) {
if (maxAttempts >= 2) {
this.maxAttempts = maxAttempts;
}
}

public void setHedgingDelayMillis(long hedgingDelay) {
this.hedgingDelay = millisToString(hedgingDelay);
}

public void setNonFatalStatusCodes(List<String> nonFatalStatusCodes) {
this.nonFatalStatusCodes = nonFatalStatusCodes;
}

public String millisToString(long value) {
return value / 1000.0 + "s";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.navercorp.pinpoint.grpc.client.retry;

import io.grpc.Status;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class RetryServiceConfigBuilder implements ServiceConfigBuilder {

public static final double DEFAULT_MAX_ATTEMPTS = 3.0;
public static final long DEFAULT_INITIAL_BACKOFF_MILLIS = 1000L;
public static final long DEFAULT_MAX_BACKOFF_MILLIS = 4000L;
public static final double DEFAULT_BACKOFF_MULTIPLIER = 2.0;
public static final List<String> DEFAULT_RETRYABLE_STATUS_CODES = Collections.singletonList(Status.Code.UNAVAILABLE.name());

private Double maxAttempts = DEFAULT_MAX_ATTEMPTS; //Required. Must be two or greater
private String initialBackoff = millisToString(DEFAULT_INITIAL_BACKOFF_MILLIS); //Required. Long decimal with "s" appended
private String maxBackoff = millisToString(DEFAULT_MAX_BACKOFF_MILLIS); //Required. Long decimal with "s" appended
private Double backoffMultiplier = DEFAULT_BACKOFF_MULTIPLIER; //Required. Must be greater than zero.
private List<String> retryableStatusCodes; //Required and must be non-empty (eg. [14], ["UNAVAILABLE"] or ["unavailable"])

@Override
public Map<String, ?> buildMetadataConfig() {
Map<String, Object> methodConfig = new LinkedHashMap<>();
addMetadataService(methodConfig);
addRetryPolicy(methodConfig);
return Collections.singletonMap("methodConfig", Collections.singletonList(methodConfig));
}

private void addMetadataService(Map<String, Object> methodConfig) {
Map<String, Object> service = Collections.singletonMap("service", METADATA_SERVICE);
methodConfig.put("name", Collections.singletonList(service));
}

private void addRetryPolicy(Map<String, Object> methodConfig) {
Map<String, Object> retryPolicy = new LinkedHashMap<>();
retryPolicy.put("maxAttempts", maxAttempts);
retryPolicy.put("initialBackoff", initialBackoff);
retryPolicy.put("maxBackoff", maxBackoff);
retryPolicy.put("backoffMultiplier", backoffMultiplier);
if (retryableStatusCodes == null || retryableStatusCodes.isEmpty()) {
retryableStatusCodes = DEFAULT_RETRYABLE_STATUS_CODES;
}
retryPolicy.put("retryableStatusCodes", retryableStatusCodes);

methodConfig.put("retryPolicy", retryPolicy);
}

public void setMaxAttempts(double maxAttempts) {
if (maxAttempts >= 2) {
this.maxAttempts = maxAttempts;
}
}

public void setInitialBackOff(long initialBackoff) {
this.initialBackoff = millisToString(initialBackoff);
}

public void setMaxBackoff(long maxBackoff) {
this.maxBackoff = millisToString(maxBackoff);
}

public void setBackoffMultiplier(double backoffMultiplier) {
this.backoffMultiplier = backoffMultiplier;
}

public void setRetryableStatusCodes(List<String> retryableStatusCodes) {
this.retryableStatusCodes = retryableStatusCodes;
}

public String millisToString(long value) {
return value / 1000.0 + "s";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.navercorp.pinpoint.grpc.client.retry;

import java.util.Map;

public interface ServiceConfigBuilder {

String METADATA_SERVICE = "v1.metadata";

Map<String, ?> buildMetadataConfig();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.navercorp.pinpoint.grpc.client.config;

import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import com.navercorp.pinpoint.grpc.client.retry.HedgingServiceConfigBuilder;
import com.navercorp.pinpoint.grpc.client.retry.RetryServiceConfigBuilder;
import io.grpc.internal.JsonUtil;
import org.junit.jupiter.api.Test;

import java.io.InputStreamReader;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;


public class RetryConfigTest {

@Test
public void retryServiceConfigBuilderTest() {
Map<String, ?> retryServiceConfig = new RetryServiceConfigBuilder().buildMetadataConfig();
Map<String, ?> exampleServiceConfig =
new Gson()
.fromJson(
new JsonReader(
new InputStreamReader(
Objects.requireNonNull(this.getClass().getClassLoader().getResourceAsStream(
"client/example/retry_service_config.json")),
UTF_8)),
Map.class);
System.out.println(retryServiceConfig);
System.out.println(exampleServiceConfig);

Map<String, ?> retryPolicy = getPolicy(retryServiceConfig, "retryPolicy");
Map<String, ?> examplePolicy = getPolicy(exampleServiceConfig, "retryPolicy");
for (String key : examplePolicy.keySet()) {
assertThat(retryPolicy.containsKey(key)).isTrue();
}
}

@Test
public void hedgeServiceConfigBuilderTest() {
Map<String, ?> hedgeServiceConfig = new HedgingServiceConfigBuilder().buildMetadataConfig();
Map<String, ?> exampleServiceConfig =
new Gson()
.fromJson(
new JsonReader(
new InputStreamReader(
Objects.requireNonNull(this.getClass().getClassLoader().getResourceAsStream(
"client/example/hedging_service_config.json")),
UTF_8)),
Map.class);

System.out.println(hedgeServiceConfig);
System.out.println(exampleServiceConfig);

Map<String, ?> retryPolicy = getPolicy(hedgeServiceConfig, "hedgingPolicy");
Map<String, ?> examplePolicy = getPolicy(exampleServiceConfig, "hedgingPolicy");
for (String key : examplePolicy.keySet()) {
assertThat(retryPolicy.containsKey(key)).isTrue();
}
}

private Map<String, ?> getPolicy(Map<String, ?> serviceConfig, String policy) {
List<?> methodConfig = JsonUtil.getList(serviceConfig, "methodConfig");
assertThat(methodConfig).isNotEmpty();
return JsonUtil.getObject((Map<String, ?>) methodConfig.get(0), policy);
}
}

0 comments on commit d0444b4

Please sign in to comment.