Skip to content

Commit

Permalink
Improve snapshot status
Browse files Browse the repository at this point in the history
Signed-off-by: Cenk Alti <cenkalti@gmail.com>
  • Loading branch information
cenkalti committed Jan 19, 2024
1 parent e5665a7 commit 7f32b22
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 11 deletions.
17 changes: 17 additions & 0 deletions etcdutl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/olekukonko/tablewriter v0.0.5
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.8.4
go.etcd.io/bbolt v1.4.0-alpha.0
go.etcd.io/etcd/api/v3 v3.6.0-alpha.0
go.etcd.io/etcd/client/pkg/v3 v3.6.0-alpha.0
Expand All @@ -36,30 +37,43 @@ require (

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jonboulle/clockwork v0.4.0 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/etcd/client/v2 v2.306.0-alpha.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/sdk v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/net v0.20.0 // indirect
Expand All @@ -71,5 +85,8 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/grpc v1.60.1 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)
101 changes: 99 additions & 2 deletions etcdutl/go.sum

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion etcdutl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/types"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -138,6 +139,7 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
if v != nil {
ds.Version = v.String()
}
var kv mvccpb.KeyValue
c := tx.Cursor()
for next, _ := c.First(); next != nil; next, _ = c.Next() {
b := tx.Bucket(next)
Expand All @@ -156,8 +158,16 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
return fmt.Errorf("cannot write to bucket %s", herr.Error())
}
if iskeyb {
rev := mvcc.BytesToRev(k)
rev, err := mvcc.BytesToBucketKey(k)
if err != nil {
return fmt.Errorf("cannot parse revision key : %s", err.Error())
}
ds.Revision = rev.Main

err = kv.Unmarshal(v)
if err != nil {
return fmt.Errorf("cannot unmarshal value : %s", err.Error())
}
}
ds.TotalKey++
return nil
Expand Down
145 changes: 145 additions & 0 deletions etcdutl/snapshot/v3_snapshot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package snapshot

import (
"context"
"errors"
"path/filepath"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.etcd.io/bbolt"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/storage/mvcc"
)

// TestSnapshotStatus is the happy case.
// It inserts pre-defined number of keys and asserts the output hash of status command.
// The expected hash value must not be changed.
// If it changes, there must be some backwards incompatible change introduced.
func TestSnapshotStatus(t *testing.T) {
dbpath := createDB(t, insertKeys(t, 10, 100))

status, err := NewV3(zap.NewNop()).Status(dbpath)
require.NoError(t, err)

assert.Equal(t, uint32(0x62132b4d), status.Hash)
assert.Equal(t, int64(11), status.Revision)
}

// TestSnapshotStatusCorruptRevision tests if snapshot status command fails when there is an unexpected revision in "key" bucket.
func TestSnapshotStatusCorruptRevision(t *testing.T) {
dbpath := createDB(t, insertKeys(t, 1, 0))

db, err := bbolt.Open(dbpath, 0666, nil)
require.NoError(t, err)
defer db.Close()

err = db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte("key"))
if b == nil {
return errors.New("key bucket not found")
}
return b.Put([]byte("0"), []byte{})
})
require.NoError(t, err)
db.Close()

_, err = NewV3(zap.NewNop()).Status(dbpath)
require.Error(t, err)
}

// TestSnapshotStatusNegativeRevisionMain tests if snapshot status command fails when main revision number is negative.
func TestSnapshotStatusNegativeRevisionMain(t *testing.T) {
dbpath := createDB(t, insertKeys(t, 1, 0))

db, err := bbolt.Open(dbpath, 0666, nil)
require.NoError(t, err)
defer db.Close()

err = db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte("key"))
if b == nil {
return errors.New("key bucket not found")
}
bytes := make([]byte, 17) // 2 int64 and a separator
mvcc.RevToBytes(mvcc.Revision{Main: -2}, bytes) // -1 is allowed for main rev
return b.Put(bytes, []byte{})
})
require.NoError(t, err)
db.Close()

_, err = NewV3(zap.NewNop()).Status(dbpath)
require.Error(t, err)
}

// TestSnapshotStatusNegativeRevisionSub tests if snapshot status command fails when sub revision number is negative.
func TestSnapshotStatusNegativeRevisionSub(t *testing.T) {
dbpath := createDB(t, insertKeys(t, 1, 0))

db, err := bbolt.Open(dbpath, 0666, nil)
require.NoError(t, err)
defer db.Close()

err = db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte("key"))
if b == nil {
return errors.New("key bucket not found")
}
bytes := make([]byte, 17) // 2 int64 and a separator
mvcc.RevToBytes(mvcc.Revision{Sub: -1}, bytes)
return b.Put(bytes, []byte{})
})
require.NoError(t, err)
db.Close()

_, err = NewV3(zap.NewNop()).Status(dbpath)
require.Error(t, err)
}

// insertKeys insert `numKeys` number of keys of `valueSize` size into a running etcd server.
func insertKeys(t *testing.T, numKeys, valueSize int) func(*etcdserver.EtcdServer) {
t.Helper()
return func(srv *etcdserver.EtcdServer) {
val := make([]byte, valueSize)
for i := 0; i < numKeys; i++ {
req := etcdserverpb.PutRequest{
Key: []byte(strconv.Itoa(i)),
Value: val,
}
_, err := srv.Put(context.TODO(), &req)
require.NoError(t, err)
}
}
}

// createDB creates a bbolt database file by running an embedded etcd server.
// While the server is running, `generateContent` function is called to insert values.
// It returns the path of bbolt database.
func createDB(t *testing.T, generateContent func(*etcdserver.EtcdServer)) string {
t.Helper()

cfg := embed.NewConfig()
cfg.LogLevel = "fatal"
cfg.Dir = t.TempDir()

etcd, err := embed.StartEtcd(cfg)
require.NoError(t, err)
defer etcd.Close()

select {
case <-etcd.Server.ReadyNotify():
case <-time.After(10 * time.Second):
t.FailNow()
}

generateContent(etcd.Server)

return filepath.Join(cfg.Dir, "member", "snap", "db")
}
3 changes: 2 additions & 1 deletion server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
}
return
}
t.Errorf("key for rev %+v still exists, want deleted", BytesToBucketKey(revbytes))
key, _ := BytesToBucketKey(revbytes)
t.Errorf("key for rev %+v still exists, want deleted", key)
})
}
}
Expand Down
24 changes: 19 additions & 5 deletions server/storage/mvcc/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package mvcc

import (
"encoding/binary"
"errors"
"fmt"
)

const (
Expand Down Expand Up @@ -55,7 +57,8 @@ func RevToBytes(rev Revision, bytes []byte) []byte {
}

func BytesToRev(bytes []byte) Revision {
return BytesToBucketKey(bytes).Revision
rev, _ := BytesToBucketKey(bytes)
return rev.Revision
}

// BucketKey indicates modification of the key-value space.
Expand Down Expand Up @@ -94,14 +97,25 @@ func BucketKeyToBytes(rev BucketKey, bytes []byte) []byte {
return bytes
}

func BytesToBucketKey(bytes []byte) BucketKey {
func BytesToBucketKey(bytes []byte) (BucketKey, error) {
if (len(bytes) != revBytesLen) && (len(bytes) != markedRevBytesLen) {
return BucketKey{}, fmt.Errorf("invalid revision length: %d", len(bytes))
}
if bytes[8] != '_' {
return BucketKey{}, fmt.Errorf("invalid separator in bucket key: %q", bytes[8])
}
main := int64(binary.BigEndian.Uint64(bytes[0:8]))
sub := int64(binary.BigEndian.Uint64(bytes[9:]))
if main < -1 || sub < 0 { // finishedCompactRev can be -1
return BucketKey{}, errors.New("negative revision")
}
return BucketKey{
Revision: Revision{
Main: int64(binary.BigEndian.Uint64(bytes[0:8])),
Sub: int64(binary.BigEndian.Uint64(bytes[9:])),
Main: main,
Sub: sub,
},
tombstone: isTombstone(bytes),
}
}, nil
}

// isTombstone checks whether the revision bytes is a tombstone.
Expand Down
4 changes: 2 additions & 2 deletions server/storage/mvcc/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestScheduledCompact(t *testing.T) {
value: math.MaxInt64,
},
{
value: math.MinInt64,
value: -1,
},
}
for _, tc := range tcs {
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestFinishedCompact(t *testing.T) {
value: math.MaxInt64,
},
{
value: math.MinInt64,
value: -1,
},
}
for _, tc := range tcs {
Expand Down

0 comments on commit 7f32b22

Please sign in to comment.