@@ -19,7 +19,6 @@ package badger
1919import (
2020 "bytes"
2121 "fmt"
22- "math/rand"
2322 "os"
2423 "sort"
2524 "strings"
@@ -337,46 +336,61 @@ func (s *levelsController) dropPrefix(prefix []byte) error {
337336}
338337
339338func (s * levelsController ) startCompact (lc * y.Closer ) {
340- n := s .kv .opt .NumCompactors
341- lc .AddRunning (n - 1 )
342- for i := 0 ; i < n ; i ++ {
343- go s .runWorker (lc )
344- }
339+ go s .runWorker (lc )
345340}
346341
347342func (s * levelsController ) runWorker (lc * y.Closer ) {
348343 defer lc .Done ()
349344
350- randomDelay := time .NewTimer (time .Duration (rand .Int31n (1000 )) * time .Millisecond )
351- select {
352- case <- randomDelay .C :
353- case <- lc .HasBeenClosed ():
354- randomDelay .Stop ()
355- return
345+ var wg sync.WaitGroup
346+
347+ semCh := make (chan struct {}, s .kv .opt .NumCompactors )
348+ cdCh := make (chan compactDef )
349+
350+ for i := 0 ; i < s .kv .opt .NumCompactors ; i ++ {
351+ wg .Add (1 )
352+ go func () {
353+ defer wg .Done ()
354+
355+ for cd := range cdCh {
356+ err := s .runCompact (cd )
357+ if err != nil {
358+ s .kv .opt .Warningf ("While running compaction: %v\n " , err )
359+ }
360+ <- semCh
361+ }
362+ }()
356363 }
357364
358- ticker := time .NewTicker (time .Second )
359- defer ticker .Stop ()
365+ defer func () {
366+ close (cdCh )
367+ wg .Wait ()
368+ }()
360369
370+ compactionLoop:
361371 for {
362372 select {
363- // Can add a done channel or other stuff.
364- case <- ticker .C :
373+ case <- lc .HasBeenClosed ():
374+ break compactionLoop
375+ case semCh <- struct {}{}:
365376 prios := s .pickCompactLevels ()
366- loop:
367377 for _ , p := range prios {
368- err := s .doCompact (p )
369- switch err {
370- case nil :
371- break loop
372- case errFillTables :
373- // pass
374- default :
375- s .kv .opt .Warningf ("While running doCompact: %v\n " , err )
378+ cd , err := s .prepareCompact (p )
379+ if err == errFillTables {
380+ // Try to schedule the next level
381+ continue
376382 }
383+
384+ cdCh <- cd
385+ continue compactionLoop
377386 }
378- case <- lc .HasBeenClosed ():
379- return
387+
388+ // Nothing to do, return the item we borrowed.
389+ <- semCh
390+
391+ // Now wait a bit.
392+ // TODO: this could be replaced by a "there is a new table at L0" event
393+ time .Sleep (10 * time .Millisecond )
380394 }
381395 }
382396}
@@ -906,6 +920,15 @@ var errFillTables = errors.New("Unable to fill tables")
906920
907921// doCompact picks some table on level l and compacts it away to the next level.
908922func (s * levelsController ) doCompact (p compactionPriority ) error {
923+ cd , err := s .prepareCompact (p )
924+ if err != nil {
925+ return err
926+ }
927+
928+ return s .runCompact (cd )
929+ }
930+
931+ func (s * levelsController ) prepareCompact (p compactionPriority ) (compactDef , error ) {
909932 l := p .level
910933 y .AssertTrue (l + 1 < s .kv .opt .MaxLevels ) // Sanity check.
911934
@@ -916,29 +939,36 @@ func (s *levelsController) doCompact(p compactionPriority) error {
916939 dropPrefix : p .dropPrefix ,
917940 }
918941 cd .elog .SetMaxEvents (100 )
919- defer cd .elog .Finish ()
920942
921- s .kv .opt .Infof ("Got compaction priority: %+v" , p )
943+ s .kv .opt .Debugf ("Got compaction priority: %+v" , p )
922944
923945 // While picking tables to be compacted, both levels' tables are expected to
924946 // remain unchanged.
925947 if l == 0 {
926948 if ! s .fillTablesL0 (& cd ) {
927- return errFillTables
949+ cd .elog .Finish ()
950+ return cd , errFillTables
928951 }
929952
930953 } else {
931954 if ! s .fillTables (& cd ) {
932- return errFillTables
955+ cd .elog .Finish ()
956+ return cd , errFillTables
933957 }
934958 }
959+
960+ return cd , nil
961+ }
962+
963+ func (s * levelsController ) runCompact (cd compactDef ) error {
964+ defer cd .elog .Finish ()
935965 defer s .cstatus .delete (cd ) // Remove the ranges from compaction status.
936966
937- s .kv .opt .Infof ("Running for level: %d \n " , cd .thisLevel .level )
967+ s .kv .opt .Infof ("Running compaction %d->%d \n " , cd .thisLevel . level , cd . nextLevel .level )
938968 s .cstatus .toLog (cd .elog )
939- if err := s .runCompactDef (l , cd ); err != nil {
969+ if err := s .runCompactDef (cd . thisLevel . level , cd ); err != nil {
940970 // This compaction couldn't be done successfully.
941- s .kv .opt .Warningf ("LOG Compact FAILED with error: %+v: %+v" , err , cd )
971+ s .kv .opt .Warningf ("Compact FAILED with error: %+v: %+v" , err , cd )
942972 return err
943973 }
944974
0 commit comments