Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise createTable in stream_writer.go #1132

Merged
merged 3 commits into from Nov 28, 2019
Merged

Conversation

ashish-goswami
Copy link
Contributor

@ashish-goswami ashish-goswami commented Nov 25, 2019

Found out this while running dgraph bulk loader.

(pprof) top --cum
Showing nodes accounting for 20.62s, 65.09% of 31.68s total
Dropped 113 nodes (cum <= 0.16s)
Showing top 10 nodes out of 45
      flat  flat%   sum%        cum   cum%
         0     0%     0%     28.62s 90.34%  github.com/dgraph-io/badger/v2.(*sortedWriter).createTable
         0     0%     0%     28.62s 90.34%  github.com/dgraph-io/badger/v2.(*sortedWriter).send.func1
     0.84s  2.65%  2.65%     28.61s 90.31%  github.com/dgraph-io/badger/v2.(*levelHandler).replaceTables
         0     0%  2.65%     27.50s 86.81%  sort.Slice
     0.07s  0.22%  2.87%     27.50s 86.81%  sort.quickSort_func
     1.30s  4.10%  6.98%     26.78s 84.53%  sort.doPivot_func
     1.24s  3.91% 10.89%     25.86s 81.63%  github.com/dgraph-io/badger/v2.(*levelHandler).replaceTables.func1
     1.60s  5.05% 15.94%     17.55s 55.40%  github.com/dgraph-io/badger/v2/y.CompareKeys
     0.19s   0.6% 16.54%     15.95s 50.35%  bytes.Compare
    15.38s 48.55% 65.09%     15.38s 48.55%  cmpbody

In createTable method of StreamWriter we are calling levelHandler.replaceTables method. This method adds table to leveHandler tables and sorts table based on table.Smallest. This sorting is required if we are adding tables and also querying Badger. In StreamWriter we just write data and hence we can avoid sorting on every addition of table. After we are done adding all tables, we can sort tables on all levels based on table.Smallest.

Ran below program on master and This PR.

package main

import (
	"fmt"
	"io/ioutil"
	"os"

	badger "github.com/dgraph-io/badger"
	"github.com/dgraph-io/badger/pb"
)

func insert(sw *badger.StreamWriter) {
	var val [10]byte
	list := &pb.KVList{}
	for i := 0; i < 100000; i++ {
		list.Kv = list.Kv[:0]
		kv1 := &pb.KV{
			Key:      []byte(fmt.Sprintf("%4d", i)),
			Value:    val[:],
			Version:  1,
			StreamId: uint32(i),
		}
		kv2 := &pb.KV{
			StreamId:   uint32(i),
			StreamDone: true,
		}
		list.Kv = append(list.Kv, kv1, kv2)
		if err := sw.Write(list); err != nil {
			panic("sw.Write() failed " + err.Error())
		}

	}
}

func main() {
	dir, err := ioutil.TempDir(".", "badger-test")
	if err != nil {
		panic(err)
	}
	defer func() {
		if err := os.RemoveAll(dir); err != nil {
			panic(err)
		}
	}()
	opts := badger.DefaultOptions(dir)
	db, err := badger.Open(opts)
	if err != nil {
		panic("unable to open db " + err.Error())
	}
	defer func() {
		if err := db.Close(); err != nil {
			panic("unable to close db " + err.Error())
		}
	}()

	sw := db.NewStreamWriter()
	if err := sw.Prepare(); err != nil {
		panic("sw.Prepare() failed " + err.Error())
	}

	insert(sw)

	if err := sw.Flush(); err != nil {
		panic("sw.Flush() failed " + err.Error())
	}
}

Time to run:
Master: 2143.86s user 25.89s system 107% cpu 33:47.75 total
This PR: 63.28s user 22.79s system 23% cpu 6:13.93 total

Note: Time difference to complete above program will increase more between master and This PR with increase in number of streams.

This change is Reviewable

@coveralls
Copy link

coveralls commented Nov 25, 2019

Coverage Status

Coverage increased (+0.2%) to 77.524% when pulling 66124f5 on ashish/stream-opt into 3eb4e72 on master.

@coveralls
Copy link

coveralls commented Nov 25, 2019

Coverage Status

Coverage increased (+0.4%) to 77.714% when pulling cb1103d on ashish/stream-opt into 3eb4e72 on master.

@ashish-goswami ashish-goswami marked this pull request as ready for review November 26, 2019 09:15
Copy link
Contributor

@manishrjain manishrjain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got a couple of comments, looks alright to me. Get @jarifibrahim to approve this.

Reviewable status: 0 of 2 files reviewed, 2 unresolved discussions (waiting on @ashish-goswami, @balajijinnah, @jarifibrahim, and @manishrjain)


level_handler.go, line 148 at r2 (raw file):

// and after all addTables calls, we can sort table list(check sortTable method).
// NOTE: addTables and sortTables duplicate some code from replaceTables().
func (s *levelHandler) addTables(toAdd []*table.Table) {

Say that this is being used only by stream writer.


stream_writer.go, line 460 at r2 (raw file):

	// We are not calling lhandler.replaceTables() here, as it sorts tables on every addition.
	// We can sort all tables only once during Flush() call.
	lhandler.addTables([]*table.Table{tbl})

lhandler.addTable() singular. This is the only usage.

Copy link
Contributor

@jarifibrahim jarifibrahim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: Thank you for adding the profile and benchmarks. It surely helps in reviewing PR. Looks good to me!

Reviewable status: 0 of 2 files reviewed, 3 unresolved discussions (waiting on @ashish-goswami, @balajijinnah, and @jarifibrahim)


level_handler.go, line 147 at r2 (raw file):

// this can be avoided(such as stream writer). We can just add tables to levelHandler's table list
// and after all addTables calls, we can sort table list(check sortTable method).
// NOTE: addTables and sortTables duplicate some code from replaceTables().

Please add a comment here saying "levelhandler.Sort() should be called after all addTables calls are done."

Copy link
Contributor Author

@ashish-goswami ashish-goswami left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 2 files reviewed, 3 unresolved discussions (waiting on @balajijinnah, @jarifibrahim, and @manishrjain)


level_handler.go, line 147 at r2 (raw file):

Previously, jarifibrahim (Ibrahim Jarif) wrote…

Please add a comment here saying "levelhandler.Sort() should be called after all addTables calls are done."

Done.


level_handler.go, line 148 at r2 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Say that this is being used only by stream writer.

Modified function comment to mention, its a special case just for stream writer.


stream_writer.go, line 460 at r2 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

lhandler.addTable() singular. This is the only usage.

Done.

@ashish-goswami ashish-goswami dismissed manishrjain’s stale review November 28, 2019 17:14

Addressed all comments from Manish.

@ashish-goswami ashish-goswami merged commit 407e5bd into master Nov 28, 2019
@ashish-goswami ashish-goswami deleted the ashish/stream-opt branch November 28, 2019 17:19
jarifibrahim pushed a commit that referenced this pull request Mar 12, 2020
In createTable method of StreamWriter we are calling levelHandler.replaceTables method. This method adds table to leveHandler tables and sorts table based on table.Smallest. This sorting is required if we are adding tables and also querying Badger. In StreamWriter we just write data and hence we can avoid sorting on every addition of table. After we are done adding all tables, we can sort tables on all levels based on table.Smallest.
This creates huge difference in case of large number of streams. I tested it on 100,000 streams
time, to completely run stream writer on master was ~38 minutes vs ~6 minuntes on this PR.



(cherry picked from commit 407e5bd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants