diff --git a/db.go b/db.go index 3ac6a2d95..0467e4f85 100644 --- a/db.go +++ b/db.go @@ -758,13 +758,6 @@ var requestPool = sync.Pool{ } func (db *DB) writeToLSM(b *request) error { - // We should check the length of b.Prts and b.Entries only when badger is not - // running in InMemory mode. In InMemory mode, we don't write anything to the - // value log and that's why the length of b.Ptrs will always be zero. - if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) { - return errors.Errorf("Ptrs and Entries don't match: %+v", b) - } - for i, entry := range b.Entries { var err error if entry.skipVlogAndSetThreshold(db.valueThreshold()) { @@ -829,6 +822,7 @@ func (db *DB) writeRequests(reqs []*request) error { } count += len(b.Entries) var i uint64 + var err error for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() { i++ if i%100 == 0 { @@ -945,7 +939,8 @@ func (db *DB) doWrites(lc *z.Closer) { // batchSet applies a list of badger.Entry. If a request level error occurs it // will be returned. -// Check(kv.BatchSet(entries)) +// +// Check(kv.BatchSet(entries)) func (db *DB) batchSet(entries []*Entry) error { req, err := db.sendToWriteCh(entries) if err != nil { @@ -958,9 +953,10 @@ func (db *DB) batchSet(entries []*Entry) error { // batchSetAsync is the asynchronous version of batchSet. It accepts a callback // function which is called when all the sets are complete. If a request level // error occurs, it will be passed back via the callback. -// err := kv.BatchSetAsync(entries, func(err error)) { -// Check(err) -// } +// +// err := kv.BatchSetAsync(entries, func(err error)) { +// Check(err) +// } func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error { req, err := db.sendToWriteCh(entries) if err != nil { @@ -1011,10 +1007,16 @@ func arenaSize(opt Options) int64 { // buildL0Table builds a new table from the memtable. func buildL0Table(ft flushTask, bopts table.Options) *table.Builder { - iter := ft.mt.sl.NewIterator() + var iter y.Iterator + if ft.itr != nil { + iter = ft.itr + } else { + iter = ft.mt.sl.NewUniIterator(false) + } defer iter.Close() + b := table.NewTableBuilder(bopts) - for iter.SeekToFirst(); iter.Valid(); iter.Next() { + for iter.Rewind(); iter.Valid(); iter.Next() { if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) { continue } @@ -1030,16 +1032,13 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder { type flushTask struct { mt *memTable + itr y.Iterator dropPrefixes [][]byte } // handleFlushTask must be run serially. func (db *DB) handleFlushTask(ft flushTask) error { - // There can be a scenario, when empty memtable is flushed. - if ft.mt.sl.Empty() { - return nil - } - + // ft.mt could be nil with ft.itr being the valid field. bopts := buildTableOptions(db) builder := buildL0Table(ft, bopts) defer builder.Close() @@ -1075,11 +1074,48 @@ func (db *DB) handleFlushTask(ft flushTask) error { func (db *DB) flushMemtable(lc *z.Closer) error { defer lc.Done() + var sz int64 + var itrs []y.Iterator + var mts []*memTable + slurp := func() { + for { + select { + case more := <-db.flushChan: + if more.mt == nil { + return + } + sl := more.mt.sl + itrs = append(itrs, sl.NewUniIterator(false)) + mts = append(mts, more.mt) + + sz += sl.MemSize() + if sz > db.opt.MemTableSize { + return + } + default: + return + } + } + } + for ft := range db.flushChan { if ft.mt == nil { // We close db.flushChan now, instead of sending a nil ft.mt. continue } + sz = ft.mt.sl.MemSize() + // Reset of itrs, mts etc. is being done below. + y.AssertTrue(len(itrs) == 0 && len(mts) == 0) + itrs = append(itrs, ft.mt.sl.NewUniIterator(false)) + mts = append(mts, ft.mt) + + // Pick more memtables, so we can really fill up the L0 table. + slurp() + + // db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz) + ft.mt = nil + ft.itr = table.NewMergeIterator(itrs, false) + for { err := db.handleFlushTask(ft) if err == nil { @@ -1090,9 +1126,11 @@ func (db *DB) flushMemtable(lc *z.Closer) error { // which would arrive here would match db.imm[0], because we acquire a // lock over DB when pushing to flushChan. // TODO: This logic is dirty AF. Any change and this could easily break. - y.AssertTrue(ft.mt == db.imm[0]) - db.imm = db.imm[1:] - ft.mt.DecrRef() // Return memory. + for _, mt := range mts { + y.AssertTrue(mt == db.imm[0]) + db.imm = db.imm[1:] + mt.DecrRef() // Return memory. + } db.lock.Unlock() break @@ -1101,6 +1139,8 @@ func (db *DB) flushMemtable(lc *z.Closer) error { db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err) time.Sleep(time.Second) } + // Reset everything. + itrs, mts, sz = itrs[:0], mts[:0], 0 } return nil } @@ -1719,16 +1759,16 @@ func (db *DB) dropAll() (func(), error) { } // DropPrefix would drop all the keys with the provided prefix. It does this in the following way: -// - Stop accepting new writes. -// - Stop memtable flushes before acquiring lock. Because we're acquring lock here -// and memtable flush stalls for lock, which leads to deadlock -// - Flush out all memtables, skipping over keys with the given prefix, Kp. -// - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp -// back after a restart. -// - Stop compaction. -// - Compact L0->L1, skipping over Kp. -// - Compact rest of the levels, Li->Li, picking tables which have Kp. -// - Resume memtable flushes, compactions and writes. +// - Stop accepting new writes. +// - Stop memtable flushes before acquiring lock. Because we're acquring lock here +// and memtable flush stalls for lock, which leads to deadlock +// - Flush out all memtables, skipping over keys with the given prefix, Kp. +// - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp +// back after a restart. +// - Stop compaction. +// - Compact L0->L1, skipping over Kp. +// - Compact rest of the levels, Li->Li, picking tables which have Kp. +// - Resume memtable flushes, compactions and writes. func (db *DB) DropPrefix(prefixes ...[]byte) error { if len(prefixes) == 0 { return nil diff --git a/options.go b/options.go index 40a055fcc..50907107f 100644 --- a/options.go +++ b/options.go @@ -148,7 +148,7 @@ func DefaultOptions(path string) Options { NumCompactors: 4, // Run at least 2 compactors. Zero-th compactor prioritizes L0. NumLevelZeroTables: 5, NumLevelZeroTablesStall: 15, - NumMemtables: 5, + NumMemtables: 15, BloomFalsePositive: 0.01, BlockSize: 4 * 1024, SyncWrites: false,