Skip to content

Commit 9109186

Browse files
manishrjainபாலாஜிmartinmrIbrahim Jarifharshil
authored
perf: Various optimizations to the bulk loader (#6412)
With this change, bulk loader runs fine over very large datasets involving billions of edges, without running out of memory on a 64 GB RAM machine. Tested with loading a client dataset and entire Stack Overflow dataset. Changes: - Modifies codec.Encoder to use memory allocated via z.Calloc. This makes UidPack use manual memory, and adds corresponding FreePack calls over the codebase. - pb.MapEntry is replaced with a simpler struct, which is allocated via z.Buffer. - countIndexEntry is similarly replaced with a simpler struct, allocated via z.Buffer. - Keys are partitioned every partition_mb size. - Map size is set by default to 2GB. This after compression becomes around 300-400MB. - Instead of allocated Slices, sorting is done via sorting offsets, and then accessing entries in sorted offset order (randomly instead of serially over) directly in z.Buffer. - Prints detailed stats from jemalloc every minute. - Prints memory usage from both Go and Jemalloc. - Lists generated via toList are not accumulated till writing anymore. Instead, they're pushed to a buffered channel, to better control memory usage. - If reducer is processing encoding requests over 1GB, then it throttles other requests. Commits: * Do not use all CPUs for encoding. Only use as many as specified in flags. * Allocate UidBlocks via Calloc. * Print out memory allocation as well * Remove prevoffset logic * Allocate mapEntries via Calloc too. Try to release memory allocated for Packs. * Fix a bug in posting list split code where the main pack keeps the blocks allocated to the parts. * Try without doing mapEntry allocations manually. * Report memory stats every minute * Add a throttle to avoid sending out too much data for encoding at once. * Compiling version of removal of pb.MapEntry buffer. * Make bulk loader work with the new MapEntry system. * Don't stop encoding. Only pause what is being sent out for encoding * Switch to z.Buffer and use z.CallocNoRef * Avoid doing encoding again, if we already have a clean posting list. Also avoid creating arrays for postings during split. * Generate a histogram of buffers. * Do not deallocate Pack twice. * Encode 4GB and over requests. * Generate a histogram of key to count. * Do not accumulate all the UIDs in memory. * Free packs on encoders used outside the bulk loader. * Print out the keys for giant buffer * Add histogram of entries per key. Avoid duplicate partition keys. * Use channels for kvs and splits. * Print memstats in Mapper as well. * use memhash instead of creating strings * Count indexing is memory efficient now. * changed partion to take 4 mb * Sort the offsets before encoding. * Fixed up the count indexing stuff. Add a new flag partition_mb to allow users to specify how often to pick partition keys. Co-authored-by: பாலாஜி <balaji@dgraph.io> Co-authored-by: Martin Martinez Rivera <martinmr@dgraph.io> Co-authored-by: Ibrahim Jarif <ibrahim@dgraph.io> Co-authored-by: harshil <harshil@dgraph.io>
1 parent 0fab710 commit 9109186

File tree

17 files changed

+749
-747
lines changed

17 files changed

+749
-747
lines changed

algo/uidlist_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,8 @@ func BenchmarkListIntersectRandom(b *testing.B) {
330330
if j < len(dst2.Uids) {
331331
b.Errorf("Unexpected error in intersection")
332332
}
333+
334+
codec.FreePack(compressedUids)
333335
}
334336

335337
randomTests(10240, 0.3)
@@ -395,6 +397,8 @@ func BenchmarkListIntersectRatio(b *testing.B) {
395397
if j < len(dst2.Uids) {
396398
b.Errorf("Unexpected error in intersection")
397399
}
400+
401+
codec.FreePack(compressedUids)
398402
}
399403
}
400404

@@ -464,6 +468,7 @@ func TestIntersectCompressedWithLinJump(t *testing.T) {
464468
actual := make([]uint64, 0)
465469
IntersectCompressedWithLinJump(&dec, otherNums, &actual)
466470
require.Equal(t, commonNums, actual)
471+
codec.FreePack(pack)
467472
}
468473
}
469474
}
@@ -488,6 +493,7 @@ func TestIntersectCompressedWithBin(t *testing.T) {
488493
actual := make([]uint64, 0)
489494
IntersectCompressedWithBin(&dec, otherNums, &actual)
490495
require.Equal(t, commonNums, actual)
496+
codec.FreePack(pack)
491497
}
492498
}
493499
}
@@ -513,6 +519,7 @@ func TestIntersectCompressedWithBinMissingSize(t *testing.T) {
513519
actual := make([]uint64, 0)
514520
IntersectCompressedWithBin(&dec, otherNums, &actual)
515521
require.Equal(t, commonNums, actual)
522+
codec.FreePack(pack)
516523
}
517524
}
518525
}

codec/benchmark/benchmark.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ func benchmarkPack(trials int, chunks *chunks) int {
121121
for i := range times {
122122
start := time.Now()
123123
for _, c := range chunks.data {
124-
codec.Encode(c, 256)
124+
pack := codec.Encode(c, 256)
125+
codec.FreePack(pack)
125126
// bp128.DeltaPack(c)
126127
}
127128
times[i] = int(time.Since(start).Nanoseconds())

codec/codec.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ import (
2020
"bytes"
2121
"math"
2222
"sort"
23+
"unsafe"
2324

2425
"github.com/dgraph-io/dgraph/protos/pb"
2526
"github.com/dgraph-io/dgraph/x"
27+
"github.com/dgraph-io/ristretto/z"
2628
"github.com/dgryski/go-groupvarint"
2729
)
2830

@@ -46,15 +48,55 @@ type Encoder struct {
4648
uids []uint64
4749
}
4850

51+
var blockSize = int(unsafe.Sizeof(pb.UidBlock{}))
52+
53+
// AllocateBlock would allocate a block via Calloc. This block must be released via a call to
54+
// ReleaseBlock.
55+
func AllocateBlock() *pb.UidBlock {
56+
// Allocate blocks manually.
57+
// TODO: Avoid calling z.Calloc repeatedly. Instead use the allocator. The tricky question here
58+
// is how would FreePack call be able to trace the allocator.
59+
b := z.CallocNoRef(blockSize)
60+
if len(b) == 0 {
61+
return &pb.UidBlock{}
62+
}
63+
return (*pb.UidBlock)(unsafe.Pointer(&b[0]))
64+
}
65+
66+
// FreeBlock releases a previously manually allocated UidBlock.
67+
func FreeBlock(ub *pb.UidBlock) {
68+
buf := (*[z.MaxArrayLen]byte)(unsafe.Pointer(ub))[:blockSize:blockSize]
69+
z.Free(buf)
70+
}
71+
72+
func FreePack(pack *pb.UidPack) {
73+
if pack == nil {
74+
return
75+
}
76+
for _, b := range pack.Blocks {
77+
z.Free(b.Deltas)
78+
FreeBlock(b)
79+
}
80+
}
81+
4982
func (e *Encoder) packBlock() {
5083
if len(e.uids) == 0 {
5184
return
5285
}
53-
block := &pb.UidBlock{Base: e.uids[0], NumUids: uint32(len(e.uids))}
86+
87+
// Allocate blocks manually.
88+
block := AllocateBlock()
89+
block.Base = e.uids[0]
90+
block.NumUids = uint32(len(e.uids))
91+
92+
// block := &pb.UidBlock{Base: e.uids[0], NumUids: uint32(len(e.uids))}
5493
last := e.uids[0]
5594
e.uids = e.uids[1:]
5695

57-
var out bytes.Buffer
96+
var out z.Buffer
97+
// We are not releasing the allocated memory here. Instead, block.Deltas would need to be
98+
// released at the end.
99+
58100
buf := make([]byte, 17)
59101
tmpUids := make([]uint32, 4)
60102
for {
@@ -79,6 +121,7 @@ func (e *Encoder) packBlock() {
79121
e.uids = e.uids[4:]
80122
}
81123

124+
// TODO: Instead of one z.Buffer for every delta, allocate one big one.
82125
block.Deltas = out.Bytes()
83126
e.pack.Blocks = append(e.pack.Blocks, block)
84127
}
@@ -102,7 +145,7 @@ func (e *Encoder) Add(uid uint64) {
102145
}
103146
}
104147

105-
// Done returns the final output of the encoder.
148+
// Done returns the final output of the encoder. This UidPack MUST BE FREED via a call to FreePack.
106149
func (e *Encoder) Done() *pb.UidPack {
107150
e.packBlock()
108151
return e.pack

codec/codec_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ func TestUidPack(t *testing.T) {
4747
rand.Seed(time.Now().UnixNano())
4848

4949
// Some edge case tests.
50-
Encode([]uint64{}, 128)
50+
pack := Encode([]uint64{}, 128)
51+
FreePack(pack)
5152
require.Equal(t, 0, ApproxLen(&pb.UidPack{}))
5253
require.Equal(t, 0, len(Decode(&pb.UidPack{}, 0)))
5354

@@ -63,6 +64,7 @@ func TestUidPack(t *testing.T) {
6364
require.Equal(t, len(expected), ExactLen(pack))
6465
actual := Decode(pack, 0)
6566
require.Equal(t, expected, actual)
67+
FreePack(pack)
6668
}
6769
}
6870

@@ -73,6 +75,7 @@ func TestSeek(t *testing.T) {
7375
enc.Add(uint64(i))
7476
}
7577
pack := enc.Done()
78+
defer FreePack(pack)
7679
dec := Decoder{Pack: pack}
7780

7881
tests := []struct {
@@ -121,6 +124,7 @@ func TestLinearSeek(t *testing.T) {
121124
enc.Add(uint64(i))
122125
}
123126
pack := enc.Done()
127+
defer FreePack(pack)
124128
dec := Decoder{Pack: pack}
125129

126130
for i := 0; i < 2*N; i += 10 {
@@ -150,6 +154,7 @@ func TestDecoder(t *testing.T) {
150154
expected = append(expected, uint64(i))
151155
}
152156
pack := enc.Done()
157+
defer FreePack(pack)
153158

154159
dec := Decoder{Pack: pack}
155160
for i := 3; i < N; i += 3 {
@@ -215,6 +220,7 @@ func benchmarkUidPackEncode(b *testing.B, blockSize int) {
215220
for i := 0; i < b.N; i++ {
216221
pack := Encode(uids, blockSize)
217222
out, err := pack.Marshal()
223+
FreePack(pack)
218224
if err != nil {
219225
b.Fatalf("Error marshaling uid pack: %s", err.Error())
220226
}
@@ -249,6 +255,7 @@ func benchmarkUidPackDecode(b *testing.B, blockSize int) {
249255

250256
pack := Encode(uids, blockSize)
251257
data, err := pack.Marshal()
258+
defer FreePack(pack)
252259
x.Check(err)
253260
b.Logf("Output size: %s. Compression: %.2f",
254261
humanize.Bytes(uint64(len(data))),
@@ -301,6 +308,7 @@ func newUidPack(data []uint64) *pb.UidPack {
301308

302309
func TestCopyUidPack(t *testing.T) {
303310
pack := newUidPack([]uint64{1, 2, 3, 4, 5})
311+
defer FreePack(pack)
304312
copy := CopyUidPack(pack)
305313
require.Equal(t, Decode(pack, 0), Decode(copy, 0))
306314
}

0 commit comments

Comments
 (0)