Skip to content

Commit

Permalink
Optimize createTable in stream_writer.go (#1132)
Browse files Browse the repository at this point in the history
In createTable method of StreamWriter we are calling levelHandler.replaceTables method. This method adds table to leveHandler tables and sorts table based on table.Smallest. This sorting is required if we are adding tables and also querying Badger. In StreamWriter we just write data and hence we can avoid sorting on every addition of table. After we are done adding all tables, we can sort tables on all levels based on table.Smallest.
This creates huge difference in case of large number of streams. I tested it on 100,000 streams
time, to completely run stream writer on master was ~38 minutes vs ~6 minuntes on this PR.
  • Loading branch information
ashish-goswami committed Nov 28, 2019
1 parent f5b6321 commit 407e5bd
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
25 changes: 25 additions & 0 deletions level_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,31 @@ func (s *levelHandler) replaceTables(toDel, toAdd []*table.Table) error {
return decrRefs(toDel)
}

// addTable adds toAdd table to levelHandler. Normally when we add tables to levelHandler, we sort
// tables based on table.Smallest. This is required for correctness of the system. But in case of
// stream writer this can be avoided. We can just add tables to levelHandler's table list
// and after all addTable calls, we can sort table list(check sortTable method).
// NOTE: levelHandler.sortTables() should be called after call addTable calls are done.
func (s *levelHandler) addTable(t *table.Table) {
s.Lock()
defer s.Unlock()

s.totalSize += t.Size() // Increase totalSize first.
t.IncrRef()
s.tables = append(s.tables, t)
}

// sortTables sorts tables of levelHandler based on table.Smallest.
// Normally it should be called after all addTable calls.
func (s *levelHandler) sortTables() {
s.RLock()
defer s.RUnlock()

sort.Slice(s.tables, func(i, j int) bool {
return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
})
}

func decrRefs(tables []*table.Table) error {
for _, table := range tables {
if err := table.DecrRef(); err != nil {
Expand Down
13 changes: 10 additions & 3 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ func (sw *StreamWriter) Flush() error {
return err
}

// Sort tables at the end.
for _, l := range sw.db.lc.levels {
l.sortTables()
}

// Now sync the directories, so all the files are registered.
if sw.db.opt.ValueDir != sw.db.opt.Dir {
if err := sw.db.syncDir(sw.db.opt.ValueDir); err != nil {
Expand Down Expand Up @@ -459,9 +464,11 @@ func (w *sortedWriter) createTable(builder *table.Builder) error {
if err := w.db.manifest.addChanges([]*pb.ManifestChange{change}); err != nil {
return err
}
if err := lhandler.replaceTables([]*table.Table{}, []*table.Table{tbl}); err != nil {
return err
}

// We are not calling lhandler.replaceTables() here, as it sorts tables on every addition.
// We can sort all tables only once during Flush() call.
lhandler.addTable(tbl)

// Release the ref held by OpenTable.
_ = tbl.DecrRef()
w.db.opt.Infof("Table created: %d at level: %d for stream: %d. Size: %s\n",
Expand Down

0 comments on commit 407e5bd

Please sign in to comment.