Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ludicrous): Run mutations from the same predicate concurrently in ludicrous mode #6060

Merged
merged 23 commits into from Sep 1, 2020

Conversation

harshil-goel
Copy link
Contributor

@harshil-goel harshil-goel commented Jul 23, 2020

Added topological sort to ludicrous mode executor to parallelize execution in one predicate.
Master: 21 Million dataset: 3m 15 sec
This PR: 21 Million dataset: 2m 25 sec

Fixes #DGRAPH-1357


This change is Reviewable

Docs Preview: Dgraph Preview

Copy link
Contributor

@manishrjain manishrjain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the title to say "run mutations from the same predicate concurrently in ludicrous mode". Looks alright to me, get it reviewed by someone and definitely be careful around the increase memory usage.

Reviewed 1 of 2 files at r1, 1 of 1 files at r3.
Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @harshil-goel)


worker/executor.go, line 71 at r3 (raw file):

func generateConflictKeys(p *subMutation) []uint64 {
	keys := make([]uint64, 0)

Move this closer to usage.

var keys []uint64


worker/executor.go, line 81 at r3 (raw file):

		}

		if schema.State().IsList(edge.Attr) {

If you have count index, only then do serially. Not on IsList.

use uniq[0]


worker/executor.go, line 180 at r3 (raw file):

		for _, i := range toRun {
			go func(j *mutation) {
				e.workerChan <- j

Maybe see if you don't need channels. You could just create goroutines to do the work. Do use throttle to limit how many goroutines you do create.


worker/executor.go, line 193 at r3 (raw file):

	for payload := range ch {
		conflicts := generateConflictKeys(payload)
		m := &mutation{m: payload, keys: conflicts, outEdges: make(map[uint64]*mutation), graph: g, inDeg: 0}

100 chars.

Copy link
Contributor Author

@harshil-goel harshil-goel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 2 files reviewed, 4 unresolved discussions (waiting on @manishrjain)


worker/executor.go, line 71 at r3 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Move this closer to usage.

var keys []uint64

Done.


worker/executor.go, line 81 at r3 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

If you have count index, only then do serially. Not on IsList.

use uniq[0]

Done.


worker/executor.go, line 180 at r3 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Maybe see if you don't need channels. You could just create goroutines to do the work. Do use throttle to limit how many goroutines you do create.

Done.


worker/executor.go, line 193 at r3 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

100 chars.

Done.

@harshil-goel harshil-goel changed the title feat(ludicrous): Added topological sort to ludicrous mode executor feat(ludicrous): Run mutations from the same predicate concurrently in ludicrous mode Aug 25, 2020
@harshil-goel harshil-goel marked this pull request as ready for review August 25, 2020 11:28
Copy link
Contributor

@ashish-goswami ashish-goswami left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 2 files reviewed, 19 unresolved discussions (waiting on @harshil-goel, @manishrjain, @martinmr, and @vvbalaji-dgraph)


posting/list.go, line 437 at r4 (raw file):

}

func GetConflictKeys(pk x.ParsedKey, key []byte, t *pb.DirectedEdge) uint64 {

Its returning single uint64, should we call it GetConflictKey instead of GetConflictKeys?


worker/executor.go, line 59 at r4 (raw file):

func newExecutor(applied *y.WaterMark) *executor {
	runtime.SetBlockProfileRate(1)

Remove this line.


worker/executor.go, line 64 at r4 (raw file):

		closer:   y.NewCloser(0),
		applied:  applied,
		throttle: y.NewThrottle(2000),

Comment about default value of 2000 would be nice.


worker/executor.go, line 72 at r4 (raw file):

func generateTokenKeys(nq *pb.DirectedEdge, tokenizers []tok.Tokenizer) ([]uint64, error) {
	keys := make([]uint64, len(tokenizers))

Looks it it should be keys := make([]uint64, 0, len(tokenizers))


worker/executor.go, line 82 at r4 (raw file):

		schemaVal, err := types.Convert(storageVal, types.TypeID(nq.GetValueType()))
		if err != nil {
			errs = append(errs, err.Error())

instead of calling err.Error() lets have []error only at top.
Also if there is an error here, should be continue further processing?


worker/executor.go, line 87 at r4 (raw file):

			nq.Lang))
		if err != nil {
			errs = append(errs, err.Error())

same here should we continue processing here even if there is error?


worker/executor.go, line 122 at r4 (raw file):

		keys, err := generateTokenKeys(edge, tokenizers)
		for _, i := range keys {

instead of i call it key


worker/executor.go, line 138 at r4 (raw file):

type mutation struct {
	m     *subMutation

we can call it sm.


worker/executor.go, line 139 at r4 (raw file):

type mutation struct {
	m     *subMutation
	keys  []uint64

we can call it conflictKeys.


worker/executor.go, line 142 at r4 (raw file):

	inDeg int

	outEdges map[uint64]*mutation

we should call it dependent mutations.


worker/executor.go, line 194 at r4 (raw file):

		dependent.inDeg -= 1
		if dependent.inDeg == 0 {
			x.Check(e.throttle.Do())

Not sure, if we should crash here.


worker/executor.go, line 197 at r4 (raw file):

			go func(d *mutation) {
				e.worker(d)
			}(dependent)

go e.worker(dependent)


worker/executor.go, line 202 at r4 (raw file):

	for _, c := range mut.keys {
		i := 0

we should rename it to something else.


worker/executor.go, line 205 at r4 (raw file):

		arr := mut.graph.conflicts[c]

		for _, x := range arr {

Add a comment that we are deleting the mutation here.


worker/executor.go, line 261 at r4 (raw file):

			x.Check(e.throttle.Do())
			go func() {
				e.worker(m)

go e.worker(m)

Copy link
Contributor Author

@harshil-goel harshil-goel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 3 files reviewed, 19 unresolved discussions (waiting on @ashish-goswami, @manishrjain, @martinmr, and @vvbalaji-dgraph)


posting/list.go, line 437 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

Its returning single uint64, should we call it GetConflictKey instead of GetConflictKeys?

Done.


worker/executor.go, line 59 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

Remove this line.

Done.


worker/executor.go, line 64 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

Comment about default value of 2000 would be nice.

Done.


worker/executor.go, line 72 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

Looks it it should be keys := make([]uint64, 0, len(tokenizers))

Done.


worker/executor.go, line 82 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

instead of calling err.Error() lets have []error only at top.
Also if there is an error here, should be continue further processing?

Yeah, We just care about whatever conflict keys we can generate, to best avoid conflicting mutations running together.
I used error instead of string because we need to join these errors later, and would need to convert to string to do that.


worker/executor.go, line 87 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

same here should we continue processing here even if there is error?

Done.


worker/executor.go, line 122 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

instead of i call it key

Done.


worker/executor.go, line 138 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

we can call it sm.

Done.


worker/executor.go, line 139 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

we can call it conflictKeys.

Done.


worker/executor.go, line 142 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

we should call it dependent mutations.

Done.


worker/executor.go, line 194 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

Not sure, if we should crash here.

We always return nil, so it wouldn't crash. If it crashes, it's a bug.


worker/executor.go, line 197 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

go e.worker(dependent)

Done.


worker/executor.go, line 202 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

we should rename it to something else.

Done.


worker/executor.go, line 205 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

Add a comment that we are deleting the mutation here.

Done.


worker/executor.go, line 261 at r4 (raw file):

Previously, ashish-goswami (Ashish Goswami) wrote…

go e.worker(m)

Done.

Copy link
Contributor

@parasssh parasssh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add UT?

}

func newExecutor(applied *y.WaterMark) *executor {
ex := &executor{
predChan: make(map[string]chan *subMutation),
closer: y.NewCloser(0),
applied: applied,
throttle: y.NewThrottle(2000), // Run 2000 mutations at a time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make this configurable?

}
}

keys := make([]uint64, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed, we can just return the map instead of the slice.

Copy link
Contributor Author

@harshil-goel harshil-goel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 5 files reviewed, 21 unresolved discussions (waiting on @ashish-goswami, @manishrjain, @martinmr, @parasssh, and @vvbalaji-dgraph)


worker/executor.go, line 62 at r5 (raw file):

Previously, parasssh wrote…

should we make this configurable?

Done.


worker/executor.go, line 127 at r5 (raw file):

Previously, parasssh wrote…

as discussed, we can just return the map instead of the slice.

Done.

Copy link
Contributor

@jarifibrahim jarifibrahim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got some comments.

Reviewed 1 of 2 files at r5, 1 of 4 files at r6.
Reviewable status: 2 of 5 files reviewed, 25 unresolved discussions (waiting on @ashish-goswami, @harshil-goel, @manishrjain, @martinmr, @parasssh, and @vvbalaji-dgraph)


worker/executor.go, line 94 at r6 (raw file):

	if len(errs) > 0 {
		return keys, fmt.Errorf(strings.Join(errs, "\n"))

Shouldn't his be nil, fmt....


worker/executor.go, line 106 at r6 (raw file):

		pk, err := x.Parse(key)
		if err != nil {
			continue

Why are we skipping an error? Do we expect x.Parse to fail in any case?


worker/executor.go, line 118 at r6 (raw file):

		}

		tokens, err := generateTokenKeys(edge, tokenizers)

err should be checked before you use tokens.


worker/executor.go, line 181 at r6 (raw file):

	e.throttle.Done(nil)

	mut.graph.Lock()

defer mut.graph.unlock(). Right now the unlock is at the end of the function which makes it hard to find.

Copy link
Contributor Author

@harshil-goel harshil-goel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 2 of 5 files reviewed, 25 unresolved discussions (waiting on @ashish-goswami, @jarifibrahim, @manishrjain, @martinmr, @parasssh, and @vvbalaji-dgraph)


worker/executor.go, line 94 at r6 (raw file):

Previously, jarifibrahim (Ibrahim Jarif) wrote…

Shouldn't his be nil, fmt....

No. We need all the keys we can get to make sure the data's safe.


worker/executor.go, line 106 at r6 (raw file):

Previously, jarifibrahim (Ibrahim Jarif) wrote…

Why are we skipping an error? Do we expect x.Parse to fail in any case?

No, we don't. But let's log in any case.


worker/executor.go, line 118 at r6 (raw file):

Previously, jarifibrahim (Ibrahim Jarif) wrote…

err should be checked before you use tokens.

We continue if the err is not nil, err at this point is always nil.


worker/executor.go, line 181 at r6 (raw file):

Previously, jarifibrahim (Ibrahim Jarif) wrote…

defer mut.graph.unlock(). Right now the unlock is at the end of the function which makes it hard to find.

Done.

Copy link
Contributor

@jarifibrahim jarifibrahim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewable status: 2 of 5 files reviewed, 23 unresolved discussions (waiting on @ashish-goswami, @harshil-goel, @jarifibrahim, @manishrjain, @martinmr, @parasssh, and @vvbalaji-dgraph)


worker/executor.go, line 106 at r7 (raw file):

		pk, err := x.Parse(key)
		if err != nil {
			glog.V(2).Info("Error in generating conflic keys", err)

conflic => conflict

@harshil-goel harshil-goel merged commit d799fd0 into master Sep 1, 2020
@harshil-goel harshil-goel deleted the harshil-goel/lud-lie branch September 1, 2020 10:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants