Skip to content

Support for inserting single predicate concurrently in ludicrous_mode #5535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from

Conversation

ashish-goswami
Copy link
Contributor

@ashish-goswami ashish-goswami commented May 28, 2020

Fixes: #5403
Fixes: DGRAPH-1357

In ludicrous mode we apply mutations for single predicate in same order on all alpha nodes. This is
done by having one channel per predicate. Mutation edges are grouped based on predicates and
then pushed to respective predicate channel for further processing.
This works fine when we have edges distributed across predicates. In cases when we have less
predicates in edges/RDFs. This channel becomes bottleneck for predicate and affects throughput
of insertions.
This PR tries to address following things:

  • Have fixed number of channels(workerChannels) instead of one channel per predicate. This can
    avoid creating channels/processing routine for predicates which have very few edges/records for
    them.
  • Use (predicate, subject id) combination to distribute load across channels. This ensures equal load
    distribution across channels. This also takes care of the fact that mutation are applied in the same
    order on all alphas.

Benchmarking:
I did benchmarking on a system with ubuntu 18, 64GB RAM, 16 core processor.

Master:
Dataset | |Time to finish live loader| Alpha RAM
21M imdb dataset |5m27sec | 7.1 GB
100M RDFs with single predicate |11m16s | 8 GB
~100M RDFs with 1024 predicates(~97K RDFs/predicate)|8m26s | 11 GB

This PR:
Dataset | |Time to finish live loader| Alpha RAM
21M imdb dataset |3m14sec | 6.6 GB
100M RDFs with single predicate |6m35s | 7.3 GB
~100M RDFs with 1024 predicates(~97K RDFs/predicate)|8m45s | 9.5 GB

100M dataset was generated using below script:

package main

import (
  "bytes"
  "fmt"
  "os"
)

var (
  total int = 100000000
  pred      = 1024
)

func main() {
  f, err := os.OpenFile("test.rdf", os.O_CREATE|os.O_RDWR, 0755)
  if err != nil {
    panic(err)
  }
  defer f.Close()

  totalPerPred := total / pred

  buf := bytes.NewBuffer(nil)
  count := 1
  for i := 0; i < totalPerPred; i++ {
    for j := 0; j < pred; j++ {
      rec := fmt.Sprintf(`_:record_%d <pred_%d> "value_%d" .`, count, j, count)
      buf.WriteString(rec)
      buf.WriteString("\n")
      count++
      if count%100000 == 0 { 
        buf.WriteTo(f)
        buf.Reset()
      }   
    }   
  }

  buf.WriteTo(f)
  if err := f.Sync(); err != nil {
    panic(err)
  }
  fmt.Println("Done writing to file: ", count)
}

This change is Reviewable

Docs Preview: Dgraph Preview

@ashish-goswami ashish-goswami changed the title WIP] Support for insert single predicate concurrently in ludicrous_mode WIP] Support for inserting single predicate concurrently in ludicrous_mode May 28, 2020
@ashish-goswami ashish-goswami changed the title WIP] Support for inserting single predicate concurrently in ludicrous_mode [WIP] Support for inserting single predicate concurrently in ludicrous_mode May 28, 2020
@ashish-goswami ashish-goswami changed the base branch from master to ashish/exe-ramp May 28, 2020 15:09
Copy link
Contributor

@parasssh parasssh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On further pondering on this, I think we don't even need locks now that we have pure channels instead of maps from string to channel.

Reviewable status: 0 of 1 files reviewed, 6 unresolved discussions (waiting on @ashish-goswami, @manishrjain, and @vvbalaji-dgraph)

a discussion (no related file):
Also, as we discussed, please see if we can add UT or Systest around this. It doesn't have to be large data set.



worker/executor.go, line 43 at r1 (raw file):

	pendingSize int64

	sync.RWMutex

don't think we need mutexes anymore. See my comment above. Go Channels are natively thread-safe.


worker/executor.go, line 50 at r1 (raw file):

func newExecutor() *executor {
	e := &executor{
		edgesChan: make([]chan *subMutation, 32), /* TODO: no of chans can be made configurable */

rename edgeChan to workerChan.


worker/executor.go, line 105 at r1 (raw file):

// getChannelID obtains the channel for the given edge.
func (e *executor) getChannelID(edge *pb.DirectedEdge) int {

rename function to "findWorkerChan()"


worker/executor.go, line 106 at r1 (raw file):

// getChannelID obtains the channel for the given edge.
func (e *executor) getChannelID(edge *pb.DirectedEdge) int {
	cid := z.MemHashString(edge.Attr+strconv.FormatUint(edge.Entity, 10)) % uint64(len(e.edgesChan))

Assuming doing this is cheap.


worker/executor.go, line 135 at r1 (raw file):

	// RLock() in case the channel gets closed from underneath us.
	e.RLock()

We may not need locks at all.

func channelIDString(attr string, uid, numChan uint64) int {
	cid := z.MemHashString(attr+strconv.FormatUint(uid, 10)) % numChan
	return int(cid)
}

func channelIDBytes(attr string, uid, numChan uint64) int {
	attrBytes := make([]byte, len(attr)+8)
	x.AssertTrue(len(attr) == copy(attrBytes, attr))
	binary.BigEndian.PutUint64(attrBytes[len(attr):], uid)
	cid := z.MemHash(attrBytes) % numChan
	return int(cid)
}

func BenchmarkChannelIDString(b *testing.B) {
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	attr := strings.Repeat("a", 128)
	var id int
	for i := 0; i < b.N; i++ {
		id = channelIDString(attr, r.Uint64(), 32)
	}

	_ = id
}

func BenchmarkChannelIDBytes(b *testing.B) {
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	attr := strings.Repeat("a", 128)
	var id int
	for i := 0; i < b.N; i++ {
		id = channelIDBytes(attr, r.Uint64(), 32)
	}

	_ = id
}

goos: linux
goarch: amd64
pkg: github.com/dgraph-io/dgraph/worker
BenchmarkChannelIDString
BenchmarkChannelIDString-8   	 7678695	       160 ns/op	     191 B/op	       2 allocs/op
BenchmarkChannelIDBytes
BenchmarkChannelIDBytes-8    	15471546	        78.8 ns/op	     144 B/op	       1 allocs/op
PASS
ok  	github.com/dgraph-io/dgraph/worker	3.878s
Copy link
Contributor Author

@ashish-goswami ashish-goswami left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 107 files reviewed, 6 unresolved discussions (waiting on @ashish-goswami, @danielmai, @manishrjain, @martinmr, @MichaelJCompton, @parasssh, @pawanrawal, and @vvbalaji-dgraph)


worker/executor.go, line 43 at r1 (raw file):

Previously, parasssh wrote…

don't think we need mutexes anymore. See my comment above. Go Channels are natively thread-safe.

We still need mutex as close on channel is not thread-safe.


worker/executor.go, line 50 at r1 (raw file):

Previously, parasssh wrote…

rename edgeChan to workerChan.

Done.


worker/executor.go, line 106 at r1 (raw file):

Previously, parasssh wrote…

Assuming doing this is cheap.

I have changed it to other method method as that was more performant

@ashish-goswami ashish-goswami changed the base branch from ashish/exe-ramp to master May 29, 2020 14:58
@ashish-goswami ashish-goswami changed the title [WIP] Support for inserting single predicate concurrently in ludicrous_mode Support for inserting single predicate concurrently in ludicrous_mode Jun 1, 2020
AESHash is uniq only for the scope of process. Hence farmHash
is uniq across processes. This will ensure all alpha nodes get
same value for same (attr, uid) combination

[Decoder]: Using assembly version of decoder
badger 2020/06/01 21:52:43 INFO: All 0 tables opened in 0s
goos: linux
goarch: amd64
pkg: github.com/dgraph-io/dgraph/worker
BenchmarkChannelIDString
BenchmarkChannelIDString-8       	 6841705	       160 ns/op	     191 B/op	       2 allocs/op
BenchmarkChannelIDBytes
BenchmarkChannelIDBytes-8        	15204393	        77.1 ns/op	     144 B/op	       1 allocs/op
BenchmarkChannelIDBytes_Farm
BenchmarkChannelIDBytes_Farm-8   	16409292	        81.2 ns/op	     144 B/op	       1 allocs/op
PASS
@@ -109,7 +109,7 @@ func (e *executor) channelID(edge *pb.DirectedEdge) int {
b := make([]byte, len(attr)+8)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove hard-coded "8".
b := make([]byte, len(attr)+ unsafe.Sizeof(uid)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to use unsafe.Sizeof(uid) as uint64 size is always 8.

@@ -109,7 +109,7 @@ func (e *executor) channelID(edge *pb.DirectedEdge) int {
b := make([]byte, len(attr)+8)
x.AssertTrue(len(attr) == copy(b, attr))
binary.BigEndian.PutUint64(b[len(attr):len(attr)+8], uid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove hard-coded "8".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean not using hard-coded values.

Copy link
Contributor

@manishrjain manishrjain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 files reviewed, 10 unresolved discussions (waiting on @ashish-goswami, @danielmai, @manishrjain, @martinmr, @MichaelJCompton, @parasssh, @pawanrawal, and @vvbalaji-dgraph)

a discussion (no related file):

Previously, parasssh wrote…

Also, as we discussed, please see if we can add UT or Systest around this. It doesn't have to be large data set.

:lgtm: Do add the Pause and Resume methods for schema updates.

Also needs the n.Applied and stuff to keep snapshots sane.



worker/executor.go, line 107 at r3 (raw file):

// channelID obtains the channel for the given edge.
func (e *executor) channelID(edge *pb.DirectedEdge) int {
	attr := edge.Attr

simple cheap hash: z.Memhash(edge.Attr) ^ uid

Might consider a map[string]uint64 to see if that would be faster (store the hash in it).


worker/executor.go, line 124 at r3 (raw file):

	rampMeter(&e.pendingSize, maxPendingEdgesSize, executorAddEdges)

	payloadMap := make(map[int]*subMutation)

You need a method by which you can wait for all edges corresponding to a predicate to be done. So, we can then apply the schema update. And then resume once schema update is done.

ex.Pause(pred)
ex.Resume(pred)

Copy link
Contributor Author

@ashish-goswami ashish-goswami left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 files reviewed, 10 unresolved discussions (waiting on @ashish-goswami, @danielmai, @manishrjain, @martinmr, @MichaelJCompton, @parasssh, @pawanrawal, and @vvbalaji-dgraph)


worker/executor.go, line 107 at r3 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

simple cheap hash: z.Memhash(edge.Attr) ^ uid

Might consider a map[string]uint64 to see if that would be faster (store the hash in it).

Map based implementation is turning out to be costly. See below benchmarks:

func BenchmarkHashCache(b *testing.B) {
	r := rand.New(rand.NewSource(time.Now().UnixNano()))

	var attrs []string
	for i := 0; i < 26; i++ {
		attrs = append(attrs, strings.Repeat(string(rune(i)), 16))
	}

	var hash uint64
	for i := 0; i < b.N; i++ {
		hash = z.MemHashString(attrs[r.Intn(len(attrs))])
	}

	_ = hash
}

func BenchmarkHashCache_map(b *testing.B) {
	r := rand.New(rand.NewSource(time.Now().UnixNano()))

	var attrs []string
	for i := 0; i < 26; i++ {
		attrs = append(attrs, strings.Repeat(string(rune(i)), 16))
	}

	var hash uint64
	m := make(map[string]uint64)
	for i := 0; i < b.N; i++ {
		attr := attrs[r.Intn(len(attrs))]
		if h, ok := m[attr]; ok {
                        hash = h
			continue
		}
		hash = z.MemHashString(attr)
		m[attr] = hash
	}

	_ = hash
}


goos: linux
goarch: amd64
pkg: github.com/dgraph-io/dgraph/worker
BenchmarkHashCache
BenchmarkHashCache-8       	61054293	        19.9 ns/op	       0 B/op	       0 allocs/op
BenchmarkHashCache_map
BenchmarkHashCache_map-8   	22034090	        55.0 ns/op	       0 B/op	       0 allocs/op
PASS
ok  	github.com/dgraph-io/dgraph/worker	2.527s

worker/executor.go, line 124 at r3 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

You need a method by which you can wait for all edges corresponding to a predicate to be done. So, we can then apply the schema update. And then resume once schema update is done.

ex.Pause(pred)
ex.Resume(pred)

Will be covering this in separate PR.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@mangalaman93 mangalaman93 deleted the ashish/conpred branch February 9, 2023 09:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

Run mutations concurrently per predicate in Ludicrous mode
5 participants