Skip to content

Commit

Permalink
xds: add HTTP connection manager max_stream_duration support (#4122)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Jan 8, 2021
1 parent 0bd76be commit 6a318bb
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 47 deletions.
13 changes: 11 additions & 2 deletions xds/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ type ListenerUpdate struct {
RouteConfigName string
// SecurityCfg contains security configuration sent by the control plane.
SecurityCfg *SecurityConfig
// MaxStreamDuration contains the HTTP connection manager's
// common_http_protocol_options.max_stream_duration field, or zero if
// unset.
MaxStreamDuration time.Duration
}

func (lu *ListenerUpdate) String() string {
Expand Down Expand Up @@ -181,8 +185,13 @@ type Route struct {
Fraction *uint32

// If the matchers above indicate a match, the below configuration is used.
Action map[string]uint32 // action is weighted clusters.
MaxStreamDuration time.Duration
Action map[string]uint32 // action is weighted clusters.
// If MaxStreamDuration is nil, it indicates neither of the route action's
// max_stream_duration fields (grpc_timeout_header_max nor
// max_stream_duration) were set. In this case, the ListenerUpdate's
// MaxStreamDuration field should be used. If MaxStreamDuration is set to
// an explicit zero duration, the application's deadline should be used.
MaxStreamDuration *time.Duration
}

// HeaderMatcher represents header matchers.
Expand Down
9 changes: 7 additions & 2 deletions xds/internal/client/client_lds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package client
import (
"strings"
"testing"
"time"

v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/protobuf/types/known/durationpb"
)

func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
Expand Down Expand Up @@ -87,6 +89,9 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
RouteConfigName: v3RouteConfigName,
},
},
CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
MaxStreamDuration: durationpb.New(time.Second),
},
}
mcm, _ := ptypes.MarshalAny(cm)
lis := &v3listenerpb.Listener{
Expand Down Expand Up @@ -278,15 +283,15 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
name: "v3 listener resource",
resources: []*anypb.Any{v3Lis},
wantUpdate: map[string]ListenerUpdate{
v3LDSTarget: {RouteConfigName: v3RouteConfigName},
v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second},
},
},
{
name: "multiple listener resources",
resources: []*anypb.Any{v2Lis, v3Lis},
wantUpdate: map[string]ListenerUpdate{
v2LDSTarget: {RouteConfigName: v2RouteConfigName},
v3LDSTarget: {RouteConfigName: v3RouteConfigName},
v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second},
},
},
}
Expand Down
10 changes: 7 additions & 3 deletions xds/internal/client/client_rds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(time.Second)}},
},
},
},
Expand Down Expand Up @@ -347,7 +347,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(time.Second)}},
},
},
},
Expand Down Expand Up @@ -377,7 +377,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: 0}},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(0)}},
},
},
},
Expand Down Expand Up @@ -803,3 +803,7 @@ func newUInt32P(i uint32) *uint32 {
func newBoolP(b bool) *bool {
return &b
}

func newDurationP(d time.Duration) *time.Duration {
return &d
}
20 changes: 15 additions & 5 deletions xds/internal/client/client_xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func processListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
// processClientSideListener checks if the provided Listener proto meets
// the expected criteria. If so, it returns a non-empty routeConfigName.
func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
update := &ListenerUpdate{}

apiLisAny := lis.GetApiListener().GetApiListener()
if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %q in LDS response", apiLisAny.GetTypeUrl())
Expand All @@ -98,7 +100,7 @@ func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, err
if name == "" {
return nil, fmt.Errorf("xds: empty route_config_name in LDS response: %+v", lis)
}
return &ListenerUpdate{RouteConfigName: name}, nil
update.RouteConfigName = name
case *v3httppb.HttpConnectionManager_RouteConfig:
// TODO: Add support for specifying the RouteConfiguration inline
// in the LDS response.
Expand All @@ -108,6 +110,10 @@ func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, err
default:
return nil, fmt.Errorf("xds: unsupported type %T for RouteSpecifier in received LDS response", apiLis.RouteSpecifier)
}

update.MaxStreamDuration = apiLis.GetCommonHttpProtocolOptions().GetMaxStreamDuration().AsDuration()

return update, nil
}

func processServerSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
Expand Down Expand Up @@ -346,12 +352,16 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger)
}

route.Action = clusters

msd := action.GetMaxStreamDuration()
// Prefer grpc_timeout_header_max, if set.
if dur := msd.GetGrpcTimeoutHeaderMax(); dur != nil {
route.MaxStreamDuration = dur.AsDuration()
} else {
route.MaxStreamDuration = msd.GetMaxStreamDuration().AsDuration()
dur := msd.GetGrpcTimeoutHeaderMax()
if dur == nil {
dur = msd.GetMaxStreamDuration()
}
if dur != nil {
d := dur.AsDuration()
route.MaxStreamDuration = &d
}
routesRet = append(routesRet, &route)
}
Expand Down
10 changes: 7 additions & 3 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,11 @@ var newWRR = wrr.NewRandom
func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) {
cs := &configSelector{
r: r,
routes: make([]route, len(su.Routes)),
routes: make([]route, len(su.routes)),
clusters: make(map[string]*clusterInfo),
}

for i, rt := range su.Routes {
for i, rt := range su.routes {
clusters := newWRR()
for cluster, weight := range rt.Action {
clusters.Add(cluster, int64(weight))
Expand All @@ -227,7 +227,11 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
if err != nil {
return nil, err
}
cs.routes[i].maxStreamDuration = rt.MaxStreamDuration
if rt.MaxStreamDuration == nil {
cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration
} else {
cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
}
}

return cs, nil
Expand Down
35 changes: 29 additions & 6 deletions xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,28 @@ import (
"fmt"
"strings"
"sync"
"time"

"google.golang.org/grpc/internal/grpclog"
xdsclient "google.golang.org/grpc/xds/internal/client"
)

// serviceUpdate contains information received from the RDS responses which is
// of interested to the xds resolver. The RDS request is built by first making a
// LDS to get the RouteConfig name.
// serviceUpdate contains information received from the LDS/RDS responses which
// are of interest to the xds resolver. The RDS request is built by first
// making a LDS to get the RouteConfig name.
type serviceUpdate struct {
// Routes contain matchers+actions to route RPCs.
Routes []*xdsclient.Route
// routes contain matchers+actions to route RPCs.
routes []*xdsclient.Route
// ldsConfig contains configuration that applies to all routes.
ldsConfig ldsConfig
}

// ldsConfig contains information received from the LDS responses which are of
// interest to the xds resolver.
type ldsConfig struct {
// maxStreamDuration is from the HTTP connection manager's
// common_http_protocol_options field.
maxStreamDuration time.Duration
}

// watchService uses LDS and RDS to discover information about the provided
Expand Down Expand Up @@ -61,6 +72,7 @@ type serviceUpdateWatcher struct {
serviceName string
ldsCancel func()
serviceCb func(serviceUpdate, error)
lastUpdate serviceUpdate

mu sync.Mutex
closed bool
Expand All @@ -84,16 +96,26 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er
w.rdsCancel()
w.rdsName = ""
w.rdsCancel = nil
w.lastUpdate = serviceUpdate{}
}
// The other error cases still return early without canceling the
// existing RDS watch.
w.serviceCb(serviceUpdate{}, err)
return
}

oldLDSConfig := w.lastUpdate.ldsConfig
w.lastUpdate.ldsConfig = ldsConfig{maxStreamDuration: update.MaxStreamDuration}

if w.rdsName == update.RouteConfigName {
// If the new RouteConfigName is same as the previous, don't cancel and
// restart the RDS watch.
if w.lastUpdate.ldsConfig != oldLDSConfig {
// The route name didn't change but the LDS data did; send it now.
// If the route name did change, then we will wait until the first
// RDS update before reporting this LDS config.
w.serviceCb(w.lastUpdate, nil)
}
return
}
w.rdsName = update.RouteConfigName
Expand Down Expand Up @@ -127,7 +149,8 @@ func (w *serviceUpdateWatcher) handleRDSResp(update xdsclient.RouteConfigUpdate,
return
}

w.serviceCb(serviceUpdate{Routes: matchVh.Routes}, nil)
w.lastUpdate.routes = matchVh.Routes
w.serviceCb(w.lastUpdate, nil)
}

func (w *serviceUpdateWatcher) close() {
Expand Down
65 changes: 59 additions & 6 deletions xds/internal/resolver/watch_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -138,7 +139,7 @@ func verifyServiceUpdate(ctx context.Context, updateCh *testutils.Channel, wantU
return fmt.Errorf("timeout when waiting for service update: %v", err)
}
gotUpdate := u.(serviceUpdateErr)
if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty()) {
if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty(), cmp.AllowUnexported(serviceUpdate{}, ldsConfig{})) {
return fmt.Errorf("unexpected service update: (%v, %v), want: (%v, nil), diff (-want +got):\n%s", gotUpdate.u, gotUpdate.err, wantUpdate, cmp.Diff(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty()))
}
return nil
Expand All @@ -165,7 +166,7 @@ func (s) TestServiceWatch(t *testing.T) {
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)

wantUpdate := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Expand All @@ -179,7 +180,7 @@ func (s) TestServiceWatch(t *testing.T) {
}

wantUpdate2 := serviceUpdate{
Routes: []*xdsclient.Route{{
routes: []*xdsclient.Route{{
Path: newStringP(""),
Action: map[string]uint32{cluster: 1},
}},
Expand Down Expand Up @@ -219,7 +220,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)

wantUpdate := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Expand All @@ -240,7 +241,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
waitForWatchRouteConfig(ctx, t, xdsC, routeStr+"2")

// RDS update for the new name.
wantUpdate2 := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}}
wantUpdate2 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Expand All @@ -254,6 +255,58 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
}
}

// TestServiceWatchLDSUpdate covers the case that after first LDS and first RDS
// response, the second LDS response includes a new MaxStreamDuration. It also
// verifies this is reported in subsequent RDS updates.
func (s) TestServiceWatchLDSUpdateMaxStreamDuration(t *testing.T) {
serviceUpdateCh := testutils.NewChannel()
xdsC := fakeclient.NewClient()
cancelWatch := watchService(xdsC, targetStr, func(update serviceUpdate, err error) {
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
}, nil)
defer cancelWatch()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)

wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}, ldsConfig: ldsConfig{maxStreamDuration: time.Second}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}},
},
},
}, nil)
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
t.Fatal(err)
}

// Another LDS update with the same RDS_name but different MaxStreamDuration (zero in this case).
wantUpdate2 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
t.Fatal(err)
}

// RDS update.
wantUpdate3 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}},
},
},
}, nil)
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate3); err != nil {
t.Fatal(err)
}
}

// TestServiceNotCancelRDSOnSameLDSUpdate covers the case that if the second LDS
// update contains the same RDS name as the previous, the RDS watch isn't
// canceled and restarted.
Expand All @@ -271,7 +324,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)

wantUpdate := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Expand Down
Loading

0 comments on commit 6a318bb

Please sign in to comment.