Skip to content

Commit

Permalink
Merge branch 'master' into 97-remove-docs-from-etcd
Browse files Browse the repository at this point in the history
  • Loading branch information
nate-double-u committed Feb 9, 2021
2 parents 2511790 + 44c889a commit b73441d
Show file tree
Hide file tree
Showing 28 changed files with 844 additions and 230 deletions.
4 changes: 0 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,10 @@ env:
matrix:
fast_finish: true
allow_failures:
- go: 1.15.7
env: TARGET=linux-amd64-grpcproxy
- go: 1.15.7
env: TARGET=linux-amd64-coverage
- go: tip
env: TARGET=linux-amd64-fmt-unit-go-tip-2-cpu
- go: 1.15.7
env: TARGET=linux-386-unit-1-cpu
exclude:
- go: tip
env: TARGET=linux-amd64-fmt
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- Fix [quorum protection logic when promoting a learner](https://github.com/etcd-io/etcd/pull/11640).
- Improve [peer corruption checker](https://github.com/etcd-io/etcd/pull/11621) to work when peer mTLS is enabled.
- Log [`[CLIENT-PORT]/health` check in server side](https://github.com/etcd-io/etcd/pull/11704).
- Log [successful etcd server-side health check in debug level](https://github.com/etcd-io/etcd/pull/12677).
- Improve [compaction performance when latest index is greater than 1-million](https://github.com/etcd-io/etcd/pull/11734).
- [Refactor consistentindex](https://github.com/etcd-io/etcd/pull/11699).
- [Add log when etcdserver failed to apply command](https://github.com/etcd-io/etcd/pull/11670).
Expand Down
2 changes: 1 addition & 1 deletion client/v3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ For full compatibility, it is recommended to install released versions of client
etcd client returns 2 types of errors:

1. context error: canceled or deadline exceeded.
2. gRPC error: see [api/v3rpc/rpctypes](https://godoc.org/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes).
2. gRPC error: see [api/v3rpc/rpctypes](https://godoc.org/go.etcd.io/etcd/api/v3rpc/rpctypes).

Here is the example code to handle client errors:

Expand Down
37 changes: 36 additions & 1 deletion client/v3/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/client/v3/balancer/connectivity"
"go.etcd.io/etcd/client/v3/balancer/picker"

"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc/balancer"
grpcconnectivity "google.golang.org/grpc/connectivity"
Expand All @@ -31,6 +32,12 @@ import (
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)

// NOTE: Ensure
// - `baseBalancer` satisfies `balancer.V2Balancer`.
var (
_ balancer.V2Balancer = (*baseBalancer)(nil)
)

// Config defines balancer configurations.
type Config struct {
// Policy configures balancer policy.
Expand Down Expand Up @@ -138,12 +145,29 @@ type baseBalancer struct {
picker picker.Picker
}

// UpdateClientConnState implements "grpc/balancer.V2Balancer" interface.
func (bb *baseBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
return bb.handleResolvedWithError(ccs.ResolverState.Addresses, nil)
}

// ResolverError implements "grpc/balancer.V2Balancer" interface.
func (bb *baseBalancer) ResolverError(err error) {
bb.HandleResolvedAddrs(nil, err)
}

// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
// gRPC sends initial or updated resolved addresses from "Build".
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
_ = bb.handleResolvedWithError(addrs, err)
}

// handleResolvedWithError is an implementation shared both by `HandleResolvedAddrs()`,
// which is part of the `Balancer` interface as well as `UpdateClientConnState()`,
// which is part of the `V2Balancer` interface.
func (bb *baseBalancer) handleResolvedWithError(addrs []resolver.Address, err error) error {
if err != nil {
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
return err
}
bb.lg.Info("resolved",
zap.String("picker", bb.picker.String()),
Expand All @@ -155,12 +179,14 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
defer bb.mu.Unlock()

resolved := make(map[resolver.Address]struct{})
warnedErrors := []error{}
for _, addr := range addrs {
resolved[addr] = struct{}{}
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
warnedErrors = append(warnedErrors, err)
continue
}
bb.lg.Info("created subconn", zap.String("address", addr.Addr))
Expand Down Expand Up @@ -191,6 +217,15 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
// (DO NOT) delete(bb.scToSt, sc)
}
}

// TODO: Consider just returning `ErrBadResolverState` if `warnedErrors` is
// not empty.
return multierr.Combine(warnedErrors...)
}

// UpdateSubConnState implements "grpc/balancer.V2Balancer" interface.
func (bb *baseBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
bb.HandleSubConnStateChange(sc, s.ConnectivityState)
}

// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
Expand Down
14 changes: 14 additions & 0 deletions client/v3/balancer/picker/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ import (
"google.golang.org/grpc/balancer"
)

// NOTE: Ensure
// - `errPickerV2` satisfies `balancer.V2Picker`.
var (
_ balancer.V2Picker = (*errPickerV2)(nil)
)

// NewErr returns a picker that always returns err on "Pick".
func NewErr(err error) Picker {
return &errPicker{p: Error, err: err}
Expand All @@ -37,3 +43,11 @@ func (ep *errPicker) String() string {
func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, ep.err
}

type errPickerV2 struct {
errPicker
}

func (ep2 *errPickerV2) Pick(opts balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{}, ep2.errPicker.err
}
18 changes: 17 additions & 1 deletion client/v3/balancer/picker/roundrobin_balanced.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import (
"google.golang.org/grpc/resolver"
)

// NOTE: Ensure
// - `rrBalancedV2` satisfies `balancer.V2Picker`.
var (
_ balancer.V2Picker = (*rrBalancedV2)(nil)
)

// newRoundrobinBalanced returns a new roundrobin balanced picker.
func newRoundrobinBalanced(cfg Config) Picker {
scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress))
Expand Down Expand Up @@ -52,7 +58,7 @@ type rrBalanced struct {
func (rb *rrBalanced) String() string { return rb.p.String() }

// Pick is called for every client request.
func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
func (rb *rrBalanced) Pick(_ context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
rb.mu.RLock()
n := len(rb.scs)
rb.mu.RUnlock()
Expand Down Expand Up @@ -93,3 +99,13 @@ func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balance
}
return sc, doneFunc, nil
}

type rrBalancedV2 struct {
rrBalanced
}

func (rb2 *rrBalancedV2) Pick(opts balancer.PickInfo) (balancer.PickResult, error) {
sc, doneFunc, err := rb2.rrBalanced.Pick(context.TODO(), opts)
pr := balancer.PickResult{SubConn: sc, Done: doneFunc}
return pr, err
}
17 changes: 15 additions & 2 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ type Client struct {

callOpts []grpc.CallOption

lg *zap.Logger
lgMu *sync.RWMutex
lg *zap.Logger
}

// New creates a new etcdv3 client from a given configuration.
Expand All @@ -112,7 +113,7 @@ func New(cfg Config) (*Client, error) {
// service interface implementations and do not need connection management.
func NewCtxClient(ctx context.Context) *Client {
cctx, cancel := context.WithCancel(ctx)
return &Client{ctx: cctx, cancel: cancel, lg: zap.NewNop()}
return &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex), lg: zap.NewNop()}
}

// NewFromURL creates a new etcdv3 client from a URL.
Expand All @@ -127,10 +128,21 @@ func NewFromURLs(urls []string) (*Client, error) {

// WithLogger sets a logger
func (c *Client) WithLogger(lg *zap.Logger) *Client {
c.lgMu.Lock()
c.lg = lg
c.lgMu.Unlock()
return c
}

// GetLogger gets the logger.
// NOTE: This method is for internal use of etcd-client library and should not be used as general-purpose logger.
func (c *Client) GetLogger() *zap.Logger {
c.lgMu.RLock()
l := c.lg
c.lgMu.RUnlock()
return l
}

// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
c.cancel()
Expand Down Expand Up @@ -382,6 +394,7 @@ func newClient(cfg *Config) (*Client, error) {
cancel: cancel,
mu: new(sync.RWMutex),
callOpts: defaultCallOpts,
lgMu: new(sync.RWMutex),
}

lcfg := logutil.DefaultZapLoggerConfig
Expand Down
6 changes: 3 additions & 3 deletions client/v3/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@
// The Client has internal state (watchers and leases), so Clients should be reused instead of created as needed.
// Clients are safe for concurrent use by multiple goroutines.
//
// etcd client returns 3 types of errors:
// etcd client returns 2 types of errors:
//
// 1. context error: canceled or deadline exceeded.
// 2. gRPC status error: e.g. when clock drifts in server-side before client's context deadline exceeded.
// 3. gRPC error: see https://github.com/etcd-io/etcd/blob/master/etcdserver/api/v3rpc/rpctypes/error.go
// 2. gRPC error: e.g. when clock drifts in server-side before client's context deadline exceeded.
// See https://github.com/etcd-io/etcd/blob/master/api/v3rpc/rpctypes/error.go
//
// Here is the example code to handle client errors:
//
Expand Down
1 change: 1 addition & 0 deletions client/v3/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/prometheus/client_golang v1.5.1
go.etcd.io/etcd/api/v3 v3.5.0-pre
go.etcd.io/etcd/pkg/v3 v3.5.0-pre
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.16.0
google.golang.org/grpc v1.29.1
sigs.k8s.io/yaml v1.2.0
Expand Down
2 changes: 1 addition & 1 deletion client/v3/naming/endpoints/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Update struct {
}

// WatchChannel is used to deliver notifications about endpoints updates.
type WatchChannel chan []*Update
type WatchChannel <-chan []*Update

// Key2EndpointMap maps etcd key into struct describing the endpoint.
type Key2EndpointMap map[string]Endpoint
Expand Down
Loading

0 comments on commit b73441d

Please sign in to comment.