Skip to content

Commit

Permalink
Merge branch 'master' into fix-old-leader-revoker
Browse files Browse the repository at this point in the history
  • Loading branch information
tangcong committed May 7, 2021
2 parents 61a170d + 344c9f3 commit 147050f
Show file tree
Hide file tree
Showing 119 changed files with 1,699 additions and 830 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The minimum recommended etcd versions to run in **production** are 3.2.28+, 3.3.
<hr>


## v3.5.0 (2021 TBD)
## v3.5.0 (2021-06)

See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and [v3.5 upgrade guide](https://etcd.io/docs/latest/upgrades/upgrade_3_5/) for any breaking changes.

Expand Down Expand Up @@ -70,6 +70,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and
- [ETCD_CLIENT_DEBUG env](https://github.com/etcd-io/etcd/pull/12786): Now supports log levels (debug, info, warn, error, dpanic, panic, fatal). Only when set, overrides application-wide grpc logging settings.
- [Embed Etcd.Close()](https://github.com/etcd-io/etcd/pull/12828) needs to called exactly once and closes Etcd.Err() stream.
- [Embed Etcd does not override global/grpc logger](https://github.com/etcd-io/etcd/pull/12861) be default any longer. If desired, please call `embed.Config::SetupGlobalLoggers()` explicitly.
- Errors: `context cancelled` or `context deadline exceeded` are exposed as codes.Canceled, codes.DeadlineExceeded instead of 'codes.Unknown'.
###

- Make sure [save snapshot downloads checksum for integrity checks](https://github.com/etcd-io/etcd/pull/11896).
Expand Down Expand Up @@ -160,6 +161,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- Setting this flag enables `SO_REUSEADDR` which allows binding to an address in `TIME_WAIT` state, improving etcd restart time.
- Reduce [around 30% memory allocation by logging range response size without marshal](https://github.com/etcd-io/etcd/pull/12871).
- Fix [old leader still revokes lease after it steped to follower if cpu or disk io latency is high](https://github.com/etcd-io/etcd/pull/12531).
- `ETCD_VERIFY="all"` enviroment triggers [additional verification of consistency](https://github.com/etcd-io/etcd/pull/) of etcd data-dir files.

### Package `runtime`

Expand Down Expand Up @@ -239,6 +241,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.

- Add [`/v3/auth/status`](https://github.com/etcd-io/etcd/pull/11536) endpoint to check if authentication is enabled
- [Add `Linearizable` field to `etcdserverpb.MemberListRequest`](https://github.com/etcd-io/etcd/pull/11639).
- [Learner support Snapshot RPC](https://github.com/etcd-io/etcd/pull/12890/).

### Package `netutil`

Expand Down
1 change: 1 addition & 0 deletions Dockerfile-release.arm64
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ ADD etcd /usr/local/bin/
ADD etcdctl /usr/local/bin/
ADD var/etcd /var/etcd
ADD var/lib/etcd /var/lib/etcd
ENV ETCD_UNSUPPORTED_ARCH=arm64

EXPOSE 2379 2380

Expand Down
3 changes: 3 additions & 0 deletions api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ var (
ErrGRPCDowngradeInProcess = status.New(codes.FailedPrecondition, "etcdserver: cluster has a downgrade job in progress").Err()
ErrGRPCNoInflightDowngrade = status.New(codes.FailedPrecondition, "etcdserver: no inflight downgrade job").Err()

ErrGRPCCanceled = status.New(codes.Canceled, "etcdserver: request canceled").Err()
ErrGRPCDeadlineExceeded = status.New(codes.DeadlineExceeded, "etcdserver: context deadline exceeded").Err()

errStringToError = map[string]error{
ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
ErrorDesc(ErrGRPCKeyNotFound): ErrGRPCKeyNotFound,
Expand Down
8 changes: 6 additions & 2 deletions client/pkg/testutil/leak.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,12 @@ func BeforeTest(t TB) {
// It will detect common goroutine leaks, retrying in case there are goroutines
// not synchronously torn down, and fail the test if any goroutines are stuck.
func AfterTest(t TB) {
if err := CheckAfterTest(1 * time.Second); err != nil {
t.Errorf("Test %v", err)
// If test-failed the leaked goroutines list is hidding the real
// source of problem.
if !t.Failed() {
if err := CheckAfterTest(1 * time.Second); err != nil {
t.Errorf("Test %v", err)
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion client/pkg/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ func MustNewURL(t *testing.T, s string) *url.URL {
func FatalStack(t *testing.T, s string) {
stackTrace := make([]byte, 1024*1024)
n := runtime.Stack(stackTrace, true)
t.Errorf("---> Test failed: %s", s)
t.Error(string(stackTrace[:n]))
t.Fatalf(s)
t.Fatal(s)
}

// ConditionFunc returns true when a condition is met.
Expand Down
3 changes: 2 additions & 1 deletion client/pkg/transport/sockopt_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
package transport

import (
"golang.org/x/sys/unix"
"syscall"

"golang.org/x/sys/unix"
)

func setReusePort(network, address string, conn syscall.RawConn) error {
Expand Down
3 changes: 2 additions & 1 deletion client/pkg/types/urlsmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package types

import (
"go.etcd.io/etcd/client/pkg/v3/testutil"
"reflect"
"testing"

"go.etcd.io/etcd/client/pkg/v3/testutil"
)

func TestParseInitialCluster(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions client/v2/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
package client

import (
"github.com/json-iterator/go"
"github.com/modern-go/reflect2"
"strconv"
"unsafe"

"github.com/json-iterator/go"
"github.com/modern-go/reflect2"
)

type customNumberExtension struct {
Expand Down
3 changes: 2 additions & 1 deletion client/v2/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
"encoding/json"
"errors"
"fmt"
"go.etcd.io/etcd/client/pkg/v3/pathutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"go.etcd.io/etcd/client/pkg/v3/pathutil"
)

const (
Expand Down
4 changes: 2 additions & 2 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
opts = append(opts,
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
// Streams that are safe to retry are enabled individually.
grpc.WithStreamInterceptor(c.streamClientInterceptor(c.lg, withMax(0), rrBackoff)),
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax(defaultUnaryMaxRetries), rrBackoff)),
grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(defaultUnaryMaxRetries), rrBackoff)),
)

return opts, nil
Expand Down
15 changes: 12 additions & 3 deletions client/v3/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,19 @@ import (

"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.uber.org/zap/zaptest"

"google.golang.org/grpc"
)

func NewClient(t *testing.T, cfg Config) (*Client, error) {
client, err := New(cfg)
if err != nil {
return nil, err
}
return client.WithLogger(zaptest.NewLogger(t)), nil
}

func TestDialCancel(t *testing.T) {
testutil.BeforeTest(t)

Expand All @@ -41,7 +50,7 @@ func TestDialCancel(t *testing.T) {
cfg := Config{
Endpoints: []string{ep},
DialTimeout: 30 * time.Second}
c, err := New(cfg)
c, err := NewClient(t, cfg)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -104,7 +113,7 @@ func TestDialTimeout(t *testing.T) {
donec := make(chan error, 1)
go func(cfg Config) {
// without timeout, dial continues forever on ipv4 black hole
c, err := New(cfg)
c, err := NewClient(t, cfg)
if c != nil || err == nil {
t.Errorf("#%d: new client should fail", i)
}
Expand Down Expand Up @@ -132,7 +141,7 @@ func TestDialTimeout(t *testing.T) {

func TestDialNoTimeout(t *testing.T) {
cfg := Config{Endpoints: []string{"127.0.0.1:12345"}}
c, err := New(cfg)
c, err := NewClient(t, cfg)
if c == nil || err != nil {
t.Fatalf("new client with DialNoWait should succeed, got %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion client/v3/naming/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
// func etcdDial(c *clientv3.Client, service string) (*grpc.ClientConn, error) {
// etcdResolver, err := resolver.NewBuilder(c);
// if err { return nil, err }
// return grpc.Dial("etcd:///foo", grpc.WithResolvers(etcdResolver))
// return grpc.Dial("etcd:///" + service, grpc.WithResolvers(etcdResolver))
// }
//
// Optionally, force delete an endpoint:
Expand Down
14 changes: 7 additions & 7 deletions client/v3/retry_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
//
// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor {
func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClientInterceptor {
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = withVersion(ctx)
Expand All @@ -50,7 +50,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
return err
}
logger.Debug(
c.GetLogger().Debug(
"retrying of unary invoker",
zap.String("target", cc.Target()),
zap.Uint("attempt", attempt),
Expand All @@ -59,7 +59,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
if lastErr == nil {
return nil
}
logger.Warn(
c.GetLogger().Warn(
"retrying of unary invoker failed",
zap.String("target", cc.Target()),
zap.Uint("attempt", attempt),
Expand All @@ -82,7 +82,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt

gterr := c.getToken(ctx)
if gterr != nil {
logger.Warn(
c.GetLogger().Warn(
"retrying of unary invoker failed to fetch new auth token",
zap.String("target", cc.Target()),
zap.Error(gterr),
Expand All @@ -107,7 +107,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
// Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs
// to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams,
// BidiStreams), the retry interceptor will fail the call.
func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor {
func (c *Client) streamClientInterceptor(optFuncs ...retryOption) grpc.StreamClientInterceptor {
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx = withVersion(ctx)
Expand All @@ -117,7 +117,7 @@ func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOp
// equal to c.Username != "" && c.Password != ""
err := c.getToken(ctx)
if err != nil && rpctypes.Error(err) != rpctypes.ErrAuthNotEnabled {
logger.Error("clientv3/retry_interceptor: getToken failed", zap.Error(err))
c.GetLogger().Error("clientv3/retry_interceptor: getToken failed", zap.Error(err))
return nil, err
}
}
Expand All @@ -132,7 +132,7 @@ func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOp
}
newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
if err != nil {
logger.Error("streamer failed to create ClientStream", zap.Error(err))
c.GetLogger().Error("streamer failed to create ClientStream", zap.Error(err))
return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
}
retryingStreamer := &serverStreamingRetryingStream{
Expand Down
2 changes: 2 additions & 0 deletions client/v3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func Save(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath strin
}
defer cli.Close()

cli = cli.WithLogger(lg.Named("client"))

partpath := dbPath + ".part"
defer os.RemoveAll(partpath)

Expand Down
5 changes: 3 additions & 2 deletions contrib/lock/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
"bytes"
"encoding/json"
"fmt"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"io/ioutil"
"net/http"
"os"
"runtime"
"strconv"
"time"

"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

type node struct {
Expand Down
Loading

0 comments on commit 147050f

Please sign in to comment.