Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js-xds: Implement federation support #2463

Merged
merged 12 commits into from
Jun 26, 2023
Merged
1 change: 1 addition & 0 deletions packages/grpc-js-xds/gulpfile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const cleanAll = gulp.parallel(clean);
const compile = checkTask(() => execNpmCommand('compile'));

const runTests = checkTask(() => {
process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION = 'true';
return gulp.src(`${outDir}/test/**/*.js`)
.pipe(mocha({reporter: 'mocha-jenkins-reporter',
require: ['ts-node/register']}));
Expand Down
3 changes: 2 additions & 1 deletion packages/grpc-js-xds/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
"dependencies": {
"@grpc/proto-loader": "^0.6.0",
"google-auth-library": "^7.0.2",
"re2-wasm": "^1.0.1"
"re2-wasm": "^1.0.1",
"vscode-uri": "^3.0.7"
},
"peerDependencies": {
"@grpc/grpc-js": "~1.8.0"
Expand Down
147 changes: 39 additions & 108 deletions packages/grpc-js-xds/src/csds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@
*
*/

import { Node } from "./generated/envoy/config/core/v3/Node";
import { ClientConfig, _envoy_service_status_v3_ClientConfig_GenericXdsConfig as GenericXdsConfig } from "./generated/envoy/service/status/v3/ClientConfig";
import { ClientStatusDiscoveryServiceHandlers } from "./generated/envoy/service/status/v3/ClientStatusDiscoveryService";
import { ClientStatusRequest__Output } from "./generated/envoy/service/status/v3/ClientStatusRequest";
import { ClientStatusResponse } from "./generated/envoy/service/status/v3/ClientStatusResponse";
import { Timestamp } from "./generated/google/protobuf/Timestamp";
import { AdsTypeUrl, CDS_TYPE_URL, EDS_TYPE_URL, LDS_TYPE_URL, RDS_TYPE_URL } from "./resources";
import { HandleResponseResult } from "./xds-stream-state/xds-stream-state";
import { xdsResourceNameToString } from "./resources";
import { sendUnaryData, ServerDuplexStream, ServerUnaryCall, status, experimental, loadPackageDefinition, logVerbosity } from '@grpc/grpc-js';
import { loadSync } from "@grpc/proto-loader";
import { ProtoGrpcType as CsdsProtoGrpcType } from "./generated/csds";

import registerAdminService = experimental.registerAdminService;
import { XdsClient } from "./xds-client";

const TRACER_NAME = 'csds';

Expand All @@ -47,115 +46,47 @@ function dateToProtoTimestamp(date?: Date | null): Timestamp | null {
}
}

let clientNode: Node | null = null;
const registeredClients: XdsClient[] = [];

const configStatus = {
[EDS_TYPE_URL]: new Map<string, GenericXdsConfig>(),
[CDS_TYPE_URL]: new Map<string, GenericXdsConfig>(),
[RDS_TYPE_URL]: new Map<string, GenericXdsConfig>(),
[LDS_TYPE_URL]: new Map<string, GenericXdsConfig>()
};

/**
* This function only accepts a v3 Node message, because we are only supporting
* v3 CSDS and it only handles v3 Nodes. If the client is actually using v2 xDS
* APIs, it should just provide the equivalent v3 Node message.
* @param node The Node message for the client that is requesting resources
*/
export function setCsdsClientNode(node: Node) {
clientNode = node;
}

/**
* Update the config status maps from the list of names of requested resources
* for a specific type URL. These lists are the source of truth for determining
* what resources will be listed in the CSDS response. Any resource that is not
* in this list will never actually be applied anywhere.
* @param typeUrl The resource type URL
* @param names The list of resource names that are being requested
*/
export function updateCsdsRequestedNameList(typeUrl: AdsTypeUrl, names: string[]) {
trace('Update type URL ' + typeUrl + ' with names [' + names + ']');
const currentTime = dateToProtoTimestamp(new Date());
const configMap = configStatus[typeUrl];
for (const name of names) {
if (!configMap.has(name)) {
configMap.set(name, {
type_url: typeUrl,
name: name,
last_updated: currentTime,
client_status: 'REQUESTED'
});
}
}
for (const name of configMap.keys()) {
if (!names.includes(name)) {
configMap.delete(name);
}
}
export function registerXdsClientWithCsds(client: XdsClient) {
registeredClients.push(client);
}

/**
* Update the config status maps from the result of parsing a single ADS
* response. All resources that validated are considered "ACKED", and all
* resources that failed validation are considered "NACKED".
* @param typeUrl The type URL of resources in this response
* @param versionInfo The version info field from this response
* @param updates The lists of resources that passed and failed validation
*/
export function updateCsdsResourceResponse(typeUrl: AdsTypeUrl, versionInfo: string, updates: HandleResponseResult) {
const currentTime = dateToProtoTimestamp(new Date());
const configMap = configStatus[typeUrl];
for (const {name, raw} of updates.accepted) {
const mapEntry = configMap.get(name);
if (mapEntry) {
trace('Updated ' + typeUrl + ' resource ' + name + ' to state ACKED');
mapEntry.client_status = 'ACKED';
mapEntry.version_info = versionInfo;
mapEntry.xds_config = raw;
mapEntry.error_state = null;
mapEntry.last_updated = currentTime;
}
}
for (const {name, error, raw} of updates.rejected) {
const mapEntry = configMap.get(name);
if (mapEntry) {
trace('Updated ' + typeUrl + ' resource ' + name + ' to state NACKED');
mapEntry.client_status = 'NACKED';
mapEntry.error_state = {
failed_configuration: raw,
last_update_attempt: currentTime,
details: error,
version_info: versionInfo
};
}
}
for (const name of updates.missing) {
const mapEntry = configMap.get(name);
if (mapEntry) {
trace('Updated ' + typeUrl + ' resource ' + name + ' to state DOES_NOT_EXIST');
mapEntry.client_status = 'DOES_NOT_EXIST';
mapEntry.version_info = versionInfo;
mapEntry.xds_config = null;
mapEntry.error_state = null;
mapEntry.last_updated = currentTime;
function getCurrentConfigList(): ClientConfig[] {
const result: ClientConfig[] = [];
for (const client of registeredClients) {
if (!client.adsNode) {
continue;
}
}
}

function getCurrentConfig(): ClientConfig {
const genericConfigList: GenericXdsConfig[] = [];
for (const configMap of Object.values(configStatus)) {
for (const configValue of configMap.values()) {
genericConfigList.push(configValue);
const genericConfigList: GenericXdsConfig[] = [];
for (const [authority, authorityState] of client.authorityStateMap) {
for (const [type, typeMap] of authorityState.resourceMap) {
for (const [key, resourceState] of typeMap) {
const typeUrl = type.getTypeUrl();
const meta = resourceState.meta;
genericConfigList.push({
name: xdsResourceNameToString({authority, key}, typeUrl),
type_url: typeUrl,
client_status: meta.clientStatus,
version_info: meta.version,
xds_config: meta.clientStatus === 'ACKED' ? meta.rawResource : undefined,
last_updated: meta.updateTime ? dateToProtoTimestamp(meta.updateTime) : undefined,
error_state: meta.clientStatus === 'NACKED' ? {
details: meta.failedDetails,
failed_configuration: meta.rawResource,
last_update_attempt: meta.failedUpdateTime ? dateToProtoTimestamp(meta.failedUpdateTime) : undefined,
version_info: meta.failedVersion
} : undefined
});
}
}
}
result.push({
node: client.adsNode,
generic_xds_configs: genericConfigList
});
}
const config = {
node: clientNode,
generic_xds_configs: genericConfigList
};
trace('Sending current config ' + JSON.stringify(config, undefined, 2));
return config;
return result;
}

const csdsImplementation: ClientStatusDiscoveryServiceHandlers = {
Expand All @@ -169,7 +100,7 @@ const csdsImplementation: ClientStatusDiscoveryServiceHandlers = {
return;
}
callback(null, {
config: [getCurrentConfig()]
config: getCurrentConfigList()
});
},
StreamClientStatus(call: ServerDuplexStream<ClientStatusRequest__Output, ClientStatusResponse>) {
Expand All @@ -182,7 +113,7 @@ const csdsImplementation: ClientStatusDiscoveryServiceHandlers = {
return;
}
call.write({
config: [getCurrentConfig()]
config: getCurrentConfigList()
});
});
call.on('end', () => {
Expand Down
3 changes: 2 additions & 1 deletion packages/grpc-js-xds/src/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@

export const EXPERIMENTAL_FAULT_INJECTION = (process.env.GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION ?? 'true') === 'true';
export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';
export const EXPERIMENTAL_RETRY = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY ?? 'true') === 'true';
export const EXPERIMENTAL_RETRY = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY ?? 'true') === 'true';
murgatroid99 marked this conversation as resolved.
Show resolved Hide resolved
export const EXPERIMENTAL_FEDERATION = (process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION ?? 'false') === 'true';