Skip to content

Use WAL instead of Vlog for crash recovery. #1535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,35 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {

var txn []txnEntry
var lastCommit uint64
var vbuf bytes.Buffer
vlog := &db.vlog
vlogWriter := &logWriter{
ft: vlogFile,
lf: vlog.vlog.getCurrentFile(),
buf: &vbuf,
lw: &vlog.vlog,
vlog: vlog,
}

toVlog := func(e Entry) (*valuePointer, error) {
vlogWriter.lf = vlog.vlog.getCurrentFile()
y.AssertTrue(vlogWriter.lf != nil)
// write the the vlog if needed.
var p valuePointer
if err := vlogWriter.writeToLogFile(e, nil, &p); err != nil {
return nil, err
}
vlog.vlog.numEntriesWritten++
// Todo(Naman): Do we need to flush to disk now? This is done to have wal entries in same
// file in write()
if vlog.vlog.offset()+uint32(vbuf.Len()) > uint32(vlog.opt.ValueLogFileSize) ||
vlog.vlog.numEntriesWritten > uint32(vlog.opt.ValueLogMaxEntries) {
if err := vlogWriter.flushWrites(); err != nil {
return nil, err
}
}
return &p, nil
}

toLSM := func(nk []byte, vs y.ValueStruct) {
for err := db.ensureRoomForWrite(); err != nil; err = db.ensureRoomForWrite() {
Expand Down Expand Up @@ -137,7 +166,12 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
nv = make([]byte, len(e.Value))
copy(nv, e.Value)
} else {
nv = vp.Encode()
// Write to vlog and get the value pointer to vlog
vlogP, err := toVlog(e)
if err != nil {
return errors.Wrapf(err, "Error while getting value pointer to vlog during replay")
}
nv = vlogP.Encode()
meta = meta | bitValuePointer
}
// Update vhead. If the crash happens while replay was in progess
Expand All @@ -164,6 +198,9 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
for _, t := range txn {
toLSM(t.nk, t.v)
}
if err := vlogWriter.flushWrites(); err != nil {
return err
}
txn = txn[:0]
lastCommit = 0

Expand Down Expand Up @@ -861,7 +898,8 @@ func (db *DB) writeRequests(reqs []*request) error {
return errors.Wrap(err, "writeRequests")
}
db.Lock()
db.updateHead(b.Ptrs)
// Update the head pointer for wal. b.Ptrs is the value pointer to vlog.
db.updateHead([]valuePointer{b.head})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment.

db.Unlock()
}
done(nil)
Expand Down Expand Up @@ -1296,29 +1334,36 @@ func (db *DB) RunValueLogGC(discardRatio float64) error {
return ErrInvalidRequest
}

// startLevel is the level from which we should search for the head key. When badger is running
// with KeepL0InMemory flag, all tables on L0 are kept in memory. This means we should pick head
// key from Level 1 onwards because if we pick the headkey from Level 0 we might end up losing
// data. See test TestL0GCBug.
// Pick a log file and run GC
return db.vlog.runGC(discardRatio)
}

func (db *DB) getPersistedHead() (*valuePointer, error) {
// startLevel is the level from which we should search for the head key.
// When badger is running with KeepL0InMemory flag, all tables on L0 are
// kept in memory. This means we should pick head
// key from Level 1 onwards because if we pick the headkey from Level 0 we
// might end up losing data. See test TestL0GCBug.
startLevel := 0
if db.opt.KeepL0InMemory {
startLevel = 1
}
// Find head on disk

// Need to pass key with timestamp. LSM get removes the last 8 bytes and
// compares the key.
headKey := y.KeyWithTs(head, math.MaxUint64)
// Need to pass with timestamp, lsm get removes the last 8 bytes and compares key

// Find the head on disk.
val, err := db.lc.get(headKey, nil, startLevel)
if err != nil {
return errors.Wrap(err, "Retrieving head from on-disk LSM")
return nil, errors.Wrap(err, "Retrieving head from on-disk LSM")
}

var head valuePointer
if len(val.Value) > 0 {
head.Decode(val.Value)
}

// Pick a log file and run GC
return db.vlog.runGC(discardRatio, head)
return &head, nil
}

// Size returns the size of lsm and value log files in bytes. It can be used to decide how often to
Expand Down
7 changes: 4 additions & 3 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestTruncateVlogWithClose(t *testing.T) {

// Close the DB.
require.NoError(t, db.Close())
require.NoError(t, os.Truncate(path.Join(dir, "000000.vlog"), 4090))
require.NoError(t, os.Truncate(path.Join(dir, "000000.wal"), 4090))

// Reopen and write some new data.
db, err = Open(opt)
Expand Down Expand Up @@ -697,6 +697,7 @@ func TestL0GCBug(t *testing.T) {
// Step 3 - Re-open the same badger. We should be able to read all the data
// inserted in the first step.
func TestWindowsDataLoss(t *testing.T) {
// TODO(ibrahim): Do we need this test?
if runtime.GOOS != "windows" {
t.Skip("The test is only for Windows.")
}
Expand Down Expand Up @@ -742,8 +743,8 @@ func TestWindowsDataLoss(t *testing.T) {
}
// Don't use vlog.Close here. We don't want to fix the file size. Only un-mmap
// the data so that we can truncate the file durning the next vlog.Open.
require.NoError(t, y.Munmap(db.vlog.filesMap[db.vlog.maxFid].fmap))
for _, f := range db.vlog.filesMap {
require.NoError(t, y.Munmap(db.vlog.vlog.filesMap[db.vlog.vlog.maxFid].fmap))
for _, f := range db.vlog.vlog.filesMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call vlog as logManager

require.NoError(t, f.fd.Close())
}
require.NoError(t, db.registry.Close())
Expand Down
29 changes: 25 additions & 4 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dgraph-io/badger/v2/options"
Expand Down Expand Up @@ -110,6 +112,20 @@ func txnDelete(t *testing.T, kv *DB, key []byte) {
require.NoError(t, txn.Commit())
}

func getSuffixedFiles(dir string, suffix string) ([]os.FileInfo, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, errFile(err, dir, "Unable to open dir.")
}
var sfiles []os.FileInfo
for _, file := range files {
if strings.HasSuffix(file.Name(), suffix) {
sfiles = append(sfiles, file)
}
}
return sfiles, nil
}

// Opens a badger db and runs a a test on it.
func runBadgerTest(t *testing.T, opts *Options, test func(t *testing.T, db *DB)) {
dir, err := ioutil.TempDir("", "badger-test")
Expand Down Expand Up @@ -1683,7 +1699,7 @@ func TestReadOnly(t *testing.T) {
return
}
require.Contains(t, err.Error(), "Another process is using this Badger database")
db.Close()
require.NoError(t, db.Close())

// Open one read-only
opts.ReadOnly = true
Expand Down Expand Up @@ -1766,9 +1782,14 @@ func TestLSMOnly(t *testing.T) {

db, err = Open(opts)
require.NoError(t, err)

defer db.Close()
require.NoError(t, db.RunValueLogGC(0.2))
vp, err := db.getPersistedHead()
db.Close()
require.NoError(t, err)
assert.LessOrEqual(t, db.vlog.wal.sortedFids()[0], vp.Fid)
// There should be no vlog files as value size is less than ValueThreshold.
vlogFiles, err := getSuffixedFiles(dir, vlogSuffix)
require.NoError(t, err)
require.Equal(t, 0, len(vlogFiles))
}

// This test function is doing some intricate sorcery.
Expand Down
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (opt Options) WithReadOnly(val bool) Options {

// WithTruncate returns a new Options value with Truncate set to the given value.
//
// Truncate indicates whether value log files should be truncated to delete corrupt data, if any.
// Truncate indicates whether wal files should be truncated to delete corrupt data, if any.
// This option is ignored when ReadOnly is true.
//
// The default value of Truncate is false.
Expand Down
Loading