Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.cloud.proto.MetaServiceGrpc;
import org.apache.doris.common.Config;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import io.grpc.ConnectivityState;
Expand All @@ -39,6 +40,7 @@

public class MetaServiceClient {
public static final Logger LOG = LogManager.getLogger(MetaServiceClient.class);
private static final Map<String, ?> serviceConfig;

private final String address;
private final MetaServiceGrpc.MetaServiceFutureStub stub;
Expand All @@ -50,6 +52,12 @@ public class MetaServiceClient {

static {
NameResolverRegistry.getDefaultRegistry().register(new MetaServiceListResolverProvider());

// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy-capabilities
serviceConfig = new Gson().fromJson(new JsonReader(new InputStreamReader(
MetaServiceClient.class.getResourceAsStream("/retrying_service_config.json"),
StandardCharsets.UTF_8)), Map.class);
LOG.info("serviceConfig:{}", serviceConfig);
}

public MetaServiceClient(String address) {
Expand All @@ -61,10 +69,12 @@ public MetaServiceClient(String address) {
if (isMetaServiceEndpointList) {
target = MetaServiceListResolverProvider.MS_LIST_SCHEME_PREFIX + address;
}

Preconditions.checkNotNull(serviceConfig, "serviceConfig is null");
channel = NettyChannelBuilder.forTarget(target)
.flowControlWindow(Config.grpc_max_message_size_bytes)
.maxInboundMessageSize(Config.grpc_max_message_size_bytes)
.defaultServiceConfig(getRetryingServiceConfig())
.defaultServiceConfig(serviceConfig)
.defaultLoadBalancingPolicy("round_robin")
.enableRetry()
.usePlaintext().build();
Expand All @@ -85,15 +95,6 @@ private long connectionAgeExpiredAt() {
return Long.MAX_VALUE;
}

protected Map<String, ?> getRetryingServiceConfig() {
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy-capabilities
Map<String, ?> serviceConfig = new Gson().fromJson(new JsonReader(new InputStreamReader(
MetaServiceClient.class.getResourceAsStream("/retrying_service_config.json"),
StandardCharsets.UTF_8)), Map.class);
LOG.info("serviceConfig:{}", serviceConfig);
return serviceConfig;
}

// Is the connection age has expired?
public boolean isConnectionAgeExpired() {
return Config.meta_service_connection_age_base_minutes > 0
Expand Down
Loading