diff --git a/internal/metamorphic/meta_test.go b/internal/metamorphic/meta_test.go index 3b4fd3bf57..02e9beb8d6 100644 --- a/internal/metamorphic/meta_test.go +++ b/internal/metamorphic/meta_test.go @@ -70,3 +70,24 @@ func TestMeta(t *testing.T) { metamorphic.RunAndCompare(t, runFlags.Dir, opts...) } } + +func TestMetaTwoInstance(t *testing.T) { + switch { + case runOnceFlags.Compare != "": + runDirs := strings.Split(runOnceFlags.Compare, ",") + onceOpts := runOnceFlags.MakeRunOnceOptions() + metamorphic.Compare(t, runOnceFlags.Dir, runOnceFlags.Seed, runDirs, onceOpts...) + + case runOnceFlags.RunDir != "": + // The --run-dir flag is specified either in the child process (see + // runOptions() below) or the user specified it manually in order to re-run + // a test. + onceOpts := runOnceFlags.MakeRunOnceOptions() + metamorphic.RunOnce(t, runOnceFlags.RunDir, runOnceFlags.Seed, filepath.Join(runOnceFlags.RunDir, "history"), onceOpts...) + + default: + opts := runFlags.MakeRunOptions() + opts = append(opts, metamorphic.MultiInstance(2)) + metamorphic.RunAndCompare(t, runFlags.Dir, opts...) + } +} diff --git a/internal/metamorphic/metaflags/meta_flags.go b/internal/metamorphic/metaflags/meta_flags.go index 331d371b9d..28bdf1b0a9 100644 --- a/internal/metamorphic/metaflags/meta_flags.go +++ b/internal/metamorphic/metaflags/meta_flags.go @@ -35,6 +35,9 @@ type CommonFlags struct { Keep bool // MaxThreads used by a single run. See "max-threads" flag below. MaxThreads int + // NumInstances is the number of Pebble instances to create in one run. See + // "num-instances" flag below. + NumInstances int } func initCommonFlags() *CommonFlags { @@ -65,6 +68,8 @@ func initCommonFlags() *CommonFlags { flag.IntVar(&c.MaxThreads, "max-threads", math.MaxInt, "limit execution of a single run to the provided number of threads; must be ≥ 1") + flag.IntVar(&c.NumInstances, "num-instances", 1, "number of pebble instances to create (default: 1)") + return c } @@ -179,6 +184,9 @@ func (ro *RunOnceFlags) MakeRunOnceOptions() []metamorphic.RunOnceOption { if ro.ErrorRate > 0 { onceOpts = append(onceOpts, metamorphic.InjectErrorsRate(ro.ErrorRate)) } + if ro.NumInstances > 1 { + onceOpts = append(onceOpts, metamorphic.MultiInstance(ro.NumInstances)) + } return onceOpts } @@ -204,6 +212,9 @@ func (r *RunFlags) MakeRunOptions() []metamorphic.RunOption { if r.PreviousOps != "" { opts = append(opts, metamorphic.ExtendPreviousRun(r.PreviousOps, r.InitialStatePath, r.InitialStateDesc)) } + if r.NumInstances > 1 { + opts = append(opts, metamorphic.MultiInstance(r.NumInstances)) + } // If the filesystem type was forced, all tests will use that value. switch r.FS { diff --git a/metamorphic/config.go b/metamorphic/config.go index 594d85b16d..9d06843726 100644 --- a/metamorphic/config.go +++ b/metamorphic/config.go @@ -38,6 +38,7 @@ const ( newIterUsingClone newSnapshot readerGet + replicate snapshotClose writerApply writerDelete @@ -65,6 +66,10 @@ type config struct { // skewing towards most recent timestamps. writeSuffixDist randvar.Dynamic + // numInstances defines the number of pebble instances created for this + // metamorphic test run. + numInstances int + // TODO(peter): unimplemented // keyDist randvar.Dynamic // keySizeDist randvar.Static @@ -107,6 +112,8 @@ var presetConfigs = []config{ withOpWeight(writerMerge, 0), } +var multiInstancePresetConfig = multiInstanceConfig() + func defaultConfig() config { return config{ // dbClose is not in this list since it is deterministically generated once, at the end of the test. @@ -139,6 +146,7 @@ func defaultConfig() config { newIterUsingClone: 5, newSnapshot: 10, readerGet: 100, + replicate: 0, snapshotClose: 10, writerApply: 10, writerDelete: 100, @@ -161,6 +169,19 @@ func defaultConfig() config { } } +func multiInstanceConfig() config { + cfg := defaultConfig() + cfg.ops[replicate] = 5 + cfg.ops[writerSingleDelete] = 0 + cfg.ops[writerMerge] = 0 + // TODO(bilal): The disabled operations below should also be supported + // in the two-instance test, once they're updated to work in multi-instance + // mode. + cfg.ops[newSnapshot] = 0 + cfg.ops[snapshotClose] = 0 + return cfg +} + func mustDynamic(dyn randvar.Dynamic, err error) randvar.Dynamic { if err != nil { panic(err) diff --git a/metamorphic/generator.go b/metamorphic/generator.go index 2237ae5a57..5428c779d2 100644 --- a/metamorphic/generator.go +++ b/metamorphic/generator.go @@ -56,6 +56,7 @@ type generator struct { // keyManager tracks the state of keys a operation generation time. keyManager *keyManager + dbs objIDSlice // Unordered sets of object IDs for live objects. Used to randomly select on // object when generating an operation. There are 4 concrete objects: the DB // (of which there is exactly 1), batches, iterators, and snapshots. @@ -83,6 +84,9 @@ type generator struct { // iterators. The iter set value will also be indexed by either the batches // or snapshots maps. iters map[objID]objIDSet + // objectID -> db: used to keep track of the DB a batch, iter, or snapshot + // was created on. + objDB map[objID]objID // readerID -> reader iters: used to keep track of the open iterators on a // reader. The iter set value will also be indexed by either the batches or // snapshots maps. This map is the union of batches and snapshots maps. @@ -103,10 +107,12 @@ func newGenerator(rng *rand.Rand, cfg config, km *keyManager) *generator { g := &generator{ cfg: cfg, rng: rng, - init: &initOp{}, + init: &initOp{dbSlots: uint32(cfg.numInstances)}, keyManager: km, - liveReaders: objIDSlice{makeObjID(dbTag, 0)}, - liveWriters: objIDSlice{makeObjID(dbTag, 0)}, + liveReaders: objIDSlice{makeObjID(dbTag, 1)}, + liveWriters: objIDSlice{makeObjID(dbTag, 1)}, + dbs: objIDSlice{makeObjID(dbTag, 1)}, + objDB: make(map[objID]objID), batches: make(map[objID]objIDSet), iters: make(map[objID]objIDSet), readers: make(map[objID]objIDSet), @@ -116,6 +122,11 @@ func newGenerator(rng *rand.Rand, cfg config, km *keyManager) *generator { iterCreationTimestamp: make(map[objID]int), iterReaderID: make(map[objID]objID), } + for i := 1; i < cfg.numInstances; i++ { + g.liveReaders = append(g.liveReaders, makeObjID(dbTag, uint32(i+1))) + g.liveWriters = append(g.liveWriters, makeObjID(dbTag, uint32(i+1))) + g.dbs = append(g.dbs, makeObjID(dbTag, uint32(i+1))) + } // Note that the initOp fields are populated during generation. g.ops = append(g.ops, g.init) return g @@ -153,6 +164,7 @@ func generate(rng *rand.Rand, count uint64, cfg config, km *keyManager) []op { newIterUsingClone: g.newIterUsingClone, newSnapshot: g.newSnapshot, readerGet: g.readerGet, + replicate: g.replicate, snapshotClose: g.snapshotClose, writerApply: g.writerApply, writerDelete: g.writerDelete, @@ -431,8 +443,11 @@ func (g *generator) newBatch() { g.init.batchSlots++ g.liveBatches = append(g.liveBatches, batchID) g.liveWriters = append(g.liveWriters, batchID) + dbID := g.dbs.rand(g.rng) + g.objDB[batchID] = dbID g.add(&newBatchOp{ + dbID: dbID, batchID: batchID, }) } @@ -447,8 +462,11 @@ func (g *generator) newIndexedBatch() { iters := make(objIDSet) g.batches[batchID] = iters g.readers[batchID] = iters + dbID := g.dbs.rand(g.rng) + g.objDB[batchID] = dbID g.add(&newIndexedBatchOp{ + dbID: dbID, batchID: batchID, }) } @@ -470,7 +488,7 @@ func (g *generator) removeBatchFromGenerator(batchID objID) { for _, id := range iters.sorted() { g.liveIters.remove(id) delete(g.iters, id) - g.add(&closeOp{objID: id}) + g.add(&closeOp{objID: id, derivedDBID: g.objDB[batchID]}) } } @@ -482,7 +500,7 @@ func (g *generator) batchAbort() { batchID := g.liveBatches.rand(g.rng) g.removeBatchFromGenerator(batchID) - g.add(&closeOp{objID: batchID}) + g.add(&closeOp{objID: batchID, derivedDBID: g.objDB[batchID]}) } func (g *generator) batchCommit() { @@ -491,11 +509,13 @@ func (g *generator) batchCommit() { } batchID := g.liveBatches.rand(g.rng) + dbID := g.objDB[batchID] g.removeBatchFromGenerator(batchID) g.add(&batchCommitOp{ + dbID: dbID, batchID: batchID, }) - g.add(&closeOp{objID: batchID}) + g.add(&closeOp{objID: batchID, derivedDBID: dbID}) } @@ -510,10 +530,15 @@ func (g *generator) dbClose() { } for len(g.liveBatches) > 0 { batchID := g.liveBatches[0] + dbID := g.objDB[batchID] g.removeBatchFromGenerator(batchID) - g.add(&closeOp{objID: batchID}) + g.add(&closeOp{objID: batchID, derivedDBID: dbID}) + } + for len(g.dbs) > 0 { + db := g.dbs[0] + g.dbs = g.dbs[1:] + g.add(&closeOp{objID: db}) } - g.add(&closeOp{objID: makeObjID(dbTag, 0)}) } func (g *generator) dbCheckpoint() { @@ -549,7 +574,9 @@ func (g *generator) dbCompact() { if g.cmp(start, end) > 0 { start, end = end, start } + dbID := g.dbs.rand(g.rng) g.add(&compactOp{ + dbID: dbID, start: start, end: end, parallelize: g.rng.Float64() < 0.5, @@ -557,7 +584,7 @@ func (g *generator) dbCompact() { } func (g *generator) dbFlush() { - g.add(&flushOp{}) + g.add(&flushOp{dbObjID}) } func (g *generator) dbRatchetFormatMajorVersion() { @@ -566,14 +593,16 @@ func (g *generator) dbRatchetFormatMajorVersion() { // version may be behind the database's format major version, in which case // RatchetFormatMajorVersion should deterministically error. + dbID := g.dbs.rand(g.rng) n := int(newestFormatMajorVersionToTest - minimumFormatMajorVersion) vers := pebble.FormatMajorVersion(g.rng.Intn(n+1)) + minimumFormatMajorVersion - g.add(&dbRatchetFormatMajorVersionOp{vers: vers}) + g.add(&dbRatchetFormatMajorVersionOp{dbID: dbID, vers: vers}) } func (g *generator) dbRestart() { // Close any live iterators and snapshots, so that we can close the DB // cleanly. + dbID := g.dbs.rand(g.rng) for len(g.liveIters) > 0 { g.randIter(g.iterClose)() } @@ -583,14 +612,15 @@ func (g *generator) dbRestart() { // Close the batches. for len(g.liveBatches) > 0 { batchID := g.liveBatches[0] + dbID := g.objDB[batchID] g.removeBatchFromGenerator(batchID) - g.add(&closeOp{objID: batchID}) + g.add(&closeOp{objID: batchID, derivedDBID: dbID}) } - if len(g.liveReaders) != 1 || len(g.liveWriters) != 1 { + if len(g.liveReaders) != len(g.dbs) || len(g.liveWriters) != len(g.dbs) { panic(fmt.Sprintf("unexpected counts: liveReaders %d, liveWriters: %d", len(g.liveReaders), len(g.liveWriters))) } - g.add(&dbRestartOp{}) + g.add(&dbRestartOp{dbID: dbID}) } // maybeSetSnapshotIterBounds must be called whenever creating a new iterator or @@ -650,6 +680,7 @@ func (g *generator) newIter() { // closes. } g.iterReaderID[iterID] = readerID + g.deriveDB(iterID, readerID) var opts iterOpts if !g.maybeSetSnapshotIterBounds(readerID, &opts) { @@ -690,10 +721,16 @@ func (g *generator) newIter() { g.itersLastOpts[iterID] = opts g.iterCreationTimestamp[iterID] = g.keyManager.nextMetaTimestamp() g.iterReaderID[iterID] = readerID + var derivedDBID objID + if readerID.tag() == batchTag { + g.deriveDB(iterID, readerID) + derivedDBID = g.objDB[iterID] + } g.add(&newIterOp{ - readerID: readerID, - iterID: iterID, - iterOpts: opts, + readerID: readerID, + iterID: iterID, + iterOpts: opts, + derivedDBID: derivedDBID, }) } @@ -715,6 +752,14 @@ func (g *generator) randKeyTypesAndMask() (keyTypes uint32, maskSuffix []byte) { return keyTypes, maskSuffix } +func (g *generator) deriveDB(readerID, parentID objID) { + dbParentID := parentID + if dbParentID.tag() != dbTag { + dbParentID = g.objDB[dbParentID] + } + g.objDB[readerID] = dbParentID +} + func (g *generator) newIterUsingClone() { if len(g.liveIters) == 0 { return @@ -733,6 +778,7 @@ func (g *generator) newIterUsingClone() { } readerID := g.iterReaderID[existingIterID] g.iterReaderID[iterID] = readerID + g.deriveDB(iterID, readerID) var refreshBatch bool if readerID.tag() == batchTag { @@ -769,7 +815,7 @@ func (g *generator) iterClose(iterID objID) { // closes. } - g.add(&closeOp{objID: iterID}) + g.add(&closeOp{objID: iterID, derivedDBID: g.objDB[iterID]}) } func (g *generator) iterSetBounds(iterID objID) { @@ -1097,7 +1143,44 @@ func (g *generator) readerGet() { } else { key = g.randKeyToRead(0.001) // 0.1% new keys } - g.add(&getOp{readerID: readerID, key: key}) + derivedDBID := objID(0) + if dbID, ok := g.objDB[readerID]; ok && readerID.tag() == batchTag { + derivedDBID = dbID + } else if readerID.tag() == snapTag { + // TODO(bilal): This is legacy behaviour as snapshots aren't supported in + // two-instance mode yet. Track snapshots in g.objDB and objToDB and remove this + // conditional. + derivedDBID = dbObjID + } + g.add(&getOp{readerID: readerID, key: key, derivedDBID: derivedDBID}) +} + +func (g *generator) replicate() { + if len(g.dbs) < 2 { + return + } + + source := g.dbs.rand(g.rng) + dest := source + for dest == source { + dest = g.dbs.rand(g.rng) + } + + var startKey, endKey []byte + startKey = g.randKeyToRead(0.001) // 0.1% new keys + endKey = g.randKeyToRead(0.001) // 0.1% new keys + for g.cmp(startKey, endKey) == 0 { + endKey = g.randKeyToRead(0.01) // 1% new keys + } + if g.cmp(startKey, endKey) > 0 { + startKey, endKey = endKey, startKey + } + g.add(&replicateOp{ + source: source, + dest: dest, + start: startKey, + end: endKey, + }) } // generateDisjointKeyRanges generates n disjoint key ranges. @@ -1165,7 +1248,7 @@ func (g *generator) snapshotClose() { for _, id := range iters.sorted() { g.liveIters.remove(id) delete(g.iters, id) - g.add(&closeOp{objID: id}) + g.add(&closeOp{objID: id, derivedDBID: g.objDB[id]}) } g.add(&closeOp{objID: snapID}) @@ -1180,11 +1263,19 @@ func (g *generator) writerApply() { } batchID := g.liveBatches.rand(g.rng) + dbID := g.objDB[batchID] var writerID objID for { + // NB: The writer we're applying to, as well as the batch we're applying, + // must be from the same DB. The writer could be the db itself. Applying + // a batch from one DB on another DB results in a panic, so avoid that. writerID = g.liveWriters.rand(g.rng) - if writerID != batchID { + writerDBID := writerID + if writerID.tag() != dbTag { + writerDBID = g.objDB[writerID] + } + if writerID != batchID && writerDBID == dbID { break } } @@ -1196,7 +1287,8 @@ func (g *generator) writerApply() { batchID: batchID, }) g.add(&closeOp{ - batchID, + objID: batchID, + derivedDBID: dbID, }) } @@ -1206,9 +1298,14 @@ func (g *generator) writerDelete() { } writerID := g.liveWriters.rand(g.rng) + derivedDBID := writerID + if derivedDBID.tag() != dbTag { + derivedDBID = g.objDB[writerID] + } g.add(&deleteOp{ - writerID: writerID, - key: g.randKeyToWrite(0.001), // 0.1% new keys + writerID: writerID, + key: g.randKeyToWrite(0.001), // 0.1% new keys + derivedDBID: derivedDBID, }) } @@ -1303,6 +1400,7 @@ func (g *generator) writerIngest() { // we can tolerate failure or not, and if the ingestOp encounters a // failure, it would retry after splitting into single batch ingests. + dbID := g.dbs.rand(g.rng) // Ingest between 1 and 3 batches. batchIDs := make([]objID, 0, 1+g.rng.Intn(3)) canFail := cap(batchIDs) > 1 @@ -1326,8 +1424,14 @@ func (g *generator) writerIngest() { g.removeBatchFromGenerator(batchID) batchIDs = append(batchIDs, batchID) } + derivedDBIDs := make([]objID, len(batchIDs)) + for i := range batchIDs { + derivedDBIDs[i] = g.objDB[batchIDs[i]] + } g.add(&ingestOp{ - batchIDs: batchIDs, + dbID: dbID, + batchIDs: batchIDs, + derivedDBIDs: derivedDBIDs, }) } diff --git a/metamorphic/generator_test.go b/metamorphic/generator_test.go index d68111c160..495a944dfa 100644 --- a/metamorphic/generator_test.go +++ b/metamorphic/generator_test.go @@ -5,6 +5,7 @@ package metamorphic import ( + "fmt" "testing" "time" @@ -128,29 +129,42 @@ func TestGenerator(t *testing.T) { func TestGeneratorRandom(t *testing.T) { seed := uint64(time.Now().UnixNano()) ops := randvar.NewUniform(1000, 10000) - generateFromSeed := func() string { + cfgs := []string{"default", "multiInstance"} + generateFromSeed := func(cfg config) string { rng := rand.New(rand.NewSource(seed)) count := ops.Uint64(rng) - return formatOps(generate(rng, count, defaultConfig(), newKeyManager())) + return formatOps(generate(rng, count, cfg, newKeyManager())) } - // Ensure that generate doesn't use any other source of randomness other - // than rng. - referenceOps := generateFromSeed() - for i := 0; i < 10; i++ { - regeneratedOps := generateFromSeed() - diff, err := difflib.GetUnifiedDiffString(difflib.UnifiedDiff{ - A: difflib.SplitLines(referenceOps), - B: difflib.SplitLines(regeneratedOps), - Context: 1, + for i := range cfgs { + t.Run(fmt.Sprintf("config=%s", cfgs[i]), func(t *testing.T) { + cfg := defaultConfig + if cfgs[i] == "multiInstance" { + cfg = func() config { + cfg := multiInstanceConfig() + cfg.numInstances = 2 + return cfg + } + } + // Ensure that generate doesn't use any other source of randomness other + // than rng. + referenceOps := generateFromSeed(cfg()) + for i := 0; i < 10; i++ { + regeneratedOps := generateFromSeed(cfg()) + diff, err := difflib.GetUnifiedDiffString(difflib.UnifiedDiff{ + A: difflib.SplitLines(referenceOps), + B: difflib.SplitLines(regeneratedOps), + Context: 1, + }) + require.NoError(t, err) + if len(diff) > 0 { + t.Fatalf("Diff:\n%s", diff) + } + } + if testing.Verbose() { + t.Logf("\nOps:\n%s", referenceOps) + } }) - require.NoError(t, err) - if len(diff) > 0 { - t.Fatalf("Diff:\n%s", diff) - } - } - if testing.Verbose() { - t.Logf("\nOps:\n%s", referenceOps) } } diff --git a/metamorphic/key_manager.go b/metamorphic/key_manager.go index 4728d47326..89a6393fab 100644 --- a/metamorphic/key_manager.go +++ b/metamorphic/key_manager.go @@ -207,7 +207,7 @@ func (k *keyManager) nextMetaTimestamp() int { return ret } -var dbObjID objID = makeObjID(dbTag, 0) +var dbObjID objID = makeObjID(dbTag, 1) // newKeyManager returns a pointer to a new keyManager. Callers should // interact with this using addNewKey, eligible*Keys, update, @@ -542,6 +542,8 @@ func opWrittenKeys(untypedOp op) [][]byte { return [][]byte{t.key} case *singleDeleteOp: return [][]byte{t.key} + case *replicateOp: + return [][]byte{t.start, t.end} } return nil } diff --git a/metamorphic/key_manager_test.go b/metamorphic/key_manager_test.go index 51e6006ce1..059ffe75c1 100644 --- a/metamorphic/key_manager_test.go +++ b/metamorphic/key_manager_test.go @@ -14,8 +14,8 @@ func TestObjKey(t *testing.T) { want string }{ { - key: makeObjKey(makeObjID(dbTag, 0), []byte("foo")), - want: "db:foo", + key: makeObjKey(makeObjID(dbTag, 1), []byte("foo")), + want: "db1:foo", }, { key: makeObjKey(makeObjID(batchTag, 1), []byte("bar")), @@ -31,7 +31,7 @@ func TestObjKey(t *testing.T) { } func TestGlobalStateIndicatesEligibleForSingleDelete(t *testing.T) { - key := makeObjKey(makeObjID(dbTag, 0), []byte("foo")) + key := makeObjKey(makeObjID(dbTag, 1), []byte("foo")) testCases := []struct { meta keyMeta want bool @@ -221,7 +221,7 @@ func TestKeyManager_GetOrInit(t *testing.T) { m := newKeyManager() require.NotContains(t, m.byObjKey, o.String()) require.NotContains(t, m.byObj, id) - require.Contains(t, m.byObj, makeObjID(dbTag, 0)) // Always contains the DB key. + require.Contains(t, m.byObj, makeObjID(dbTag, 1)) // Always contains the DB key. meta1 := m.getOrInit(id, key) require.Contains(t, m.byObjKey, o.String()) @@ -233,7 +233,7 @@ func TestKeyManager_GetOrInit(t *testing.T) { } func TestKeyManager_Contains(t *testing.T) { - id := makeObjID(dbTag, 0) + id := makeObjID(dbTag, 1) key := []byte("foo") m := newKeyManager() @@ -245,7 +245,7 @@ func TestKeyManager_Contains(t *testing.T) { func TestKeyManager_MergeInto(t *testing.T) { fromID := makeObjID(batchTag, 1) - toID := makeObjID(dbTag, 0) + toID := makeObjID(dbTag, 1) m := newKeyManager() diff --git a/metamorphic/meta.go b/metamorphic/meta.go index 71eb766cda..480d55ccdb 100644 --- a/metamorphic/meta.go +++ b/metamorphic/meta.go @@ -18,6 +18,7 @@ import ( "path/filepath" "regexp" "sort" + "strconv" "testing" "time" @@ -42,6 +43,7 @@ type runAndCompareOptions struct { innerBinary string mutateTestOptions []func(*TestOptions) customRuns map[string]string + numInstances int runOnceOptions } @@ -177,6 +179,13 @@ func RunAndCompare(t *testing.T, rootDir string, rOpts ...RunOption) { require.NoError(t, err) loadPrecedingKeys(t, ops, &cfg, km) } + if runOpts.numInstances > 1 { + // The multi-instance variant does not support all operations yet. + // + // TODO(bilal): Address this and use the default configs. + cfg = multiInstancePresetConfig + cfg.numInstances = runOpts.numInstances + } ops := generate(rng, opCount, cfg, km) opsPath := filepath.Join(metaDir, "ops") formattedOps := formatOps(ops) @@ -204,6 +213,9 @@ func RunAndCompare(t *testing.T, rootDir string, rOpts ...RunOption) { "-run-dir=" + runDir, "-test.run=" + t.Name() + "$", } + if runOpts.numInstances > 1 { + args = append(args, "--num-instances="+strconv.Itoa(runOpts.numInstances)) + } if runOpts.traceFile != "" { args = append(args, "-test.trace="+filepath.Join(runDir, runOpts.traceFile)) } @@ -343,6 +355,7 @@ type runOnceOptions struct { maxThreads int errorRate float64 failRegexp *regexp.Regexp + numInstances int customOptionParsers map[string]func(string) (CustomOption, bool) } @@ -383,6 +396,12 @@ type FailOnMatch struct { func (f FailOnMatch) apply(ro *runAndCompareOptions) { ro.failRegexp = f.Regexp } func (f FailOnMatch) applyOnce(ro *runOnceOptions) { ro.failRegexp = f.Regexp } +// MultiInstance configures the number of pebble instances to create. +type MultiInstance int + +func (m MultiInstance) apply(ro *runAndCompareOptions) { ro.numInstances = int(m) } +func (m MultiInstance) applyOnce(ro *runOnceOptions) { ro.numInstances = int(m) } + // RunOnce performs one run of the metamorphic tests. RunOnce expects the // directory named by `runDir` to already exist and contain an `OPTIONS` file // containing the test run's configuration. The history of the run is persisted @@ -458,7 +477,13 @@ func RunOnce(t TestingT, runDir string, seed uint64, historyPath string, rOpts . )) if opts.WALDir != "" { - opts.WALDir = opts.FS.PathJoin(runDir, opts.WALDir) + if runOpts.numInstances > 1 { + // TODO(bilal): Allow opts to diverge on a per-instance basis, and use + // that to set unique WAL dirs for all instances in multi-instance mode. + opts.WALDir = "" + } else { + opts.WALDir = opts.FS.PathJoin(runDir, opts.WALDir) + } } historyFile, err := os.Create(historyPath) @@ -472,7 +497,7 @@ func RunOnce(t TestingT, runDir string, seed uint64, historyPath string, rOpts . h := newHistory(runOpts.failRegexp, writers...) m := newTest(ops) - require.NoError(t, m.init(h, dir, testOpts)) + require.NoError(t, m.init(h, dir, testOpts, runOpts.numInstances)) if threads <= 1 { for m.step(h) { diff --git a/metamorphic/ops.go b/metamorphic/ops.go index 1ab5216cb7..c1509150c0 100644 --- a/metamorphic/ops.go +++ b/metamorphic/ops.go @@ -6,10 +6,12 @@ package metamorphic import ( "bytes" + "context" "crypto/rand" "encoding/binary" "fmt" "io" + "path" "path/filepath" "strings" @@ -18,10 +20,10 @@ import ( "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/private" + "github.com/cockroachdb/pebble/internal/rangekey" "github.com/cockroachdb/pebble/internal/testkeys" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" "github.com/cockroachdb/pebble/sstable" - "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/pebble/vfs/errorfs" ) @@ -46,6 +48,7 @@ type op interface { // initOp performs test initialization type initOp struct { + dbSlots uint32 batchSlots uint32 iterSlots uint32 snapshotSlots uint32 @@ -59,12 +62,19 @@ func (o *initOp) run(t *test, h historyRecorder) { } func (o *initOp) String() string { - return fmt.Sprintf("Init(%d /* batches */, %d /* iters */, %d /* snapshots */)", - o.batchSlots, o.iterSlots, o.snapshotSlots) + return fmt.Sprintf("Init(%d /* dbs */, %d /* batches */, %d /* iters */, %d /* snapshots */)", + o.dbSlots, o.batchSlots, o.iterSlots, o.snapshotSlots) } -func (o *initOp) receiver() objID { return dbObjID } -func (o *initOp) syncObjs() objIDSlice { return nil } +func (o *initOp) receiver() objID { return makeObjID(dbTag, 1) } +func (o *initOp) syncObjs() objIDSlice { + syncObjs := make([]objID, 0) + // Add any additional DBs to syncObjs. + for i := uint32(2); i < o.dbSlots+1; i++ { + syncObjs = append(syncObjs, makeObjID(dbTag, i)) + } + return syncObjs +} // applyOp models a Writer.Apply operation. type applyOp struct { @@ -135,7 +145,7 @@ func (o *checkpointOp) String() string { } fmt.Fprintf(&spanStr, "%q,%q", span.Start, span.End) } - return fmt.Sprintf("db.Checkpoint(%s)", spanStr.String()) + return fmt.Sprintf("db1.Checkpoint(%s)", spanStr.String()) } func (o *checkpointOp) receiver() objID { return dbObjID } @@ -143,7 +153,8 @@ func (o *checkpointOp) syncObjs() objIDSlice { return nil } // closeOp models a {Batch,Iterator,Snapshot}.Close operation. type closeOp struct { - objID objID + objID objID + derivedDBID objID } func (o *closeOp) run(t *test, h historyRecorder) { @@ -152,7 +163,7 @@ func (o *closeOp) run(t *test, h historyRecorder) { // Special case: If WAL is disabled, do a flush right before DB Close. This // allows us to reuse this run's data directory as initial state for // future runs without losing any mutations. - _ = t.db.Flush() + _ = t.getDB(o.objID).Flush() } t.clearObj(o.objID) err := c.Close() @@ -166,14 +177,20 @@ func (o *closeOp) syncObjs() objIDSlice { // all its iterators, snapshots and batches are closed. // TODO(jackson): It would be nice to relax this so that Close calls can // execute in parallel. - if o.objID == dbObjID { + if o.objID.tag() == dbTag { return nil } + if o.derivedDBID != 0 { + return []objID{o.derivedDBID} + } + // TODO(bilal): Once readers on snapshots are tracked correctly, return nil + // in the case below. return []objID{dbObjID} } // compactOp models a DB.Compact operation. type compactOp struct { + dbID objID start []byte end []byte parallelize bool @@ -181,28 +198,30 @@ type compactOp struct { func (o *compactOp) run(t *test, h historyRecorder) { err := withRetries(func() error { - return t.db.Compact(o.start, o.end, o.parallelize) + return t.getDB(o.dbID).Compact(o.start, o.end, o.parallelize) }) h.Recordf("%s // %v", o, err) } func (o *compactOp) String() string { - return fmt.Sprintf("db.Compact(%q, %q, %t /* parallelize */)", o.start, o.end, o.parallelize) + return fmt.Sprintf("%s.Compact(%q, %q, %t /* parallelize */)", o.dbID, o.start, o.end, o.parallelize) } -func (o *compactOp) receiver() objID { return dbObjID } +func (o *compactOp) receiver() objID { return o.dbID } func (o *compactOp) syncObjs() objIDSlice { return nil } // deleteOp models a Write.Delete operation. type deleteOp struct { writerID objID key []byte + + derivedDBID objID } func (o *deleteOp) run(t *test, h historyRecorder) { w := t.getWriter(o.writerID) var err error - if t.testOpts.deleteSized && t.isFMV(pebble.FormatDeleteSizedAndObsolete) { + if t.testOpts.deleteSized && t.isFMV(o.derivedDBID, pebble.FormatDeleteSizedAndObsolete) { // Call DeleteSized with a deterministic size derived from the index. // The size does not need to be accurate for correctness. err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts) @@ -275,15 +294,17 @@ func (o *deleteRangeOp) syncObjs() objIDSlice { return nil } // flushOp models a DB.Flush operation. type flushOp struct { + db objID } func (o *flushOp) run(t *test, h historyRecorder) { - err := t.db.Flush() + db := t.getDB(dbObjID) + err := db.Flush() h.Recordf("%s // %v", o, err) } -func (o *flushOp) String() string { return "db.Flush()" } -func (o *flushOp) receiver() objID { return dbObjID } +func (o *flushOp) String() string { return fmt.Sprintf("%s.Flush()", o.db) } +func (o *flushOp) receiver() objID { return o.db } func (o *flushOp) syncObjs() objIDSlice { return nil } // mergeOp models a Write.Merge operation. @@ -387,17 +408,18 @@ func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil } // newBatchOp models a Write.NewBatch operation. type newBatchOp struct { + dbID objID batchID objID } func (o *newBatchOp) run(t *test, h historyRecorder) { - b := t.db.NewBatch() + b := t.getDB(o.dbID).NewBatch() t.setBatch(o.batchID, b) h.Recordf("%s", o) } -func (o *newBatchOp) String() string { return fmt.Sprintf("%s = db.NewBatch()", o.batchID) } -func (o *newBatchOp) receiver() objID { return dbObjID } +func (o *newBatchOp) String() string { return fmt.Sprintf("%s = %s.NewBatch()", o.batchID, o.dbID) } +func (o *newBatchOp) receiver() objID { return o.dbID } func (o *newBatchOp) syncObjs() objIDSlice { // NewBatch should not be concurrent with operations that interact with that // same batch. @@ -406,19 +428,20 @@ func (o *newBatchOp) syncObjs() objIDSlice { // newIndexedBatchOp models a Write.NewIndexedBatch operation. type newIndexedBatchOp struct { + dbID objID batchID objID } func (o *newIndexedBatchOp) run(t *test, h historyRecorder) { - b := t.db.NewIndexedBatch() + b := t.getDB(o.dbID).NewIndexedBatch() t.setBatch(o.batchID, b) h.Recordf("%s", o) } func (o *newIndexedBatchOp) String() string { - return fmt.Sprintf("%s = db.NewIndexedBatch()", o.batchID) + return fmt.Sprintf("%s = %s.NewIndexedBatch()", o.batchID, o.dbID) } -func (o *newIndexedBatchOp) receiver() objID { return dbObjID } +func (o *newIndexedBatchOp) receiver() objID { return o.dbID } func (o *newIndexedBatchOp) syncObjs() objIDSlice { // NewIndexedBatch should not be concurrent with operations that interact // with that same batch. @@ -427,6 +450,7 @@ func (o *newIndexedBatchOp) syncObjs() objIDSlice { // batchCommitOp models a Batch.Commit operation. type batchCommitOp struct { + dbID objID batchID objID } @@ -440,26 +464,29 @@ func (o *batchCommitOp) String() string { return fmt.Sprintf("%s.Commit()", o.b func (o *batchCommitOp) receiver() objID { return o.batchID } func (o *batchCommitOp) syncObjs() objIDSlice { // Synchronize on the database so that NewIters wait for the commit. - return []objID{dbObjID} + return []objID{o.dbID} } // ingestOp models a DB.Ingest operation. type ingestOp struct { + dbID objID batchIDs []objID + + derivedDBIDs []objID } func (o *ingestOp) run(t *test, h historyRecorder) { // We can only use apply as an alternative for ingestion if we are ingesting // a single batch. If we are ingesting multiple batches, the batches may // overlap which would cause ingestion to fail but apply would succeed. - if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 { + if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 && o.derivedDBIDs[0] == o.dbID { id := o.batchIDs[0] b := t.getBatch(id) iter, rangeDelIter, rangeKeyIter := private.BatchSort(b) - c, err := o.collapseBatch(t, iter, rangeDelIter, rangeKeyIter) + db := t.getDB(o.dbID) + c, err := o.collapseBatch(t, db, iter, rangeDelIter, rangeKeyIter) if err == nil { - w := t.getWriter(makeObjID(dbTag, 0)) - err = w.Apply(c, t.writeOpts) + err = db.Apply(c, t.writeOpts) } _ = b.Close() _ = c.Close() @@ -485,25 +512,25 @@ func (o *ingestOp) run(t *test, h historyRecorder) { } err = firstError(err, withRetries(func() error { - return t.db.Ingest(paths) + return t.getDB(o.dbID).Ingest(paths) })) h.Recordf("%s // %v", o, err) } func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (string, error) { - rootFS := vfs.Root(t.opts.FS) - path := rootFS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d", i)) - f, err := rootFS.Create(path) + path := t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", o.dbID.slot(), i)) + f, err := t.opts.FS.Create(path) if err != nil { return "", err } + db := t.getDB(o.dbID) iter, rangeDelIter, rangeKeyIter := private.BatchSort(b) defer closeIters(iter, rangeDelIter, rangeKeyIter) equal := t.opts.Comparer.Equal - tableFormat := t.db.FormatMajorVersion().MaxTableFormat() + tableFormat := db.FormatMajorVersion().MaxTableFormat() w := sstable.NewWriter( objstorageprovider.NewFileWritable(f), t.opts.MakeWriterOptions(0, tableFormat), @@ -520,6 +547,13 @@ func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (st lastUserKey = key.UserKey key.SetSeqNum(base.SeqNumZero) + // It's possible that we wrote the key on a batch from a db that supported + // DeleteSized, but are now ingesting into a db that does not. Detect + // this case and translate the key to an InternalKeyKindDelete. + if key.Kind() == pebble.InternalKeyKindDeleteSized && !t.isFMV(o.dbID, pebble.FormatDeleteSizedAndObsolete) { + value = pebble.LazyValue{} + key.SetKind(pebble.InternalKeyKindDelete) + } if err := w.Add(*key, value.InPlaceValue()); err != nil { return "", err } @@ -550,7 +584,7 @@ func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (st return path, nil } -func (o *ingestOp) receiver() objID { return dbObjID } +func (o *ingestOp) receiver() objID { return o.dbID } func (o *ingestOp) syncObjs() objIDSlice { // Ingest should not be concurrent with mutating the batches that will be // ingested as sstables. @@ -579,11 +613,14 @@ func closeIters( // performed first in the batch to match the semantics of ingestion where a // range deletion does not delete a point record contained in the sstable. func (o *ingestOp) collapseBatch( - t *test, pointIter base.InternalIterator, rangeDelIter, rangeKeyIter keyspan.FragmentIterator, + t *test, + db *pebble.DB, + pointIter base.InternalIterator, + rangeDelIter, rangeKeyIter keyspan.FragmentIterator, ) (*pebble.Batch, error) { defer closeIters(pointIter, rangeDelIter, rangeKeyIter) equal := t.opts.Comparer.Equal - collapsed := t.db.NewBatch() + collapsed := db.NewBatch() if rangeDelIter != nil { // NB: The range tombstones have already been fragmented by the Batch. @@ -651,7 +688,8 @@ func (o *ingestOp) collapseBatch( func (o *ingestOp) String() string { var buf strings.Builder - buf.WriteString("db.Ingest(") + buf.WriteString(o.dbID.String()) + buf.WriteString(".Ingest(") for i, id := range o.batchIDs { if i > 0 { buf.WriteString(", ") @@ -664,8 +702,9 @@ func (o *ingestOp) String() string { // getOp models a Reader.Get operation. type getOp struct { - readerID objID - key []byte + readerID objID + key []byte + derivedDBID objID } func (o *getOp) run(t *test, h historyRecorder) { @@ -685,11 +724,14 @@ func (o *getOp) run(t *test, h historyRecorder) { func (o *getOp) String() string { return fmt.Sprintf("%s.Get(%q)", o.readerID, o.key) } func (o *getOp) receiver() objID { return o.readerID } func (o *getOp) syncObjs() objIDSlice { - if o.readerID == dbObjID { + if o.readerID.tag() == dbTag { return nil } // batch.Get reads through to the current database state. - return []objID{dbObjID} + if o.derivedDBID != 0 { + return []objID{o.derivedDBID} + } + return nil } // newIterOp models a Reader.NewIter operation. @@ -697,6 +739,7 @@ type newIterOp struct { readerID objID iterID objID iterOpts + derivedDBID objID } func (o *newIterOp) run(t *test, h historyRecorder) { @@ -736,7 +779,7 @@ func (o *newIterOp) syncObjs() objIDSlice { // state, and we must synchronize on the database state for a consistent // view. if o.readerID.tag() == batchTag { - objs = append(objs, dbObjID) + objs = append(objs, o.derivedDBID) } return objs } @@ -1220,6 +1263,7 @@ func (o *newSnapshotOp) receiver() objID { return dbObjID } func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} } type dbRatchetFormatMajorVersionOp struct { + dbID objID vers pebble.FormatMajorVersion } @@ -1233,22 +1277,24 @@ func (o *dbRatchetFormatMajorVersionOp) run(t *test, h historyRecorder) { //Regardless, subsequent operations should behave identically, which is what //we're really aiming to test by including this format major version ratchet //operation. - if t.db.FormatMajorVersion() < o.vers { - err = t.db.RatchetFormatMajorVersion(o.vers) + if t.getDB(o.dbID).FormatMajorVersion() < o.vers { + err = t.getDB(o.dbID).RatchetFormatMajorVersion(o.vers) } h.Recordf("%s // %v", o, err) } func (o *dbRatchetFormatMajorVersionOp) String() string { - return fmt.Sprintf("db.RatchetFormatMajorVersion(%s)", o.vers) + return fmt.Sprintf("%s.RatchetFormatMajorVersion(%s)", o.dbID, o.vers) } -func (o *dbRatchetFormatMajorVersionOp) receiver() objID { return dbObjID } +func (o *dbRatchetFormatMajorVersionOp) receiver() objID { return o.dbID } func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil } -type dbRestartOp struct{} +type dbRestartOp struct { + dbID objID +} func (o *dbRestartOp) run(t *test, h historyRecorder) { - if err := t.restartDB(); err != nil { + if err := t.restartDB(o.dbID); err != nil { h.Recordf("%s // %v", o, err) h.history.err.Store(errors.Wrap(err, "dbRestartOp")) } else { @@ -1256,8 +1302,8 @@ func (o *dbRestartOp) run(t *test, h historyRecorder) { } } -func (o *dbRestartOp) String() string { return "db.Restart()" } -func (o *dbRestartOp) receiver() objID { return dbObjID } +func (o *dbRestartOp) String() string { return fmt.Sprintf("%s.Restart()", o.dbID) } +func (o *dbRestartOp) receiver() objID { return o.dbID } func (o *dbRestartOp) syncObjs() objIDSlice { return nil } func formatOps(ops []op) string { @@ -1267,3 +1313,128 @@ func formatOps(ops []op) string { } return buf.String() } + +// replicateOp models an operation that could copy keys from one db to +// another through either an IngestAndExcise, or an Ingest. +type replicateOp struct { + source, dest objID + start, end []byte +} + +func (r *replicateOp) runSharedReplicate( + t *test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string, +) { + var sharedSSTs []pebble.SharedSSTMeta + var err error + err = source.ScanInternal(context.TODO(), r.start, r.end, + func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error { + val, _, err := value.Value(nil) + if err != nil { + panic(err) + } + return w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val) + }, + func(start, end []byte, seqNum uint64) error { + return w.DeleteRange(start, end) + }, + func(start, end []byte, keys []keyspan.Key) error { + s := keyspan.Span{ + Start: start, + End: end, + Keys: keys, + KeysOrder: 0, + } + return rangekey.Encode(&s, func(k base.InternalKey, v []byte) error { + return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v) + }) + }, + func(sst *pebble.SharedSSTMeta) error { + sharedSSTs = append(sharedSSTs, *sst) + return nil + }, + ) + if err != nil { + h.Recordf("%s // %v", r, err) + return + } + + _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, pebble.KeyRange{Start: r.start, End: r.end}) + h.Recordf("%s // %v", r, err) +} + +func (r *replicateOp) run(t *test, h historyRecorder) { + // Shared replication only works if shared storage is enabled. + useSharedIngest := t.testOpts.useSharedReplicate + if !t.testOpts.sharedStorageEnabled { + useSharedIngest = false + } + + source := t.getDB(r.source) + dest := t.getDB(r.dest) + sstPath := path.Join(t.tmpDir, fmt.Sprintf("ext-replicate%d.sst", t.idx)) + f, err := t.opts.FS.Create(sstPath) + if err != nil { + h.Recordf("%s // %v", r, err) + return + } + w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), t.opts.MakeWriterOptions(0, dest.FormatMajorVersion().MaxTableFormat())) + + if useSharedIngest { + r.runSharedReplicate(t, h, source, dest, w, sstPath) + return + } + + iter, err := source.NewIter(&pebble.IterOptions{ + LowerBound: r.start, + UpperBound: r.end, + KeyTypes: pebble.IterKeyTypePointsAndRanges, + }) + if err != nil { + panic(err) + } + defer iter.Close() + + // Write rangedels and rangekeydels for the range. This mimics the Excise + // that runSharedReplicate would do. + if err := w.DeleteRange(r.start, r.end); err != nil { + panic(err) + } + if err := w.RangeKeyDelete(r.start, r.end); err != nil { + panic(err) + } + + for ok := iter.SeekGE(r.start); ok && iter.Error() != nil; ok = iter.Next() { + hasPoint, hasRange := iter.HasPointAndRange() + if hasPoint { + val, err := iter.ValueAndErr() + if err != nil { + panic(err) + } + if err := w.Set(iter.Key(), val); err != nil { + panic(err) + } + } + if hasRange && iter.RangeKeyChanged() { + rangeKeys := iter.RangeKeys() + rkStart, rkEnd := iter.RangeBounds() + for i := range rangeKeys { + if err := w.RangeKeySet(rkStart, rkEnd, rangeKeys[i].Suffix, rangeKeys[i].Value); err != nil { + panic(err) + } + } + } + } + if err := w.Close(); err != nil { + panic(err) + } + + err = dest.Ingest([]string{sstPath}) + h.Recordf("%s // %v", r, err) +} + +func (r *replicateOp) String() string { + return fmt.Sprintf("%s.Replicate(%s, %q, %q)", r.source, r.dest, r.start, r.end) +} + +func (r *replicateOp) receiver() objID { return r.source } +func (r *replicateOp) syncObjs() objIDSlice { return objIDSlice{r.dest} } diff --git a/metamorphic/options.go b/metamorphic/options.go index 66c4a15313..b288c44068 100644 --- a/metamorphic/options.go +++ b/metamorphic/options.go @@ -254,6 +254,8 @@ type TestOptions struct { asyncApplyToDB bool // Enable the use of shared storage. sharedStorageEnabled bool + // Enables the use of shared replication in TestOptions. + useSharedReplicate bool // Enable the secondary cache. Only effective if sharedStorageEnabled is // also true. secondaryCacheEnabled bool @@ -547,8 +549,9 @@ func randomOptions( // 20% of time, enable shared storage. if rng.Intn(5) == 0 { testOpts.sharedStorageEnabled = true + inMemShared := remote.NewInMem() testOpts.Opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{ - "": remote.NewInMem(), + "": inMemShared, }) // If shared storage is enabled, pick between writing all files on shared // vs. lower levels only, 50% of the time. @@ -562,6 +565,8 @@ func randomOptions( // TODO(josh): Randomize various secondary cache settings. testOpts.Opts.Experimental.SecondaryCacheSizeBytes = 1024 * 1024 * 32 // 32 MBs } + // 50% of the time, enable shared replication. + testOpts.useSharedReplicate = rng.Intn(2) == 0 } testOpts.seedEFOS = rng.Uint64() testOpts.ingestSplit = rng.Intn(2) == 0 diff --git a/metamorphic/parser.go b/metamorphic/parser.go index f50dea47a7..cdbd60fdeb 100644 --- a/metamorphic/parser.go +++ b/metamorphic/parser.go @@ -50,13 +50,13 @@ func opArgs(op op) (receiverID *objID, targetID *objID, args []interface{}) { case *closeOp: return &t.objID, nil, nil case *compactOp: - return nil, nil, []interface{}{&t.start, &t.end, &t.parallelize} + return &t.dbID, nil, []interface{}{&t.start, &t.end, &t.parallelize} case *batchCommitOp: return &t.batchID, nil, nil case *dbRatchetFormatMajorVersionOp: - return nil, nil, []interface{}{&t.vers} + return &t.dbID, nil, []interface{}{&t.vers} case *dbRestartOp: - return nil, nil, nil + return &t.dbID, nil, nil case *deleteOp: return &t.writerID, nil, []interface{}{&t.key} case *deleteRangeOp: @@ -64,21 +64,21 @@ func opArgs(op op) (receiverID *objID, targetID *objID, args []interface{}) { case *iterFirstOp: return &t.iterID, nil, nil case *flushOp: - return nil, nil, nil + return &t.db, nil, nil case *getOp: return &t.readerID, nil, []interface{}{&t.key} case *ingestOp: - return nil, nil, []interface{}{&t.batchIDs} + return &t.dbID, nil, []interface{}{&t.batchIDs} case *initOp: - return nil, nil, []interface{}{&t.batchSlots, &t.iterSlots, &t.snapshotSlots} + return nil, nil, []interface{}{&t.dbSlots, &t.batchSlots, &t.iterSlots, &t.snapshotSlots} case *iterLastOp: return &t.iterID, nil, nil case *mergeOp: return &t.writerID, nil, []interface{}{&t.key, &t.value} case *newBatchOp: - return nil, &t.batchID, nil + return &t.dbID, &t.batchID, nil case *newIndexedBatchOp: - return nil, &t.batchID, nil + return &t.dbID, &t.batchID, nil case *newIterOp: return &t.readerID, &t.iterID, []interface{}{&t.lower, &t.upper, &t.keyTypes, &t.filterMin, &t.filterMax, &t.useL6Filters, &t.maskSuffix} case *newIterUsingCloneOp: @@ -111,6 +111,8 @@ func opArgs(op op) (receiverID *objID, targetID *objID, args []interface{}) { return &t.writerID, nil, []interface{}{&t.start, &t.end, &t.suffix, &t.value} case *rangeKeyUnsetOp: return &t.writerID, nil, []interface{}{&t.start, &t.end, &t.suffix} + case *replicateOp: + return &t.source, nil, []interface{}{&t.dest, &t.start, &t.end} } panic(fmt.Sprintf("unsupported op type: %T", op)) } @@ -142,6 +144,7 @@ var methods = map[string]*methodInfo{ "RangeKeySet": makeMethod(rangeKeySetOp{}, dbTag, batchTag), "RangeKeyUnset": makeMethod(rangeKeyUnsetOp{}, dbTag, batchTag), "RatchetFormatMajorVersion": makeMethod(dbRatchetFormatMajorVersionOp{}, dbTag), + "Replicate": makeMethod(replicateOp{}, dbTag), "Restart": makeMethod(dbRestartOp{}, dbTag), "SeekGE": makeMethod(iterSeekGEOp{}, iterTag), "SeekLT": makeMethod(iterSeekLTOp{}, iterTag), @@ -164,7 +167,7 @@ func parse(src []byte) (_ []op, err error) { // look like Go which allows us to use the Go scanner for parsing. p := &parser{ fset: token.NewFileSet(), - objs: map[objID]bool{makeObjID(dbTag, 0): true}, + objs: map[objID]bool{makeObjID(dbTag, 1): true, makeObjID(dbTag, 2): true}, } file := p.fset.AddFile("", -1, len(src)) p.s.Init(file, src, nil /* no error handler */, 0) @@ -203,7 +206,7 @@ func (p *parser) parseOp() op { } if destLit == "Init" { // () - return p.makeOp(destLit, makeObjID(dbTag, 0), 0, destPos) + return p.makeOp(destLit, makeObjID(dbTag, 1), 0, destPos) } destID := p.parseObjID(destPos, destLit) @@ -236,8 +239,11 @@ func (p *parser) parseOp() op { func (p *parser) parseObjID(pos token.Pos, str string) objID { var tag objTag switch { - case str == "db": - return makeObjID(dbTag, 0) + case strings.HasPrefix(str, "db"): + tag, str = dbTag, str[2:] + if str == "" { + str = "1" + } case strings.HasPrefix(str, "batch"): tag, str = batchTag, str[5:] case strings.HasPrefix(str, "iter"): @@ -497,13 +503,23 @@ func (p *parser) errorf(pos token.Pos, format string, args ...interface{}) error // execution depends on these fields. func computeDerivedFields(ops []op) { iterToReader := make(map[objID]objID) + objToDB := make(map[objID]objID) for i := range ops { switch v := ops[i].(type) { case *newIterOp: iterToReader[v.iterID] = v.readerID + dbReaderID := v.readerID + if dbReaderID.tag() != dbTag { + dbReaderID = objToDB[dbReaderID] + } + objToDB[v.iterID] = dbReaderID + if v.readerID.tag() == batchTag { + v.derivedDBID = dbReaderID + } case *newIterUsingCloneOp: v.derivedReaderID = iterToReader[v.existingIterID] iterToReader[v.iterID] = v.derivedReaderID + objToDB[v.iterID] = objToDB[v.existingIterID] case *iterSetOptionsOp: v.derivedReaderID = iterToReader[v.iterID] case *iterFirstOp: @@ -522,6 +538,40 @@ func computeDerivedFields(ops []op) { v.derivedReaderID = iterToReader[v.iterID] case *iterPrevOp: v.derivedReaderID = iterToReader[v.iterID] + case *newBatchOp: + objToDB[v.batchID] = v.dbID + case *newIndexedBatchOp: + objToDB[v.batchID] = v.dbID + case *applyOp: + if derivedDBID, ok := objToDB[v.batchID]; ok && v.writerID.tag() != dbTag { + objToDB[v.writerID] = derivedDBID + } + case *getOp: + if derivedDBID, ok := objToDB[v.readerID]; ok { + v.derivedDBID = derivedDBID + } else if v.readerID.tag() == snapTag { + // TODO(bilal): This is legacy behaviour as snapshots aren't supported in + // two-instance mode yet. Track snapshots in g.objDB and objToDB and remove this + // conditional. + v.derivedDBID = dbObjID + } + case *batchCommitOp: + v.dbID = objToDB[v.batchID] + case *closeOp: + if derivedDBID, ok := objToDB[v.objID]; ok { + v.derivedDBID = derivedDBID + } + case *ingestOp: + v.derivedDBIDs = make([]objID, len(v.batchIDs)) + for i := range v.batchIDs { + v.derivedDBIDs[i] = objToDB[v.batchIDs[i]] + } + case *deleteOp: + derivedDBID := v.writerID + if v.writerID.tag() != dbTag { + derivedDBID = objToDB[v.writerID] + } + v.derivedDBID = derivedDBID } } } diff --git a/metamorphic/parser_test.go b/metamorphic/parser_test.go index cdb20af05e..b52a314894 100644 --- a/metamorphic/parser_test.go +++ b/metamorphic/parser_test.go @@ -29,20 +29,30 @@ func TestParser(t *testing.T) { } func TestParserRandom(t *testing.T) { - ops := generate(randvar.NewRand(), 10000, defaultConfig(), newKeyManager()) - src := formatOps(ops) + cfgs := []string{"default", "multiInstance"} + for i := range cfgs { + t.Run(fmt.Sprintf("config=%s", cfgs[i]), func(t *testing.T) { + cfg := defaultConfig() + if cfgs[i] == "multiInstance" { + cfg = multiInstanceConfig() + cfg.numInstances = 2 + } + ops := generate(randvar.NewRand(), 10000, cfg, newKeyManager()) + src := formatOps(ops) - parsedOps, err := parse([]byte(src)) - if err != nil { - t.Fatalf("%s\n%s", err, src) + parsedOps, err := parse([]byte(src)) + if err != nil { + t.Fatalf("%s\n%s", err, src) + } + require.Equal(t, ops, parsedOps) + }) } - require.Equal(t, ops, parsedOps) } func TestParserNilBounds(t *testing.T) { formatted := formatOps([]op{ &newIterOp{ - readerID: makeObjID(dbTag, 0), + readerID: makeObjID(dbTag, 1), iterID: makeObjID(iterTag, 1), iterOpts: iterOpts{}, }, diff --git a/metamorphic/test.go b/metamorphic/test.go index fd3b55664e..588991d693 100644 --- a/metamorphic/test.go +++ b/metamorphic/test.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "os" + "path" "sort" "strings" @@ -24,8 +25,11 @@ type test struct { opsWaitOn [][]int // op index -> op indexes opsDone []chan struct{} // op index -> done channel idx int - // The DB the test is run on. dir string + // The DB the test is run on. + // + // TODO(bilal): The DB field is deprecated in favour of the dbs slice. Update + // all remaining uses of t.db to use t.dbs[] instead. db *pebble.DB opts *pebble.Options testOpts *TestOptions @@ -33,6 +37,7 @@ type test struct { tmpDir string // The slots for the batches, iterators, and snapshots. These are read and // written by the ops to pass state from one op to another. + dbs []*pebble.DB batches []*pebble.Batch iters []*retryableIter snapshots []readerCloser @@ -44,7 +49,7 @@ func newTest(ops []op) *test { } } -func (t *test) init(h *history, dir string, testOpts *TestOptions) error { +func (t *test) init(h *history, dir string, testOpts *TestOptions, numInstances int) error { t.dir = dir t.testOpts = testOpts t.writeOpts = pebble.NoSync @@ -62,6 +67,9 @@ func (t *test) init(h *history, dir string, testOpts *TestOptions) error { return pebble.DebugCheckLevels(db) }) } + if numInstances < 1 { + numInstances = 1 + } t.opsWaitOn, t.opsDone = computeSynchronizationPoints(t.ops) @@ -127,28 +135,36 @@ func (t *test) init(h *history, dir string, testOpts *TestOptions) error { } } - var db *pebble.DB - var err error - err = withRetries(func() error { - db, err = pebble.Open(dir, t.opts) - return err - }) - if err != nil { - return err - } - h.log.Printf("// db.Open() %v", err) - - if t.testOpts.sharedStorageEnabled { + t.dbs = make([]*pebble.DB, numInstances) + for i := range t.dbs { + var db *pebble.DB + var err error + if len(t.dbs) > 1 { + dir = path.Join(t.dir, fmt.Sprintf("db%d", i+1)) + } err = withRetries(func() error { - return db.SetCreatorID(1) + db, err = pebble.Open(dir, t.opts) + return err }) if err != nil { return err } - h.log.Printf("// db.SetCreatorID() %v", err) + t.dbs[i] = db + h.log.Printf("// db%d.Open() %v", i+1, err) + + if t.testOpts.sharedStorageEnabled { + err = withRetries(func() error { + return db.SetCreatorID(uint64(i + 1)) + }) + if err != nil { + return err + } + h.log.Printf("// db%d.SetCreatorID() %v", i+1, err) + } } - t.tmpDir = t.opts.FS.PathJoin(dir, "tmp") + var err error + t.tmpDir = t.opts.FS.PathJoin(t.dir, "tmp") if err = t.opts.FS.MkdirAll(t.tmpDir, 0755); err != nil { return err } @@ -180,15 +196,17 @@ func (t *test) init(h *history, dir string, testOpts *TestOptions) error { } } - t.db = db + t.db = t.dbs[0] return nil } -func (t *test) isFMV(fmv pebble.FormatMajorVersion) bool { - return t.db.FormatMajorVersion() >= fmv +func (t *test) isFMV(dbID objID, fmv pebble.FormatMajorVersion) bool { + db := t.getDB(dbID) + return db.FormatMajorVersion() >= fmv } -func (t *test) restartDB() error { +func (t *test) restartDB(dbID objID) error { + db := t.getDB(dbID) if !t.testOpts.strictFS { return nil } @@ -198,7 +216,7 @@ func (t *test) restartDB() error { if ok { fs.SetIgnoreSyncs(true) } - if err := t.db.Close(); err != nil { + if err := db.Close(); err != nil { return err } // Release any resources held by custom options. This may be used, for @@ -225,7 +243,15 @@ func (t *test) restartDB() error { return err } } - t.db, err = pebble.Open(t.dir, t.opts) + dir := t.dir + if len(t.dbs) > 1 { + dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot())) + } + t.dbs[dbID.slot()-1], err = pebble.Open(dir, t.opts) + if err != nil { + return err + } + t.db = t.dbs[0] return err }) t.opts.Cache.Unref() @@ -285,7 +311,7 @@ func (t *test) setSnapshot(id objID, s readerCloser) { func (t *test) clearObj(id objID) { switch id.tag() { case dbTag: - t.db = nil + t.dbs[id.slot()-1] = nil case batchTag: t.batches[id.slot()] = nil case iterTag: @@ -305,7 +331,7 @@ func (t *test) getBatch(id objID) *pebble.Batch { func (t *test) getCloser(id objID) io.Closer { switch id.tag() { case dbTag: - return t.db + return t.dbs[id.slot()-1] case batchTag: return t.batches[id.slot()] case iterTag: @@ -326,7 +352,7 @@ func (t *test) getIter(id objID) *retryableIter { func (t *test) getReader(id objID) pebble.Reader { switch id.tag() { case dbTag: - return t.db + return t.dbs[id.slot()-1] case batchTag: return t.batches[id.slot()] case snapTag: @@ -338,13 +364,22 @@ func (t *test) getReader(id objID) pebble.Reader { func (t *test) getWriter(id objID) pebble.Writer { switch id.tag() { case dbTag: - return t.db + return t.dbs[id.slot()-1] case batchTag: return t.batches[id.slot()] } panic(fmt.Sprintf("invalid writer ID: %s", id)) } +func (t *test) getDB(id objID) *pebble.DB { + switch id.tag() { + case dbTag: + return t.dbs[id.slot()-1] + default: + panic(fmt.Sprintf("invalid writer tag: %v", id.tag())) + } +} + // Compute the synchronization points between operations. When operating // with more than 1 thread, operations must synchronize access to shared // objects. Compute two slices the same length as ops. @@ -369,10 +404,14 @@ func computeSynchronizationPoints(ops []op) (opsWaitOn [][]int, opsDone []chan s // Only valid for i=0. For all other operations, the receiver should // have been referenced by some other operation before it's used as // a receiver. + if i != 0 && receiver.tag() != dbTag { + panic(fmt.Sprintf("op %s on receiver %s; first reference of %s", ops[i].String(), receiver, receiver)) + } + // The initOp is a little special. We do want to store the objects it's + // syncing on, in `lastOpReference`. if i != 0 { - panic(fmt.Sprintf("op %d on receiver %s; first reference of %s", i, receiver, receiver)) + continue } - continue } // The last operation that referenced `receiver` is the one at index diff --git a/metamorphic/testdata/parser b/metamorphic/testdata/parser index ac74aa12cc..0b29a8827f 100644 --- a/metamorphic/testdata/parser +++ b/metamorphic/testdata/parser @@ -11,7 +11,7 @@ parse parse db.bar() ---- -1:1: unknown op db.bar +1:1: unknown op db1.bar parse db.Apply() @@ -26,12 +26,12 @@ db.Apply(hello) parse db.NewBatch() ---- -1:1: assignment expected for db.NewBatch +1:1: assignment expected for db1.NewBatch parse batch0 = db.Apply() ---- -1:10: cannot use db.Apply in assignment +1:10: cannot use db1.Apply in assignment parse batch0 = db.NewBatch() diff --git a/metamorphic/utils.go b/metamorphic/utils.go index f81fddd6cb..f5c0a83fba 100644 --- a/metamorphic/utils.go +++ b/metamorphic/utils.go @@ -41,7 +41,7 @@ func (i objID) slot() uint32 { func (i objID) String() string { switch i.tag() { case dbTag: - return "db" + return fmt.Sprintf("db%d", i.slot()) case batchTag: return fmt.Sprintf("batch%d", i.slot()) case iterTag: