From cdeb1652b6981c25e4aba5ceb5e5b236d87e1df6 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Tue, 8 Dec 2020 11:00:14 -0800 Subject: [PATCH] xds: timeout support --- xds/internal/client/client.go | 9 +- xds/internal/client/client_rds_test.go | 92 +++++++++++++++++ xds/internal/client/client_xds.go | 10 +- xds/internal/env/env.go | 5 + xds/internal/resolver/serviceconfig.go | 40 +++++--- xds/internal/resolver/xds_resolver_test.go | 109 +++++++++++++++++++++ 6 files changed, 246 insertions(+), 19 deletions(-) diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index d78122ca5b9..73cfdc02207 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -176,10 +176,11 @@ type Route struct { Path, Prefix, Regex *string // Indicates if prefix/path matching should be case insensitive. The default // is false (case sensitive). - CaseInsensitive bool - Headers []*HeaderMatcher - Fraction *uint32 - Action map[string]uint32 // action is weighted clusters. + CaseInsensitive bool + Headers []*HeaderMatcher + Fraction *uint32 + Action map[string]uint32 // action is weighted clusters. + MaxStreamDuration time.Duration } // HeaderMatcher represents header matchers. diff --git a/xds/internal/client/client_rds_test.go b/xds/internal/client/client_rds_test.go index 5e9ee775865..ab4737376e5 100644 --- a/xds/internal/client/client_rds_test.go +++ b/xds/internal/client/client_rds_test.go @@ -20,6 +20,7 @@ package client import ( "testing" + "time" v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" v2routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" @@ -33,6 +34,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/xds/internal/version" + "google.golang.org/protobuf/types/known/durationpb" ) func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { @@ -290,6 +292,96 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { }, }, }, + { + name: "good-route-config-with-max-stream-duration", + rc: &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{ + { + Domains: []string{ldsTarget}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName}, + MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{MaxStreamDuration: durationpb.New(time.Second)}, + }, + }, + }, + }, + }, + }, + }, + wantUpdate: RouteConfigUpdate{ + VirtualHosts: []*VirtualHost{ + { + Domains: []string{ldsTarget}, + Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}}, + }, + }, + }, + }, + { + name: "good-route-config-with-grpc-timeout-header-max", + rc: &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{ + { + Domains: []string{ldsTarget}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName}, + MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{GrpcTimeoutHeaderMax: durationpb.New(time.Second)}, + }, + }, + }, + }, + }, + }, + }, + wantUpdate: RouteConfigUpdate{ + VirtualHosts: []*VirtualHost{ + { + Domains: []string{ldsTarget}, + Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}}, + }, + }, + }, + }, + { + name: "good-route-config-with-both-timeouts", + rc: &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{ + { + Domains: []string{ldsTarget}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName}, + MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{MaxStreamDuration: durationpb.New(2 * time.Second), GrpcTimeoutHeaderMax: durationpb.New(0)}, + }, + }, + }, + }, + }, + }, + }, + wantUpdate: RouteConfigUpdate{ + VirtualHosts: []*VirtualHost{ + { + Domains: []string{ldsTarget}, + Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: 0}}, + }, + }, + }, + }, } for _, test := range tests { diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go index 66376571546..f31b6009b6e 100644 --- a/xds/internal/client/client_xds.go +++ b/xds/internal/client/client_xds.go @@ -322,7 +322,8 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger) } clusters := make(map[string]uint32) - switch a := r.GetRoute().GetClusterSpecifier().(type) { + action := r.GetRoute() + switch a := action.GetClusterSpecifier().(type) { case *v3routepb.RouteAction_Cluster: clusters[a.Cluster] = 1 case *v3routepb.RouteAction_WeightedClusters: @@ -341,6 +342,13 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger) } route.Action = clusters + msd := action.GetMaxStreamDuration() + // Prefer grpc_timeout_header_max, if set. + if dur := msd.GetGrpcTimeoutHeaderMax(); dur != nil { + route.MaxStreamDuration = dur.AsDuration() + } else { + route.MaxStreamDuration = msd.GetMaxStreamDuration().AsDuration() + } routesRet = append(routesRet, &route) } return routesRet, nil diff --git a/xds/internal/env/env.go b/xds/internal/env/env.go index c0fa0e65b7a..e350e65ed55 100644 --- a/xds/internal/env/env.go +++ b/xds/internal/env/env.go @@ -29,6 +29,7 @@ const ( bootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP" xdsV3SupportEnv = "GRPC_XDS_EXPERIMENTAL_V3_SUPPORT" circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" + timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_TIMEOUT" ) var ( @@ -44,4 +45,8 @@ var ( // enabled, which can be done by setting the environment variable // "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "true". CircuitBreakingSupport = strings.EqualFold(os.Getenv(circuitBreakingSupportEnv), "true") + // TimeoutSupport indicates whether support for max_stream_duration in + // route actions is enabled. This can be enabled by setting the + // environment variable "GRPC_XDS_EXPERIMENTAL_TIMEOUT" to "true". + TimeoutSupport = strings.EqualFold(os.Getenv(timeoutSupportEnv), "true") ) diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index 50514303e89..ea6e22725a7 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -22,12 +22,14 @@ import ( "encoding/json" "fmt" "sync/atomic" + "time" "google.golang.org/grpc/codes" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/wrr" "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/balancer/clustermanager" + "google.golang.org/grpc/xds/internal/env" ) const ( @@ -93,12 +95,13 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) (string, error) { } type route struct { - action wrr.WRR - m *compositeMatcher // converted from route matchers + m *compositeMatcher // converted from route matchers + clusters wrr.WRR + maxStreamDuration time.Duration } func (r route) String() string { - return r.m.String() + "->" + fmt.Sprint(r.action) + return r.m.String() + "->" + fmt.Sprint(r.clusters) } type configSelector struct { @@ -110,18 +113,18 @@ type configSelector struct { var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found") func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) { - var action wrr.WRR + var rt *route // Loop through routes in order and select first match. - for _, rt := range cs.routes { - if rt.m.match(rpcInfo) { - action = rt.action + for _, r := range cs.routes { + if r.m.match(rpcInfo) { + rt = &r break } } - if action == nil { + if rt == nil || rt.clusters == nil { return nil, errNoMatchedRouteFound } - cluster, ok := action.Next().(string) + cluster, ok := rt.clusters.Next().(string) if !ok { return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) } @@ -129,7 +132,8 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP // it is committed. ref := &cs.clusters[cluster].refCount atomic.AddInt32(ref, 1) - return &iresolver.RPCConfig{ + + config := &iresolver.RPCConfig{ // Communicate to the LB policy the chosen cluster. Context: clustermanager.SetPickedCluster(rpcInfo.Context, cluster), OnCommitted: func() { @@ -144,7 +148,13 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP } } }, - }, nil + } + + if env.TimeoutSupport && rt.maxStreamDuration != 0 { + config.MethodConfig.Timeout = &rt.maxStreamDuration + } + + return config, nil } // incRefs increments refs of all clusters referenced by this config selector. @@ -196,9 +206,9 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro } for i, rt := range su.Routes { - action := newWRR() + clusters := newWRR() for cluster, weight := range rt.Action { - action.Add(cluster, int64(weight)) + clusters.Add(cluster, int64(weight)) // Initialize entries in cs.clusters map, creating entries in // r.activeClusters as necessary. Set to zero as they will be @@ -210,14 +220,16 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro } cs.clusters[cluster] = ci } - cs.routes[i].action = action + cs.routes[i].clusters = clusters var err error cs.routes[i].m, err = routeToMatcher(rt) if err != nil { return nil, err } + cs.routes[i].maxStreamDuration = rt.MaxStreamDuration } + return cs, nil } diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index f3bdc57c0ba..ba0e8cc3788 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -21,6 +21,7 @@ package resolver import ( "context" "errors" + "fmt" "reflect" "testing" "time" @@ -41,6 +42,7 @@ import ( "google.golang.org/grpc/xds/internal/client" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/bootstrap" + "google.golang.org/grpc/xds/internal/env" xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/fakeclient" ) @@ -496,6 +498,113 @@ func (s) TestXDSResolverWRR(t *testing.T) { } } +func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { + defer func(old bool) { env.TimeoutSupport = old }(env.TimeoutSupport) + xdsC := fakeclient.NewClient() + xdsR, tcc, cancel := testSetup(t, setupOpts{ + xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil }, + }) + defer func() { + cancel() + xdsR.Close() + }() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + + defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR) + newWRR = xdstestutils.NewTestWRR + + // Invoke the watchAPI callback with a good service update and wait for the + // UpdateState method to be called on the ClientConn. + xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ + VirtualHosts: []*xdsclient.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*client.Route{{ + Prefix: newStringP("/foo"), + Action: map[string]uint32{"A": 1}, + MaxStreamDuration: 5 * time.Second, + }, { + Prefix: newStringP("/bar"), + Action: map[string]uint32{"B": 1}, + MaxStreamDuration: time.Duration(0), + }, { + Prefix: newStringP(""), + Action: map[string]uint32{"C": 1}, + }}, + }, + }, + }, nil) + + gotState, err := tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("ClientConn.UpdateState returned error: %v", err) + } + rState := gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + + cs := iresolver.GetConfigSelector(rState) + if cs == nil { + t.Fatal("received nil config selector") + } + + testCases := []struct { + method string + timeoutSupport bool + want time.Duration + }{{ + method: "/foo/method", + timeoutSupport: true, + want: 5 * time.Second, + }, { + method: "/foo/method", + timeoutSupport: false, + // zero (nil) + }, { + method: "/bar/method", + timeoutSupport: true, + // zero (nil) + }, { + method: "/baz/method", + timeoutSupport: true, + // zero (nil) + }} + + for _, tc := range testCases { + env.TimeoutSupport = tc.timeoutSupport + req := iresolver.RPCInfo{ + Method: tc.method, + Context: context.Background(), + } + res, err := cs.SelectConfig(req) + if err != nil { + t.Errorf("Unexpected error from cs.SelectConfig(%v): %v", req, err) + continue + } + res.OnCommitted() + got := res.MethodConfig.Timeout + if got == nil { + if tc.want != 0 { + t.Errorf("For method %q: res.MethodConfig.Timeout = ; want %v", tc.method, tc.want) + } + continue + } + if tc.want == 0 || *got != tc.want { + want := "" // we never want a duration of zero to be returned. + if tc.want != 0 { + want = fmt.Sprint(tc.want) + } + t.Errorf("For method %q: res.MethodConfig.Timeout = %v; want %v", tc.method, *got, want) + } + } +} + // TestXDSResolverDelayedOnCommitted tests that clusters remain in service // config if RPCs are in flight. func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {