Skip to content

Commit 889989f

Browse files
authored
Reuse allocator (#7360)
Instead of creating a new z.Allocator for every encoder, this PR reuses the allocator. On a 21M dataset, bulk loader takes 2m22s on master, and only 1m35s on this PR. That's a major 35% performance improvement.
1 parent 4a2bc36 commit 889989f

File tree

4 files changed

+30
-15
lines changed

4 files changed

+30
-15
lines changed

codec/codec.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type Encoder struct {
4747
BlockSize int
4848
pack *pb.UidPack
4949
uids []uint64
50-
alloc *z.Allocator
50+
Alloc *z.Allocator
5151
buf *bytes.Buffer
5252
}
5353

@@ -70,7 +70,7 @@ func (e *Encoder) packBlock() {
7070
}
7171

7272
// Allocate blocks manually.
73-
b := e.alloc.AllocateAligned(blockSize)
73+
b := e.Alloc.AllocateAligned(blockSize)
7474
block := (*pb.UidBlock)(unsafe.Pointer(&b[0]))
7575

7676
block.Base = e.uids[0]
@@ -106,7 +106,7 @@ func (e *Encoder) packBlock() {
106106
}
107107

108108
sz := len(e.buf.Bytes())
109-
block.Deltas = e.alloc.Allocate(sz)
109+
block.Deltas = e.Alloc.Allocate(sz)
110110
x.AssertTrue(sz == copy(block.Deltas, e.buf.Bytes()))
111111
e.pack.Blocks = append(e.pack.Blocks, block)
112112
}
@@ -117,10 +117,12 @@ var tagEncoder string = "enc"
117117
func (e *Encoder) Add(uid uint64) {
118118
if e.pack == nil {
119119
e.pack = &pb.UidPack{BlockSize: uint32(e.BlockSize)}
120-
e.alloc = z.NewAllocator(1024)
121-
e.alloc.Tag = tagEncoder
122120
e.buf = new(bytes.Buffer)
123121
}
122+
if e.Alloc == nil {
123+
e.Alloc = z.NewAllocator(1024)
124+
e.Alloc.Tag = tagEncoder
125+
}
124126

125127
size := len(e.uids)
126128
if size > 0 && !match32MSB(e.uids[size-1], uid) {
@@ -138,8 +140,8 @@ func (e *Encoder) Add(uid uint64) {
138140
// Done returns the final output of the encoder. This UidPack MUST BE FREED via a call to FreePack.
139141
func (e *Encoder) Done() *pb.UidPack {
140142
e.packBlock()
141-
if e.pack != nil && e.alloc != nil {
142-
e.pack.AllocRef = e.alloc.Ref
143+
if e.pack != nil && e.Alloc != nil {
144+
e.pack.AllocRef = e.Alloc.Ref
143145
}
144146
return e.pack
145147
}

dgraph/cmd/bulk/count_index.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,11 @@ func (c *countIndexer) writeIndex(buf *z.Buffer) {
132132
fmt.Printf("Writing count index for %q rev=%v\n", pk.Attr, pk.IsReverse())
133133
}
134134

135+
alloc := z.NewAllocator(8 << 20)
136+
defer alloc.Release()
137+
135138
var pl pb.PostingList
136-
encoder := codec.Encoder{BlockSize: 256}
139+
encoder := codec.Encoder{BlockSize: 256, Alloc: alloc}
137140

138141
outBuf := z.NewBuffer(5 << 20)
139142
defer outBuf.Release()
@@ -144,13 +147,13 @@ func (c *countIndexer) writeIndex(buf *z.Buffer) {
144147
}
145148

146149
kv := posting.MarshalPostingList(&pl, nil)
147-
codec.FreePack(pl.Pack)
148150
kv.Key = append([]byte{}, lastCe.Key()...)
149151
kv.Version = c.state.writeTs
150152
kv.StreamId = streamId
151153
badger.KVToBuffer(kv, outBuf)
152154

153-
encoder = codec.Encoder{BlockSize: 256}
155+
alloc.Reset()
156+
encoder = codec.Encoder{BlockSize: 256, Alloc: alloc}
154157
pl.Reset()
155158

156159
// Flush out the buffer.

dgraph/cmd/bulk/reduce.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,12 @@ func (r *reducer) toList(req *encodeRequest) {
569569
freePostings = append(freePostings, p)
570570
}
571571

572+
alloc := z.NewAllocator(16 << 20)
573+
defer func() {
574+
// We put alloc.Release in defer because we reassign alloc for split posting lists.
575+
alloc.Release()
576+
}()
577+
572578
start, end, num := cbuf.StartOffset(), cbuf.StartOffset(), 0
573579
appendToList := func() {
574580
if num == 0 {
@@ -595,7 +601,8 @@ func (r *reducer) toList(req *encodeRequest) {
595601
}
596602
}
597603

598-
enc := codec.Encoder{BlockSize: 256}
604+
alloc.Reset()
605+
enc := codec.Encoder{BlockSize: 256, Alloc: alloc}
599606
var lastUid uint64
600607
slice, next := []byte{}, start
601608
for next >= 0 && (next < end || end == -1) {
@@ -629,7 +636,7 @@ func (r *reducer) toList(req *encodeRequest) {
629636
// the full pb.Posting type is used (which pb.y contains the
630637
// delta packed UID list).
631638
if numUids == 0 {
632-
codec.FreePack(pl.Pack)
639+
// No need to FrePack here because we are reusing alloc.
633640
return
634641
}
635642

@@ -657,6 +664,9 @@ func (r *reducer) toList(req *encodeRequest) {
657664
kvs, err := l.Rollup(nil)
658665
x.Check(err)
659666

667+
// Assign a new allocator, so we don't reset the one we were using during Rollup.
668+
alloc = z.NewAllocator(16 << 20)
669+
660670
for _, kv := range kvs {
661671
kv.StreamId = r.streamIdFor(pk.Attr)
662672
}
@@ -666,7 +676,7 @@ func (r *reducer) toList(req *encodeRequest) {
666676
}
667677
} else {
668678
kv := posting.MarshalPostingList(pl, nil)
669-
codec.FreePack(pl.Pack)
679+
// No need to FreePack here, because we are reusing alloc.
670680

671681
kv.Key = y.Copy(currentKey)
672682
kv.Version = writeVersionTs

t/t.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -485,12 +485,12 @@ func getPackages() []task {
485485
}
486486

487487
if !isValidPackageForSuite(pkg.ID) {
488-
fmt.Printf("Skipping pacakge %s as its not valid for the selected suite %s \n", pkg.ID, *suite)
488+
fmt.Printf("Skipping package %s as its not valid for the selected suite %s \n", pkg.ID, *suite)
489489
continue
490490
}
491491

492492
if has(skipPkgs, pkg.ID) {
493-
fmt.Printf("Skipping pacakge %s as its available in skip list \n", pkg.ID)
493+
fmt.Printf("Skipping package %s as its available in skip list \n", pkg.ID)
494494
continue
495495
}
496496

0 commit comments

Comments
 (0)