Skip to content

Commit 0b8eb4c

Browse files
author
Ibrahim Jarif
authored
fix(compaction): Use separate compactors for L0, L1 (#1466)
Related to #1459 This PR contains the following changes to compactions - Use a separate thread for compacting Level 0 and 1 and a separate one for other levels - Pick levels to compact based on score. - Stall Level 0 if compactions cannot keep up (we had added this in #1186) - Limit the number of open table builders to 5 in compactions.
1 parent f671746 commit 0b8eb4c

File tree

7 files changed

+124
-122
lines changed

7 files changed

+124
-122
lines changed

badger/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func main() {
2929
go func() {
3030
for i := 8080; i < 9080; i++ {
3131
fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i)
32-
if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil {
32+
if err := http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", i), nil); err != nil {
3333
fmt.Println("Port busy. Trying another one...")
3434
continue
3535

db.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,12 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
192192

193193
// Open returns a new DB object.
194194
func Open(opt Options) (db *DB, err error) {
195+
// It's okay to have zero compactors which will disable all compactions but
196+
// we cannot have just one compactor otherwise we will end up with all data
197+
// one level 2.
198+
if opt.NumCompactors == 1 {
199+
return nil, errors.New("Cannot have 1 compactor. Need at least 2")
200+
}
195201
if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") {
196202
return nil, errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set")
197203
}
@@ -534,7 +540,7 @@ func (db *DB) close() (err error) {
534540
// Force Compact L0
535541
// We don't need to care about cstatus since no parallel compaction is running.
536542
if db.opt.CompactL0OnClose {
537-
err := db.lc.doCompact(compactionPriority{level: 0, score: 1.73})
543+
err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
538544
switch err {
539545
case errFillTables:
540546
// This error only means that there might be enough tables to do a compaction. So, we
@@ -1461,7 +1467,7 @@ func (db *DB) Flatten(workers int) error {
14611467
errCh := make(chan error, 1)
14621468
for i := 0; i < workers; i++ {
14631469
go func() {
1464-
errCh <- db.lc.doCompact(cp)
1470+
errCh <- db.lc.doCompact(175, cp)
14651471
}()
14661472
}
14671473
var success int

level_handler.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,8 @@ func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool {
188188
// Need lock as we may be deleting the first table during a level 0 compaction.
189189
s.Lock()
190190
defer s.Unlock()
191-
// Return false only if L0 is in memory and number of tables is more than number of
192-
// ZeroTableStall. For on disk L0, we should just add the tables to the level.
193-
if s.db.opt.KeepL0InMemory && len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
191+
// Stall (by returning false) if we are above the specified stall setting for L0.
192+
if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
194193
return false
195194
}
196195

levels.go

Lines changed: 69 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
306306
// function in logs, and forces a compaction.
307307
dropPrefixes: prefixes,
308308
}
309-
if err := s.doCompact(cp); err != nil {
309+
if err := s.doCompact(174, cp); err != nil {
310310
opt.Warningf("While compacting level 0: %v", err)
311311
return nil
312312
}
@@ -366,11 +366,13 @@ func (s *levelsController) startCompact(lc *y.Closer) {
366366
n := s.kv.opt.NumCompactors
367367
lc.AddRunning(n - 1)
368368
for i := 0; i < n; i++ {
369-
go s.runWorker(lc)
369+
// The worker with id=0 is dedicated to L0 and L1. This is not counted
370+
// towards the user specified NumCompactors.
371+
go s.runCompactor(i, lc)
370372
}
371373
}
372374

373-
func (s *levelsController) runWorker(lc *y.Closer) {
375+
func (s *levelsController) runCompactor(id int, lc *y.Closer) {
374376
defer lc.Done()
375377

376378
randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
@@ -381,7 +383,7 @@ func (s *levelsController) runWorker(lc *y.Closer) {
381383
return
382384
}
383385

384-
ticker := time.NewTicker(time.Second)
386+
ticker := time.NewTicker(100 * time.Millisecond)
385387
defer ticker.Stop()
386388

387389
for {
@@ -391,7 +393,15 @@ func (s *levelsController) runWorker(lc *y.Closer) {
391393
prios := s.pickCompactLevels()
392394
loop:
393395
for _, p := range prios {
394-
err := s.doCompact(p)
396+
if id == 0 && p.level > 1 {
397+
// If I'm ID zero, I only compact L0 and L1.
398+
continue
399+
}
400+
if id != 0 && p.level <= 1 {
401+
// If I'm ID non-zero, I do NOT compact L0 and L1.
402+
continue
403+
}
404+
err := s.doCompact(id, p)
395405
switch err {
396406
case nil:
397407
break loop
@@ -453,10 +463,11 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) {
453463
prios = append(prios, pri)
454464
}
455465
}
456-
// We used to sort compaction priorities based on the score. But, we
457-
// decided to compact based on the level, not the priority. So, upper
458-
// levels (level 0, level 1, etc) always get compacted first, before the
459-
// lower levels -- this allows us to avoid stalls.
466+
// We should continue to sort the compaction priorities by score. Now that we have a dedicated
467+
// compactor for L0 and L1, we don't need to sort by level here.
468+
sort.Slice(prios, func(i, j int) bool {
469+
return prios[i].score > prios[j].score
470+
})
460471
return prios
461472
}
462473

@@ -541,15 +552,13 @@ nextTable:
541552
// that would affect the snapshot view guarantee provided by transactions.
542553
discardTs := s.kv.orc.discardAtOrBelow()
543554

544-
// Start generating new tables.
545-
type newTableResult struct {
546-
table *table.Table
547-
err error
548-
}
549-
resultCh := make(chan newTableResult)
550555
var numBuilds, numVersions int
551556
var lastKey, skipKey []byte
552557
var vp valuePointer
558+
var newTables []*table.Table
559+
mu := new(sync.Mutex) // Guards newTables
560+
561+
inflightBuilders := y.NewThrottle(5)
553562
for it.Valid() {
554563
timeStart := time.Now()
555564
dk, err := s.kv.registry.latestDataKey()
@@ -646,19 +655,6 @@ nextTable:
646655
// called Add() at least once, and builder is not Empty().
647656
s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
648657
numKeys, numSkips, time.Since(timeStart))
649-
build := func(fileID uint64) (*table.Table, error) {
650-
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
651-
if err != nil {
652-
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
653-
}
654-
655-
if _, err := fd.Write(builder.Finish()); err != nil {
656-
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
657-
}
658-
tbl, err := table.OpenTable(fd, bopts)
659-
// decrRef is added below.
660-
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
661-
}
662658
if builder.Empty() {
663659
// Cleanup builder resources:
664660
builder.Finish()
@@ -667,49 +663,61 @@ nextTable:
667663
}
668664
numBuilds++
669665
fileID := s.reserveFileID()
666+
if err := inflightBuilders.Do(); err != nil {
667+
// Can't return from here, until I decrRef all the tables that I built so far.
668+
break
669+
}
670670
go func(builder *table.Builder) {
671671
defer builder.Close()
672-
var (
673-
tbl *table.Table
674-
err error
675-
)
672+
defer inflightBuilders.Done(err)
673+
674+
build := func(fileID uint64) (*table.Table, error) {
675+
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
676+
if err != nil {
677+
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
678+
}
679+
680+
if _, err := fd.Write(builder.Finish()); err != nil {
681+
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
682+
}
683+
tbl, err := table.OpenTable(fd, bopts)
684+
// decrRef is added below.
685+
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
686+
}
687+
688+
var tbl *table.Table
689+
var err error
676690
if s.kv.opt.InMemory {
677691
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
678692
} else {
679693
tbl, err = build(fileID)
680694
}
681-
resultCh <- newTableResult{tbl, err}
682-
}(builder)
683-
}
684695

685-
newTables := make([]*table.Table, 0, 20)
686-
// Wait for all table builders to finish.
687-
var firstErr error
688-
for x := 0; x < numBuilds; x++ {
689-
res := <-resultCh
690-
newTables = append(newTables, res.table)
691-
if firstErr == nil {
692-
firstErr = res.err
693-
}
696+
// If we couldn't build the table, return fast.
697+
if err != nil {
698+
return
699+
}
700+
701+
mu.Lock()
702+
newTables = append(newTables, tbl)
703+
mu.Unlock()
704+
}(builder)
694705
}
695706

696-
if firstErr == nil {
707+
// Wait for all table builders to finish and also for newTables accumulator to finish.
708+
err := inflightBuilders.Finish()
709+
if err == nil {
697710
// Ensure created files' directory entries are visible. We don't mind the extra latency
698711
// from not doing this ASAP after all file creation has finished because this is a
699712
// background operation.
700-
firstErr = s.kv.syncDir(s.kv.opt.Dir)
713+
err = s.kv.syncDir(s.kv.opt.Dir)
701714
}
702715

703-
if firstErr != nil {
716+
if err != nil {
704717
// An error happened. Delete all the newly created table files (by calling DecrRef
705718
// -- we're the only holders of a ref).
706-
for j := 0; j < numBuilds; j++ {
707-
if newTables[j] != nil {
708-
_ = newTables[j].DecrRef()
709-
}
710-
}
711-
errorReturn := errors.Wrapf(firstErr, "While running compaction for: %+v", cd)
712-
return nil, nil, errorReturn
719+
_ = decrRefs(newTables)
720+
return nil, nil, errors.Wrapf(err, "while running compactions for: %+v", cd)
713721
}
714722

715723
sort.Slice(newTables, func(i, j int) bool {
@@ -963,7 +971,7 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) {
963971
var errFillTables = errors.New("Unable to fill tables")
964972

965973
// doCompact picks some table on level l and compacts it away to the next level.
966-
func (s *levelsController) doCompact(p compactionPriority) error {
974+
func (s *levelsController) doCompact(id int, p compactionPriority) error {
967975
l := p.level
968976
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.
969977

@@ -976,7 +984,7 @@ func (s *levelsController) doCompact(p compactionPriority) error {
976984
cd.elog.SetMaxEvents(100)
977985
defer cd.elog.Finish()
978986

979-
s.kv.opt.Infof("Got compaction priority: %+v", p)
987+
s.kv.opt.Debugf("[Compactor: %d] Attempting to run compaction: %+v", id, p)
980988

981989
// While picking tables to be compacted, both levels' tables are expected to
982990
// remain unchanged.
@@ -992,16 +1000,17 @@ func (s *levelsController) doCompact(p compactionPriority) error {
9921000
}
9931001
defer s.cstatus.delete(cd) // Remove the ranges from compaction status.
9941002

995-
s.kv.opt.Infof("Running for level: %d\n", cd.thisLevel.level)
1003+
s.kv.opt.Infof("[Compactor: %d] Running compaction: %+v for level: %d\n",
1004+
id, p, cd.thisLevel.level)
9961005
s.cstatus.toLog(cd.elog)
9971006
if err := s.runCompactDef(l, cd); err != nil {
9981007
// This compaction couldn't be done successfully.
999-
s.kv.opt.Warningf("LOG Compact FAILED with error: %+v: %+v", err, cd)
1008+
s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd)
10001009
return err
10011010
}
10021011

10031012
s.cstatus.toLog(cd.elog)
1004-
s.kv.opt.Infof("Compaction for level: %d DONE", cd.thisLevel.level)
1013+
s.kv.opt.Infof("[Compactor: %d] Compaction for level: %d DONE", id, cd.thisLevel.level)
10051014
return nil
10061015
}
10071016

@@ -1025,7 +1034,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
10251034
// Stall. Make sure all levels are healthy before we unstall.
10261035
var timeStart time.Time
10271036
{
1028-
s.kv.opt.Debugf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
1037+
s.kv.opt.Infof("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
10291038
s.cstatus.RLock()
10301039
for i := 0; i < s.kv.opt.MaxLevels; i++ {
10311040
s.kv.opt.Debugf("level=%d. Status=%s Size=%d\n",

0 commit comments

Comments
 (0)