diff --git a/cmd/badger/cmd/info.go b/cmd/badger/cmd/info.go index 3ca0ec509..0d23d3aaf 100644 --- a/cmd/badger/cmd/info.go +++ b/cmd/badger/cmd/info.go @@ -27,6 +27,7 @@ import ( "github.com/dgraph-io/badger" "github.com/dgraph-io/badger/table" + "github.com/dgraph-io/badger/y" humanize "github.com/dustin/go-humanize" "github.com/spf13/cobra" ) @@ -46,6 +47,11 @@ to the Dgraph team. fmt.Println("Error:", err.Error()) os.Exit(1) } + err = tableInfo(sstDir, vlogDir) + if err != nil { + fmt.Println("Error:", err.Error()) + os.Exit(1) + } }, } @@ -61,6 +67,29 @@ func dur(src, dst time.Time) string { return humanize.RelTime(dst, src, "earlier", "later") } +func tableInfo(dir, valueDir string) error { + // Open DB + opts := badger.DefaultOptions + opts.Dir = sstDir + opts.ValueDir = vlogDir + opts.ReadOnly = true + + db, err := badger.Open(opts) + if err != nil { + return err + } + defer db.Close() + + tables := db.Tables() + for _, t := range tables { + lk, lv := y.ParseKey(t.Left), y.ParseTs(t.Left) + rk, rv := y.ParseKey(t.Right), y.ParseTs(t.Right) + fmt.Printf("SSTable [L%d, %03d] [%20X, v%-10d -> %20X, v%-10d]\n", + t.Level, t.ID, lk, lv, rk, rv) + } + return nil +} + func printInfo(dir, valueDir string) error { if dir == "" { return fmt.Errorf("--dir not supplied") diff --git a/cmd/badger/main.go b/cmd/badger/main.go index 518711790..b817bc327 100644 --- a/cmd/badger/main.go +++ b/cmd/badger/main.go @@ -16,8 +16,23 @@ package main -import "github.com/dgraph-io/badger/cmd/badger/cmd" +import ( + "fmt" + "net/http" + + "github.com/dgraph-io/badger/cmd/badger/cmd" +) func main() { + go func() { + for i := 8080; i < 9080; i++ { + fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i) + if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil { + fmt.Println("Port busy. Trying another one...") + continue + + } + } + }() cmd.Execute() } diff --git a/compaction.go b/compaction.go index 23824ae17..00be8f6ef 100644 --- a/compaction.go +++ b/compaction.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "log" + "math" "sync" "golang.org/x/net/trace" @@ -37,7 +38,7 @@ type keyRange struct { var infRange = keyRange{inf: true} func (r keyRange) String() string { - return fmt.Sprintf("[left=%q, right=%q, inf=%v]", r.left, r.right, r.inf) + return fmt.Sprintf("[left=%x, right=%x, inf=%v]", r.left, r.right, r.inf) } func (r keyRange) equals(dst keyRange) bool { @@ -75,7 +76,10 @@ func getKeyRange(tables []*table.Table) keyRange { biggest = tables[i].Biggest() } } - return keyRange{left: smallest, right: biggest} + return keyRange{ + left: y.KeyWithTs(y.ParseKey(smallest), math.MaxUint64), + right: y.KeyWithTs(y.ParseKey(biggest), 0), + } } type levelCompactStatus struct { diff --git a/db.go b/db.go index c88e98813..82476ebfe 100644 --- a/db.go +++ b/db.go @@ -1207,6 +1207,10 @@ func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) { return seq, err } +func (db *DB) Tables() []TableInfo { + return db.lc.getTableInfo() +} + // MergeOperator represents a Badger merge operator. type MergeOperator struct { sync.RWMutex diff --git a/levels.go b/levels.go index 9d8232e5d..60df3df4b 100644 --- a/levels.go +++ b/levels.go @@ -18,6 +18,7 @@ package badger import ( "fmt" + "math" "math/rand" "os" "sort" @@ -262,6 +263,24 @@ func (s *levelsController) compactBuildTables( topTables := cd.top botTables := cd.bot + var hasOverlap bool + { + kr := getKeyRange(cd.top) + for i, lh := range s.levels { + if i <= l { // Skip upper levels. + continue + } + lh.RLock() + left, right := lh.overlappingTables(levelHandlerRLocked{}, kr) + lh.RUnlock() + if right-left > 0 { + hasOverlap = true + break + } + } + cd.elog.LazyPrintf("Key range overlaps with lower levels: %v", hasOverlap) + } + // Create iterators across all the tables involved first. var iters []y.Iterator if l == 0 { @@ -278,54 +297,90 @@ func (s *levelsController) compactBuildTables( it.Rewind() + readTs := s.kv.orc.readTs() // Start generating new tables. type newTableResult struct { table *table.Table err error } resultCh := make(chan newTableResult) - var i int - for ; it.Valid(); i++ { + var numBuilds int + var lastKey, skipKey []byte + for it.Valid() { timeStart := time.Now() builder := table.NewTableBuilder() + var numKeys, numSkips uint64 for ; it.Valid(); it.Next() { - if builder.ReachedCapacity(s.kv.opt.MaxTableSize) { - break + if !y.SameKey(it.Key(), lastKey) { + if builder.ReachedCapacity(s.kv.opt.MaxTableSize) { + // Only break if we are on a different key, and have reached capacity. We want + // to ensure that all versions of the key are stored in the same sstable, and + // not divided across multiple tables at the same level. + break + } + lastKey = y.SafeCopy(lastKey, it.Key()) + } + if len(skipKey) > 0 { + if y.SameKey(it.Key(), skipKey) { + numSkips++ + continue + } else { + skipKey = skipKey[:0] + } + } + + vs := it.Value() + version := y.ParseTs(it.Key()) + if version < readTs && isDeletedOrExpired(vs.Meta, vs.ExpiresAt) { + // If this version of the key is deleted or expired, skip all the rest of the + // versions. Ensure that we're only removing versions below readTs. + skipKey = y.SafeCopy(skipKey, it.Key()) + + if !hasOverlap { + // If no overlap, we can skip all the versions, by continuing here. + numSkips++ + continue // Skip adding this key. + } else { + // If this key range has overlap with lower levels, then keep the deletion + // marker with the latest version, discarding the rest. This logic here + // would not continue, but has set the skipKey for the future iterations. + } } + numKeys++ y.Check(builder.Add(it.Key(), it.Value())) } // It was true that it.Valid() at least once in the loop above, which means we // called Add() at least once, and builder is not Empty(). - y.AssertTrue(!builder.Empty()) - - cd.elog.LazyPrintf("LOG Compact. Iteration to generate one table took: %v\n", time.Since(timeStart)) - - fileID := s.reserveFileID() - go func(builder *table.Builder) { - defer builder.Close() - - fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true) - if err != nil { - resultCh <- newTableResult{nil, errors.Wrapf(err, "While opening new table: %d", fileID)} - return - } + cd.elog.LazyPrintf("Added %d keys. Skipped %d keys.", numKeys, numSkips) + cd.elog.LazyPrintf("LOG Compact. Iteration took: %v\n", time.Since(timeStart)) + if !builder.Empty() { + numBuilds++ + fileID := s.reserveFileID() + go func(builder *table.Builder) { + defer builder.Close() + + fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true) + if err != nil { + resultCh <- newTableResult{nil, errors.Wrapf(err, "While opening new table: %d", fileID)} + return + } - if _, err := fd.Write(builder.Finish()); err != nil { - resultCh <- newTableResult{nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)} - return - } + if _, err := fd.Write(builder.Finish()); err != nil { + resultCh <- newTableResult{nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)} + return + } - tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode) - // decrRef is added below. - resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())} - }(builder) + tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode) + // decrRef is added below. + resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())} + }(builder) + } } newTables := make([]*table.Table, 0, 20) - // Wait for all table builders to finish. var firstErr error - for x := 0; x < i; x++ { + for x := 0; x < numBuilds; x++ { res := <-resultCh newTables = append(newTables, res.table) if firstErr == nil { @@ -343,7 +398,7 @@ func (s *levelsController) compactBuildTables( if firstErr != nil { // An error happened. Delete all the newly created table files (by calling DecrRef // -- we're the only holders of a ref). - for j := 0; j < i; j++ { + for j := 0; j < numBuilds; j++ { if newTables[j] != nil { newTables[j].DecrRef() } @@ -446,8 +501,10 @@ func (s *levelsController) fillTables(cd *compactDef) bool { for _, t := range tbls { cd.thisSize = t.Size() cd.thisRange = keyRange{ - left: t.Smallest(), - right: t.Biggest(), + // We pick all the versions of the smallest and the biggest key. + left: y.KeyWithTs(y.ParseKey(t.Smallest()), math.MaxUint64), + // Note that version zero would be the rightmost key. + right: y.KeyWithTs(y.ParseKey(t.Biggest()), 0), } if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) { continue @@ -486,40 +543,8 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) { thisLevel := cd.thisLevel nextLevel := cd.nextLevel - if thisLevel.level >= 1 && len(cd.bot) == 0 { - y.AssertTrue(len(cd.top) == 1) - tbl := cd.top[0] - - // We write to the manifest _before_ we delete files (and after we created files). - changes := []*protos.ManifestChange{ - // The order matters here -- you can't temporarily have two copies of the same - // table id when reloading the manifest. - makeTableDeleteChange(tbl.ID()), - makeTableCreateChange(tbl.ID(), nextLevel.level), - } - if err := s.kv.manifest.addChanges(changes); err != nil { - return err - } - - // We have to add to nextLevel before we remove from thisLevel, not after. This way, we - // don't have a bug where reads would see keys missing from both levels. - - // Note: It's critical that we add tables (replace them) in nextLevel before deleting them - // in thisLevel. (We could finagle it atomically somehow.) Also, when reading we must - // read, or at least acquire s.RLock(), in increasing order by level, so that we don't skip - // a compaction. - - if err := nextLevel.replaceTables(cd.top); err != nil { - return err - } - if err := thisLevel.deleteTables(cd.top); err != nil { - return err - } - - cd.elog.LazyPrintf("\tLOG Compact-Move %d->%d smallest:%s biggest:%s took %v\n", - l, l+1, string(tbl.Smallest()), string(tbl.Biggest()), time.Since(timeStart)) - return nil - } + // Table should never be moved directly between levels, always be rewritten to allow discarding + // invalid versions. newTables, decr, err := s.compactBuildTables(l, cd) if err != nil { @@ -561,7 +586,7 @@ func (s *levelsController) doCompact(p compactionPriority) (bool, error) { y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check. cd := compactDef{ - elog: trace.New("Badger", "Compact"), + elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), thisLevel: s.levels[l], nextLevel: s.levels[l+1], } @@ -704,3 +729,31 @@ func (s *levelsController) appendIterators( } return iters } + +type TableInfo struct { + ID uint64 + Level int + Left []byte + Right []byte +} + +func (s *levelsController) getTableInfo() (result []TableInfo) { + for _, l := range s.levels { + for _, t := range l.tables { + info := TableInfo{ + ID: t.ID(), + Level: l.level, + Left: t.Smallest(), + Right: t.Biggest(), + } + result = append(result, info) + } + } + sort.Slice(result, func(i, j int) bool { + if result[i].Level != result[j].Level { + return result[i].Level < result[j].Level + } + return result[i].ID < result[j].ID + }) + return +}