Skip to content

Commit

Permalink
Merge pull request #17923 from siyuanfoundation/robust
Browse files Browse the repository at this point in the history
Add randomness in robustness cluster process version to test mixed version scenarios.
  • Loading branch information
serathius committed May 22, 2024
2 parents f8d8f6a + 0f94c2c commit 1d367fb
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 53 deletions.
13 changes: 7 additions & 6 deletions tests/e2e/etcd_mix_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,13 @@ func mixVersionsSnapshotTestByMockPartition(t *testing.T, cfg *e2e.EtcdProcessCl
t.Skipf("%q does not exist", e2e.BinPath.EtcdLastRelease)
}

clusterOptions := []e2e.EPClusterOption{e2e.WithConfig(cfg), e2e.WithSnapshotCount(10)}
// TODO: remove version check after 3.5.14 release.
if cfg.Version == e2e.CurrentVersion {
clusterOptions = append(clusterOptions, e2e.WithSnapshotCatchUpEntries(10))
}
t.Logf("Create an etcd cluster with %d member", cfg.ClusterSize)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithConfig(cfg),
e2e.WithSnapshotCount(10),
e2e.WithSnapshotCatchUpEntries(10),
)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, clusterOptions...)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
derr := epc.Close()
Expand All @@ -161,7 +162,7 @@ func mixVersionsSnapshotTestByMockPartition(t *testing.T, cfg *e2e.EtcdProcessCl
assertKVHash(t, epc)

leaderEPC = epc.Procs[epc.WaitLeader(t)]
if leaderEPC.Config().ExecPath == e2e.BinPath.Etcd {
if cfg.Version == e2e.CurrentVersion {
t.Log("Verify logs to check snapshot be sent from leader to follower")
e2e.AssertProcessLogs(t, leaderEPC, "sent database snapshot")
}
Expand Down
105 changes: 77 additions & 28 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,11 @@ type EtcdProcessClusterConfig struct {

// Cluster setup config

ClusterSize int
RollingStart bool
ClusterSize int
// InitialLeaderIndex makes sure the leader is the ith proc
// when the cluster starts if it is specified (>=0).
InitialLeaderIndex int
RollingStart bool
// BaseDataDirPath specifies the data-dir for the members. If test cases
// do not specify `BaseDataDirPath`, then e2e framework creates a
// temporary directory for each member; otherwise, it creates a
Expand Down Expand Up @@ -180,10 +183,10 @@ type EtcdProcessClusterConfig struct {

func DefaultConfig() *EtcdProcessClusterConfig {
cfg := &EtcdProcessClusterConfig{
ClusterSize: 3,
CN: true,

ServerConfig: *embed.NewConfig(),
ClusterSize: 3,
CN: true,
InitialLeaderIndex: -1,
ServerConfig: *embed.NewConfig(),
}
cfg.ServerConfig.InitialClusterToken = "new"
return cfg
Expand All @@ -207,6 +210,10 @@ func WithVersion(version ClusterVersion) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.Version = version }
}

func WithInitialLeaderIndex(i int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.InitialLeaderIndex = i }
}

func WithDataDirPath(path string) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.BaseDataDirPath = path }
}
Expand Down Expand Up @@ -398,6 +405,16 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP
cfg.ServerConfig.SnapshotCount = etcdserver.DefaultSnapshotCount
}

// validate SnapshotCatchUpEntries could be set for at least one member
if cfg.ServerConfig.SnapshotCatchUpEntries != etcdserver.DefaultSnapshotCatchUpEntries {
if !CouldSetSnapshotCatchupEntries(BinPath.Etcd) {
return nil, fmt.Errorf("cannot set SnapshotCatchUpEntries for current etcd version: %s", BinPath.Etcd)
}
if cfg.Version == LastVersion && !CouldSetSnapshotCatchupEntries(BinPath.EtcdLastRelease) {
return nil, fmt.Errorf("cannot set SnapshotCatchUpEntries for last etcd version: %s", BinPath.EtcdLastRelease)
}
}

etcdCfgs := cfg.EtcdAllServerProcessConfigs(t)
epc := &EtcdProcessCluster{
Cfg: cfg,
Expand Down Expand Up @@ -437,7 +454,11 @@ func StartEtcdProcessCluster(ctx context.Context, t testing.TB, epc *EtcdProcess
t.Skip("please run 'make gofail-enable && make build' before running the test")
}
}

if cfg.InitialLeaderIndex >= 0 {
if err := epc.MoveLeader(ctx, t, cfg.InitialLeaderIndex); err != nil {
return nil, fmt.Errorf("failed to move leader: %v", err)
}
}
return epc, nil
}

Expand Down Expand Up @@ -570,27 +591,6 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
args = append(args, "--discovery="+cfg.Discovery)
}

defaultValues := values(*embed.NewConfig())
overrideValues := values(cfg.ServerConfig)
for flag, value := range overrideValues {
if defaultValue := defaultValues[flag]; value == "" || value == defaultValue {
continue
}
if flag == "experimental-snapshot-catchup-entries" && !(cfg.Version == CurrentVersion || (cfg.Version == MinorityLastVersion && i <= cfg.ClusterSize/2) || (cfg.Version == QuorumLastVersion && i > cfg.ClusterSize/2)) {
continue
}
args = append(args, fmt.Sprintf("--%s=%s", flag, value))
}
envVars := map[string]string{}
for key, value := range cfg.EnvVars {
envVars[key] = value
}
var gofailPort int
if cfg.GoFailEnabled {
gofailPort = (i+1)*10000 + 2381
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort)
}

var execPath string
switch cfg.Version {
case CurrentVersion:
Expand All @@ -613,6 +613,27 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
panic(fmt.Sprintf("Unknown cluster version %v", cfg.Version))
}

defaultValues := values(*embed.NewConfig())
overrideValues := values(cfg.ServerConfig)
for flag, value := range overrideValues {
if defaultValue := defaultValues[flag]; value == "" || value == defaultValue {
continue
}
if flag == "experimental-snapshot-catchup-entries" && !CouldSetSnapshotCatchupEntries(execPath) {
continue
}
args = append(args, fmt.Sprintf("--%s=%s", flag, value))
}
envVars := map[string]string{}
for key, value := range cfg.EnvVars {
envVars[key] = value
}
var gofailPort int
if cfg.GoFailEnabled {
gofailPort = (i+1)*10000 + 2381
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort)
}

return &EtcdServerProcessConfig{
lg: cfg.Logger,
ExecPath: execPath,
Expand Down Expand Up @@ -1050,3 +1071,31 @@ func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testi
t.Fatal("impossible path of execution")
return -1
}

// MoveLeader moves the leader to the ith process.
func (epc *EtcdProcessCluster) MoveLeader(ctx context.Context, t testing.TB, i int) error {
if i < 0 || i >= len(epc.Procs) {
return fmt.Errorf("invalid index: %d, must between 0 and %d", i, len(epc.Procs)-1)
}
t.Logf("moving leader to Procs[%d]", i)
oldLeader := epc.WaitMembersForLeader(ctx, t, epc.Procs)
if oldLeader == i {
t.Logf("Procs[%d] is already the leader", i)
return nil
}
resp, err := epc.Procs[i].Etcdctl().Status(ctx)
if err != nil {
return err
}
memberID := resp[0].Header.MemberId
err = epc.Procs[oldLeader].Etcdctl().MoveLeader(ctx, memberID)
if err != nil {
return err
}
newLeader := epc.WaitMembersForLeader(ctx, t, epc.Procs)
if newLeader != i {
t.Fatalf("expect new leader to be Procs[%d] but got Procs[%d]", i, newLeader)
}
t.Logf("moved leader from Procs[%d] to Procs[%d]", oldLeader, i)
return nil
}
27 changes: 26 additions & 1 deletion tests/framework/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@
package e2e

import (
"fmt"
"testing"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
)

func TestEtcdServerProcessConfig(t *testing.T) {
v3_5_12 := semver.Version{Major: 3, Minor: 5, Patch: 12}
v3_5_13 := semver.Version{Major: 3, Minor: 5, Patch: 13}
tcs := []struct {
name string
config *EtcdProcessClusterConfig
expectArgsNotContain []string
expectArgsContain []string
mockBinaryVersion *semver.Version
}{
{
name: "Default",
Expand Down Expand Up @@ -73,17 +78,37 @@ func TestEtcdServerProcessConfig(t *testing.T) {
expectArgsContain: []string{
"--experimental-snapshot-catchup-entries=100",
},
mockBinaryVersion: &v3_5_13,
},
{
name: "CatchUpEntriesLastVersion",
name: "CatchUpEntriesNoVersion",
config: NewConfig(WithSnapshotCatchUpEntries(100), WithVersion(LastVersion)),
expectArgsNotContain: []string{
"--experimental-snapshot-catchup-entries=100",
},
},
{
name: "CatchUpEntriesOldVersion",
config: NewConfig(WithSnapshotCatchUpEntries(100), WithVersion(LastVersion)),
expectArgsNotContain: []string{
"--experimental-snapshot-catchup-entries=100",
},
mockBinaryVersion: &v3_5_12,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
var mockGetVersionFromBinary func(binaryPath string) (*semver.Version, error)
if tc.mockBinaryVersion == nil {
mockGetVersionFromBinary = func(binaryPath string) (*semver.Version, error) {
return nil, fmt.Errorf("could not get binary version")
}
} else {
mockGetVersionFromBinary = func(binaryPath string) (*semver.Version, error) {
return tc.mockBinaryVersion, nil
}
}
setGetVersionFromBinary(t, mockGetVersionFromBinary)
args := tc.config.EtcdServerProcessConfig(t, 0).Args
if len(tc.expectArgsContain) != 0 {
assert.Subset(t, args, tc.expectArgsContain)
Expand Down
24 changes: 23 additions & 1 deletion tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,10 @@ func parseFailpointsBody(body io.Reader) (map[string]string, error) {
return failpoints, nil
}

func GetVersionFromBinary(binaryPath string) (*semver.Version, error) {
var GetVersionFromBinary = func(binaryPath string) (*semver.Version, error) {
if !fileutil.Exist(binaryPath) {
return nil, fmt.Errorf("binary path does not exist: %s", binaryPath)
}
lines, err := RunUtilCompletion([]string{binaryPath, "--version"}, nil)
if err != nil {
return nil, fmt.Errorf("could not find binary version from %s, err: %w", binaryPath, err)
Expand All @@ -508,3 +511,22 @@ func GetVersionFromBinary(binaryPath string) (*semver.Version, error) {

return nil, fmt.Errorf("could not find version in binary output of %s, lines outputted were %v", binaryPath, lines)
}

// setGetVersionFromBinary changes the GetVersionFromBinary function to a mock in testing.
func setGetVersionFromBinary(tb testing.TB, f func(binaryPath string) (*semver.Version, error)) {
origGetVersionFromBinary := GetVersionFromBinary
GetVersionFromBinary = f
tb.Cleanup(func() {
GetVersionFromBinary = origGetVersionFromBinary
})
}

func CouldSetSnapshotCatchupEntries(execPath string) bool {
v, err := GetVersionFromBinary(execPath)
if err != nil {
return false
}
// snapshot-catchup-entries flag was backported in https://github.com/etcd-io/etcd/pull/17808
v3_5_13 := semver.Version{Major: 3, Minor: 5, Patch: 13}
return v.Compare(v3_5_13) >= 0
}
7 changes: 7 additions & 0 deletions tests/framework/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,13 @@ func (ctl *EtcdctlV3) MemberPromote(ctx context.Context, id uint64) (*clientv3.M
return &resp, err
}

// MoveLeader requests current leader to transfer its leadership to the transferee.
// Request must be made to the leader.
func (ctl *EtcdctlV3) MoveLeader(ctx context.Context, transfereeID uint64) error {
_, err := SpawnWithExpectLines(ctx, ctl.cmdArgs("move-leader", fmt.Sprintf("%x", transfereeID)), nil, expect.ExpectedResponse{Value: "Leadership transferred"})
return err
}

func (ctl *EtcdctlV3) cmdArgs(args ...string) []string {
cmdArgs := []string{BinPath.Etcdctl}
for k, v := range ctl.flags() {
Expand Down
5 changes: 5 additions & 0 deletions tests/framework/e2e/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func InitFlags() {
certDirDef := FixturesDir

binDir := flag.String("bin-dir", binDirDef, "The directory for store etcd and etcdctl binaries.")
binLastRelease := flag.String("bin-last-release", "", "The path for the last release etcd binary.")

flag.StringVar(&CertDir, "cert-dir", certDirDef, "The directory for store certificate files.")
flag.Parse()

Expand All @@ -79,6 +81,9 @@ func InitFlags() {
Etcdutl: *binDir + "/etcdutl",
LazyFS: *binDir + "/lazyfs",
}
if *binLastRelease != "" {
BinPath.EtcdLastRelease = *binLastRelease
}
CertPath = CertDir + "/server.crt"
PrivateKeyPath = CertDir + "/server.key.insecure"
CaPath = CertDir + "/ca.crt"
Expand Down
5 changes: 3 additions & 2 deletions tests/robustness/failpoint/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ func (f memberReplace) Name() string {
return "MemberReplace"
}

func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, _ e2e.EtcdProcess) bool {
return config.ClusterSize > 1
func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool {
// a lower etcd version may not be able to join a cluster with higher cluster version.
return config.ClusterSize > 1 && (config.Version == e2e.QuorumLastVersion || member.Config().ExecPath == e2e.BinPath.Etcd)
}

func getID(ctx context.Context, cc clientv3.Cluster, name string) (id uint64, found bool, err error) {
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/failpoint/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (tb triggerBlackhole) Trigger(ctx context.Context, t *testing.T, member e2e
func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess) bool {
// Avoid triggering failpoint if waiting for failpoint would take too long to fit into timeout.
// Number of required entries for snapshot depends on etcd configuration.
if tb.waitTillSnapshot && entriesToGuaranteeSnapshot(config) > 200 {
if tb.waitTillSnapshot && (entriesToGuaranteeSnapshot(config) > 200 || !e2e.CouldSetSnapshotCatchupEntries(process.Config().ExecPath)) {
return false
}
return config.ClusterSize > 1 && process.PeerProxy() != nil
Expand Down
16 changes: 14 additions & 2 deletions tests/robustness/makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ test-robustness-reports:

# Test previous release branches

.PHONY: test-robustness-release-3.6
test-robustness-release-3.6: /tmp/etcd-release-3.6-failpoints/bin /tmp/etcd-release-3.5-failpoints/bin
GO_TEST_FLAGS="$${GO_TEST_FLAGS} --bin-dir=/tmp/etcd-release-3.6-failpoints/bin --bin-last-release=/tmp/etcd-release-3.5-failpoints/bin/etcd" make test-robustness

.PHONY: test-robustness-release-3.5
test-robustness-release-3.5: /tmp/etcd-release-3.5-failpoints/bin
GO_TEST_FLAGS="$${GO_TEST_FLAGS} --bin-dir=/tmp/etcd-release-3.5-failpoints/bin" make test-robustness
test-robustness-release-3.5: /tmp/etcd-release-3.5-failpoints/bin /tmp/etcd-release-3.4-failpoints/bin
GO_TEST_FLAGS="$${GO_TEST_FLAGS} --bin-dir=/tmp/etcd-release-3.5-failpoints/bin --bin-last-release=/tmp/etcd-release-3.4-failpoints/bin/etcd" make test-robustness

.PHONY: test-robustness-release-3.4
test-robustness-release-3.4: /tmp/etcd-release-3.4-failpoints/bin
Expand Down Expand Up @@ -72,6 +76,14 @@ $(GOPATH)/bin/gofail: tools/mod/go.mod tools/mod/go.sum
make gofail-enable; \
make build;

/tmp/etcd-release-3.6-failpoints/bin: $(GOPATH)/bin/gofail
rm -rf /tmp/etcd-release-3.6-failpoints/
mkdir -p /tmp/etcd-release-3.6-failpoints/
cd /tmp/etcd-release-3.6-failpoints/; \
git clone --depth 1 --branch main https://github.com/etcd-io/etcd.git .; \
make gofail-enable; \
make build;

/tmp/etcd-v3.5.2-failpoints/bin:
/tmp/etcd-v3.5.4-failpoints/bin:
/tmp/etcd-v3.5.5-failpoints/bin:
Expand Down
8 changes: 8 additions & 0 deletions tests/robustness/options/server_config_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,11 @@ func WithExperimentalWatchProgressNotifyInterval(input ...time.Duration) e2e.EPC
c.ServerConfig.ExperimentalWatchProgressNotifyInterval = input[internalRand.Intn(len(input))]
}
}

func WithVersion(input ...e2e.ClusterVersion) e2e.EPClusterOption {
return func(c *e2e.EtcdProcessClusterConfig) { c.Version = input[internalRand.Intn(len(input))] }
}

func WithInitialLeaderIndex(input ...int) e2e.EPClusterOption {
return func(c *e2e.EtcdProcessClusterConfig) { c.InitialLeaderIndex = input[internalRand.Intn(len(input))] }
}

0 comments on commit 1d367fb

Please sign in to comment.