Skip to content

Commit

Permalink
Merge pull request #17381 from ivanvc/3.5-backport-wait-leader
Browse files Browse the repository at this point in the history
[3.5] backport e2e WaitLeader
  • Loading branch information
serathius committed Feb 8, 2024
2 parents c2b458c + 8b24932 commit b2aa3a1
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 26 deletions.
4 changes: 2 additions & 2 deletions tests/e2e/cmux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ func testConnectionMultiplexing(ctx context.Context, t *testing.T, member e2e.Et
}
t.Run("etcdctl", func(t *testing.T) {
t.Run("v2", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{httpEndpoint}, connType, false, true)
etcdctl := e2e.NewEtcdctl([]string{httpEndpoint}, connType, false, true)
err := etcdctl.Set("a", "1")
assert.NoError(t, err)
})
t.Run("v3", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{grpcEndpoint}, connType, false, false)
etcdctl := e2e.NewEtcdctl([]string{grpcEndpoint}, connType, false, false)
err := etcdctl.Put("a", "1")
assert.NoError(t, err)
})
Expand Down
10 changes: 5 additions & 5 deletions tests/e2e/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestInPlaceRecovery(t *testing.T) {
//Put some data into the old cluster, so that after recovering from a blank db, the hash diverges.
t.Log("putting 10 keys...")

oldCc := NewEtcdctl(epcOld.EndpointsV3(), e2e.ClientNonTLS, false, false)
oldCc := e2e.NewEtcdctl(epcOld.EndpointsV3(), e2e.ClientNonTLS, false, false)
for i := 0; i < 10; i++ {
err := oldCc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i))
assert.NoError(t, err, "error on put")
Expand All @@ -154,7 +154,7 @@ func TestInPlaceRecovery(t *testing.T) {
}
})

newCc := NewEtcdctl(epcNew.EndpointsV3(), e2e.ClientNonTLS, false, false)
newCc := e2e.NewEtcdctl(epcNew.EndpointsV3(), e2e.ClientNonTLS, false, false)
assert.NoError(t, err)

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) {
}
})

cc := NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)
cc := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)

for i := 0; i < 10; i++ {
err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i))
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
}
})

cc := NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)
cc := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)

for i := 0; i < 10; i++ {
err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i))
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
// Put 200 identical keys to the cluster, so that the compaction will drop some stale values.
// We need a relatively big number here to make the compaction takes a non-trivial time, and we can interrupt it.
t.Log("putting 200 values to the identical key...")
cc := NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)
cc := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)

for i := 0; i < 200; i++ {
err = cc.Put("key", fmt.Sprint(i))
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_member_no_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestMemberReplace(t *testing.T) {
for i := 1; i < len(epc.Procs); i++ {
endpoints = append(endpoints, epc.Procs[(memberIdx+i)%len(epc.Procs)].EndpointsGRPC()...)
}
cc := NewEtcdctl(endpoints, e2e.ClientNonTLS, false, false)
cc := e2e.NewEtcdctl(endpoints, e2e.ClientNonTLS, false, false)

memberID, found, err := getMemberIdByName(ctx, cc, memberName)
require.NoError(t, err)
Expand Down
10 changes: 5 additions & 5 deletions tests/e2e/http_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func doHealthCheckAndVerify(t *testing.T, client *http.Client, url string, expec

func triggerNoSpaceAlarm(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
buf := strings.Repeat("b", os.Getpagesize())
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
for {
if err := etcdctl.Put("foo", buf); err != nil {
if !strings.Contains(err.Error(), "etcdserver: mvcc: database space exceeded") {
Expand All @@ -377,7 +377,7 @@ func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCl
// the following proposal will be blocked at applying stage
// because when apply index < committed index, linearizable read would time out.
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "beforeApplyOneEntryNormal", fmt.Sprintf(`sleep("%s")`, duration)))
etcdctl := NewEtcdctl(clus.Procs[1].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[1].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl.Put("foo", "bar")
}

Expand All @@ -391,12 +391,12 @@ func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _

func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "raftBeforeSaveWaitWalSync", fmt.Sprintf(`sleep("%s")`, duration)))
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl.Put("foo", "bar")
}

func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)

_, err := etcdctl.UserAdd("root", "root")
require.NoError(t, err)
Expand All @@ -409,7 +409,7 @@ func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus
}

func triggerCorrupt(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
for i := 0; i < 10; i++ {
require.NoError(t, etcdctl.Put("foo", "bar"))
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error
return g.Wait()
}

func getMemberIdByName(ctx context.Context, c *Etcdctl, name string) (id uint64, found bool, err error) {
func getMemberIdByName(ctx context.Context, c *e2e.Etcdctl, name string) (id uint64, found bool, err error) {
resp, err := c.MemberList()
if err != nil {
return 0, false, err
Expand Down
69 changes: 69 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package e2e

import (
"context"
"fmt"
"net/url"
"os"
Expand Down Expand Up @@ -576,3 +577,71 @@ func (epc *EtcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
}
return ret
}

// WaitLeader returns index of the member in c.Members() that is leader
// or fails the test (if not established in 30s).
func (epc *EtcdProcessCluster) WaitLeader(t testing.TB) int {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return epc.WaitMembersForLeader(ctx, t, epc.Procs)
}

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []EtcdProcess) int {
cc := NewEtcdctl(epc.EndpointsV3(), epc.Cfg.ClientTLS, epc.Cfg.IsClientAutoTLS, epc.Cfg.EnableV2)

// ensure leader is up via linearizable get
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
_, err := cc.Get("0")
if err == nil || strings.Contains(err.Error(), "Key not found") {
break
}
t.Logf("WaitMembersForLeader Get err: %v", err)
}

leaders := make(map[uint64]struct{})
members := make(map[uint64]int)
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
for i := range membs {
resp, err := membs[i].Etcdctl(epc.Cfg.ClientTLS, epc.Cfg.IsClientAutoTLS, epc.Cfg.EnableV2).Status()
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
// if member[i] has stopped
continue
} else {
t.Fatal(err)
}
}
members[resp[0].Header.MemberId] = i
leaders[resp[0].Leader] = struct{}{}
}
// members agree on the same leader
if len(leaders) == 1 {
break
}
leaders = make(map[uint64]struct{})
members = make(map[uint64]int)
// From main branch 10 * config.TickDuration (10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}
for l := range leaders {
if index, ok := members[l]; ok {
t.Logf("members agree on a leader, members:%v , leader:%v", members, l)
return index
}
t.Fatalf("members agree on a leader which is not one of members, members:%v , leader:%v", members, l)
}
t.Fatal("impossible path of execution")
return -1
}
6 changes: 6 additions & 0 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type EtcdProcess interface {
PeerProxy() proxy.Server
Failpoints() *BinaryFailpoints
IsRunning() bool

Etcdctl(connType ClientConnType, isAutoTLS bool, v2 bool) *Etcdctl
}

type LogsExpect interface {
Expand Down Expand Up @@ -249,6 +251,10 @@ func (ep *EtcdServerProcess) IsRunning() bool {
return false
}

func (ep *EtcdServerProcess) Etcdctl(connType ClientConnType, isAutoTLS, v2 bool) *Etcdctl {
return NewEtcdctl(ep.EndpointsV3(), connType, isAutoTLS, v2)
}

type BinaryFailpoints struct {
member EtcdProcess
availableCache map[string]string
Expand Down
39 changes: 27 additions & 12 deletions tests/e2e/etcdctl.go → tests/framework/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ import (
"strings"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/integration"
)

type Etcdctl struct {
connType e2e.ClientConnType
connType ClientConnType
isAutoTLS bool
endpoints []string
v2 bool
}

func NewEtcdctl(endpoints []string, connType e2e.ClientConnType, isAutoTLS bool, v2 bool) *Etcdctl {
func NewEtcdctl(endpoints []string, connType ClientConnType, isAutoTLS bool, v2 bool) *Etcdctl {
return &Etcdctl{
endpoints: endpoints,
connType: connType,
Expand All @@ -52,7 +51,7 @@ func (ctl *Etcdctl) Put(key, value string) error {
}
args := ctl.cmdArgs()
args = append(args, "put", key, value)
return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "OK")
return SpawnWithExpectWithEnv(args, ctl.env(), "OK")
}

func (ctl *Etcdctl) PutWithAuth(key, value, username, password string) error {
Expand All @@ -61,7 +60,7 @@ func (ctl *Etcdctl) PutWithAuth(key, value, username, password string) error {
}
args := ctl.cmdArgs()
args = append(args, "--user", fmt.Sprintf("%s:%s", username, password), "put", key, value)
return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "OK")
return SpawnWithExpectWithEnv(args, ctl.env(), "OK")
}

func (ctl *Etcdctl) Set(key, value string) error {
Expand All @@ -70,7 +69,7 @@ func (ctl *Etcdctl) Set(key, value string) error {
}
args := ctl.cmdArgs()
args = append(args, "set", key, value)
lines, err := e2e.RunUtilCompletion(args, ctl.env())
lines, err := RunUtilCompletion(args, ctl.env())
if err != nil {
return err
}
Expand All @@ -83,7 +82,7 @@ func (ctl *Etcdctl) Set(key, value string) error {

func (ctl *Etcdctl) AuthEnable() error {
args := ctl.cmdArgs("auth", "enable")
return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "Authentication Enabled")
return SpawnWithExpectWithEnv(args, ctl.env(), "Authentication Enabled")
}

func (ctl *Etcdctl) UserGrantRole(user string, role string) (*clientv3.AuthUserGrantRoleResponse, error) {
Expand Down Expand Up @@ -148,12 +147,28 @@ func (ctl *Etcdctl) Compact(rev int64) (*clientv3.CompactResponse, error) {
panic("Unsupported method for v2")
}
args := ctl.cmdArgs("compact", fmt.Sprint(rev))
return nil, e2e.SpawnWithExpectWithEnv(args, ctl.env(), fmt.Sprintf("compacted revision %v", rev))
return nil, SpawnWithExpectWithEnv(args, ctl.env(), fmt.Sprintf("compacted revision %v", rev))
}

func (ctl *Etcdctl) Status() ([]*clientv3.StatusResponse, error) {
var epStatus []*struct {
Endpoint string
Status *clientv3.StatusResponse
}
err := ctl.spawnJsonCmd(&epStatus, "endpoint", "status")
if err != nil {
return nil, err
}
resp := make([]*clientv3.StatusResponse, len(epStatus))
for i, e := range epStatus {
resp[i] = e.Status
}
return resp, err
}

func (ctl *Etcdctl) spawnJsonCmd(output interface{}, args ...string) error {
args = append(args, "-w", "json")
cmd, err := e2e.SpawnCmd(append(ctl.cmdArgs(), args...), ctl.env())
cmd, err := SpawnCmd(append(ctl.cmdArgs(), args...), ctl.env())
if err != nil {
return err
}
Expand All @@ -165,7 +180,7 @@ func (ctl *Etcdctl) spawnJsonCmd(output interface{}, args ...string) error {
}

func (ctl *Etcdctl) cmdArgs(args ...string) []string {
cmdArgs := []string{e2e.CtlBinPath}
cmdArgs := []string{CtlBinPath}
for k, v := range ctl.flags() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v))
}
Expand All @@ -176,13 +191,13 @@ func (ctl *Etcdctl) flags() map[string]string {
fmap := make(map[string]string)
if ctl.v2 {
fmap["no-sync"] = "true"
if ctl.connType == e2e.ClientTLS {
if ctl.connType == ClientTLS {
fmap["ca-file"] = integration.TestTLSInfo.TrustedCAFile
fmap["cert-file"] = integration.TestTLSInfo.CertFile
fmap["key-file"] = integration.TestTLSInfo.KeyFile
}
} else {
if ctl.connType == e2e.ClientTLS {
if ctl.connType == ClientTLS {
if ctl.isAutoTLS {
fmap["insecure-transport"] = "false"
fmap["insecure-skip-tls-verify"] = "true"
Expand Down

0 comments on commit b2aa3a1

Please sign in to comment.