From 7d5120c8f6536befdcc46681c07e146d4a21f014 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 12 Apr 2017 12:42:45 +1000 Subject: [PATCH 1/5] Various small tweaks --- badger/db.go | 63 +++++++++--------- badger/table_handler.go | 138 ++++++++++++++++++++-------------------- memtable/memtable.go | 4 +- table/iterator.go | 2 - value/value.go | 17 +++-- 5 files changed, 111 insertions(+), 113 deletions(-) diff --git a/badger/db.go b/badger/db.go index 679758cad..55236153f 100644 --- a/badger/db.go +++ b/badger/db.go @@ -28,7 +28,6 @@ import ( // DBOptions are params for creating DB object. type DBOptions struct { Dir string - WriteBufferSize int // Memtable size. NumLevelZeroTables int // Maximum number of Level 0 tables before we start compacting. NumLevelZeroTablesStall int // If we hit this number of Level 0 tables, we will stall until level 0 is compacted away. LevelOneSize int64 // Maximum total size for Level 1. @@ -42,14 +41,13 @@ type DBOptions struct { var DefaultDBOptions = DBOptions{ Dir: "/tmp", - WriteBufferSize: 1 << 20, // Size of each memtable. NumLevelZeroTables: 5, NumLevelZeroTablesStall: 10, - LevelOneSize: 11 << 20, + LevelOneSize: 256 << 20, MaxLevels: 7, - NumCompactWorkers: 3, - MaxTableSize: 2 << 20, - LevelSizeMultiplier: 5, + NumCompactWorkers: 3, // MRJN: Can we increase these? + MaxTableSize: 64 << 20, + LevelSizeMultiplier: 10, ValueThreshold: 20, Verbose: true, } @@ -82,16 +80,36 @@ func NewDB(opt DBOptions) *DB { func (s *DB) Close() { } -func (s *DB) getMemImm() (*memtable.Memtable, *memtable.Memtable) { +func (s *DB) getMemTables() (*memtable.Memtable, *memtable.Memtable) { s.RLock() defer s.RUnlock() return s.mem, s.imm } +func decodeValue(val []byte, vlog *value.Log) []byte { + if (val[0] & value.BitDelete) != 0 { + // Tombstone encountered. + return nil + } + if (val[0] & value.BitValuePointer) == 0 { + return val[1:] + } + + var vp value.Pointer + vp.Decode(val[1:]) + entry, err := vlog.Read(vp) + y.Checkf(err, "Unable to read from value log: %+v", vp) + + if (entry.Value[0] & value.BitDelete) == 0 { // Not tombstone. + return entry.Value + } + return []byte{} +} + // getValueHelper returns the value in memtable or disk for given key. // Note that value will include meta byte. -func (s *DB) getValueHelper(key []byte) []byte { - mem, imm := s.getMemImm() // Lock should be released. +func (s *DB) get(key []byte) []byte { + mem, imm := s.getMemTables() // Lock should be released. if mem != nil { if v := mem.Get(key); v != nil { return v @@ -105,28 +123,9 @@ func (s *DB) getValueHelper(key []byte) []byte { return s.lc.get(key) } -func decodeValue(val []byte, vlog *value.Log) []byte { - if (val[0] & value.BitDelete) != 0 { - // Tombstone encountered. - return nil - } - if (val[0] & value.BitValueOffset) == 0 { - return val[1:] - } - var vp value.Pointer - vp.Decode(val[1:]) - var out []byte - vlog.Read(vp, func(entry value.Entry) { - if (entry.Value[0] & value.BitDelete) == 0 { // Not tombstone. - out = entry.Value - } - }) - return out -} - // Get looks for key and returns value. If not found, return nil. func (s *DB) Get(key []byte) []byte { - val := s.getValueHelper(key) + val := s.get(key) if val == nil { return nil } @@ -148,7 +147,7 @@ func (s *DB) Write(entries []value.Entry) error { if len(entry.Value) < s.opt.ValueThreshold { // Will include deletion / tombstone case. s.mem.Put(entry.Key, entry.Value, entry.Meta) } else { - s.mem.Put(entry.Key, ptrs[i].Encode(offsetBuf[:]), entry.Meta|value.BitValueOffset) + s.mem.Put(entry.Key, ptrs[i].Encode(offsetBuf[:]), entry.Meta|value.BitValuePointer) } } return nil @@ -175,7 +174,7 @@ func (s *DB) Delete(key []byte) error { } func (s *DB) makeRoomForWrite() error { - if s.mem.MemUsage() < s.opt.WriteBufferSize { + if s.mem.MemUsage() < s.opt.MaxTableSize { // Nothing to do. We have enough space. return nil } @@ -244,7 +243,7 @@ func (s *DB) NewIterator() y.Iterator { // Imagine you add level0 first, then add imm. In between, the initial imm might be moved into // level0, and be completely missed. On the other hand, if you add imm first and it got moved // to level 0, you would just have that data appear twice which is fine. - mem, imm := s.getMemImm() + mem, imm := s.getMemTables() var iters []y.Iterator if mem != nil { iters = append(iters, mem.NewIterator()) diff --git a/badger/table_handler.go b/badger/table_handler.go index afaab503d..dcd8b1b2c 100644 --- a/badger/table_handler.go +++ b/badger/table_handler.go @@ -1,31 +1,31 @@ package badger import ( - "fmt" - "math/rand" - "os" - "path/filepath" - "sync/atomic" - - "github.com/dgraph-io/badger/table" - "github.com/dgraph-io/badger/y" + "fmt" + "math/rand" + "os" + "path/filepath" + "sync/atomic" + + "github.com/dgraph-io/badger/table" + "github.com/dgraph-io/badger/y" ) type tableHandler struct { - ref int32 // For file garbage collection. + ref int32 // For file garbage collection. - // The following are initialized once and const. - smallest, biggest []byte // Smallest and largest keys. - fd *os.File // Owns fd. - table *table.Table // Does not own fd. - id uint64 + // The following are initialized once and const. + smallest, biggest []byte // Smallest and largest keys. + fd *os.File // Owns fd. + table *table.Table // Does not own fd. + id uint64 } // tableIterator is a thin wrapper around table.TableIterator. // For example, it does reference counting. type tableIterator struct { - table *tableHandler - it y.Iterator // From the actual table. + table *tableHandler + it y.Iterator // From the actual table. } func (s *tableIterator) Next() { s.it.Next() } @@ -36,75 +36,77 @@ func (s *tableIterator) Valid() bool { return s.it.Valid() } func (s *tableIterator) Name() string { return "TableHandlerIterator" } func (s *tableIterator) Close() { - s.it.Close() - s.table.decrRef() // Important. + s.it.Close() + s.table.decrRef() // Important. } func (s *tableHandler) newIterator() y.Iterator { - s.incrRef() // Important. - return &tableIterator{ - table: s, - it: s.table.NewIterator(), - } + s.incrRef() // Important. + return &tableIterator{ + table: s, + it: s.table.NewIterator(), + } } func (s *tableHandler) incrRef() { - atomic.AddInt32(&s.ref, 1) + atomic.AddInt32(&s.ref, 1) } func (s *tableHandler) decrRef() { - newRef := atomic.AddInt32(&s.ref, -1) - if newRef == 0 { - filename := s.fd.Name() - y.Check(s.fd.Close()) - os.Remove(filename) - } + newRef := atomic.AddInt32(&s.ref, -1) + if newRef == 0 { + // We can safely delete this file, because for all the current files, we always have + // at least one reference pointing to them. + filename := s.fd.Name() + y.Check(s.fd.Close()) + os.Remove(filename) + } } func (s *tableHandler) size() int64 { return s.table.Size() } // newTableHandler returns a new table given file. Please remember to decrRef. func newTableHandler(id uint64, f *os.File) (*tableHandler, error) { - t, err := table.OpenTable(f) - if err != nil { - return nil, err - } - out := &tableHandler{ - id: id, - fd: f, - table: t, - ref: 1, // Caller is given one reference. - } - - it := t.NewIterator() - it.SeekToFirst() - y.AssertTrue(it.Valid()) - out.smallest, _ = it.KeyValue() - - it2 := t.NewIterator() // For now, safer to use a different iterator. - it2.SeekToLast() - y.AssertTrue(it2.Valid()) - out.biggest, _ = it2.KeyValue() - - // Make sure we did populate smallest and biggest. - y.AssertTrue(len(out.smallest) > 0) // We do not allow empty keys... - y.AssertTrue(len(out.biggest) > 0) - // It is possible that smallest=biggest. In that case, table has only one element. - return out, nil + t, err := table.OpenTable(f) + if err != nil { + return nil, err + } + out := &tableHandler{ + id: id, + fd: f, + table: t, + ref: 1, // Caller is given one reference. + } + + it := t.NewIterator() + it.SeekToFirst() + y.AssertTrue(it.Valid()) + out.smallest, _ = it.KeyValue() + + it2 := t.NewIterator() // For now, safer to use a different iterator. + it2.SeekToLast() + y.AssertTrue(it2.Valid()) + out.biggest, _ = it2.KeyValue() + + // Make sure we did populate smallest and biggest. + y.AssertTrue(len(out.smallest) > 0) // We do not allow empty keys... + y.AssertTrue(len(out.biggest) > 0) + // It is possible that smallest=biggest. In that case, table has only one element. + return out, nil } // tempFile returns a unique filename and the uint64. func tempFile(dir string) (uint64, *os.File) { - var id uint64 - for { - id = rand.Uint64() - name := fmt.Sprintf("table_%016x", id) - filename := filepath.Join(dir, name) - if _, err := os.Stat(filename); os.IsNotExist(err) { - // File does not exist. - fd, err := os.Create(filename) - y.Check(err) - return id, fd - } - } + var id uint64 + for { + id = rand.Uint64() + name := fmt.Sprintf("table_%016x", id) + filename := filepath.Join(dir, name) + if _, err := os.Stat(filename); os.IsNotExist(err) { + // File does not exist. + fd, err := os.Create(filename) + y.Check(err) + return id, fd + } + } } diff --git a/memtable/memtable.go b/memtable/memtable.go index f3668b11b..81698b243 100644 --- a/memtable/memtable.go +++ b/memtable/memtable.go @@ -116,8 +116,8 @@ func (s *Memtable) Get(key []byte) []byte { } // MemUsage returns an approximate mem usage. -func (s *Memtable) MemUsage() int { - return s.arena.MemUsage() +func (s *Memtable) MemUsage() int64 { + return int64(s.arena.MemUsage()) } func (s *Memtable) DebugString() string { diff --git a/table/iterator.go b/table/iterator.go index 85f07a150..5a04f690f 100644 --- a/table/iterator.go +++ b/table/iterator.go @@ -18,8 +18,6 @@ package table import ( "bytes" - // "errors" - // "fmt" "io" "math" "sort" diff --git a/value/value.go b/value/value.go index e035689d3..fdc62185a 100644 --- a/value/value.go +++ b/value/value.go @@ -32,8 +32,8 @@ import ( // Values have their first byte being byteData or byteDelete. This helps us distinguish between // a key that has never been seen and a key that has been explicitly deleted. const ( - BitDelete = 1 - BitValueOffset = 2 + BitDelete = 1 // Set if the key has been deleted. + BitValuePointer = 2 // Set if the value is NOT stored directly next to key. ) type Log struct { @@ -70,11 +70,11 @@ type Pointer struct { Offset uint64 } -// Encode encodes Pointer into byte buffer. We don't return because this can avoid mem allocation. +// Encode encodes Pointer into byte buffer. func (p Pointer) Encode(b []byte) []byte { y.AssertTrue(len(b) >= 12) binary.BigEndian.PutUint32(b[:4], p.Len) - binary.BigEndian.PutUint64(b[4:12], uint64(p.Offset)) // Might want to use uint64 for Offset. + binary.BigEndian.PutUint64(b[4:12], uint64(p.Offset)) return b[:12] } @@ -140,11 +140,11 @@ func (l *Log) Write(entries []Entry) ([]Pointer, error) { } // Read reads the value log at a given location. -func (l *Log) Read(p Pointer, fn func(Entry)) error { - var e Entry +func (l *Log) Read(p Pointer) (e Entry, err error) { + // func (l *Log) Read(p Pointer, fn func(Entry)) error { buf := make([]byte, p.Len) if _, err := l.fd.ReadAt(buf, int64(p.Offset)); err != nil { - return err + return e, err } var h header buf = h.Decode(buf) @@ -152,6 +152,5 @@ func (l *Log) Read(p Pointer, fn func(Entry)) error { e.Meta = buf[h.klen] buf = buf[h.klen+1:] e.Value = buf[0:h.vlen] - fn(e) - return nil + return e, nil } From 2fc19f42fd0ae16c48baa34f93209a6eed5997aa Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 12 Apr 2017 16:11:44 +1000 Subject: [PATCH 2/5] Some more minor tweaks --- badger/db.go | 5 +++-- badger/table_handler.go | 1 + memtable/memtable.go | 4 ++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/badger/db.go b/badger/db.go index 55236153f..549ace452 100644 --- a/badger/db.go +++ b/badger/db.go @@ -45,7 +45,7 @@ var DefaultDBOptions = DBOptions{ NumLevelZeroTablesStall: 10, LevelOneSize: 256 << 20, MaxLevels: 7, - NumCompactWorkers: 3, // MRJN: Can we increase these? + NumCompactWorkers: 3, // Max possible = num levels / 2. MaxTableSize: 64 << 20, LevelSizeMultiplier: 10, ValueThreshold: 20, @@ -186,10 +186,11 @@ func (s *DB) makeRoomForWrite() error { s.mem = memtable.NewMemtable() // Note: It is important to start the compaction within this lock. - // Otherwise, you might be compactng the wrong imm! + // Otherwise, you might be compacting the wrong imm! s.immWg.Add(1) go func(imm *memtable.Memtable) { defer s.immWg.Done() + fileID, f := tempFile(s.opt.Dir) y.Check(imm.WriteLevel0Table(f)) tbl, err := newTableHandler(fileID, f) diff --git a/badger/table_handler.go b/badger/table_handler.go index dcd8b1b2c..38f63b27d 100644 --- a/badger/table_handler.go +++ b/badger/table_handler.go @@ -83,6 +83,7 @@ func newTableHandler(id uint64, f *os.File) (*tableHandler, error) { y.AssertTrue(it.Valid()) out.smallest, _ = it.KeyValue() + // TODO: We shouldn't need to create another iterator. it2 := t.NewIterator() // For now, safer to use a different iterator. it2.SeekToLast() y.AssertTrue(it2.Valid()) diff --git a/memtable/memtable.go b/memtable/memtable.go index 81698b243..84c98654f 100644 --- a/memtable/memtable.go +++ b/memtable/memtable.go @@ -69,8 +69,8 @@ func (s *Memtable) WriteLevel0Table(f *os.File) error { return err } } - f.Write(b.Finish()) - return nil + _, err := f.Write(b.Finish()) + return err } // Iterator is an iterator over memtable. From a16272e9e6f3dcc1cd648bf1863bfbd906cf9576 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 12 Apr 2017 16:15:35 +1000 Subject: [PATCH 3/5] Remove older func def --- value/value.go | 1 - 1 file changed, 1 deletion(-) diff --git a/value/value.go b/value/value.go index fdc62185a..b2ae6c450 100644 --- a/value/value.go +++ b/value/value.go @@ -141,7 +141,6 @@ func (l *Log) Write(entries []Entry) ([]Pointer, error) { // Read reads the value log at a given location. func (l *Log) Read(p Pointer) (e Entry, err error) { - // func (l *Log) Read(p Pointer, fn func(Entry)) error { buf := make([]byte, p.Len) if _, err := l.fd.ReadAt(buf, int64(p.Offset)); err != nil { return e, err From d679c17ecb527541e62957b564b0c75cbffa3f81 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 12 Apr 2017 19:26:01 +1000 Subject: [PATCH 4/5] Store and replay value log offset. --- badger/db.go | 81 ++++++++++++++++++++++-------- badger/db_test.go | 65 ++++++++++++++++++++++-- badger/levels.go | 17 ++++--- badger/levels_test.go | 2 +- {memtable => mem}/memtable.go | 24 ++++----- {memtable => mem}/memtable_test.go | 12 ++--- value/value.go | 46 +++++++++++++++++ 7 files changed, 198 insertions(+), 49 deletions(-) rename {memtable => mem}/memtable.go (86%) rename {memtable => mem}/memtable_test.go (93%) diff --git a/badger/db.go b/badger/db.go index 549ace452..92406af80 100644 --- a/badger/db.go +++ b/badger/db.go @@ -17,16 +17,18 @@ package badger import ( + "encoding/binary" + "fmt" "path/filepath" "sync" - "github.com/dgraph-io/badger/memtable" + "github.com/dgraph-io/badger/mem" "github.com/dgraph-io/badger/value" "github.com/dgraph-io/badger/y" ) -// DBOptions are params for creating DB object. -type DBOptions struct { +// Options are params for creating DB object. +type Options struct { Dir string NumLevelZeroTables int // Maximum number of Level 0 tables before we start compacting. NumLevelZeroTablesStall int // If we hit this number of Level 0 tables, we will stall until level 0 is compacted away. @@ -37,9 +39,10 @@ type DBOptions struct { LevelSizeMultiplier int ValueThreshold int // If value size >= this threshold, we store offsets in value log. Verbose bool + DoNotCompact bool } -var DefaultDBOptions = DBOptions{ +var DefaultOptions = Options{ Dir: "/tmp", NumLevelZeroTables: 5, NumLevelZeroTablesStall: 10, @@ -50,29 +53,45 @@ var DefaultDBOptions = DBOptions{ LevelSizeMultiplier: 10, ValueThreshold: 20, Verbose: true, + DoNotCompact: false, // Only for testing. } type DB struct { sync.RWMutex // Guards imm, mem. - imm *memtable.Memtable // Immutable, memtable being flushed. - mem *memtable.Memtable - immWg sync.WaitGroup // Nonempty when flushing immutable memtable. - opt DBOptions - lc *levelsController - vlog value.Log + imm *mem.Table // Immutable, memtable being flushed. + mt *mem.Table + immWg sync.WaitGroup // Nonempty when flushing immutable memtable. + opt Options + lc *levelsController + vlog value.Log + voffset uint64 } // NewDB returns a new DB object. Compact levels are created as well. -func NewDB(opt DBOptions) *DB { +func NewDB(opt Options) *DB { y.AssertTrue(len(opt.Dir) > 0) out := &DB{ - mem: memtable.NewMemtable(), + mt: mem.NewTable(), opt: opt, // Make a copy. lc: newLevelsController(opt), } vlogPath := filepath.Join(opt.Dir, "vlog") out.vlog.Open(vlogPath) + + val := out.Get(Head) + var voffset uint64 + if len(val) == 0 { + voffset = 0 + } else { + voffset = binary.BigEndian.Uint64(val) + } + + fn := func(k, v []byte, meta byte) { + out.mt.Put(k, v, meta) + } + out.vlog.Replay(voffset, fn) + return out } @@ -80,10 +99,10 @@ func NewDB(opt DBOptions) *DB { func (s *DB) Close() { } -func (s *DB) getMemTables() (*memtable.Memtable, *memtable.Memtable) { +func (s *DB) getMemTables() (*mem.Table, *mem.Table) { s.RLock() defer s.RUnlock() - return s.mem, s.imm + return s.mt, s.imm } func decodeValue(val []byte, vlog *value.Log) []byte { @@ -132,11 +151,22 @@ func (s *DB) Get(key []byte) []byte { return decodeValue(val, &s.vlog) } +func (s *DB) updateOffset(ptrs []value.Pointer) { + ptr := ptrs[len(ptrs)-1] + + s.Lock() + defer s.Unlock() + if s.voffset < ptr.Offset { + s.voffset = ptr.Offset + uint64(ptr.Len) + } +} + // Write applies a list of value.Entry to our memtable. func (s *DB) Write(entries []value.Entry) error { ptrs, err := s.vlog.Write(entries) y.Check(err) y.AssertTrue(len(ptrs) == len(entries)) + s.updateOffset(ptrs) if err := s.makeRoomForWrite(); err != nil { return err @@ -145,9 +175,9 @@ func (s *DB) Write(entries []value.Entry) error { var offsetBuf [20]byte for i, entry := range entries { if len(entry.Value) < s.opt.ValueThreshold { // Will include deletion / tombstone case. - s.mem.Put(entry.Key, entry.Value, entry.Meta) + s.mt.Put(entry.Key, entry.Value, entry.Meta) } else { - s.mem.Put(entry.Key, ptrs[i].Encode(offsetBuf[:]), entry.Meta|value.BitValuePointer) + s.mt.Put(entry.Key, ptrs[i].Encode(offsetBuf[:]), entry.Meta|value.BitValuePointer) } } return nil @@ -173,22 +203,33 @@ func (s *DB) Delete(key []byte) error { }) } +var ( + Head = []byte("_head_") +) + func (s *DB) makeRoomForWrite() error { - if s.mem.MemUsage() < s.opt.MaxTableSize { + if s.mt.MemUsage() < s.opt.MaxTableSize { // Nothing to do. We have enough space. return nil } s.immWg.Wait() // Make sure we finish flushing immutable memtable. s.Lock() + if s.voffset > 0 { + fmt.Printf("Storing offset: %v\n", s.voffset) + offset := make([]byte, 8) + binary.BigEndian.PutUint64(offset, s.voffset) + s.mt.Put(Head, offset, 0) + } + // Changing imm, mem requires lock. - s.imm = s.mem - s.mem = memtable.NewMemtable() + s.imm = s.mt + s.mt = mem.NewTable() // Note: It is important to start the compaction within this lock. // Otherwise, you might be compacting the wrong imm! s.immWg.Add(1) - go func(imm *memtable.Memtable) { + go func(imm *mem.Table) { defer s.immWg.Done() fileID, f := tempFile(s.opt.Dir) diff --git a/badger/db_test.go b/badger/db_test.go index df4ac921e..030cf5891 100644 --- a/badger/db_test.go +++ b/badger/db_test.go @@ -17,7 +17,10 @@ package badger import ( + "encoding/binary" "fmt" + "io/ioutil" + "os" "testing" "github.com/stretchr/testify/require" @@ -26,7 +29,7 @@ import ( ) func TestDBWrite(t *testing.T) { - db := NewDB(DefaultDBOptions) + db := NewDB(DefaultOptions) var entries []value.Entry for i := 0; i < 100; i++ { entries = append(entries, value.Entry{ @@ -38,7 +41,7 @@ func TestDBWrite(t *testing.T) { } func TestDBGet(t *testing.T) { - db := NewDB(DefaultDBOptions) + db := NewDB(DefaultOptions) require.NoError(t, db.Put([]byte("key1"), []byte("val1"))) require.EqualValues(t, "val1", db.Get([]byte("key1"))) @@ -59,7 +62,7 @@ func TestDBGet(t *testing.T) { // Put a lot of data to move some data to disk. // WARNING: This test might take a while but it should pass! func TestDBGetMore(t *testing.T) { - db := NewDB(DefaultDBOptions) + db := NewDB(DefaultOptions) // n := 500000 n := 100000 for i := 0; i < n; i++ { @@ -115,7 +118,7 @@ func TestDBGetMore(t *testing.T) { // Put a lot of data to move some data to disk. Then iterate. func TestDBIterateBasic(t *testing.T) { - db := NewDB(DefaultDBOptions) + db := NewDB(DefaultOptions) defer db.Close() // n := 500000 @@ -140,3 +143,57 @@ func TestDBIterateBasic(t *testing.T) { } require.EqualValues(t, n, count) } + +func TestCrash(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.Nil(t, err) + defer os.RemoveAll(dir) + + opt := DefaultOptions + opt.MaxTableSize = 1 << 20 + opt.Dir = dir + opt.DoNotCompact = true + + db := NewDB(opt) + var keys [][]byte + for i := 0; i < 150000; i++ { + k := []byte(fmt.Sprintf("%09d", i)) + keys = append(keys, k) + } + + entries := make([]value.Entry, 0, 10) + for _, k := range keys { + e := value.Entry{ + Key: k, + Value: k, + } + entries = append(entries, e) + + if len(entries) == 10 { + err := db.Write(entries) + require.Nil(t, err) + entries = entries[:0] + } + } + + for _, k := range keys { + require.Equal(t, k, db.Get(k)) + } + + db2 := NewDB(opt) + for _, k := range keys { + require.Equal(t, k, db2.Get(k), "Key: %s", k) + } + + db.lc.tryCompact(1) + db.lc.tryCompact(1) + val := db.lc.levels[1].get(Head) + require.True(t, len(val) > 0) + voffset := binary.BigEndian.Uint64(val) + fmt.Printf("level 1 val: %v\n", voffset) + + db3 := NewDB(opt) + for _, k := range keys { + require.Equal(t, k, db3.Get(k), "Key: %s", k) + } +} diff --git a/badger/levels.go b/badger/levels.go index 602414853..6934b7a23 100644 --- a/badger/levels.go +++ b/badger/levels.go @@ -41,7 +41,7 @@ type levelHandler struct { // The following are initialized once and const. level int maxTotalSize int64 - opt DBOptions + opt Options } type levelsController struct { @@ -52,7 +52,7 @@ type levelsController struct { // The following are initialized once and const. levels []*levelHandler - opt DBOptions + opt Options } var ( @@ -164,14 +164,14 @@ func (s *levelHandler) overlappingTables(begin, end []byte) (int, int) { return left, right } -func newLevelHandler(opt DBOptions, level int) *levelHandler { +func newLevelHandler(opt Options, level int) *levelHandler { return &levelHandler{ level: level, opt: opt, } } -func newLevelsController(opt DBOptions) *levelsController { +func newLevelsController(opt Options) *levelsController { y.AssertTrue(opt.NumLevelZeroTablesStall > opt.NumLevelZeroTables) s := &levelsController{ opt: opt, @@ -190,12 +190,17 @@ func newLevelsController(opt DBOptions) *levelsController { } } for i := 0; i < s.opt.NumCompactWorkers; i++ { - go s.compact(i) + go s.runWorker(i) } return s } -func (s *levelsController) compact(workerID int) { +func (s *levelsController) runWorker(workerID int) { + if s.opt.DoNotCompact { + fmt.Println("NOT running any compactions due to DB options.") + return + } + time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond) timeChan := time.Tick(10 * time.Millisecond) for { diff --git a/badger/levels_test.go b/badger/levels_test.go index 504223b0e..1c3f61a09 100644 --- a/badger/levels_test.go +++ b/badger/levels_test.go @@ -59,7 +59,7 @@ func extractTable(table *tableHandler) [][]string { // TestDoCompact tests the merging logic which is done in internal doCompact function. // We might remove this internal test eventually. func TestDoCompact(t *testing.T) { - c := newLevelsController(DefaultDBOptions) + c := newLevelsController(DefaultOptions) t0 := buildTable(t, [][]string{ {"k2", "z2"}, {"k22", "z22"}, diff --git a/memtable/memtable.go b/mem/memtable.go similarity index 86% rename from memtable/memtable.go rename to mem/memtable.go index 84c98654f..84ba32143 100644 --- a/memtable/memtable.go +++ b/mem/memtable.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package memtable +package mem import ( "fmt" @@ -26,8 +26,8 @@ import ( "github.com/dgraph-io/badger/y" ) -// Memtable is a thin wrapper over Skiplist, at least for now. -type Memtable struct { +// Table is a thin wrapper over Skiplist, at least for now. +type Table struct { table *skl.Skiplist arena *y.Arena } @@ -39,9 +39,9 @@ const ( byteDelete = 1 ) -// NewMemtable creates a new memtable. Input is the user key comparator. -func NewMemtable() *Memtable { - return &Memtable{ +// NewTable creates a new memtable. Input is the user key comparator. +func NewTable() *Table { + return &Table{ arena: new(y.Arena), table: skl.NewSkiplist(), } @@ -50,7 +50,7 @@ func NewMemtable() *Memtable { // Put sets a key-value pair. We don't use onlyIfAbsent now. And we ignore the old value returned // by the skiplist. These can be used later on to support more operations, e.g., GetOrCreate can // be a Put with an empty value with onlyIfAbsent=true. -func (s *Memtable) Put(key, value []byte, meta byte) { +func (s *Table) Put(key, value []byte, meta byte) { data := s.arena.Allocate(len(key) + len(value) + 1) y.AssertTrue(len(key) == copy(data[:len(key)], key)) v := data[len(key):] @@ -60,7 +60,7 @@ func (s *Memtable) Put(key, value []byte, meta byte) { } // WriteLevel0Table flushes memtable. It drops deleteValues. -func (s *Memtable) WriteLevel0Table(f *os.File) error { +func (s *Table) WriteLevel0Table(f *os.File) error { iter := s.NewIterator() b := table.NewTableBuilder() defer b.Close() @@ -79,7 +79,7 @@ type Iterator struct { } // NewIterator returns a memtable iterator. -func (s *Memtable) NewIterator() *Iterator { +func (s *Table) NewIterator() *Iterator { return &Iterator{iter: s.table.NewIterator()} } @@ -106,7 +106,7 @@ func (s *Iterator) KeyValue() ([]byte, []byte) { } // Get looks up a key. This includes the meta byte. -func (s *Memtable) Get(key []byte) []byte { +func (s *Table) Get(key []byte) []byte { v := s.table.Get(key) if v == nil { // This is different from unsafe.Pointer(nil). @@ -116,11 +116,11 @@ func (s *Memtable) Get(key []byte) []byte { } // MemUsage returns an approximate mem usage. -func (s *Memtable) MemUsage() int64 { +func (s *Table) MemUsage() int64 { return int64(s.arena.MemUsage()) } -func (s *Memtable) DebugString() string { +func (s *Table) DebugString() string { it := s.NewIterator() it.SeekToFirst() k1, _ := it.KeyValue() diff --git a/memtable/memtable_test.go b/mem/memtable_test.go similarity index 93% rename from memtable/memtable_test.go rename to mem/memtable_test.go index 153f30b79..e30dcd3bb 100644 --- a/memtable/memtable_test.go +++ b/mem/memtable_test.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package memtable +package mem import ( "fmt" @@ -26,7 +26,7 @@ import ( "github.com/dgraph-io/badger/y" ) -func extract(m *Memtable) ([]string, []string) { +func extract(m *Table) ([]string, []string) { var keys, vals []string it := m.NewIterator() for it.SeekToFirst(); it.Valid(); it.Next() { @@ -42,7 +42,7 @@ func extract(m *Memtable) ([]string, []string) { } func TestBasic(t *testing.T) { - m := NewMemtable() + m := NewTable() require.NotNil(t, m) m.Put([]byte("somekey"), []byte("hohoho"), 0) m.Put([]byte("somekey"), []byte("hahaha"), 0) @@ -63,7 +63,7 @@ func TestBasic(t *testing.T) { } func TestMemUssage(t *testing.T) { - m := NewMemtable() + m := NewTable() for i := 0; i < 10000; i++ { m.Put([]byte(fmt.Sprintf("k%05d", i)), []byte(fmt.Sprintf("v%05d", i)), 0) } @@ -72,7 +72,7 @@ func TestMemUssage(t *testing.T) { } func TestMergeIterator(t *testing.T) { - m := NewMemtable() + m := NewTable() it := m.NewIterator() mergeIt := y.NewMergeIterator([]y.Iterator{it}) require.False(t, mergeIt.Valid()) @@ -80,7 +80,7 @@ func TestMergeIterator(t *testing.T) { // BenchmarkAdd-4 1000000 1289 ns/op func BenchmarkAdd(b *testing.B) { - m := NewMemtable() + m := NewTable() for i := 0; i < b.N; i++ { m.Put([]byte(fmt.Sprintf("k%09d", i)), []byte(fmt.Sprintf("v%09d", i)), 0) } diff --git a/value/value.go b/value/value.go index b2ae6c450..028b2c195 100644 --- a/value/value.go +++ b/value/value.go @@ -17,8 +17,10 @@ package value import ( + "bufio" "bytes" "encoding/binary" + "fmt" "io" "os" "sync" @@ -90,6 +92,50 @@ func (l *Log) Open(fname string) { y.Check(err) } +func (l *Log) Replay(offset uint64, fn func(k, v []byte, meta byte)) { + fmt.Printf("Seeking at offset: %v\n", offset) + + read := func(r *bufio.Reader, buf []byte) error { + for { + n, err := r.Read(buf) + if err != nil { + return err + } + if n == len(buf) { + return nil + } + buf = buf[n:] + } + } + + _, err := l.fd.Seek(int64(offset), 0) + y.Check(err) + reader := bufio.NewReader(l.fd) + + hbuf := make([]byte, 8) + var h header + var count int + for { + if err := read(reader, hbuf); err == io.EOF { + break + } + h.Decode(hbuf) + // fmt.Printf("[%d] Header read: %+v\n", count, h) + + k := make([]byte, h.klen) + v := make([]byte, h.vlen) + + y.Check(read(reader, k)) + meta, err := reader.ReadByte() + y.Check(err) + y.Check(read(reader, v)) + + fn(k, v, meta) + count++ + } + fmt.Printf("Replayed %d KVs\n", count) +} + var bufPool = sync.Pool{ New: func() interface{} { return &bytes.Buffer{} From 096297ee29bdd6f50ceb6e8045a1b7e1ce9571f6 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 12 Apr 2017 19:33:04 +1000 Subject: [PATCH 5/5] Do db.Get instead of picking up value from levels. --- badger/db_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/badger/db_test.go b/badger/db_test.go index 030cf5891..574cf7acc 100644 --- a/badger/db_test.go +++ b/badger/db_test.go @@ -169,7 +169,7 @@ func TestCrash(t *testing.T) { } entries = append(entries, e) - if len(entries) == 10 { + if len(entries) == 100 { err := db.Write(entries) require.Nil(t, err) entries = entries[:0] @@ -187,7 +187,8 @@ func TestCrash(t *testing.T) { db.lc.tryCompact(1) db.lc.tryCompact(1) - val := db.lc.levels[1].get(Head) + val := db.Get(Head) + // val := db.lc.levels[1].get(Head) require.True(t, len(val) > 0) voffset := binary.BigEndian.Uint64(val) fmt.Printf("level 1 val: %v\n", voffset)