Skip to content

Commit

Permalink
Discard key versions during compaction
Browse files Browse the repository at this point in the history
- In response to dgraph-io#464 .
- If a key version has deletion marker or is expired, we can safely drop
    all the older versions of that key.
- If there's no overlap from lower levels, we can even drop the first such version.
- To avoid an edge case bug, we need to ensure that all versions of the
    same key are contained by the same SSTable. This is reflected by not
    closing a table as long as more versions are found. And by picking key
    ranges to include all versions of the keys. Both these measures ensure
    that all versions at the same level get compacted together.
- To avoid another edge case bug, we need to ensure that we don't drop
    versions above the current `readTs`. Note that managed DB would
    therefore not result in any version being discarded. Handling that is an
    open problem.

Badger Info:
- Print out the key ranges for each SSTable via `badger info`.
- Open an available port when running `badger` tool, so one can view the
    `/debug/request` and `/debug/events` links to understand what's going on
    behind the scenes.
  • Loading branch information
manishrjain committed May 1, 2018
1 parent c3ac2ff commit e597fb7
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 68 deletions.
29 changes: 29 additions & 0 deletions cmd/badger/cmd/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/table"
"github.com/dgraph-io/badger/y"
humanize "github.com/dustin/go-humanize"
"github.com/spf13/cobra"
)
Expand All @@ -46,6 +47,11 @@ to the Dgraph team.
fmt.Println("Error:", err.Error())
os.Exit(1)
}
err = tableInfo(sstDir, vlogDir)
if err != nil {
fmt.Println("Error:", err.Error())
os.Exit(1)
}
},
}

Expand All @@ -61,6 +67,29 @@ func dur(src, dst time.Time) string {
return humanize.RelTime(dst, src, "earlier", "later")
}

func tableInfo(dir, valueDir string) error {
// Open DB
opts := badger.DefaultOptions
opts.Dir = sstDir
opts.ValueDir = vlogDir
opts.ReadOnly = true

db, err := badger.Open(opts)
if err != nil {
return err
}
defer db.Close()

tables := db.Tables()
for _, t := range tables {
lk, lv := y.ParseKey(t.Left), y.ParseTs(t.Left)
rk, rv := y.ParseKey(t.Right), y.ParseTs(t.Right)
fmt.Printf("SSTable [L%d, %03d] [%20X, v%-10d -> %20X, v%-10d]\n",
t.Level, t.ID, lk, lv, rk, rv)
}
return nil
}

func printInfo(dir, valueDir string) error {
if dir == "" {
return fmt.Errorf("--dir not supplied")
Expand Down
17 changes: 16 additions & 1 deletion cmd/badger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,23 @@

package main

import "github.com/dgraph-io/badger/cmd/badger/cmd"
import (
"fmt"
"net/http"

"github.com/dgraph-io/badger/cmd/badger/cmd"
)

func main() {
go func() {
for i := 8080; i < 9080; i++ {
fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i)
if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil {
fmt.Println("Port busy. Trying another one...")
continue

}
}
}()
cmd.Execute()
}
8 changes: 6 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"log"
"math"
"sync"

"golang.org/x/net/trace"
Expand All @@ -37,7 +38,7 @@ type keyRange struct {
var infRange = keyRange{inf: true}

func (r keyRange) String() string {
return fmt.Sprintf("[left=%q, right=%q, inf=%v]", r.left, r.right, r.inf)
return fmt.Sprintf("[left=%x, right=%x, inf=%v]", r.left, r.right, r.inf)
}

func (r keyRange) equals(dst keyRange) bool {
Expand Down Expand Up @@ -75,7 +76,10 @@ func getKeyRange(tables []*table.Table) keyRange {
biggest = tables[i].Biggest()
}
}
return keyRange{left: smallest, right: biggest}
return keyRange{
left: y.KeyWithTs(y.ParseKey(smallest), math.MaxUint64),
right: y.KeyWithTs(y.ParseKey(biggest), 0),
}
}

type levelCompactStatus struct {
Expand Down
4 changes: 4 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,10 @@ func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) {
return seq, err
}

func (db *DB) Tables() []TableInfo {
return db.lc.getTableInfo()
}

// MergeOperator represents a Badger merge operator.
type MergeOperator struct {
sync.RWMutex
Expand Down
183 changes: 118 additions & 65 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package badger

import (
"fmt"
"math"
"math/rand"
"os"
"sort"
Expand Down Expand Up @@ -262,6 +263,24 @@ func (s *levelsController) compactBuildTables(
topTables := cd.top
botTables := cd.bot

var hasOverlap bool
{
kr := getKeyRange(cd.top)
for i, lh := range s.levels {
if i <= l { // Skip upper levels.
continue
}
lh.RLock()
left, right := lh.overlappingTables(levelHandlerRLocked{}, kr)
lh.RUnlock()
if right-left > 0 {
hasOverlap = true
break
}
}
cd.elog.LazyPrintf("Key range overlaps with lower levels: %v", hasOverlap)
}

// Create iterators across all the tables involved first.
var iters []y.Iterator
if l == 0 {
Expand All @@ -278,54 +297,90 @@ func (s *levelsController) compactBuildTables(

it.Rewind()

readTs := s.kv.orc.readTs()
// Start generating new tables.
type newTableResult struct {
table *table.Table
err error
}
resultCh := make(chan newTableResult)
var i int
for ; it.Valid(); i++ {
var numBuilds int
var lastKey, skipKey []byte
for it.Valid() {
timeStart := time.Now()
builder := table.NewTableBuilder()
var numKeys, numSkips uint64
for ; it.Valid(); it.Next() {
if builder.ReachedCapacity(s.kv.opt.MaxTableSize) {
break
if !y.SameKey(it.Key(), lastKey) {
if builder.ReachedCapacity(s.kv.opt.MaxTableSize) {
// Only break if we are on a different key, and have reached capacity. We want
// to ensure that all versions of the key are stored in the same sstable, and
// not divided across multiple tables at the same level.
break
}
lastKey = y.SafeCopy(lastKey, it.Key())
}
if len(skipKey) > 0 {
if y.SameKey(it.Key(), skipKey) {
numSkips++
continue
} else {
skipKey = skipKey[:0]
}
}

vs := it.Value()
version := y.ParseTs(it.Key())
if version < readTs && isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
// If this version of the key is deleted or expired, skip all the rest of the
// versions. Ensure that we're only removing versions below readTs.
skipKey = y.SafeCopy(skipKey, it.Key())

if !hasOverlap {
// If no overlap, we can skip all the versions, by continuing here.
numSkips++
continue // Skip adding this key.
} else {
// If this key range has overlap with lower levels, then keep the deletion
// marker with the latest version, discarding the rest. This logic here
// would not continue, but has set the skipKey for the future iterations.
}
}
numKeys++
y.Check(builder.Add(it.Key(), it.Value()))
}
// It was true that it.Valid() at least once in the loop above, which means we
// called Add() at least once, and builder is not Empty().
y.AssertTrue(!builder.Empty())

cd.elog.LazyPrintf("LOG Compact. Iteration to generate one table took: %v\n", time.Since(timeStart))

fileID := s.reserveFileID()
go func(builder *table.Builder) {
defer builder.Close()

fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
if err != nil {
resultCh <- newTableResult{nil, errors.Wrapf(err, "While opening new table: %d", fileID)}
return
}
cd.elog.LazyPrintf("Added %d keys. Skipped %d keys.", numKeys, numSkips)
cd.elog.LazyPrintf("LOG Compact. Iteration took: %v\n", time.Since(timeStart))
if !builder.Empty() {
numBuilds++
fileID := s.reserveFileID()
go func(builder *table.Builder) {
defer builder.Close()

fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
if err != nil {
resultCh <- newTableResult{nil, errors.Wrapf(err, "While opening new table: %d", fileID)}
return
}

if _, err := fd.Write(builder.Finish()); err != nil {
resultCh <- newTableResult{nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)}
return
}
if _, err := fd.Write(builder.Finish()); err != nil {
resultCh <- newTableResult{nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)}
return
}

tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode)
// decrRef is added below.
resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())}
}(builder)
tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode)
// decrRef is added below.
resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())}
}(builder)
}
}

newTables := make([]*table.Table, 0, 20)

// Wait for all table builders to finish.
var firstErr error
for x := 0; x < i; x++ {
for x := 0; x < numBuilds; x++ {
res := <-resultCh
newTables = append(newTables, res.table)
if firstErr == nil {
Expand All @@ -343,7 +398,7 @@ func (s *levelsController) compactBuildTables(
if firstErr != nil {
// An error happened. Delete all the newly created table files (by calling DecrRef
// -- we're the only holders of a ref).
for j := 0; j < i; j++ {
for j := 0; j < numBuilds; j++ {
if newTables[j] != nil {
newTables[j].DecrRef()
}
Expand Down Expand Up @@ -446,8 +501,10 @@ func (s *levelsController) fillTables(cd *compactDef) bool {
for _, t := range tbls {
cd.thisSize = t.Size()
cd.thisRange = keyRange{
left: t.Smallest(),
right: t.Biggest(),
// We pick all the versions of the smallest and the biggest key.
left: y.KeyWithTs(y.ParseKey(t.Smallest()), math.MaxUint64),
// Note that version zero would be the rightmost key.
right: y.KeyWithTs(y.ParseKey(t.Biggest()), 0),
}
if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
continue
Expand Down Expand Up @@ -486,40 +543,8 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) {
thisLevel := cd.thisLevel
nextLevel := cd.nextLevel

if thisLevel.level >= 1 && len(cd.bot) == 0 {
y.AssertTrue(len(cd.top) == 1)
tbl := cd.top[0]

// We write to the manifest _before_ we delete files (and after we created files).
changes := []*protos.ManifestChange{
// The order matters here -- you can't temporarily have two copies of the same
// table id when reloading the manifest.
makeTableDeleteChange(tbl.ID()),
makeTableCreateChange(tbl.ID(), nextLevel.level),
}
if err := s.kv.manifest.addChanges(changes); err != nil {
return err
}

// We have to add to nextLevel before we remove from thisLevel, not after. This way, we
// don't have a bug where reads would see keys missing from both levels.

// Note: It's critical that we add tables (replace them) in nextLevel before deleting them
// in thisLevel. (We could finagle it atomically somehow.) Also, when reading we must
// read, or at least acquire s.RLock(), in increasing order by level, so that we don't skip
// a compaction.

if err := nextLevel.replaceTables(cd.top); err != nil {
return err
}
if err := thisLevel.deleteTables(cd.top); err != nil {
return err
}

cd.elog.LazyPrintf("\tLOG Compact-Move %d->%d smallest:%s biggest:%s took %v\n",
l, l+1, string(tbl.Smallest()), string(tbl.Biggest()), time.Since(timeStart))
return nil
}
// Table should never be moved directly between levels, always be rewritten to allow discarding
// invalid versions.

newTables, decr, err := s.compactBuildTables(l, cd)
if err != nil {
Expand Down Expand Up @@ -561,7 +586,7 @@ func (s *levelsController) doCompact(p compactionPriority) (bool, error) {
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.

cd := compactDef{
elog: trace.New("Badger", "Compact"),
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
thisLevel: s.levels[l],
nextLevel: s.levels[l+1],
}
Expand Down Expand Up @@ -704,3 +729,31 @@ func (s *levelsController) appendIterators(
}
return iters
}

type TableInfo struct {
ID uint64
Level int
Left []byte
Right []byte
}

func (s *levelsController) getTableInfo() (result []TableInfo) {
for _, l := range s.levels {
for _, t := range l.tables {
info := TableInfo{
ID: t.ID(),
Level: l.level,
Left: t.Smallest(),
Right: t.Biggest(),
}
result = append(result, info)
}
}
sort.Slice(result, func(i, j int) bool {
if result[i].Level != result[j].Level {
return result[i].Level < result[j].Level
}
return result[i].ID < result[j].ID
})
return
}

0 comments on commit e597fb7

Please sign in to comment.