diff --git a/badger/db.go b/badger/db.go index 549ace452..92406af80 100644 --- a/badger/db.go +++ b/badger/db.go @@ -17,16 +17,18 @@ package badger import ( + "encoding/binary" + "fmt" "path/filepath" "sync" - "github.com/dgraph-io/badger/memtable" + "github.com/dgraph-io/badger/mem" "github.com/dgraph-io/badger/value" "github.com/dgraph-io/badger/y" ) -// DBOptions are params for creating DB object. -type DBOptions struct { +// Options are params for creating DB object. +type Options struct { Dir string NumLevelZeroTables int // Maximum number of Level 0 tables before we start compacting. NumLevelZeroTablesStall int // If we hit this number of Level 0 tables, we will stall until level 0 is compacted away. @@ -37,9 +39,10 @@ type DBOptions struct { LevelSizeMultiplier int ValueThreshold int // If value size >= this threshold, we store offsets in value log. Verbose bool + DoNotCompact bool } -var DefaultDBOptions = DBOptions{ +var DefaultOptions = Options{ Dir: "/tmp", NumLevelZeroTables: 5, NumLevelZeroTablesStall: 10, @@ -50,29 +53,45 @@ var DefaultDBOptions = DBOptions{ LevelSizeMultiplier: 10, ValueThreshold: 20, Verbose: true, + DoNotCompact: false, // Only for testing. } type DB struct { sync.RWMutex // Guards imm, mem. - imm *memtable.Memtable // Immutable, memtable being flushed. - mem *memtable.Memtable - immWg sync.WaitGroup // Nonempty when flushing immutable memtable. - opt DBOptions - lc *levelsController - vlog value.Log + imm *mem.Table // Immutable, memtable being flushed. + mt *mem.Table + immWg sync.WaitGroup // Nonempty when flushing immutable memtable. + opt Options + lc *levelsController + vlog value.Log + voffset uint64 } // NewDB returns a new DB object. Compact levels are created as well. -func NewDB(opt DBOptions) *DB { +func NewDB(opt Options) *DB { y.AssertTrue(len(opt.Dir) > 0) out := &DB{ - mem: memtable.NewMemtable(), + mt: mem.NewTable(), opt: opt, // Make a copy. lc: newLevelsController(opt), } vlogPath := filepath.Join(opt.Dir, "vlog") out.vlog.Open(vlogPath) + + val := out.Get(Head) + var voffset uint64 + if len(val) == 0 { + voffset = 0 + } else { + voffset = binary.BigEndian.Uint64(val) + } + + fn := func(k, v []byte, meta byte) { + out.mt.Put(k, v, meta) + } + out.vlog.Replay(voffset, fn) + return out } @@ -80,10 +99,10 @@ func NewDB(opt DBOptions) *DB { func (s *DB) Close() { } -func (s *DB) getMemTables() (*memtable.Memtable, *memtable.Memtable) { +func (s *DB) getMemTables() (*mem.Table, *mem.Table) { s.RLock() defer s.RUnlock() - return s.mem, s.imm + return s.mt, s.imm } func decodeValue(val []byte, vlog *value.Log) []byte { @@ -132,11 +151,22 @@ func (s *DB) Get(key []byte) []byte { return decodeValue(val, &s.vlog) } +func (s *DB) updateOffset(ptrs []value.Pointer) { + ptr := ptrs[len(ptrs)-1] + + s.Lock() + defer s.Unlock() + if s.voffset < ptr.Offset { + s.voffset = ptr.Offset + uint64(ptr.Len) + } +} + // Write applies a list of value.Entry to our memtable. func (s *DB) Write(entries []value.Entry) error { ptrs, err := s.vlog.Write(entries) y.Check(err) y.AssertTrue(len(ptrs) == len(entries)) + s.updateOffset(ptrs) if err := s.makeRoomForWrite(); err != nil { return err @@ -145,9 +175,9 @@ func (s *DB) Write(entries []value.Entry) error { var offsetBuf [20]byte for i, entry := range entries { if len(entry.Value) < s.opt.ValueThreshold { // Will include deletion / tombstone case. - s.mem.Put(entry.Key, entry.Value, entry.Meta) + s.mt.Put(entry.Key, entry.Value, entry.Meta) } else { - s.mem.Put(entry.Key, ptrs[i].Encode(offsetBuf[:]), entry.Meta|value.BitValuePointer) + s.mt.Put(entry.Key, ptrs[i].Encode(offsetBuf[:]), entry.Meta|value.BitValuePointer) } } return nil @@ -173,22 +203,33 @@ func (s *DB) Delete(key []byte) error { }) } +var ( + Head = []byte("_head_") +) + func (s *DB) makeRoomForWrite() error { - if s.mem.MemUsage() < s.opt.MaxTableSize { + if s.mt.MemUsage() < s.opt.MaxTableSize { // Nothing to do. We have enough space. return nil } s.immWg.Wait() // Make sure we finish flushing immutable memtable. s.Lock() + if s.voffset > 0 { + fmt.Printf("Storing offset: %v\n", s.voffset) + offset := make([]byte, 8) + binary.BigEndian.PutUint64(offset, s.voffset) + s.mt.Put(Head, offset, 0) + } + // Changing imm, mem requires lock. - s.imm = s.mem - s.mem = memtable.NewMemtable() + s.imm = s.mt + s.mt = mem.NewTable() // Note: It is important to start the compaction within this lock. // Otherwise, you might be compacting the wrong imm! s.immWg.Add(1) - go func(imm *memtable.Memtable) { + go func(imm *mem.Table) { defer s.immWg.Done() fileID, f := tempFile(s.opt.Dir) diff --git a/badger/db_test.go b/badger/db_test.go index df4ac921e..574cf7acc 100644 --- a/badger/db_test.go +++ b/badger/db_test.go @@ -17,7 +17,10 @@ package badger import ( + "encoding/binary" "fmt" + "io/ioutil" + "os" "testing" "github.com/stretchr/testify/require" @@ -26,7 +29,7 @@ import ( ) func TestDBWrite(t *testing.T) { - db := NewDB(DefaultDBOptions) + db := NewDB(DefaultOptions) var entries []value.Entry for i := 0; i < 100; i++ { entries = append(entries, value.Entry{ @@ -38,7 +41,7 @@ func TestDBWrite(t *testing.T) { } func TestDBGet(t *testing.T) { - db := NewDB(DefaultDBOptions) + db := NewDB(DefaultOptions) require.NoError(t, db.Put([]byte("key1"), []byte("val1"))) require.EqualValues(t, "val1", db.Get([]byte("key1"))) @@ -59,7 +62,7 @@ func TestDBGet(t *testing.T) { // Put a lot of data to move some data to disk. // WARNING: This test might take a while but it should pass! func TestDBGetMore(t *testing.T) { - db := NewDB(DefaultDBOptions) + db := NewDB(DefaultOptions) // n := 500000 n := 100000 for i := 0; i < n; i++ { @@ -115,7 +118,7 @@ func TestDBGetMore(t *testing.T) { // Put a lot of data to move some data to disk. Then iterate. func TestDBIterateBasic(t *testing.T) { - db := NewDB(DefaultDBOptions) + db := NewDB(DefaultOptions) defer db.Close() // n := 500000 @@ -140,3 +143,58 @@ func TestDBIterateBasic(t *testing.T) { } require.EqualValues(t, n, count) } + +func TestCrash(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.Nil(t, err) + defer os.RemoveAll(dir) + + opt := DefaultOptions + opt.MaxTableSize = 1 << 20 + opt.Dir = dir + opt.DoNotCompact = true + + db := NewDB(opt) + var keys [][]byte + for i := 0; i < 150000; i++ { + k := []byte(fmt.Sprintf("%09d", i)) + keys = append(keys, k) + } + + entries := make([]value.Entry, 0, 10) + for _, k := range keys { + e := value.Entry{ + Key: k, + Value: k, + } + entries = append(entries, e) + + if len(entries) == 100 { + err := db.Write(entries) + require.Nil(t, err) + entries = entries[:0] + } + } + + for _, k := range keys { + require.Equal(t, k, db.Get(k)) + } + + db2 := NewDB(opt) + for _, k := range keys { + require.Equal(t, k, db2.Get(k), "Key: %s", k) + } + + db.lc.tryCompact(1) + db.lc.tryCompact(1) + val := db.Get(Head) + // val := db.lc.levels[1].get(Head) + require.True(t, len(val) > 0) + voffset := binary.BigEndian.Uint64(val) + fmt.Printf("level 1 val: %v\n", voffset) + + db3 := NewDB(opt) + for _, k := range keys { + require.Equal(t, k, db3.Get(k), "Key: %s", k) + } +} diff --git a/badger/levels.go b/badger/levels.go index 602414853..6934b7a23 100644 --- a/badger/levels.go +++ b/badger/levels.go @@ -41,7 +41,7 @@ type levelHandler struct { // The following are initialized once and const. level int maxTotalSize int64 - opt DBOptions + opt Options } type levelsController struct { @@ -52,7 +52,7 @@ type levelsController struct { // The following are initialized once and const. levels []*levelHandler - opt DBOptions + opt Options } var ( @@ -164,14 +164,14 @@ func (s *levelHandler) overlappingTables(begin, end []byte) (int, int) { return left, right } -func newLevelHandler(opt DBOptions, level int) *levelHandler { +func newLevelHandler(opt Options, level int) *levelHandler { return &levelHandler{ level: level, opt: opt, } } -func newLevelsController(opt DBOptions) *levelsController { +func newLevelsController(opt Options) *levelsController { y.AssertTrue(opt.NumLevelZeroTablesStall > opt.NumLevelZeroTables) s := &levelsController{ opt: opt, @@ -190,12 +190,17 @@ func newLevelsController(opt DBOptions) *levelsController { } } for i := 0; i < s.opt.NumCompactWorkers; i++ { - go s.compact(i) + go s.runWorker(i) } return s } -func (s *levelsController) compact(workerID int) { +func (s *levelsController) runWorker(workerID int) { + if s.opt.DoNotCompact { + fmt.Println("NOT running any compactions due to DB options.") + return + } + time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond) timeChan := time.Tick(10 * time.Millisecond) for { diff --git a/badger/levels_test.go b/badger/levels_test.go index 504223b0e..1c3f61a09 100644 --- a/badger/levels_test.go +++ b/badger/levels_test.go @@ -59,7 +59,7 @@ func extractTable(table *tableHandler) [][]string { // TestDoCompact tests the merging logic which is done in internal doCompact function. // We might remove this internal test eventually. func TestDoCompact(t *testing.T) { - c := newLevelsController(DefaultDBOptions) + c := newLevelsController(DefaultOptions) t0 := buildTable(t, [][]string{ {"k2", "z2"}, {"k22", "z22"}, diff --git a/memtable/memtable.go b/mem/memtable.go similarity index 86% rename from memtable/memtable.go rename to mem/memtable.go index 84c98654f..84ba32143 100644 --- a/memtable/memtable.go +++ b/mem/memtable.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package memtable +package mem import ( "fmt" @@ -26,8 +26,8 @@ import ( "github.com/dgraph-io/badger/y" ) -// Memtable is a thin wrapper over Skiplist, at least for now. -type Memtable struct { +// Table is a thin wrapper over Skiplist, at least for now. +type Table struct { table *skl.Skiplist arena *y.Arena } @@ -39,9 +39,9 @@ const ( byteDelete = 1 ) -// NewMemtable creates a new memtable. Input is the user key comparator. -func NewMemtable() *Memtable { - return &Memtable{ +// NewTable creates a new memtable. Input is the user key comparator. +func NewTable() *Table { + return &Table{ arena: new(y.Arena), table: skl.NewSkiplist(), } @@ -50,7 +50,7 @@ func NewMemtable() *Memtable { // Put sets a key-value pair. We don't use onlyIfAbsent now. And we ignore the old value returned // by the skiplist. These can be used later on to support more operations, e.g., GetOrCreate can // be a Put with an empty value with onlyIfAbsent=true. -func (s *Memtable) Put(key, value []byte, meta byte) { +func (s *Table) Put(key, value []byte, meta byte) { data := s.arena.Allocate(len(key) + len(value) + 1) y.AssertTrue(len(key) == copy(data[:len(key)], key)) v := data[len(key):] @@ -60,7 +60,7 @@ func (s *Memtable) Put(key, value []byte, meta byte) { } // WriteLevel0Table flushes memtable. It drops deleteValues. -func (s *Memtable) WriteLevel0Table(f *os.File) error { +func (s *Table) WriteLevel0Table(f *os.File) error { iter := s.NewIterator() b := table.NewTableBuilder() defer b.Close() @@ -79,7 +79,7 @@ type Iterator struct { } // NewIterator returns a memtable iterator. -func (s *Memtable) NewIterator() *Iterator { +func (s *Table) NewIterator() *Iterator { return &Iterator{iter: s.table.NewIterator()} } @@ -106,7 +106,7 @@ func (s *Iterator) KeyValue() ([]byte, []byte) { } // Get looks up a key. This includes the meta byte. -func (s *Memtable) Get(key []byte) []byte { +func (s *Table) Get(key []byte) []byte { v := s.table.Get(key) if v == nil { // This is different from unsafe.Pointer(nil). @@ -116,11 +116,11 @@ func (s *Memtable) Get(key []byte) []byte { } // MemUsage returns an approximate mem usage. -func (s *Memtable) MemUsage() int64 { +func (s *Table) MemUsage() int64 { return int64(s.arena.MemUsage()) } -func (s *Memtable) DebugString() string { +func (s *Table) DebugString() string { it := s.NewIterator() it.SeekToFirst() k1, _ := it.KeyValue() diff --git a/memtable/memtable_test.go b/mem/memtable_test.go similarity index 93% rename from memtable/memtable_test.go rename to mem/memtable_test.go index 153f30b79..e30dcd3bb 100644 --- a/memtable/memtable_test.go +++ b/mem/memtable_test.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package memtable +package mem import ( "fmt" @@ -26,7 +26,7 @@ import ( "github.com/dgraph-io/badger/y" ) -func extract(m *Memtable) ([]string, []string) { +func extract(m *Table) ([]string, []string) { var keys, vals []string it := m.NewIterator() for it.SeekToFirst(); it.Valid(); it.Next() { @@ -42,7 +42,7 @@ func extract(m *Memtable) ([]string, []string) { } func TestBasic(t *testing.T) { - m := NewMemtable() + m := NewTable() require.NotNil(t, m) m.Put([]byte("somekey"), []byte("hohoho"), 0) m.Put([]byte("somekey"), []byte("hahaha"), 0) @@ -63,7 +63,7 @@ func TestBasic(t *testing.T) { } func TestMemUssage(t *testing.T) { - m := NewMemtable() + m := NewTable() for i := 0; i < 10000; i++ { m.Put([]byte(fmt.Sprintf("k%05d", i)), []byte(fmt.Sprintf("v%05d", i)), 0) } @@ -72,7 +72,7 @@ func TestMemUssage(t *testing.T) { } func TestMergeIterator(t *testing.T) { - m := NewMemtable() + m := NewTable() it := m.NewIterator() mergeIt := y.NewMergeIterator([]y.Iterator{it}) require.False(t, mergeIt.Valid()) @@ -80,7 +80,7 @@ func TestMergeIterator(t *testing.T) { // BenchmarkAdd-4 1000000 1289 ns/op func BenchmarkAdd(b *testing.B) { - m := NewMemtable() + m := NewTable() for i := 0; i < b.N; i++ { m.Put([]byte(fmt.Sprintf("k%09d", i)), []byte(fmt.Sprintf("v%09d", i)), 0) } diff --git a/value/value.go b/value/value.go index b2ae6c450..028b2c195 100644 --- a/value/value.go +++ b/value/value.go @@ -17,8 +17,10 @@ package value import ( + "bufio" "bytes" "encoding/binary" + "fmt" "io" "os" "sync" @@ -90,6 +92,50 @@ func (l *Log) Open(fname string) { y.Check(err) } +func (l *Log) Replay(offset uint64, fn func(k, v []byte, meta byte)) { + fmt.Printf("Seeking at offset: %v\n", offset) + + read := func(r *bufio.Reader, buf []byte) error { + for { + n, err := r.Read(buf) + if err != nil { + return err + } + if n == len(buf) { + return nil + } + buf = buf[n:] + } + } + + _, err := l.fd.Seek(int64(offset), 0) + y.Check(err) + reader := bufio.NewReader(l.fd) + + hbuf := make([]byte, 8) + var h header + var count int + for { + if err := read(reader, hbuf); err == io.EOF { + break + } + h.Decode(hbuf) + // fmt.Printf("[%d] Header read: %+v\n", count, h) + + k := make([]byte, h.klen) + v := make([]byte, h.vlen) + + y.Check(read(reader, k)) + meta, err := reader.ReadByte() + y.Check(err) + y.Check(read(reader, v)) + + fn(k, v, meta) + count++ + } + fmt.Printf("Replayed %d KVs\n", count) +} + var bufPool = sync.Pool{ New: func() interface{} { return &bytes.Buffer{}