From e597fb721a61d074b8f7d128a40a6833409dc68b Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 30 Apr 2018 20:27:09 -0700 Subject: [PATCH] Discard key versions during compaction - In response to #464 . - If a key version has deletion marker or is expired, we can safely drop all the older versions of that key. - If there's no overlap from lower levels, we can even drop the first such version. - To avoid an edge case bug, we need to ensure that all versions of the same key are contained by the same SSTable. This is reflected by not closing a table as long as more versions are found. And by picking key ranges to include all versions of the keys. Both these measures ensure that all versions at the same level get compacted together. - To avoid another edge case bug, we need to ensure that we don't drop versions above the current `readTs`. Note that managed DB would therefore not result in any version being discarded. Handling that is an open problem. Badger Info: - Print out the key ranges for each SSTable via `badger info`. - Open an available port when running `badger` tool, so one can view the `/debug/request` and `/debug/events` links to understand what's going on behind the scenes. --- cmd/badger/cmd/info.go | 29 +++++++ cmd/badger/main.go | 17 +++- compaction.go | 8 +- db.go | 4 + levels.go | 183 ++++++++++++++++++++++++++--------------- 5 files changed, 173 insertions(+), 68 deletions(-) diff --git a/cmd/badger/cmd/info.go b/cmd/badger/cmd/info.go index 3ca0ec509..0d23d3aaf 100644 --- a/cmd/badger/cmd/info.go +++ b/cmd/badger/cmd/info.go @@ -27,6 +27,7 @@ import ( "github.com/dgraph-io/badger" "github.com/dgraph-io/badger/table" + "github.com/dgraph-io/badger/y" humanize "github.com/dustin/go-humanize" "github.com/spf13/cobra" ) @@ -46,6 +47,11 @@ to the Dgraph team. fmt.Println("Error:", err.Error()) os.Exit(1) } + err = tableInfo(sstDir, vlogDir) + if err != nil { + fmt.Println("Error:", err.Error()) + os.Exit(1) + } }, } @@ -61,6 +67,29 @@ func dur(src, dst time.Time) string { return humanize.RelTime(dst, src, "earlier", "later") } +func tableInfo(dir, valueDir string) error { + // Open DB + opts := badger.DefaultOptions + opts.Dir = sstDir + opts.ValueDir = vlogDir + opts.ReadOnly = true + + db, err := badger.Open(opts) + if err != nil { + return err + } + defer db.Close() + + tables := db.Tables() + for _, t := range tables { + lk, lv := y.ParseKey(t.Left), y.ParseTs(t.Left) + rk, rv := y.ParseKey(t.Right), y.ParseTs(t.Right) + fmt.Printf("SSTable [L%d, %03d] [%20X, v%-10d -> %20X, v%-10d]\n", + t.Level, t.ID, lk, lv, rk, rv) + } + return nil +} + func printInfo(dir, valueDir string) error { if dir == "" { return fmt.Errorf("--dir not supplied") diff --git a/cmd/badger/main.go b/cmd/badger/main.go index 518711790..b817bc327 100644 --- a/cmd/badger/main.go +++ b/cmd/badger/main.go @@ -16,8 +16,23 @@ package main -import "github.com/dgraph-io/badger/cmd/badger/cmd" +import ( + "fmt" + "net/http" + + "github.com/dgraph-io/badger/cmd/badger/cmd" +) func main() { + go func() { + for i := 8080; i < 9080; i++ { + fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i) + if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil { + fmt.Println("Port busy. Trying another one...") + continue + + } + } + }() cmd.Execute() } diff --git a/compaction.go b/compaction.go index 23824ae17..00be8f6ef 100644 --- a/compaction.go +++ b/compaction.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "log" + "math" "sync" "golang.org/x/net/trace" @@ -37,7 +38,7 @@ type keyRange struct { var infRange = keyRange{inf: true} func (r keyRange) String() string { - return fmt.Sprintf("[left=%q, right=%q, inf=%v]", r.left, r.right, r.inf) + return fmt.Sprintf("[left=%x, right=%x, inf=%v]", r.left, r.right, r.inf) } func (r keyRange) equals(dst keyRange) bool { @@ -75,7 +76,10 @@ func getKeyRange(tables []*table.Table) keyRange { biggest = tables[i].Biggest() } } - return keyRange{left: smallest, right: biggest} + return keyRange{ + left: y.KeyWithTs(y.ParseKey(smallest), math.MaxUint64), + right: y.KeyWithTs(y.ParseKey(biggest), 0), + } } type levelCompactStatus struct { diff --git a/db.go b/db.go index c88e98813..82476ebfe 100644 --- a/db.go +++ b/db.go @@ -1207,6 +1207,10 @@ func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) { return seq, err } +func (db *DB) Tables() []TableInfo { + return db.lc.getTableInfo() +} + // MergeOperator represents a Badger merge operator. type MergeOperator struct { sync.RWMutex diff --git a/levels.go b/levels.go index 9d8232e5d..60df3df4b 100644 --- a/levels.go +++ b/levels.go @@ -18,6 +18,7 @@ package badger import ( "fmt" + "math" "math/rand" "os" "sort" @@ -262,6 +263,24 @@ func (s *levelsController) compactBuildTables( topTables := cd.top botTables := cd.bot + var hasOverlap bool + { + kr := getKeyRange(cd.top) + for i, lh := range s.levels { + if i <= l { // Skip upper levels. + continue + } + lh.RLock() + left, right := lh.overlappingTables(levelHandlerRLocked{}, kr) + lh.RUnlock() + if right-left > 0 { + hasOverlap = true + break + } + } + cd.elog.LazyPrintf("Key range overlaps with lower levels: %v", hasOverlap) + } + // Create iterators across all the tables involved first. var iters []y.Iterator if l == 0 { @@ -278,54 +297,90 @@ func (s *levelsController) compactBuildTables( it.Rewind() + readTs := s.kv.orc.readTs() // Start generating new tables. type newTableResult struct { table *table.Table err error } resultCh := make(chan newTableResult) - var i int - for ; it.Valid(); i++ { + var numBuilds int + var lastKey, skipKey []byte + for it.Valid() { timeStart := time.Now() builder := table.NewTableBuilder() + var numKeys, numSkips uint64 for ; it.Valid(); it.Next() { - if builder.ReachedCapacity(s.kv.opt.MaxTableSize) { - break + if !y.SameKey(it.Key(), lastKey) { + if builder.ReachedCapacity(s.kv.opt.MaxTableSize) { + // Only break if we are on a different key, and have reached capacity. We want + // to ensure that all versions of the key are stored in the same sstable, and + // not divided across multiple tables at the same level. + break + } + lastKey = y.SafeCopy(lastKey, it.Key()) + } + if len(skipKey) > 0 { + if y.SameKey(it.Key(), skipKey) { + numSkips++ + continue + } else { + skipKey = skipKey[:0] + } + } + + vs := it.Value() + version := y.ParseTs(it.Key()) + if version < readTs && isDeletedOrExpired(vs.Meta, vs.ExpiresAt) { + // If this version of the key is deleted or expired, skip all the rest of the + // versions. Ensure that we're only removing versions below readTs. + skipKey = y.SafeCopy(skipKey, it.Key()) + + if !hasOverlap { + // If no overlap, we can skip all the versions, by continuing here. + numSkips++ + continue // Skip adding this key. + } else { + // If this key range has overlap with lower levels, then keep the deletion + // marker with the latest version, discarding the rest. This logic here + // would not continue, but has set the skipKey for the future iterations. + } } + numKeys++ y.Check(builder.Add(it.Key(), it.Value())) } // It was true that it.Valid() at least once in the loop above, which means we // called Add() at least once, and builder is not Empty(). - y.AssertTrue(!builder.Empty()) - - cd.elog.LazyPrintf("LOG Compact. Iteration to generate one table took: %v\n", time.Since(timeStart)) - - fileID := s.reserveFileID() - go func(builder *table.Builder) { - defer builder.Close() - - fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true) - if err != nil { - resultCh <- newTableResult{nil, errors.Wrapf(err, "While opening new table: %d", fileID)} - return - } + cd.elog.LazyPrintf("Added %d keys. Skipped %d keys.", numKeys, numSkips) + cd.elog.LazyPrintf("LOG Compact. Iteration took: %v\n", time.Since(timeStart)) + if !builder.Empty() { + numBuilds++ + fileID := s.reserveFileID() + go func(builder *table.Builder) { + defer builder.Close() + + fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true) + if err != nil { + resultCh <- newTableResult{nil, errors.Wrapf(err, "While opening new table: %d", fileID)} + return + } - if _, err := fd.Write(builder.Finish()); err != nil { - resultCh <- newTableResult{nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)} - return - } + if _, err := fd.Write(builder.Finish()); err != nil { + resultCh <- newTableResult{nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)} + return + } - tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode) - // decrRef is added below. - resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())} - }(builder) + tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode) + // decrRef is added below. + resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())} + }(builder) + } } newTables := make([]*table.Table, 0, 20) - // Wait for all table builders to finish. var firstErr error - for x := 0; x < i; x++ { + for x := 0; x < numBuilds; x++ { res := <-resultCh newTables = append(newTables, res.table) if firstErr == nil { @@ -343,7 +398,7 @@ func (s *levelsController) compactBuildTables( if firstErr != nil { // An error happened. Delete all the newly created table files (by calling DecrRef // -- we're the only holders of a ref). - for j := 0; j < i; j++ { + for j := 0; j < numBuilds; j++ { if newTables[j] != nil { newTables[j].DecrRef() } @@ -446,8 +501,10 @@ func (s *levelsController) fillTables(cd *compactDef) bool { for _, t := range tbls { cd.thisSize = t.Size() cd.thisRange = keyRange{ - left: t.Smallest(), - right: t.Biggest(), + // We pick all the versions of the smallest and the biggest key. + left: y.KeyWithTs(y.ParseKey(t.Smallest()), math.MaxUint64), + // Note that version zero would be the rightmost key. + right: y.KeyWithTs(y.ParseKey(t.Biggest()), 0), } if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) { continue @@ -486,40 +543,8 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) { thisLevel := cd.thisLevel nextLevel := cd.nextLevel - if thisLevel.level >= 1 && len(cd.bot) == 0 { - y.AssertTrue(len(cd.top) == 1) - tbl := cd.top[0] - - // We write to the manifest _before_ we delete files (and after we created files). - changes := []*protos.ManifestChange{ - // The order matters here -- you can't temporarily have two copies of the same - // table id when reloading the manifest. - makeTableDeleteChange(tbl.ID()), - makeTableCreateChange(tbl.ID(), nextLevel.level), - } - if err := s.kv.manifest.addChanges(changes); err != nil { - return err - } - - // We have to add to nextLevel before we remove from thisLevel, not after. This way, we - // don't have a bug where reads would see keys missing from both levels. - - // Note: It's critical that we add tables (replace them) in nextLevel before deleting them - // in thisLevel. (We could finagle it atomically somehow.) Also, when reading we must - // read, or at least acquire s.RLock(), in increasing order by level, so that we don't skip - // a compaction. - - if err := nextLevel.replaceTables(cd.top); err != nil { - return err - } - if err := thisLevel.deleteTables(cd.top); err != nil { - return err - } - - cd.elog.LazyPrintf("\tLOG Compact-Move %d->%d smallest:%s biggest:%s took %v\n", - l, l+1, string(tbl.Smallest()), string(tbl.Biggest()), time.Since(timeStart)) - return nil - } + // Table should never be moved directly between levels, always be rewritten to allow discarding + // invalid versions. newTables, decr, err := s.compactBuildTables(l, cd) if err != nil { @@ -561,7 +586,7 @@ func (s *levelsController) doCompact(p compactionPriority) (bool, error) { y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check. cd := compactDef{ - elog: trace.New("Badger", "Compact"), + elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), thisLevel: s.levels[l], nextLevel: s.levels[l+1], } @@ -704,3 +729,31 @@ func (s *levelsController) appendIterators( } return iters } + +type TableInfo struct { + ID uint64 + Level int + Left []byte + Right []byte +} + +func (s *levelsController) getTableInfo() (result []TableInfo) { + for _, l := range s.levels { + for _, t := range l.tables { + info := TableInfo{ + ID: t.ID(), + Level: l.level, + Left: t.Smallest(), + Right: t.Biggest(), + } + result = append(result, info) + } + } + sort.Slice(result, func(i, j int) bool { + if result[i].Level != result[j].Level { + return result[i].Level < result[j].Level + } + return result[i].ID < result[j].ID + }) + return +}