Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js-xds: Implement custom LB policies #2555

Merged
merged 8 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/deps/envoy-api
Submodule envoy-api updated 703 files
1 change: 1 addition & 0 deletions packages/grpc-js-xds/gulpfile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const compile = checkTask(() => execNpmCommand('compile'));

const runTests = checkTask(() => {
process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION = 'true';
process.env.GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG = 'true';
return gulp.src(`${outDir}/test/**/*.js`)
.pipe(mocha({reporter: 'mocha-jenkins-reporter',
require: ['ts-node/register']}));
Expand Down
100 changes: 95 additions & 5 deletions packages/grpc-js-xds/interop/xds-interop-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,98 @@ import { XdsUpdateClientConfigureServiceHandlers } from './generated/grpc/testin
import { Empty__Output } from './generated/grpc/testing/Empty';
import { LoadBalancerAccumulatedStatsResponse } from './generated/grpc/testing/LoadBalancerAccumulatedStatsResponse';

import TypedLoadBalancingConfig = grpc.experimental.TypedLoadBalancingConfig;
import LoadBalancer = grpc.experimental.LoadBalancer;
import ChannelControlHelper = grpc.experimental.ChannelControlHelper;
import ChildLoadBalancerHandler = grpc.experimental.ChildLoadBalancerHandler;
import SubchannelAddress = grpc.experimental.SubchannelAddress;
import Picker = grpc.experimental.Picker;
import PickArgs = grpc.experimental.PickArgs;
import PickResult = grpc.experimental.PickResult;
import PickResultType = grpc.experimental.PickResultType;
import createChildChannelControlHelper = grpc.experimental.createChildChannelControlHelper;
import parseLoadBalancingConfig = grpc.experimental.parseLoadBalancingConfig;

grpc_xds.register();

const LB_POLICY_NAME = 'test.RpcBehaviorLoadBalancer';

class RpcBehaviorLoadBalancingConfig implements TypedLoadBalancingConfig {
constructor(private rpcBehavior: string) {}
getLoadBalancerName(): string {
return LB_POLICY_NAME;
}
toJsonObject(): object {
return {
[LB_POLICY_NAME]: {
'rpcBehavior': this.rpcBehavior
}
};
}
getRpcBehavior() {
return this.rpcBehavior;
}
static createFromJson(obj: any): RpcBehaviorLoadBalancingConfig {
if (!('rpcBehavior' in obj && typeof obj.rpcBehavior === 'string')) {
throw new Error(`${LB_POLICY_NAME} parsing error: expected string field rpcBehavior`);
}
return new RpcBehaviorLoadBalancingConfig(obj.rpcBehavior);
}
}

class RpcBehaviorPicker implements Picker {
constructor(private wrappedPicker: Picker, private rpcBehavior: string) {}
pick(pickArgs: PickArgs): PickResult {
const wrappedPick = this.wrappedPicker.pick(pickArgs);
if (wrappedPick.pickResultType === PickResultType.COMPLETE) {
pickArgs.metadata.add('rpc-behavior', this.rpcBehavior);
}
return wrappedPick;
}
}

const RPC_BEHAVIOR_CHILD_CONFIG = parseLoadBalancingConfig({round_robin: {}});

/**
* Load balancer implementation for Custom LB policy test
*/
class RpcBehaviorLoadBalancer implements LoadBalancer {
private child: ChildLoadBalancerHandler;
private latestConfig: RpcBehaviorLoadBalancingConfig | null = null;
constructor(channelControlHelper: ChannelControlHelper) {
const childChannelControlHelper = createChildChannelControlHelper(channelControlHelper, {
updateState: (connectivityState, picker) => {
if (connectivityState === grpc.connectivityState.READY && this.latestConfig) {
picker = new RpcBehaviorPicker(picker, this.latestConfig.getLoadBalancerName());
}
channelControlHelper.updateState(connectivityState, picker);
}
});
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
}
updateAddressList(addressList: SubchannelAddress[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
return;
}
this.latestConfig = lbConfig;
this.child.updateAddressList(addressList, RPC_BEHAVIOR_CHILD_CONFIG, attributes);
}
exitIdle(): void {
this.child.exitIdle();
}
resetBackoff(): void {
this.child.resetBackoff();
}
destroy(): void {
this.child.destroy();
}
getTypeName(): string {
return LB_POLICY_NAME;
}
}

grpc.experimental.registerLoadBalancerType(LB_POLICY_NAME, RpcBehaviorLoadBalancer, RpcBehaviorLoadBalancingConfig);

const packageDefinition = protoLoader.loadSync('grpc/testing/test.proto', {
keepCase: true,
defaults: true,
Expand Down Expand Up @@ -91,7 +181,7 @@ class CallSubscriber {
}
if (peerName in this.callsSucceededByPeer) {
this.callsSucceededByPeer[peerName] += 1;
} else {
} else {
this.callsSucceededByPeer[peerName] = 1;
}
this.callsSucceeded += 1;
Expand Down Expand Up @@ -426,9 +516,9 @@ function main() {
* channels do not share any subchannels. It does not have any
* inherent function. */
console.log(`Interop client channel ${i} starting sending ${argv.qps} QPS to ${argv.server}`);
sendConstantQps(new loadedProto.grpc.testing.TestService(argv.server, grpc.credentials.createInsecure(), {'unique': i}),
argv.qps,
argv.fail_on_failed_rpcs === 'true',
sendConstantQps(new loadedProto.grpc.testing.TestService(argv.server, grpc.credentials.createInsecure(), {'unique': i}),
argv.qps,
argv.fail_on_failed_rpcs === 'true',
callStatsTracker);
}

Expand Down Expand Up @@ -486,4 +576,4 @@ function main() {

if (require.main === module) {
main();
}
}
4 changes: 3 additions & 1 deletion packages/grpc-js-xds/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"prepare": "npm run compile",
"pretest": "npm run compile",
"posttest": "npm run check",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/xds/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto udpa/type/v1/typed_struct.proto xds/type/v3/typed_struct.proto envoy/extensions/filters/http/fault/v3/fault.proto envoy/service/status/v3/csds.proto",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/xds/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto udpa/type/v1/typed_struct.proto xds/type/v3/typed_struct.proto envoy/extensions/filters/http/fault/v3/fault.proto envoy/service/status/v3/csds.proto envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto",
"generate-interop-types": "proto-loader-gen-types --keep-case --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs proto/ -O interop/generated --grpcLib @grpc/grpc-js grpc/testing/test.proto",
"generate-test-types": "proto-loader-gen-types --keep-case --longs String --enums String --defaults --oneofs --json --includeComments --includeDirs proto/ -O test/generated --grpcLib @grpc/grpc-js grpc/testing/echo.proto"
},
Expand Down Expand Up @@ -59,13 +59,15 @@
"build/src/**/*.{js,d.ts,js.map}",
"deps/envoy-api/envoy/admin/v3/**/*.proto",
"deps/envoy-api/envoy/config/**/*.proto",
"deps/envoy-api/envoy/data/**/*.proto",
"deps/envoy-api/envoy/service/**/*.proto",
"deps/envoy-api/envoy/type/**/*.proto",
"deps/envoy-api/envoy/annotations/**/*.proto",
"deps/envoy-api/envoy/extensions/**/*.proto",
"deps/googleapis/google/api/**/*.proto",
"deps/googleapis/google/protobuf/**/*.proto",
"deps/googleapis/google/rpc/**/*.proto",
"deps/protoc-gen-validate/**/*.proto",
"deps/xds/udpa/annotations/**/*.proto",
"deps/xds/udpa/type/**/*.proto",
"deps/xds/xds/annotations/**/*.proto",
Expand Down
1 change: 1 addition & 0 deletions packages/grpc-js-xds/src/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ export const EXPERIMENTAL_FAULT_INJECTION = (process.env.GRPC_XDS_EXPERIMENTAL_F
export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';
export const EXPERIMENTAL_RETRY = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY ?? 'true') === 'true';
export const EXPERIMENTAL_FEDERATION = (process.env.GRPC_EXPERIMENTAL_XDS_FEDERATION ?? 'false') === 'true';
export const EXPERIMENTAL_CUSTOM_LB_CONFIG = (process.env.GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG ?? 'false') === 'true';
7 changes: 6 additions & 1 deletion packages/grpc-js-xds/src/generated/ads.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 5 additions & 10 deletions packages/grpc-js-xds/src/generated/cluster.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.