Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lrs: handle multiple clusters in LRS stream #3935

Merged
merged 4 commits into from Oct 26, 2020

Conversation

@menghanl
Copy link
Contributor

@menghanl menghanl commented Oct 7, 2020

  • xdsclient.ReportLoad
    • create one LRS stream and one load.Store for each server address
    • to take just the server address, and return the load.Store
    • update EDS and LRS balancing policy to recreate LRS stream only when server address changes
  • LRS stream (v2 and v3)
    • set feature send_all_clusters in node
    • handle resp.Clusters and only send load for those clusters
    • handle resp.SendAllClusters and send load for all clusters
@menghanl menghanl requested a review from easwars Oct 7, 2020
@menghanl menghanl force-pushed the lrs_stream_report branch 2 times, most recently from d5de61e to 985f6e5 Oct 7, 2020
Copy link
Contributor

@easwars easwars left a comment

I haven't added much comments. But based on our discussion offline to have the lrsClient contain all LRS functionality, I will wait for that change to be made before adding other comments. Thanks.

xds/internal/client/client.go Outdated Show resolved Hide resolved
xds/internal/client/client_loadreport.go Outdated Show resolved Hide resolved
xds/internal/client/transport_helper.go Outdated Show resolved Hide resolved
@easwars easwars assigned menghanl and unassigned easwars Oct 7, 2020
@menghanl menghanl added this to the 1.33 Release milestone Oct 8, 2020
@menghanl menghanl force-pushed the lrs_stream_report branch from a7dd78c to 232759f Oct 8, 2020
@menghanl
Copy link
Contributor Author

@menghanl menghanl commented Oct 8, 2020

lrsClient done. PTAL.

@menghanl menghanl assigned easwars and unassigned menghanl Oct 8, 2020
xds/internal/client/client_loadreport.go Show resolved Hide resolved
xds/internal/client/client_loadreport.go Outdated Show resolved Hide resolved
xds/internal/client/client_loadreport.go Show resolved Hide resolved
xds/internal/client/client_loadreport.go Outdated Show resolved Hide resolved
if err != nil {
// An error from a non-blocking dial indicates something serious.
c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err)
return func() {}
lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err)
Copy link
Contributor

@easwars easwars Oct 9, 2020

Can we do something better than just throwing a log when the Dial fails. This seems to be irrecoverable at this point, because even if there is another call to ReportLoad we wont retry the Dial since the refCount will be non-zero.

Copy link
Contributor Author

@menghanl menghanl Oct 9, 2020

This should just panic... If this non-blocking dial fails, there's no way the LRS will keep working...

Copy link
Contributor

@easwars easwars Oct 15, 2020

Hmm ... but we are only throwing an Info log here. Am I missing something?

Copy link
Contributor Author

@menghanl menghanl Oct 20, 2020

I didn't want to add panic...

What I meant was, this should never happen (for real????). But we still Info just in case..

xds/internal/client/client.go Show resolved Hide resolved
xds/internal/client/transport_helper.go Outdated Show resolved Hide resolved
xds/internal/client/client_loadreport_test.go Show resolved Hide resolved
xds/internal/client/client_loadreport_test.go Outdated Show resolved Hide resolved
xds/internal/client/client_loadreport_test.go Outdated Show resolved Hide resolved
@easwars easwars assigned menghanl and unassigned easwars Oct 9, 2020
@GarrettGutierrez1 GarrettGutierrez1 removed this from the 1.33 Release milestone Oct 9, 2020
@GarrettGutierrez1 GarrettGutierrez1 added this to the 1.34 Release milestone Oct 9, 2020
Copy link
Contributor Author

@menghanl menghanl left a comment

Thanks for the review. All fixed. PTAL.

xds/internal/client/client_loadreport.go Show resolved Hide resolved
xds/internal/client/client_loadreport.go Outdated Show resolved Hide resolved
loadWrapper *loadStoreWrapper
// loadOriginal is the load.Store for reporting loads to lrsServerName. It's
// returned by the client.
loadOriginal *load.Store
Copy link
Contributor Author

@menghanl menghanl Oct 9, 2020

startLoadReport is only called when the LRS server changes.

We keep this around for cases where only the EDS service name changes (so we don't restart LRS, but we get a new PerClusterReport with the new service name).

xds/internal/balancer/lrs/balancer.go Outdated Show resolved Hide resolved
xds/internal/client/transport_helper.go Outdated Show resolved Hide resolved
xds/internal/balancer/edsbalancer/xds_client_wrapper.go Outdated Show resolved Hide resolved
xds/internal/balancer/lrs/balancer.go Outdated Show resolved Hide resolved
if err != nil {
// An error from a non-blocking dial indicates something serious.
c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err)
return func() {}
lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err)
Copy link
Contributor Author

@menghanl menghanl Oct 9, 2020

This should just panic... If this non-blocking dial fails, there's no way the LRS will keep working...

xds/internal/client/client_loadreport_test.go Outdated Show resolved Hide resolved
@menghanl menghanl assigned easwars and unassigned menghanl Oct 9, 2020
@menghanl

This comment has been hidden.

loadWrapper *loadStoreWrapper
// loadOriginal is the load.Store for reporting loads to lrsServerName. It's
// returned by the client.
loadOriginal *load.Store
Copy link
Contributor

@easwars easwars Oct 15, 2020

We could instead cache the load.Store returned by the call to xdsclient.ReportLoad() in the loadWrapper itself. And split the update() method into two: setStore() and setServiceName() which will called appropriately when handling the update. What do you think about this approach?

}
}

if updateLoadStore {
Copy link
Contributor

@easwars easwars Oct 15, 2020

Probably a copy-paste gotcha. You have the same conditional statement twice.

Copy link
Contributor Author

@menghanl menghanl Oct 20, 2020

This whole if statement is deleted.

}

if attr == nil {
return fmt.Errorf("failed to get xdsClient from attributes: attributes is nil")
Copy link
Contributor

@easwars easwars Oct 15, 2020

Should these errors be prefixed with xds: or lrs:. I'm a little confused about the policy for these error message. I guess we dont have to do that for log statements since the prefix logger takes care of it.

Copy link
Contributor Author

@menghanl menghanl Oct 20, 2020

Added lrs:.

The caller (which is the implementation of the balancer interface) could wrap. But that will be only to add the prefix, and seems to be not worth it. I added it here.

if err != nil {
// An error from a non-blocking dial indicates something serious.
c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err)
return func() {}
lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err)
Copy link
Contributor

@easwars easwars Oct 15, 2020

Hmm ... but we are only throwing an Info log here. Am I missing something?

// Report to the same address should not create new ClientConn.
store1, lrsCancel1 := xdsC.ReportLoad(fs.Address)
defer lrsCancel1()
ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
Copy link
Contributor

@easwars easwars Oct 15, 2020

Could you please create a different context here, so that we can use the context with the defaultTestTimeout deadline for other things which need it.

	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
	defer sCancel()
	if u, err := fs.NewConnChan.Receive(sCtx); err != context.DeadlineExceeded {

Copy link
Contributor Author

@menghanl menghanl Oct 20, 2020

Done

// Report to a different address should create new ClientConn.
store2, lrsCancel2 := xdsC.ReportLoad(fs2.Address)
defer lrsCancel2()
ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
Copy link
Contributor

@easwars easwars Oct 15, 2020

If we do the above, we wont need to reinitialize the context with the defaultTestTimeout here.

Copy link
Contributor Author

@menghanl menghanl Oct 20, 2020

Done

ClusterServiceName: "eds",
TotalDroppedRequests: 1,
DroppedRequests: []*endpointpb.ClusterStats_DroppedRequests{{Category: "test", DroppedCount: 1}},
}); !proto.Equal(want, receivedLoad[0]) {
Copy link
Contributor

@easwars easwars Oct 15, 2020

Would it be possible to use protocmp.Diff or whatever is the equivalent in the protobuf package and pass it to cmp.Diff so that we can print a useful error message.

Copy link
Contributor Author

@menghanl menghanl Oct 20, 2020

Done.

@@ -115,7 +115,7 @@ func (c *testAPIClient) RemoveWatch(resourceType ResourceType, resourceName stri
c.removeWatches[resourceType].Send(resourceName)
}

func (c *testAPIClient) ReportLoad(ctx context.Context, cc *grpc.ClientConn, opts LoadReportingOptions) {
func (c *testAPIClient) reportLoad(_ context.Context, _ *grpc.ClientConn, _ loadReportingOptions) {
Copy link
Contributor

@easwars easwars Oct 15, 2020

The underscores could be removed I think.

Copy link
Contributor Author

@menghanl menghanl Oct 20, 2020

Done

@easwars easwars assigned menghanl and unassigned easwars Oct 15, 2020
@menghanl menghanl force-pushed the lrs_stream_report branch 2 times, most recently from dec9231 to 13f8352 Oct 20, 2020
@menghanl menghanl assigned easwars and unassigned menghanl Oct 20, 2020
@@ -177,7 +177,9 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
return
}

x.client.handleUpdate(cfg, u.ResolverState.Attributes)
if err := x.client.handleUpdate(cfg, u.ResolverState.Attributes); err != nil {
x.logger.Infof("failed to update xds clients: %v", err)
Copy link
Contributor

@easwars easwars Oct 21, 2020

Errorf or Warningf instead?

Copy link
Contributor Author

@menghanl menghanl Oct 22, 2020

Changed to warning

func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
Copy link
Contributor

@easwars easwars Oct 21, 2020

The perClusterStore type in package load already checks for nil. Why is this required?

Copy link
Contributor Author

@menghanl menghanl Oct 22, 2020

Right, I forgot.
Deleted.

}

var dopts []grpc.DialOption
if dialer := c.bbo.Dialer; dialer != nil {
dopts = []grpc.DialOption{grpc.WithContextDialer(dialer)}
}

// TODO: there's no long a need to read bootstrap file and create a new xds
Copy link
Contributor

@easwars easwars Oct 21, 2020

s/long/longer/

Copy link
Contributor Author

@menghanl menghanl Oct 22, 2020

Done

func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
Copy link
Contributor

@easwars easwars Oct 21, 2020

Same comment about nil checks.

Copy link
Contributor Author

@menghanl menghanl Oct 22, 2020

Removed

@easwars easwars assigned menghanl and unassigned easwars Oct 21, 2020
menghanl added 3 commits Oct 22, 2020
- xdsclient.ReportLoad
  - create one LRS stream and one load.Store for each server address
  - to take just the server address, and return the load.Store
  - update EDS and LRS balancing policy to recreate LRS stream only when server address changes
- LRS stream (v2 and v3)
  - set feature `send_all_clusters` in `node`
  - handle `resp.Clusters` and only send load for those clusters
  - handle `resp.SendAllClusters` and send load for all clusters
@menghanl menghanl force-pushed the lrs_stream_report branch from 13f8352 to e199a5a Oct 22, 2020
@menghanl menghanl assigned easwars and unassigned menghanl Oct 22, 2020
@menghanl menghanl assigned menghanl and unassigned easwars Oct 22, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

3 participants