Skip to content
103 changes: 73 additions & 30 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -103,6 +104,9 @@ func (lf *logFile) sync() error {
}

var errStop = errors.New("Stop iteration")
var entryHashTable = crc32.MakeTable(crc32.Castagnoli)

const entryHashSize = 4

type logEntry func(e Entry, vp valuePointer) error

Expand All @@ -114,34 +118,31 @@ func (f *logFile) iterate(offset uint32, fn logEntry) error {
return y.Wrap(err)
}

read := func(r *bufio.Reader, buf []byte) error {
for {
n, err := r.Read(buf)
if err != nil {
return err
}
if n == len(buf) {
return nil
}
buf = buf[n:]
}
}

reader := bufio.NewReader(f.fd)
var hbuf [headerBufSize]byte
var h header
k := make([]byte, 1<<10)
v := make([]byte, 1<<20)

truncate := false
recordOffset := offset
for {
if err = read(reader, hbuf[:]); err == io.EOF {
break
hash := crc32.New(entryHashTable)
tee := io.TeeReader(reader, hash)

if _, err = io.ReadFull(tee, hbuf[:]); err != nil {
if err == io.EOF {
break
} else if err == io.ErrUnexpectedEOF {
truncate = true
break
}
return err
}

var e Entry
e.offset = recordOffset
_, hlen := h.Decode(hbuf[:])
h.Decode(hbuf[:])
vl := int(h.vlen)
if cap(v) < vl {
v = make([]byte, 2*vl)
Expand All @@ -154,21 +155,43 @@ func (f *logFile) iterate(offset uint32, fn logEntry) error {
e.Key = k[:kl]
e.Value = v[:vl]

if err = read(reader, e.Key); err != nil {
if _, err = io.ReadFull(tee, e.Key); err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
truncate = true
break
}
return err
}
e.Meta = h.meta
e.UserMeta = h.userMeta
e.casCounter = h.casCounter
e.CASCounterCheck = h.casCounterCheck
if err = read(reader, e.Value); err != nil {
if _, err = io.ReadFull(tee, e.Value); err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
truncate = true
break
}
return err
}

var crcBuf [entryHashSize]byte
if _, err = io.ReadFull(reader, crcBuf[:]); err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
truncate = true
break
}
return err
}
crc := binary.BigEndian.Uint32(crcBuf[:])
if crc != hash.Sum32() {
truncate = true
break
}

var vp valuePointer

recordOffset += uint32(hlen + kl + vl)
vp.Len = uint32(len(hbuf)) + h.klen + h.vlen
vp.Len = headerBufSize + h.klen + h.vlen + entryHashSize
recordOffset += vp.Len

vp.Offset = e.offset
vp.Fid = f.fid
Expand All @@ -180,6 +203,13 @@ func (f *logFile) iterate(offset uint32, fn logEntry) error {
return y.Wrap(err)
}
}

if truncate {
if err := f.fd.Truncate(int64(recordOffset)); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -319,10 +349,14 @@ func encodeEntry(e *Entry, buf *bytes.Buffer) (int, error) {
var headerEnc [headerBufSize]byte
h.Encode(headerEnc[:])

buf.Write(headerEnc[:])
buf.Write(e.Key)
buf.Write(e.Value)
return len(headerEnc) + len(e.Key) + len(e.Value), nil
hash := crc32.New(entryHashTable)
w := io.MultiWriter(hash, buf)

w.Write(headerEnc[:])
w.Write(e.Key)
w.Write(e.Value)
binary.Write(buf, binary.BigEndian, hash.Sum32())
return len(headerEnc) + len(e.Key) + len(e.Value) + entryHashSize, nil
}

func (e Entry) print(prefix string) {
Expand Down Expand Up @@ -353,15 +387,14 @@ func (h header) Encode(out []byte) {
binary.BigEndian.PutUint64(out[18:26], h.casCounterCheck)
}

// Decodes h from buf. Returns buf without header and number of bytes read.
func (h *header) Decode(buf []byte) ([]byte, int) {
// Decodes h from buf.
func (h *header) Decode(buf []byte) {
h.klen = binary.BigEndian.Uint32(buf[0:4])
h.vlen = binary.BigEndian.Uint32(buf[4:8])
h.meta = buf[8]
h.userMeta = buf[9]
h.casCounter = binary.BigEndian.Uint64(buf[10:18])
h.casCounterCheck = binary.BigEndian.Uint64(buf[18:26])
return buf[26:], 26
}

type valuePointer struct {
Expand Down Expand Up @@ -679,14 +712,24 @@ func (l *valueLog) Read(p valuePointer, s *y.Slice) (e Entry, err error) {
return e, err
}
var h header
buf, _ = h.Decode(buf)
h.Decode(buf)
n := uint32(headerBufSize)

e.Key = buf[0:h.klen]
e.Key = buf[n : n+h.klen]
n += h.klen
e.Meta = h.meta
e.UserMeta = h.userMeta
e.casCounter = h.casCounter
e.CASCounterCheck = h.casCounterCheck
e.Value = buf[h.klen : h.klen+h.vlen]
e.Value = buf[n : n+h.vlen]
n += h.vlen

storedCRC := binary.BigEndian.Uint32(buf[n:])
calculatedCRC := crc32.Checksum(buf[:n], entryHashTable)
if storedCRC != calculatedCRC {
return e, errors.New("CRC checksum mismatch")
}

return e, nil
}

Expand Down
127 changes: 126 additions & 1 deletion value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"testing"

"github.com/dgraph-io/badger/y"
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestValueBasic(t *testing.T) {

log.write([]*request{b})
require.Len(t, b.Ptrs, 2)
fmt.Printf("Pointer written: %+v %+v", b.Ptrs[0], b.Ptrs[1])
fmt.Printf("Pointer written: %+v %+v\n", b.Ptrs[0], b.Ptrs[1])

e, err := log.Read(b.Ptrs[0], nil)
e2, err := log.Read(b.Ptrs[1], nil)
Expand Down Expand Up @@ -213,6 +214,130 @@ func TestValueGC2(t *testing.T) {
}
}

var (
k1 = []byte("k1")
k2 = []byte("k2")
k3 = []byte("k3")
v1 = []byte("value1")
v2 = []byte("value2")
v3 = []byte("value3")
)

func TestChecksums(t *testing.T) {
dir, err := ioutil.TempDir("", "badger")
require.NoError(t, err)
defer os.RemoveAll(dir)

// Set up SST with K1=V1
kv, err := NewKV(getTestOptions(dir))
require.NoError(t, err)
require.NoError(t, kv.Set(k1, v1, 0))
require.NoError(t, kv.Close())

// Use a vlog with K1=V1 and a (corrupted) K2=V2
buf := createVlog(t, []*Entry{
&Entry{Key: k1, Value: v1},
&Entry{Key: k2, Value: v2},
})
buf[len(buf)-1]++ // Corrupt last byte
require.NoError(t, ioutil.WriteFile(filepath.Join(dir, "000000.vlog"), buf, 0777))

// K1 should exist, but K2 shouldn't.
kv, err = NewKV(getTestOptions(dir))
require.NoError(t, err)
var item KVItem
require.NoError(t, kv.Get(k1, &item))
require.Equal(t, item.Value(), v1)
ok, err := kv.Exists(k2)
require.NoError(t, err)
require.False(t, ok)
// Write K3 at the end of the vlog.
require.NoError(t, kv.Set(k3, v3, 0))
require.NoError(t, kv.Close())

// The vlog should contain K1 and K3 (K2 was lost when Badger started up
// last due to checksum failure).
kv, err = NewKV(getTestOptions(dir))
require.NoError(t, err)
iter := kv.NewIterator(IteratorOptions{FetchValues: true})
iter.Seek(k1)
require.True(t, iter.Valid())
it := iter.Item()
require.Equal(t, it.Key(), k1)
require.Equal(t, it.Value(), v1)
iter.Next()
require.True(t, iter.Valid())
it = iter.Item()
require.Equal(t, it.Key(), k3)
require.Equal(t, it.Value(), v3)
iter.Close()
require.NoError(t, kv.Close())
}

func TestPartialAppendToValueLog(t *testing.T) {
dir, err := ioutil.TempDir("", "badger")
require.NoError(t, err)
defer os.RemoveAll(dir)

// Create skeleton files.
kv, err := NewKV(getTestOptions(dir))
require.NoError(t, err)
require.NoError(t, kv.Close())

// Create truncated vlog to simulate a partial append.
buf := createVlog(t, []*Entry{
&Entry{Key: k1, Value: v1},
&Entry{Key: k2, Value: v2},
})
buf = buf[:len(buf)-6]
require.NoError(t, ioutil.WriteFile(filepath.Join(dir, "000000.vlog"), buf, 0777))

// Badger should now start up, but with only K1.
kv, err = NewKV(getTestOptions(dir))
require.NoError(t, err)
var item KVItem
require.NoError(t, kv.Get(k1, &item))
ok, err := kv.Exists(k2)
require.NoError(t, err)
require.False(t, ok)
require.Equal(t, item.Key(), k1)
require.Equal(t, item.Value(), v1)

// When K3 is set, it should be persisted after a restart.
require.NoError(t, kv.Set(k3, v3, 0))
require.NoError(t, kv.Close())
kv, err = NewKV(getTestOptions(dir))
require.NoError(t, err)
checkKeys(t, kv, [][]byte{k1, k3})
require.NoError(t, kv.Close())
}

func createVlog(t *testing.T, entries []*Entry) []byte {
dir, err := ioutil.TempDir("", "badger")
require.NoError(t, err)
defer os.RemoveAll(dir)

kv, err := NewKV(getTestOptions(dir))
require.NoError(t, err)
require.NoError(t, kv.BatchSet(entries))
require.NoError(t, kv.Close())

filename := filepath.Join(dir, "000000.vlog")
buf, err := ioutil.ReadFile(filename)
require.NoError(t, err)
return buf
}

func checkKeys(t *testing.T, kv *KV, keys [][]byte) {
i := 0
iter := kv.NewIterator(IteratorOptions{})
for iter.Seek(keys[0]); iter.Valid(); iter.Next() {
require.Equal(t, iter.Item().Key(), keys[i])
i++
}
require.Equal(t, i, len(keys))
}

func BenchmarkReadWrite(b *testing.B) {
rwRatio := []float32{
0.1, 0.2, 0.5, 1.0,
Expand Down