Skip to content

Commit

Permalink
increase robustness for kubeadm etcd operations
Browse files Browse the repository at this point in the history
Signed-off-by: SataQiu <1527062125@qq.com>
  • Loading branch information
SataQiu committed Jun 15, 2020
1 parent 15e95e4 commit 800dd19
Showing 1 changed file with 67 additions and 74 deletions.
141 changes: 67 additions & 74 deletions cmd/kubeadm/app/util/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ import (

const etcdTimeout = 2 * time.Second

// Exponential backoff for etcd operations
// Exponential backoff for etcd operations (up to ~200 seconds)
var etcdBackoff = wait.Backoff{
Steps: 11,
Duration: 50 * time.Millisecond,
Factor: 2.0,
Steps: 18,
Duration: 100 * time.Millisecond,
Factor: 1.5,
Jitter: 0.1,
}

Expand Down Expand Up @@ -210,29 +210,27 @@ func getRawEtcdEndpointsFromClusterStatus(client clientset.Interface) ([]string,
return etcdEndpoints, nil
}

// dialTimeout is the timeout for failing to establish a connection.
// It is set to >20 seconds as times shorter than that will cause TLS connections to fail
// on heavily loaded arm64 CPUs (issue #64649)
const dialTimeout = 40 * time.Second

// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c *Client) Sync() error {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return err
}
defer cli.Close()

// Syncs the list of endpoints
var cli *clientv3.Client
var lastError error
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
var err error
cli, err = clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()

ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
err = cli.Sync(ctx)
cancel()
Expand Down Expand Up @@ -260,23 +258,24 @@ type Member struct {
}

func (c *Client) listMembers() (*clientv3.MemberListResponse, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return nil, err
}
defer cli.Close()

// Gets the member list
var lastError error
var resp *clientv3.MemberListResponse
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()

ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.MemberList(ctx)
cancel()
Expand Down Expand Up @@ -324,23 +323,24 @@ func (c *Client) ListMembers() ([]Member, error) {

// RemoveMember notifies an etcd cluster to remove an existing member
func (c *Client) RemoveMember(id uint64) ([]Member, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return nil, err
}
defer cli.Close()

// Remove an existing member from the cluster
var lastError error
var resp *clientv3.MemberRemoveResponse
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()

ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.MemberRemove(ctx, id)
cancel()
Expand Down Expand Up @@ -374,18 +374,10 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs)
}

// Exponential backoff for the MemberAdd operation (up to ~200 seconds)
etcdBackoffAdd := wait.Backoff{
Steps: 18,
Duration: 100 * time.Millisecond,
Factor: 1.5,
Jitter: 0.1,
}

// Adds a new member to the cluster
var lastError error
var resp *clientv3.MemberAddResponse
err = wait.ExponentialBackoff(etcdBackoffAdd, func() (bool, error) {
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
Expand Down Expand Up @@ -447,25 +439,26 @@ func (c *Client) CheckClusterHealth() error {

// getClusterStatus returns nil for status Up (along with endpoint status response map) or error for status Down
func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return nil, err
}
defer cli.Close()

clusterStatus := make(map[string]*clientv3.StatusResponse)
for _, ep := range c.Endpoints {
// Gets the member status
var lastError error
var resp *clientv3.StatusResponse
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()

ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.Status(ctx, ep)
cancel()
Expand Down

0 comments on commit 800dd19

Please sign in to comment.