diff --git a/concurrent_test.go b/concurrent_test.go new file mode 100644 index 000000000..3995c0466 --- /dev/null +++ b/concurrent_test.go @@ -0,0 +1,950 @@ +package bbolt_test + +import ( + "bytes" + crand "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "io" + mrand "math/rand" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "testing" + "time" + "unicode/utf8" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + bolt "go.etcd.io/bbolt" +) + +const ( + bucketPrefix = "bucket" + keyPrefix = "key" + noopTxKey = "%magic-no-op-key%" + + // TestConcurrentCaseDuration is used as a env variable to specify the + // concurrent test duration. + testConcurrentCaseDuration = "TEST_CONCURRENT_CASE_DURATION" + defaultConcurrentTestDuration = 30 * time.Second +) + +type duration struct { + min time.Duration + max time.Duration +} + +type bytesRange struct { + min int + max int +} + +type operationChance struct { + operation OperationType + chance int +} + +type concurrentConfig struct { + bucketCount int + keyCount int + workInterval duration + operationRatio []operationChance + readInterval duration // only used by readOperation + noopWriteRatio int // only used by writeOperation + writeBytes bytesRange // only used by writeOperation +} + +/* +TestConcurrentGenericReadAndWrite verifies: + 1. Repeatable read: a read transaction should always see the same data + view during its lifecycle. + 2. Any data written by a writing transaction should be visible to any + following reading transactions (with txid >= previous writing txid). + 3. The txid should never decrease. +*/ +func TestConcurrentGenericReadAndWrite(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + testDuration := concurrentTestDuration(t) + conf := concurrentConfig{ + bucketCount: 5, + keyCount: 10000, + workInterval: duration{}, + operationRatio: []operationChance{ + {operation: Read, chance: 60}, + {operation: Write, chance: 20}, + {operation: Delete, chance: 20}, + }, + readInterval: duration{ + min: 50 * time.Millisecond, + max: 100 * time.Millisecond, + }, + noopWriteRatio: 20, + writeBytes: bytesRange{ + min: 200, + max: 16000, + }, + } + + testCases := []struct { + name string + workerCount int + conf concurrentConfig + testDuration time.Duration + }{ + { + name: "1 worker", + workerCount: 1, + conf: conf, + testDuration: testDuration, + }, + { + name: "10 workers", + workerCount: 10, + conf: conf, + testDuration: testDuration, + }, + { + name: "50 workers", + workerCount: 50, + conf: conf, + testDuration: testDuration, + }, + { + name: "100 workers", + workerCount: 100, + conf: conf, + testDuration: testDuration, + }, + { + name: "200 workers", + workerCount: 200, + conf: conf, + testDuration: testDuration, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + concurrentReadAndWrite(t, + tc.workerCount, + tc.conf, + tc.testDuration) + }) + } +} + +func concurrentTestDuration(t *testing.T) time.Duration { + durationInEnv := strings.ToLower(os.Getenv(testConcurrentCaseDuration)) + if durationInEnv == "" { + t.Logf("%q not set, defaults to %s", testConcurrentCaseDuration, defaultConcurrentTestDuration) + return defaultConcurrentTestDuration + } + + d, err := time.ParseDuration(durationInEnv) + if err != nil { + t.Logf("Failed to parse %s=%s, error: %v, defaults to %s", testConcurrentCaseDuration, durationInEnv, err, defaultConcurrentTestDuration) + return defaultConcurrentTestDuration + } + + t.Logf("Concurrent test duration set by %s=%s", testConcurrentCaseDuration, d) + return d +} + +func concurrentReadAndWrite(t *testing.T, + workerCount int, + conf concurrentConfig, + testDuration time.Duration) { + + t.Log("Preparing db.") + db := mustCreateDB(t, nil) + defer db.Close() + err := db.Update(func(tx *bolt.Tx) error { + for i := 0; i < conf.bucketCount; i++ { + if _, err := tx.CreateBucketIfNotExists(bucketName(i)); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + var records historyRecords + // t.Failed() returns false during panicking. We need to forcibly + // save data on panicking. + // Refer to: https://github.com/golang/go/issues/49929 + panicked := true + defer func() { + t.Log("Save data if failed.") + saveDataIfFailed(t, db, records, panicked) + }() + + t.Log("Starting workers.") + records = runWorkers(t, + db, + workerCount, + conf, + testDuration) + + t.Log("Analyzing the history records.") + if err := validateSequential(records); err != nil { + t.Errorf("The history records are not sequential:\n %v", err) + } + + t.Log("Checking database consistency.") + if err := checkConsistency(t, db); err != nil { + t.Errorf("The data isn't consistency: %v", err) + } + + panicked = false + // TODO (ahrtr): + // 1. intentionally inject a random failpoint. +} + +// mustCreateDB is created in place of `btesting.MustCreateDB`, and it's +// only supposed to be used by the concurrent test case. The purpose is +// to ensure the test case can be executed on old branches or versions, +// e.g. `release-1.3` or `1.3.[5-7]`. +func mustCreateDB(t *testing.T, o *bolt.Options) *bolt.DB { + f := filepath.Join(t.TempDir(), "db") + + return mustOpenDB(t, f, o) +} + +func mustReOpenDB(t *testing.T, db *bolt.DB, o *bolt.Options) *bolt.DB { + f := db.Path() + + t.Logf("Closing bbolt DB at: %s", f) + err := db.Close() + require.NoError(t, err) + + return mustOpenDB(t, f, o) +} + +func mustOpenDB(t *testing.T, dbPath string, o *bolt.Options) *bolt.DB { + t.Logf("Opening bbolt DB at: %s", dbPath) + if o == nil { + o = bolt.DefaultOptions + } + + freelistType := bolt.FreelistArrayType + if env := os.Getenv("TEST_FREELIST_TYPE"); env == string(bolt.FreelistMapType) { + freelistType = bolt.FreelistMapType + } + + o.FreelistType = freelistType + + db, err := bolt.Open(dbPath, 0600, o) + require.NoError(t, err) + + return db +} + +func checkConsistency(t *testing.T, db *bolt.DB) error { + return db.View(func(tx *bolt.Tx) error { + cnt := 0 + for err := range tx.Check() { + t.Errorf("Consistency error: %v", err) + cnt++ + } + if cnt > 0 { + return fmt.Errorf("%d consistency errors found", cnt) + } + return nil + }) +} + +/* +********************************************************* +Data structures and functions/methods for running concurrent +workers, which execute different operations, including `Read`, +`Write` and `Delete`. +********************************************************* +*/ +func runWorkers(t *testing.T, + db *bolt.DB, + workerCount int, + conf concurrentConfig, + testDuration time.Duration) historyRecords { + stopCh := make(chan struct{}, 1) + errCh := make(chan error, workerCount) + + var mu sync.Mutex + var rs historyRecords + + g := new(errgroup.Group) + for i := 0; i < workerCount; i++ { + w := &worker{ + id: i, + db: db, + + conf: conf, + + errCh: errCh, + stopCh: stopCh, + t: t, + } + g.Go(func() error { + wrs, err := runWorker(t, w, errCh) + mu.Lock() + rs = append(rs, wrs...) + mu.Unlock() + return err + }) + } + + t.Logf("Keep all workers running for about %s.", testDuration) + select { + case <-time.After(testDuration): + case <-errCh: + } + + close(stopCh) + t.Log("Waiting for all workers to finish.") + if err := g.Wait(); err != nil { + t.Errorf("Received error: %v", err) + } + + return rs +} + +func runWorker(t *testing.T, w *worker, errCh chan error) (historyRecords, error) { + rs, err := w.run() + if len(rs) > 0 && err == nil { + if terr := validateIncrementalTxid(rs); terr != nil { + txidErr := fmt.Errorf("[%s]: %w", w.name(), terr) + t.Error(txidErr) + errCh <- txidErr + return rs, txidErr + } + } + return rs, err +} + +type worker struct { + id int + db *bolt.DB + + conf concurrentConfig + + errCh chan error + stopCh chan struct{} + + t *testing.T +} + +func (w *worker) name() string { + return fmt.Sprintf("worker-%d", w.id) +} + +func (w *worker) run() (historyRecords, error) { + var rs historyRecords + for { + select { + case <-w.stopCh: + w.t.Logf("%q finished.", w.name()) + return rs, nil + default: + } + + op := w.pickOperation() + bucket, key := w.pickBucket(), w.pickKey() + rec, err := executeOperation(op, w.db, bucket, key, w.conf) + if err != nil { + readErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, err) + w.t.Error(readErr) + w.errCh <- readErr + return rs, readErr + } + + rs = append(rs, rec) + if w.conf.workInterval != (duration{}) { + time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max)) + } + } +} + +func (w *worker) pickBucket() []byte { + return bucketName(mrand.Intn(w.conf.bucketCount)) +} + +func bucketName(index int) []byte { + bucket := fmt.Sprintf("%s_%d", bucketPrefix, index) + return []byte(bucket) +} + +func (w *worker) pickKey() []byte { + key := fmt.Sprintf("%s_%d", keyPrefix, mrand.Intn(w.conf.keyCount)) + return []byte(key) +} + +func (w *worker) pickOperation() OperationType { + sum := 0 + for _, op := range w.conf.operationRatio { + sum += op.chance + } + roll := mrand.Int() % sum + for _, op := range w.conf.operationRatio { + if roll < op.chance { + return op.operation + } + roll -= op.chance + } + panic("unexpected") +} + +func executeOperation(op OperationType, db *bolt.DB, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) { + switch op { + case Read: + return executeRead(db, bucket, key, conf.readInterval) + case Write: + return executeWrite(db, bucket, key, conf.writeBytes, conf.noopWriteRatio) + case Delete: + return executeDelete(db, bucket, key) + default: + panic(fmt.Sprintf("unexpected operation type: %s", op)) + } +} + +func executeRead(db *bolt.DB, bucket []byte, key []byte, readInterval duration) (historyRecord, error) { + var rec historyRecord + err := db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + + initialVal := b.Get(key) + time.Sleep(randomDurationInRange(readInterval.min, readInterval.max)) + val := b.Get(key) + + if !bytes.Equal(initialVal, val) { + return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q", + string(key), formatBytes(initialVal), formatBytes(val)) + } + + clonedVal := make([]byte, len(val)) + copy(clonedVal, val) + + rec = historyRecord{ + OperationType: Read, + Bucket: string(bucket), + Key: string(key), + Value: clonedVal, + Txid: tx.ID(), + } + + return nil + }) + + return rec, err +} + +func executeWrite(db *bolt.DB, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) { + var rec historyRecord + + err := db.Update(func(tx *bolt.Tx) error { + if mrand.Intn(100) < noopWriteRatio { + // A no-op write transaction has two consequences: + // 1. The txid increases by 1; + // 2. Two meta pages point to the same root page. + rec = historyRecord{ + OperationType: Write, + Bucket: string(bucket), + Key: noopTxKey, + Value: nil, + Txid: tx.ID(), + } + return nil + } + + b := tx.Bucket(bucket) + + valueBytes := randomIntInRange(writeBytes.min, writeBytes.max) + v := make([]byte, valueBytes) + if _, cErr := crand.Read(v); cErr != nil { + return cErr + } + + putErr := b.Put(key, v) + if putErr == nil { + rec = historyRecord{ + OperationType: Write, + Bucket: string(bucket), + Key: string(key), + Value: v, + Txid: tx.ID(), + } + } + + return putErr + }) + + return rec, err +} + +func executeDelete(db *bolt.DB, bucket []byte, key []byte) (historyRecord, error) { + var rec historyRecord + + err := db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + + deleteErr := b.Delete(key) + if deleteErr == nil { + rec = historyRecord{ + OperationType: Delete, + Bucket: string(bucket), + Key: string(key), + Txid: tx.ID(), + } + } + + return deleteErr + }) + + return rec, err +} + +func randomDurationInRange(min, max time.Duration) time.Duration { + d := int64(max) - int64(min) + d = int64(mrand.Intn(int(d))) + int64(min) + return time.Duration(d) +} + +func randomIntInRange(min, max int) int { + return mrand.Intn(max-min) + min +} + +func formatBytes(val []byte) string { + if utf8.ValidString(string(val)) { + return string(val) + } + + return hex.EncodeToString(val) +} + +/* +********************************************************* +Functions for persisting test data, including db file +and operation history +********************************************************* +*/ +func saveDataIfFailed(t *testing.T, db *bolt.DB, rs historyRecords, force bool) { + if t.Failed() || force { + t.Log("Saving data...") + dbPath := db.Path() + if err := db.Close(); err != nil { + t.Errorf("Failed to close db: %v", err) + } + backupPath := testResultsDirectory(t) + backupDB(t, dbPath, backupPath) + persistHistoryRecords(t, rs, backupPath) + } +} + +func backupDB(t *testing.T, srcPath string, dstPath string) { + targetFile := filepath.Join(dstPath, "db.bak") + t.Logf("Saving the DB file to %s", targetFile) + err := copyFile(srcPath, targetFile) + require.NoError(t, err) + t.Logf("DB file saved to %s", targetFile) +} + +func copyFile(srcPath, dstPath string) error { + // Ensure source file exists. + _, err := os.Stat(srcPath) + if os.IsNotExist(err) { + return fmt.Errorf("source file %q not found", srcPath) + } else if err != nil { + return err + } + + // Ensure output file not exist. + _, err = os.Stat(dstPath) + if err == nil { + return fmt.Errorf("output file %q already exists", dstPath) + } else if !os.IsNotExist(err) { + return err + } + + srcDB, err := os.Open(srcPath) + if err != nil { + return fmt.Errorf("failed to open source file %q: %w", srcPath, err) + } + defer srcDB.Close() + dstDB, err := os.Create(dstPath) + if err != nil { + return fmt.Errorf("failed to create output file %q: %w", dstPath, err) + } + defer dstDB.Close() + written, err := io.Copy(dstDB, srcDB) + if err != nil { + return fmt.Errorf("failed to copy database file from %q to %q: %w", srcPath, dstPath, err) + } + + srcFi, err := srcDB.Stat() + if err != nil { + return fmt.Errorf("failed to get source file info %q: %w", srcPath, err) + } + initialSize := srcFi.Size() + if initialSize != written { + return fmt.Errorf("the byte copied (%q: %d) isn't equal to the initial db size (%q: %d)", dstPath, written, srcPath, initialSize) + } + + return nil +} + +func persistHistoryRecords(t *testing.T, rs historyRecords, path string) { + recordFilePath := filepath.Join(path, "history_records.json") + t.Logf("Saving history records to %s", recordFilePath) + recordFile, err := os.OpenFile(recordFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) + require.NoError(t, err) + defer recordFile.Close() + encoder := json.NewEncoder(recordFile) + for _, rec := range rs { + err := encoder.Encode(rec) + require.NoError(t, err) + } +} + +func testResultsDirectory(t *testing.T) string { + resultsDirectory, ok := os.LookupEnv("RESULTS_DIR") + var err error + if !ok { + resultsDirectory, err = os.MkdirTemp("", "*.db") + require.NoError(t, err) + } + resultsDirectory, err = filepath.Abs(resultsDirectory) + require.NoError(t, err) + + path, err := filepath.Abs(filepath.Join(resultsDirectory, strings.ReplaceAll(t.Name(), "/", "_"))) + require.NoError(t, err) + + err = os.RemoveAll(path) + require.NoError(t, err) + + err = os.MkdirAll(path, 0700) + require.NoError(t, err) + + return path +} + +/* +********************************************************* +Data structures and functions for analyzing history records +********************************************************* +*/ +type OperationType string + +const ( + Read OperationType = "read" + Write OperationType = "write" + Delete OperationType = "delete" +) + +type historyRecord struct { + OperationType OperationType `json:"operationType,omitempty"` + Txid int `json:"txid,omitempty"` + Bucket string `json:"bucket,omitempty"` + Key string `json:"key,omitempty"` + Value []byte `json:"value,omitempty"` +} + +type historyRecords []historyRecord + +func (rs historyRecords) Len() int { + return len(rs) +} + +func (rs historyRecords) Less(i, j int) bool { + // Sorted by (bucket, key) firstly: all records in the same + // (bucket, key) are grouped together. + bucketCmp := strings.Compare(rs[i].Bucket, rs[j].Bucket) + if bucketCmp != 0 { + return bucketCmp < 0 + } + keyCmp := strings.Compare(rs[i].Key, rs[j].Key) + if keyCmp != 0 { + return keyCmp < 0 + } + + // Sorted by txid + if rs[i].Txid != rs[j].Txid { + return rs[i].Txid < rs[j].Txid + } + + // Sorted by operation type: put `Read` after other operation types + // if they operate on the same (bucket, key) and have the same txid. + if rs[i].OperationType == Read { + return false + } + + return true +} + +func (rs historyRecords) Swap(i, j int) { + rs[i], rs[j] = rs[j], rs[i] +} + +func validateIncrementalTxid(rs historyRecords) error { + lastTxid := rs[0].Txid + + for i := 1; i < len(rs); i++ { + if (rs[i].OperationType == Read && rs[i].Txid < lastTxid) || (rs[i].OperationType != Read && rs[i].Txid <= lastTxid) { + return fmt.Errorf("detected non-incremental txid(%d, %d) in %s mode", lastTxid, rs[i].Txid, rs[i].OperationType) + } + lastTxid = rs[i].Txid + } + + return nil +} + +func validateSequential(rs historyRecords) error { + sort.Sort(rs) + + type bucketAndKey struct { + bucket string + key string + } + lastWriteKeyValueMap := make(map[bucketAndKey]*historyRecord) + + for _, rec := range rs { + bk := bucketAndKey{ + bucket: rec.Bucket, + key: rec.Key, + } + if v, ok := lastWriteKeyValueMap[bk]; ok { + if rec.OperationType == Write { + v.Txid = rec.Txid + if rec.Key != noopTxKey { + v.Value = rec.Value + } + } else if rec.OperationType == Delete { + delete(lastWriteKeyValueMap, bk) + } else { + if !bytes.Equal(v.Value, rec.Value) { + return fmt.Errorf("readOperation[txid: %d, bucket: %s, key: %s] read %x, \nbut writer[txid: %d] wrote %x", + rec.Txid, rec.Bucket, rec.Key, rec.Value, v.Txid, v.Value) + } + } + } else { + if rec.OperationType == Write && rec.Key != noopTxKey { + lastWriteKeyValueMap[bk] = &historyRecord{ + OperationType: Write, + Bucket: rec.Bucket, + Key: rec.Key, + Value: rec.Value, + Txid: rec.Txid, + } + } else if rec.OperationType == Read { + if len(rec.Value) != 0 { + return fmt.Errorf("expected the first readOperation[txid: %d, bucket: %s, key: %s] read nil, \nbut got %x", + rec.Txid, rec.Bucket, rec.Key, rec.Value) + } + } + } + } + + return nil +} + +/* +TestConcurrentRepeatableRead verifies repeatable read. The case +intentionally creates a scenario that read and write transactions +are interleaved. It performs several writing operations after starting +each long-running read transaction to ensure it has a larger txid +than previous read transaction. It verifies that bbolt correctly +releases free pages, and will not pollute (e.g. prematurely release) +any pages which are still being used by any read transaction. +*/ +func TestConcurrentRepeatableRead(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + testCases := []struct { + name string + noFreelistSync bool + freelistType bolt.FreelistType + }{ + // [array] freelist + { + name: "sync array freelist", + noFreelistSync: false, + freelistType: bolt.FreelistArrayType, + }, + { + name: "not sync array freelist", + noFreelistSync: true, + freelistType: bolt.FreelistArrayType, + }, + // [map] freelist + { + name: "sync map freelist", + noFreelistSync: false, + freelistType: bolt.FreelistMapType, + }, + { + name: "not sync map freelist", + noFreelistSync: true, + freelistType: bolt.FreelistMapType, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + + t.Log("Preparing db.") + var ( + bucket = []byte("data") + key = []byte("mykey") + + option = &bolt.Options{ + PageSize: 4096, + NoFreelistSync: tc.noFreelistSync, + FreelistType: tc.freelistType, + } + ) + + db := mustCreateDB(t, option) + defer func() { + // The db will be reopened later, so put `db.Close()` in a function + // to avoid premature evaluation of `db`. Note that the execution + // of a deferred function is deferred to the moment the surrounding + // function returns, but the function value and parameters to the + // call are evaluated as usual and saved anew. + db.Close() + }() + + // Create lots of K/V to allocate some pages + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists(bucket) + if err != nil { + return err + } + for i := 0; i < 1000; i++ { + k := fmt.Sprintf("key_%d", i) + if err := b.Put([]byte(k), make([]byte, 1024)); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + // Remove all K/V to create some free pages + err = db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + for i := 0; i < 1000; i++ { + k := fmt.Sprintf("key_%d", i) + if err := b.Delete([]byte(k)); err != nil { + return err + } + } + return b.Put(key, []byte("randomValue")) + }) + require.NoError(t, err) + + // bbolt will not release free pages directly after committing + // a writing transaction; instead all pages freed are putting + // into a pending list. Accordingly, the free pages might not + // be able to be reused by following writing transactions. So + // we reopen the db to completely release all free pages. + db = mustReOpenDB(t, db, option) + + var ( + wg sync.WaitGroup + longRunningReaderCount = 10 + stopCh = make(chan struct{}) + errCh = make(chan error, longRunningReaderCount) + readInterval = duration{5 * time.Millisecond, 10 * time.Millisecond} + + writeOperationCountInBetween = 5 + writeBytes = bytesRange{10, 20} + + testDuration = 10 * time.Second + ) + + for i := 0; i < longRunningReaderCount; i++ { + readWorkerName := fmt.Sprintf("reader_%d", i) + t.Logf("Starting long running read operation: %s", readWorkerName) + wg.Add(1) + go func() { + defer wg.Done() + rErr := executeLongRunningRead(t, readWorkerName, db, bucket, key, readInterval, stopCh) + if rErr != nil { + errCh <- rErr + } + }() + time.Sleep(500 * time.Millisecond) + + t.Logf("Perform %d write operations after starting a long running read operation", writeOperationCountInBetween) + for j := 0; j < writeOperationCountInBetween; j++ { + _, err := executeWrite(db, bucket, key, writeBytes, 0) + require.NoError(t, err) + } + } + + t.Log("Perform lots of write operations to check whether the long running read operations will read dirty data") + wg.Add(1) + go func() { + defer wg.Done() + cnt := longRunningReaderCount * writeOperationCountInBetween + for i := 0; i < cnt; i++ { + select { + case <-stopCh: + return + default: + } + _, err := executeWrite(db, bucket, key, writeBytes, 0) + require.NoError(t, err) + } + }() + + t.Log("Waiting for result") + select { + case err := <-errCh: + close(stopCh) + t.Errorf("Detected dirty read: %v", err) + case <-time.After(testDuration): + close(stopCh) + } + + wg.Wait() + }) + } +} + +func executeLongRunningRead(t *testing.T, name string, db *bolt.DB, bucket []byte, key []byte, readInterval duration, stopCh chan struct{}) error { + err := db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + + initialVal := b.Get(key) + + for { + select { + case <-stopCh: + t.Logf("%q finished.", name) + return nil + default: + } + + time.Sleep(randomDurationInRange(readInterval.min, readInterval.max)) + val := b.Get(key) + + if !bytes.Equal(initialVal, val) { + dirtyReadErr := fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q", + string(key), formatBytes(initialVal), formatBytes(val)) + return dirtyReadErr + } + } + }) + + return err +} diff --git a/go.mod b/go.mod index 511a392a0..a5cc44074 100644 --- a/go.mod +++ b/go.mod @@ -11,5 +11,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/sync v0.5.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index f0f96bf31..a2a5388d6 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= go.etcd.io/gofail v0.1.0 h1:XItAMIhOojXFQMgrxjnd2EIIHun/d5qL0Pf7FzVTkFg= go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=