Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ringhash: allow setting request hash key explicitly #7170

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 19 additions & 11 deletions internal/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,30 @@ func hasNotPrintable(msg string) bool {
return false
}

// ValidatePair validate a key-value pair with the following rules (the pseudo-header will be skipped) :
// ValidatePair validates a key-value pair with the following rules (the pseudo-header will be skipped) :
//
// - key must contain one or more characters.
// - the characters in the key must be contained in [0-9 a-z _ - .].
// - if the key ends with a "-bin" suffix, no validation of the corresponding value is performed.
// - the characters in the every value must be printable (in [%x20-%x7E]).
// - the characters in every value must be printable (in [%x20-%x7E]).
func ValidatePair(key string, vals ...string) error {
if err := ValidateKey(key); err != nil {
return err
}
if strings.HasSuffix(key, "-bin") {
return nil
}
// check value
for _, val := range vals {
if hasNotPrintable(val) {
return fmt.Errorf("header key %q contains value with non-printable ASCII characters", key)
}
}
return nil
}

// ValidateKey validates
func ValidateKey(key string) error {
// key should not be empty
if key == "" {
return fmt.Errorf("there is an empty key in the header")
Expand All @@ -119,14 +136,5 @@ func ValidatePair(key string, vals ...string) error {
return fmt.Errorf("header key %q contains illegal characters not in [0-9a-z-_.]", key)
}
}
if strings.HasSuffix(key, "-bin") {
return nil
}
// check value
for _, val := range vals {
if hasNotPrintable(val) {
return fmt.Errorf("header key %q contains value with non-printable ASCII characters", key)
}
}
return nil
}
13 changes: 13 additions & 0 deletions xds/internal/balancer/ringhash/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ package ringhash
import (
"encoding/json"
"fmt"
"strings"

"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/serviceconfig"
)

Expand All @@ -32,6 +34,8 @@ type LBConfig struct {

MinRingSize uint64 `json:"minRingSize,omitempty"`
MaxRingSize uint64 `json:"maxRingSize,omitempty"`

RequestMetadataKey string `json:"request_metadata_key,omitempty"`
}

const (
Expand Down Expand Up @@ -66,5 +70,14 @@ func parseConfig(c json.RawMessage) (*LBConfig, error) {
if cfg.MaxRingSize > envconfig.RingHashCap {
cfg.MaxRingSize = envconfig.RingHashCap
}
if cfg.RequestMetadataKey != "" {
// See rules in https://github.com/grpc/proposal/blob/54074388ca49e7c8eb1060af238ce98a63ad9daa/A76-ring-hash-improvements.md#explicitly-setting-the-request-hash-key
if err := metadata.ValidateKey(cfg.RequestMetadataKey); err != nil {
return nil, fmt.Errorf("invalid request_metadata_key %q: %s", cfg.RequestMetadataKey, err)
}
if strings.HasSuffix(cfg.RequestMetadataKey, "-bin") {
return nil, fmt.Errorf("invalid request_metadata_key %q: key must not end with \"-bin\"", cfg.RequestMetadataKey)
}
}
return &cfg, nil
}
21 changes: 21 additions & 0 deletions xds/internal/balancer/ringhash/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,27 @@ func (s) TestParseConfig(t *testing.T) {
want: nil,
wantErr: true,
},
{
name: "request metadata key set",
js: `{"request_metadata_key": "x-foo"}`,
want: &LBConfig{
MinRingSize: defaultMinSize,
MaxRingSize: defaultMaxSize,
RequestMetadataKey: "x-foo",
},
},
{
name: "invalid request metadata keys",
js: `{"request_metadata_key": "!invalid"}`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a case rejecting key referencing a binary header please

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

want: nil,
wantErr: true,
},
{
name: "binary request metadata keys",
js: `{"request_metadata_key": "header-with-bin"}`,
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
99 changes: 81 additions & 18 deletions xds/internal/balancer/ringhash/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,57 @@
package ringhash

import (
"fmt"
"context"
"errors"
"strings"

"github.com/cespare/xxhash/v2"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

var (
errNoSubConnReady = errors.New("no connection is Ready")
errSingleSubConnInTransientFailure = errors.New("the only SubConn is in Transient Failure")
)

type picker struct {
ring *ring
logger *grpclog.PrefixLogger
subConnStates map[*subConn]connectivity.State
ring *ring
logger *grpclog.PrefixLogger
subConnStates map[*subConn]connectivity.State
requestHashKey string
randuint64 func() uint64 // overridable for testing
}

func newPicker(ring *ring, logger *grpclog.PrefixLogger) *picker {
func newPicker(ring *ring, requestHashKey string, logger *grpclog.PrefixLogger) *picker {
states := make(map[*subConn]connectivity.State)
for _, e := range ring.items {
states[e.sc] = e.sc.effectiveState()
}
return &picker{ring: ring, logger: logger, subConnStates: states}
return &picker{ring: ring, logger: logger, subConnStates: states, requestHashKey: requestHashKey, randuint64: grpcrand.Uint64}
}

// handleRICSResult is the return type of handleRICS. It's needed to wrap the
// returned error from Pick() in a struct. With this, if the return values are
// `balancer.PickResult, error, bool`, linter complains because error is not the
// last return value.
// returned error from Pick() in a struct and whether we triggered a connection
// attempt. Without this, the return values would be `balancer.PickResult, bool,
// error, bool`, and linter would complain because error is not the last return
// value.
type handleRICSResult struct {
pr balancer.PickResult
err error
pr balancer.PickResult
triggeredConnect bool
err error
}

// handleRICS generates pick result if the entry is in Ready, Idle, Connecting
// or Shutdown. TransientFailure will be handled specifically after this
// function returns.
//
// The first return value indicates if the state is in Ready, Idle, Connecting
// The second return value indicates if the state is in Ready, Idle, Connecting
// or Shutdown. If it's true, the PickResult and error should be returned from
// Pick() as is.
func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) {
Expand All @@ -65,7 +79,7 @@ func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) {
case connectivity.Idle:
// Trigger Connect() and queue the pick.
e.sc.queueConnect()
return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true
return handleRICSResult{triggeredConnect: true, err: balancer.ErrNoSubConnAvailable}, true
case connectivity.Connecting:
return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true
case connectivity.TransientFailure:
Expand All @@ -84,23 +98,61 @@ func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) {
}

func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
e := p.ring.pick(getRequestHash(info.Ctx))
h, usesRandomHash := p.getRequestHash(info.Ctx)
e := p.ring.pick(h)
if hr, ok := p.handleRICS(e); ok {
if usesRandomHash && hr.triggeredConnect {
// "If the use of this random hash triggers a connection attempt
// (...), then before queuing the pick, the picker will scan forward
// searching for a subchannel in `READY` state. If it finds a
// subchannel in `READY` state, the picker returns it." - A76
if p := p.nextReadySubConn(e); p != nil {
return balancer.PickResult{SubConn: p}, nil
}
}
return hr.pr, hr.err
}
// ok was false, the entry is in transient failure.
return p.handleTransientFailure(e)
return p.handleTransientFailure(e, usesRandomHash)
}

// getRequestHash returns the request hash to use for this pick, and whether
// a random hash was used.
func (p *picker) getRequestHash(ctx context.Context) (uint64, bool) {
if p.requestHashKey == "" {
// No explicit request metadata key, use the hash set by the xDS
// resolver. Note that for xDS, the random hash is never generated
// in the picker.
return GetXDSRequestHash(ctx), false
}
md, _ := metadata.FromOutgoingContext(ctx)
values := md.Get(p.requestHashKey)
if len(values) == 0 || len(values) == 1 && values[0] == "" {
// If the header is not present, generate a random hash.
return p.randuint64(), true
}
joinedValues := strings.Join(values, ",")
return xxhash.Sum64String(joinedValues), false
}

func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, error) {
func (p *picker) handleTransientFailure(e *ringEntry, usesRandomHash bool) (balancer.PickResult, error) {
// Queue a connect on the first picked SubConn.
e.sc.queueConnect()
if usesRandomHash {
// "If the use of this random hash triggers a connection attempt
// (...), then before queuing the pick, the picker will scan forward
// searching for a subchannel in `READY` state. If it finds a
// subchannel in `READY` state, the picker returns it." - A76
if p := p.nextReadySubConn(e); p != nil {
return balancer.PickResult{SubConn: p}, nil
}
}

// Find next entry in the ring, skipping duplicate SubConns.
e2 := nextSkippingDuplicates(p.ring, e)
if e2 == nil {
// There's no next entry available, fail the pick.
return balancer.PickResult{}, fmt.Errorf("the only SubConn is in Transient Failure")
return balancer.PickResult{}, errSingleSubConnInTransientFailure
}

// For the second SubConn, also check Ready/Idle/Connecting as if it's the
Expand Down Expand Up @@ -145,7 +197,18 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro
ee.sc.queueConnect()
}
}
return balancer.PickResult{}, fmt.Errorf("no connection is Ready")
return balancer.PickResult{}, errNoSubConnReady
}

// nextReadySubConn returns the first entry after e that has its subconn in
// READY state. If no such entry is found, it returns nil.
func (p *picker) nextReadySubConn(e *ringEntry) balancer.SubConn {
for next := p.ring.next(e); next != e; next = p.ring.next(next) {
if next.sc.state == connectivity.Ready {
return next.sc.sc
}
}
return nil
}

// nextSkippingDuplicates finds the next entry in the ring, with a different
Expand Down