Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Return LB policy configs from resolvers in JSON form #2538

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions packages/grpc-js-xds/src/duration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { experimental } from '@grpc/grpc-js';
import { Duration__Output } from './generated/google/protobuf/Duration';
import Duration = experimental.Duration;

/**
* Convert a Duration protobuf message object to a Duration object as used in
* the ServiceConfig definition. The difference is that the protobuf message
* defines seconds as a long, which is represented as a string in JavaScript,
* and the one used in the service config defines it as a number.
* @param duration
*/
export function protoDurationToDuration(duration: Duration__Output): Duration {
return {
seconds: Number.parseInt(duration.seconds),
nanos: duration.nanos
};
}
83 changes: 34 additions & 49 deletions packages/grpc-js-xds/src/load-balancer-cds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
*/

import { connectivityState, status, Metadata, logVerbosity, experimental } from '@grpc/grpc-js';
import { connectivityState, status, Metadata, logVerbosity, experimental, LoadBalancingConfig } from '@grpc/grpc-js';
import { getSingletonXdsClient, Watcher, XdsClient } from './xds-client';
import { Cluster__Output } from './generated/envoy/config/cluster/v3/Cluster';
import SubchannelAddress = experimental.SubchannelAddress;
Expand All @@ -24,17 +24,18 @@ import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import LoadBalancingConfig = experimental.LoadBalancingConfig;
import OutlierDetectionLoadBalancingConfig = experimental.OutlierDetectionLoadBalancingConfig;
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import SuccessRateEjectionConfig = experimental.SuccessRateEjectionConfig;
import FailurePercentageEjectionConfig = experimental.FailurePercentageEjectionConfig;
import QueuePicker = experimental.QueuePicker;
import OutlierDetectionRawConfig = experimental.OutlierDetectionRawConfig;
import parseLoadBalancingConfig = experimental.parseLoadBalancingConfig;
import { OutlierDetection__Output } from './generated/envoy/config/cluster/v3/OutlierDetection';
import { Duration__Output } from './generated/google/protobuf/Duration';
import { EXPERIMENTAL_OUTLIER_DETECTION } from './environment';
import { DiscoveryMechanism, XdsClusterResolverChildPolicyHandler, XdsClusterResolverLoadBalancingConfig } from './load-balancer-xds-cluster-resolver';
import { DiscoveryMechanism, XdsClusterResolverChildPolicyHandler } from './load-balancer-xds-cluster-resolver';
import { CLUSTER_CONFIG_TYPE_URL, decodeSingleResource } from './resources';
import { CdsUpdate, ClusterResourceType, OutlierDetectionUpdate } from './xds-resource-type/cluster-resource-type';
import { CdsUpdate, ClusterResourceType } from './xds-resource-type/cluster-resource-type';

const TRACER_NAME = 'cds_balancer';

Expand All @@ -44,7 +45,7 @@ function trace(text: string): void {

const TYPE_NAME = 'cds';

export class CdsLoadBalancingConfig implements LoadBalancingConfig {
class CdsLoadBalancingConfig implements TypedLoadBalancingConfig {
getLoadBalancerName(): string {
return TYPE_NAME;
}
Expand Down Expand Up @@ -72,29 +73,6 @@ export class CdsLoadBalancingConfig implements LoadBalancingConfig {
}
}

function durationToMs(duration: Duration__Output): number {
return (Number(duration.seconds) * 1_000 + duration.nanos / 1_000_000) | 0;
}

function translateOutlierDetectionConfig(outlierDetection: OutlierDetectionUpdate | undefined): OutlierDetectionLoadBalancingConfig | undefined {
if (!EXPERIMENTAL_OUTLIER_DETECTION) {
return undefined;
}
if (!outlierDetection) {
/* No-op outlier detection config, with all fields unset. */
return new OutlierDetectionLoadBalancingConfig(null, null, null, null, null, null, []);
}
return new OutlierDetectionLoadBalancingConfig(
outlierDetection.intervalMs,
outlierDetection.baseEjectionTimeMs,
outlierDetection.maxEjectionTimeMs,
outlierDetection.maxEjectionPercent,
outlierDetection.successRateConfig,
outlierDetection.failurePercentageConfig,
[]
);
}

interface ClusterEntry {
watcher: Watcher<CdsUpdate>;
latestUpdate?: CdsUpdate;
Expand Down Expand Up @@ -133,16 +111,16 @@ function generateDiscoverymechanismForCdsUpdate(config: CdsUpdate): DiscoveryMec
type: config.type,
eds_service_name: config.edsServiceName,
dns_hostname: config.dnsHostname,
outlier_detection: translateOutlierDetectionConfig(config.outlierDetectionUpdate)
outlier_detection: config.outlierDetectionUpdate
};
}

const RECURSION_DEPTH_LIMIT = 15;

/**
* Prerequisite: isClusterTreeFullyUpdated(tree, root)
* @param tree
* @param root
* @param tree
* @param root
*/
function getDiscoveryMechanismList(tree: ClusterTree, root: string): DiscoveryMechanism[] {
const visited = new Set<string>();
Expand Down Expand Up @@ -189,6 +167,11 @@ export class CdsLoadBalancer implements LoadBalancer {
this.childBalancer = new XdsClusterResolverChildPolicyHandler(channelControlHelper);
}

private reportError(errorMessage: string) {
trace('CDS cluster reporting error ' + errorMessage);
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: errorMessage, metadata: new Metadata()}));
}

private addCluster(cluster: string) {
if (cluster in this.clusterTree) {
return;
Expand All @@ -208,19 +191,28 @@ export class CdsLoadBalancer implements LoadBalancer {
try {
discoveryMechanismList = getDiscoveryMechanismList(this.clusterTree, this.latestConfig!.getCluster());
} catch (e) {
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: e.message, metadata: new Metadata()}));
this.reportError((e as Error).message);
return;
}
const clusterResolverConfig: LoadBalancingConfig = {
xds_cluster_resolver: {
discovery_mechanisms: discoveryMechanismList,
locality_picking_policy: [],
endpoint_picking_policy: []
}
};
let parsedClusterResolverConfig: TypedLoadBalancingConfig;
try {
parsedClusterResolverConfig = parseLoadBalancingConfig(clusterResolverConfig);
} catch (e) {
this.reportError(`CDS cluster ${this.latestConfig?.getCluster()} child config parsing failed with error ${(e as Error).message}`);
return;
}
const clusterResolverConfig = new XdsClusterResolverLoadBalancingConfig(
discoveryMechanismList,
[],
[]
);
trace('Child update config: ' + JSON.stringify(clusterResolverConfig));
this.updatedChild = true;
this.childBalancer.updateAddressList(
[],
clusterResolverConfig,
parsedClusterResolverConfig,
this.latestAttributes
);
}
Expand All @@ -231,20 +223,13 @@ export class CdsLoadBalancer implements LoadBalancer {
this.clusterTree[cluster].latestUpdate = undefined;
this.clusterTree[cluster].children = [];
}
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `CDS resource ${cluster} does not exist`, metadata: new Metadata()}));
this.reportError(`CDS resource ${cluster} does not exist`);
this.childBalancer.destroy();
},
onError: (statusObj) => {
if (!this.updatedChild) {
trace('Transitioning to transient failure due to onError update for cluster' + cluster);
this.channelControlHelper.updateState(
connectivityState.TRANSIENT_FAILURE,
new UnavailablePicker({
code: status.UNAVAILABLE,
details: `xDS request failed with error ${statusObj.details}`,
metadata: new Metadata(),
})
);
this.reportError(`xDS request failed with error ${statusObj.details}`);
}
}
});
Expand Down Expand Up @@ -275,7 +260,7 @@ export class CdsLoadBalancer implements LoadBalancer {

updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
if (!(lbConfig instanceof CdsLoadBalancingConfig)) {
Expand Down
25 changes: 12 additions & 13 deletions packages/grpc-js-xds/src/load-balancer-lrs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import { XdsClusterLocalityStats, XdsClient, getSingletonXdsClient } from './xds
import LoadBalancer = experimental.LoadBalancer;
import ChannelControlHelper = experimental.ChannelControlHelper;
import registerLoadBalancerType = experimental.registerLoadBalancerType;
import getFirstUsableConfig = experimental.getFirstUsableConfig;
import SubchannelAddress = experimental.SubchannelAddress;
import LoadBalancingConfig = experimental.LoadBalancingConfig;
import ChildLoadBalancerHandler = experimental.ChildLoadBalancerHandler;
import Picker = experimental.Picker;
import PickArgs = experimental.PickArgs;
Expand All @@ -34,11 +32,12 @@ import Filter = experimental.Filter;
import BaseFilter = experimental.BaseFilter;
import FilterFactory = experimental.FilterFactory;
import Call = experimental.CallStream;
import validateLoadBalancingConfig = experimental.validateLoadBalancingConfig
import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import selectLbConfigFromList = experimental.selectLbConfigFromList;

const TYPE_NAME = 'lrs';

export class LrsLoadBalancingConfig implements LoadBalancingConfig {
class LrsLoadBalancingConfig implements TypedLoadBalancingConfig {
getLoadBalancerName(): string {
return TYPE_NAME;
}
Expand All @@ -49,12 +48,12 @@ export class LrsLoadBalancingConfig implements LoadBalancingConfig {
eds_service_name: this.edsServiceName,
lrs_load_reporting_server_name: this.lrsLoadReportingServer,
locality: this.locality,
child_policy: this.childPolicy.map(policy => policy.toJsonObject())
child_policy: [this.childPolicy.toJsonObject()]
}
}
}

constructor(private clusterName: string, private edsServiceName: string, private lrsLoadReportingServer: XdsServerConfig, private locality: Locality__Output, private childPolicy: LoadBalancingConfig[]) {}
constructor(private clusterName: string, private edsServiceName: string, private lrsLoadReportingServer: XdsServerConfig, private locality: Locality__Output, private childPolicy: TypedLoadBalancingConfig) {}

getClusterName() {
return this.clusterName;
Expand Down Expand Up @@ -98,11 +97,15 @@ export class LrsLoadBalancingConfig implements LoadBalancingConfig {
if (!('child_policy' in obj && Array.isArray(obj.child_policy))) {
throw new Error('lrs config must have a child_policy array');
}
const childConfig = selectLbConfigFromList(obj.config);
if (!childConfig) {
throw new Error('lrs config child_policy parsing failed');
}
return new LrsLoadBalancingConfig(obj.cluster_name, obj.eds_service_name, validateXdsServerConfig(obj.lrs_load_reporting_server), {
region: obj.locality.region ?? '',
zone: obj.locality.zone ?? '',
sub_zone: obj.locality.sub_zone ?? ''
}, obj.child_policy.map(validateLoadBalancingConfig));
}, childConfig);
}
}

Expand Down Expand Up @@ -161,7 +164,7 @@ export class LrsLoadBalancer implements LoadBalancer {

updateAddressList(
addressList: SubchannelAddress[],
lbConfig: LoadBalancingConfig,
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
): void {
if (!(lbConfig instanceof LrsLoadBalancingConfig)) {
Expand All @@ -173,11 +176,7 @@ export class LrsLoadBalancer implements LoadBalancer {
lbConfig.getEdsServiceName(),
lbConfig.getLocality()
);
const childPolicy: LoadBalancingConfig = getFirstUsableConfig(
lbConfig.getChildPolicy(),
true
);
this.childBalancer.updateAddressList(addressList, childPolicy, attributes);
this.childBalancer.updateAddressList(addressList, lbConfig.getChildPolicy(), attributes);
}
exitIdle(): void {
this.childBalancer.exitIdle();
Expand Down
Loading
Loading