diff --git a/db.go b/db.go index d4f1e06ae..d124ab8de 100644 --- a/db.go +++ b/db.go @@ -755,13 +755,6 @@ var requestPool = sync.Pool{ } func (db *DB) writeToLSM(b *request) error { - // We should check the length of b.Prts and b.Entries only when badger is not - // running in InMemory mode. In InMemory mode, we don't write anything to the - // value log and that's why the length of b.Ptrs will always be zero. - if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) { - return errors.Errorf("Ptrs and Entries don't match: %+v", b) - } - for i, entry := range b.Entries { var err error if entry.skipVlogAndSetThreshold(db.valueThreshold()) { @@ -826,6 +819,7 @@ func (db *DB) writeRequests(reqs []*request) error { } count += len(b.Entries) var i uint64 + var err error for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() { i++ if i%100 == 0 { @@ -1010,10 +1004,16 @@ func arenaSize(opt Options) int64 { // buildL0Table builds a new table from the memtable. func buildL0Table(ft flushTask, bopts table.Options) *table.Builder { - iter := ft.mt.sl.NewIterator() + var iter y.Iterator + if ft.itr != nil { + iter = ft.itr + } else { + iter = ft.mt.sl.NewUniIterator(false) + } defer iter.Close() + b := table.NewTableBuilder(bopts) - for iter.SeekToFirst(); iter.Valid(); iter.Next() { + for iter.Rewind(); iter.Valid(); iter.Next() { if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) { continue } @@ -1029,16 +1029,13 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder { type flushTask struct { mt *memTable + itr y.Iterator dropPrefixes [][]byte } // handleFlushTask must be run serially. func (db *DB) handleFlushTask(ft flushTask) error { - // There can be a scenario, when empty memtable is flushed. - if ft.mt.sl.Empty() { - return nil - } - + // ft.mt could be nil with ft.itr being the valid field. bopts := buildTableOptions(db) builder := buildL0Table(ft, bopts) defer builder.Close() @@ -1074,11 +1071,51 @@ func (db *DB) handleFlushTask(ft flushTask) error { func (db *DB) flushMemtable(lc *z.Closer) error { defer lc.Done() + var sz int64 + var itrs []y.Iterator + var mts []*memTable + slurp := func() { + for { + select { + case more, ok := <-db.flushChan: + if !ok { + return + } + if more.mt == nil { + continue + } + sl := more.mt.sl + itrs = append(itrs, sl.NewUniIterator(false)) + mts = append(mts, more.mt) + + sz += sl.MemSize() + if sz > db.opt.MemTableSize { + return + } + default: + return + } + } + } + for ft := range db.flushChan { if ft.mt == nil { // We close db.flushChan now, instead of sending a nil ft.mt. continue } + sz = ft.mt.sl.MemSize() + // Reset of itrs, mts etc. is being done below. + y.AssertTrue(len(itrs) == 0 && len(mts) == 0) + itrs = append(itrs, ft.mt.sl.NewUniIterator(false)) + mts = append(mts, ft.mt) + + // Pick more memtables, so we can really fill up the L0 table. + slurp() + + // db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz) + ft.mt = nil + ft.itr = table.NewMergeIterator(itrs, false) + for { err := db.handleFlushTask(ft) if err == nil { @@ -1089,9 +1126,11 @@ func (db *DB) flushMemtable(lc *z.Closer) error { // which would arrive here would match db.imm[0], because we acquire a // lock over DB when pushing to flushChan. // TODO: This logic is dirty AF. Any change and this could easily break. - y.AssertTrue(ft.mt == db.imm[0]) - db.imm = db.imm[1:] - ft.mt.DecrRef() // Return memory. + for _, mt := range mts { + y.AssertTrue(mt == db.imm[0]) + db.imm = db.imm[1:] + mt.DecrRef() // Return memory. + } db.lock.Unlock() break @@ -1100,6 +1139,8 @@ func (db *DB) flushMemtable(lc *z.Closer) error { db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err) time.Sleep(time.Second) } + // Reset everything. + itrs, mts, sz = itrs[:0], mts[:0], 0 } return nil }