Skip to content

Commit

Permalink
Surface errors while opening and closing value log.
Browse files Browse the repository at this point in the history
  • Loading branch information
manishrjain committed May 17, 2017
1 parent 2084940 commit 31c0bd2
Showing 1 changed file with 39 additions and 16 deletions.
55 changes: 39 additions & 16 deletions badger/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -36,6 +35,7 @@ import (
"golang.org/x/net/trace"

"github.com/dgraph-io/badger/y"
"github.com/pkg/errors"

"github.com/bkaradzic/go-lz4"
)
Expand All @@ -62,14 +62,19 @@ type logFile struct {
}

// openReadOnly assumes that we have a write lock on logFile.
func (lf *logFile) openReadOnly() {
func (lf *logFile) openReadOnly() error {
var err error
lf.fd, err = os.OpenFile(lf.path, os.O_RDONLY, 0666)
y.Check(err)
if err != nil {
return errors.Wrapf(err, "Unable to open %q as RDONLY.", lf.path)
}

fi, err := lf.fd.Stat()
y.Check(err)
if err != nil {
return errors.Wrapf(err, "Unable to check stat for %q", lf.path)
}
lf.size = uint32(fi.Size())
return nil
}

func (lf *logFile) read(buf []byte, offset int64) error {
Expand Down Expand Up @@ -442,9 +447,11 @@ func (l *valueLog) fpath(fid uint16) string {
return fmt.Sprintf("%s/%06d.vlog", l.dirPath, fid)
}

func (l *valueLog) openOrCreateFiles() {
func (l *valueLog) openOrCreateFiles() error {
files, err := ioutil.ReadDir(l.dirPath)
y.Check(err)
if err != nil {
return errors.Wrapf(err, "Error while opening value log")
}

found := make(map[int]struct{})
for _, file := range files {
Expand All @@ -453,9 +460,11 @@ func (l *valueLog) openOrCreateFiles() {
}
fsz := len(file.Name())
fid, err := strconv.Atoi(file.Name()[:fsz-5])
y.Check(err)
if err != nil {
return errors.Wrapf(err, "Error while parsing value log id for file: %q", file.Name())
}
if _, ok := found[fid]; ok {
y.Fatalf("Found the same file twice: %d", fid)
return errors.Errorf("Found the same value log file twice: %d", fid)
}
found[fid] = struct{}{}

Expand All @@ -473,38 +482,52 @@ func (l *valueLog) openOrCreateFiles() {
lf := l.files[i]
if i == len(l.files)-1 {
lf.fd, err = y.OpenSyncedFile(l.fpath(lf.fid), l.opt.SyncWrites)
y.Check(err)
if err != nil {
return errors.Wrapf(err, "Unable to open value log file as RDWR")
}
l.maxFid = uint32(lf.fid)

} else {
lf.openReadOnly()
if err := lf.openReadOnly(); err != nil {
return err
}
}
}

// If no files are found, then create a new file.
if len(l.files) == 0 {
lf := &logFile{fid: 0, path: l.fpath(0)}
lf.fd, err = y.OpenSyncedFile(l.fpath(lf.fid), l.opt.SyncWrites)
y.Check(err)
if err != nil {
return errors.Wrapf(err, "Unable to open value log file as RDWR")
}
l.files = append(l.files, lf)
}
return nil
}

func (l *valueLog) Open(kv *KV, opt *Options) {
func (l *valueLog) Open(kv *KV, opt *Options) error {
l.dirPath = opt.Dir
l.openOrCreateFiles()
if err := l.openOrCreateFiles(); err != nil {
return errors.Wrapf(err, "Unable to open value log")
}
l.opt = *opt
l.kv = kv

l.elog = trace.NewEventLog("Badger", "Valuelog")
return nil
}

func (l *valueLog) Close() {
func (l *valueLog) Close() error {
l.elog.Printf("Stopping garbage collection of values.")
defer l.elog.Finish()

for _, f := range l.files {
y.Check(f.fd.Close())
if err := f.fd.Close(); err != nil {
return err
}
}
l.elog.Finish()
return nil
}

// Replay replays the value log. The kv provided is only valid for the lifetime of function call.
Expand Down

0 comments on commit 31c0bd2

Please sign in to comment.