Skip to content

Commit 42b2a60

Browse files
authored
Perform incremental rollups instead of rollups at snapshot (#4410)
Perform incremental rollups instead of rollups at snapshot
1 parent 04eafb1 commit 42b2a60

File tree

2 files changed

+142
-71
lines changed

2 files changed

+142
-71
lines changed

posting/mvcc.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,105 @@ import (
2121
"encoding/hex"
2222
"math"
2323
"strconv"
24+
"sync"
2425
"sync/atomic"
26+
"time"
2527

2628
"github.com/dgraph-io/badger/v2"
29+
bpb "github.com/dgraph-io/badger/v2/pb"
2730
"github.com/dgraph-io/dgo/v2/protos/api"
2831
"github.com/dgraph-io/dgraph/protos/pb"
2932
"github.com/dgraph-io/dgraph/x"
33+
"github.com/dgraph-io/ristretto/z"
34+
"github.com/golang/glog"
3035
"github.com/pkg/errors"
3136
)
3237

38+
// incrRollupi is used to batch keys for rollup incrementally.
39+
type incrRollupi struct {
40+
// keysCh is populated with batch of 64 keys that needs to be rolled up during reads
41+
keysCh chan *[][]byte
42+
// keysPool is sync.Pool to share the batched keys to rollup.
43+
keysPool *sync.Pool
44+
}
45+
3346
var (
3447
// ErrTsTooOld is returned when a transaction is too old to be applied.
3548
ErrTsTooOld = errors.Errorf("Transaction is too old")
49+
50+
// IncrRollup is used to batch keys for rollup incrementally.
51+
IncrRollup = &incrRollupi{
52+
keysCh: make(chan *[][]byte),
53+
keysPool: &sync.Pool{
54+
New: func() interface{} {
55+
return new([][]byte)
56+
},
57+
},
58+
}
3659
)
3760

61+
// rollUpKey takes the given key's posting lists, rolls it up and writes back to badger
62+
func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
63+
l, err := GetNoStore(key)
64+
if err != nil {
65+
return err
66+
}
67+
68+
kvs, err := l.Rollup()
69+
if err != nil {
70+
return err
71+
}
72+
73+
return writer.Write(&bpb.KVList{Kv: kvs})
74+
}
75+
76+
func (ir *incrRollupi) addKeyToBatch(key []byte) {
77+
batch := ir.keysPool.Get().(*[][]byte)
78+
*batch = append(*batch, key)
79+
if len(*batch) < 64 {
80+
ir.keysPool.Put(batch)
81+
return
82+
}
83+
84+
select {
85+
case ir.keysCh <- batch:
86+
default:
87+
// Drop keys and build the batch again. Lossy behavior.
88+
*batch = (*batch)[:0]
89+
ir.keysPool.Put(batch)
90+
}
91+
}
92+
93+
// Process will rollup batches of 64 keys in a go routine.
94+
func (ir *incrRollupi) Process() {
95+
m := make(map[uint64]int64) // map hash(key) to ts. hash(key) to limit the size of the map.
96+
limiter := time.NewTicker(100 * time.Millisecond)
97+
writer := NewTxnWriter(pstore)
98+
99+
for batch := range ir.keysCh {
100+
currTs := time.Now().Unix()
101+
for _, key := range *batch {
102+
hash := z.MemHash(key)
103+
if elem, ok := m[hash]; !ok || (currTs-elem >= 10) {
104+
// Key not present or Key present but last roll up was more than 10 sec ago.
105+
// Add/Update map and rollup.
106+
m[hash] = currTs
107+
if err := ir.rollUpKey(writer, key); err != nil {
108+
glog.Warningf("Error %v rolling up key %v\n", err, key)
109+
continue
110+
}
111+
}
112+
}
113+
// clear the batch and put it back in Sync keysPool
114+
*batch = (*batch)[:0]
115+
ir.keysPool.Put(batch)
116+
117+
// throttle to 1 batch = 64 rollups per 100 ms.
118+
<-limiter.C
119+
}
120+
// keysCh is closed. This should never happen.
121+
}
122+
38123
// ShouldAbort returns whether the transaction should be aborted.
39124
func (txn *Txn) ShouldAbort() bool {
40125
if txn == nil {
@@ -144,6 +229,8 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
144229
l := new(List)
145230
l.key = key
146231
l.plist = new(pb.PostingList)
232+
const maxDeltaCount = 2
233+
deltaCount := 0
147234

148235
// Iterates from highest Ts to lowest Ts
149236
for it.Valid() {
@@ -188,6 +275,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
188275
if err != nil {
189276
return nil, err
190277
}
278+
deltaCount++
191279
case BitSchemaPosting:
192280
return nil, errors.Errorf(
193281
"Trying to read schema in ReadPostingList for key: %s", hex.Dump(key))
@@ -200,6 +288,11 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
200288
}
201289
it.Next()
202290
}
291+
292+
if deltaCount >= maxDeltaCount {
293+
IncrRollup.addKeyToBatch(key)
294+
}
295+
203296
return l, nil
204297
}
205298

worker/draft.go

Lines changed: 49 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -420,13 +420,13 @@ func (n *node) processRollups() {
420420
if readTs <= last {
421421
break // Break out of the select case.
422422
}
423-
if err := n.rollupLists(readTs); err != nil {
423+
if err := n.calcTabletSizes(readTs); err != nil {
424424
// If we encounter error here, we don't need to do anything about
425425
// it. Just let the user know.
426426
glog.Errorf("Error while rolling up lists at %d: %v\n", readTs, err)
427427
} else {
428428
last = readTs // Update last only if we succeeded.
429-
glog.Infof("List rollup at Ts %d: OK.\n", readTs)
429+
glog.Infof("Last rollup at Ts %d: OK.\n", readTs)
430430
}
431431
}
432432
}
@@ -1008,20 +1008,19 @@ func listWrap(kv *bpb.KV) *bpb.KVList {
10081008
return &bpb.KVList{Kv: []*bpb.KV{kv}}
10091009
}
10101010

1011-
// rollupLists would consolidate all the deltas that constitute one posting
1012-
// list, and write back a complete posting list.
1013-
func (n *node) rollupLists(readTs uint64) error {
1014-
writer := posting.NewTxnWriter(pstore)
1011+
// calcTabletSizes updates the tablet sizes for the keys.
1012+
func (n *node) calcTabletSizes(readTs uint64) error {
1013+
// We can now discard all invalid versions of keys below this ts.
1014+
pstore.SetDiscardTs(readTs)
1015+
1016+
if !n.AmLeader() {
1017+
// Only leader needs to calculate the tablet sizes.
1018+
return nil
1019+
}
10151020

1016-
// We're doing rollups. We should use this opportunity to calculate the tablet sizes.
1017-
amLeader := n.AmLeader()
10181021
m := new(sync.Map)
10191022

10201023
addTo := func(key []byte, delta int64) {
1021-
if !amLeader {
1022-
// Only leader needs to calculate the tablet sizes.
1023-
return
1024-
}
10251024
pk, err := x.Parse(key)
10261025

10271026
// Type keys should not count for tablet size calculations.
@@ -1043,7 +1042,7 @@ func (n *node) rollupLists(readTs uint64) error {
10431042
}
10441043

10451044
stream := pstore.NewStreamAt(readTs)
1046-
stream.LogPrefix = "Rolling up"
1045+
stream.LogPrefix = "Tablet Size Calculation"
10471046
stream.ChooseKey = func(item *badger.Item) bool {
10481047
switch item.UserMeta() {
10491048
case posting.BitSchemaPosting, posting.BitCompletePosting, posting.BitEmptyPosting:
@@ -1052,75 +1051,53 @@ func (n *node) rollupLists(readTs uint64) error {
10521051
case x.ByteUnused:
10531052
return false
10541053
default:
1055-
return true
1054+
// not doing rollups anymore.
1055+
return false
10561056
}
10571057
}
1058-
var numKeys uint64
1059-
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
1060-
l, err := posting.ReadPostingList(key, itr)
1061-
if err != nil {
1062-
return nil, err
1063-
}
1064-
atomic.AddUint64(&numKeys, 1)
1065-
kvs, err := l.Rollup()
1066-
1067-
// If there are multiple keys, the posting list was split into multiple
1068-
// parts. The key of the first part is the right key to use for tablet
1069-
// size calculations.
1070-
for _, kv := range kvs {
1071-
addTo(kvs[0].Key, int64(kv.Size()))
1072-
}
10731058

1074-
return &bpb.KVList{Kv: kvs}, err
1059+
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
1060+
return nil, nil // no-op
10751061
}
10761062
stream.Send = func(list *bpb.KVList) error {
1077-
return writer.Write(list)
1063+
return nil
10781064
}
10791065
if err := stream.Orchestrate(context.Background()); err != nil {
10801066
return err
10811067
}
1082-
if err := writer.Flush(); err != nil {
1083-
return err
1084-
}
1085-
// For all the keys, let's see if they're in the LRU cache. If so, we can roll them up.
1086-
glog.Infof("Rolled up %d keys. Done", atomic.LoadUint64(&numKeys))
10871068

1088-
// We can now discard all invalid versions of keys below this ts.
1089-
pstore.SetDiscardTs(readTs)
1090-
1091-
if amLeader {
1092-
// Only leader sends the tablet size updates to Zero. No one else does.
1093-
// doSendMembership is also being concurrently called from another goroutine.
1094-
go func() {
1095-
tablets := make(map[string]*pb.Tablet)
1096-
var total int64
1097-
m.Range(func(key, val interface{}) bool {
1098-
pred := key.(string)
1099-
size := atomic.LoadInt64(val.(*int64))
1100-
tablets[pred] = &pb.Tablet{
1101-
GroupId: n.gid,
1102-
Predicate: pred,
1103-
Space: size,
1104-
}
1105-
total += size
1106-
return true
1107-
})
1108-
// Update Zero with the tablet sizes. If Zero sees a tablet which does not belong to
1109-
// this group, it would send instruction to delete that tablet. There's an edge case
1110-
// here if the followers are still running Rollup, and happen to read a key before and
1111-
// write after the tablet deletion, causing that tablet key to resurface. Then, only the
1112-
// follower would have that key, not the leader.
1113-
// However, if the follower then becomes the leader, we'd be able to get rid of that
1114-
// key then. Alternatively, we could look into cancelling the Rollup if we see a
1115-
// predicate deletion.
1116-
if err := groups().doSendMembership(tablets); err != nil {
1117-
glog.Warningf("While sending membership to Zero. Error: %v", err)
1118-
} else {
1119-
glog.V(2).Infof("Sent tablet size update to Zero. Total size: %s",
1120-
humanize.Bytes(uint64(total)))
1069+
// Only leader sends the tablet size updates to Zero. No one else does.
1070+
// doSendMembership is also being concurrently called from another goroutine.
1071+
go func() {
1072+
tablets := make(map[string]*pb.Tablet)
1073+
var total int64
1074+
m.Range(func(key, val interface{}) bool {
1075+
pred := key.(string)
1076+
size := atomic.LoadInt64(val.(*int64))
1077+
tablets[pred] = &pb.Tablet{
1078+
GroupId: n.gid,
1079+
Predicate: pred,
1080+
Space: size,
11211081
}
1122-
}()
1123-
}
1082+
total += size
1083+
return true
1084+
})
1085+
// Update Zero with the tablet sizes. If Zero sees a tablet which does not belong to
1086+
// this group, it would send instruction to delete that tablet. There's an edge case
1087+
// here if the followers are still running Rollup, and happen to read a key before and
1088+
// write after the tablet deletion, causing that tablet key to resurface. Then, only the
1089+
// follower would have that key, not the leader.
1090+
// However, if the follower then becomes the leader, we'd be able to get rid of that
1091+
// key then. Alternatively, we could look into cancelling the Rollup if we see a
1092+
// predicate deletion.
1093+
if err := groups().doSendMembership(tablets); err != nil {
1094+
glog.Warningf("While sending membership to Zero. Error: %v", err)
1095+
} else {
1096+
glog.V(2).Infof("Sent tablet size update to Zero. Total size: %s",
1097+
humanize.Bytes(uint64(total)))
1098+
}
1099+
}()
1100+
11241101
return nil
11251102
}
11261103

@@ -1455,6 +1432,7 @@ func (n *node) InitAndStartNode() {
14551432
go n.processRollups()
14561433
go n.processApplyCh()
14571434
go n.BatchAndSendMessages()
1435+
go posting.IncrRollup.Process()
14581436
go n.Run()
14591437
}
14601438

0 commit comments

Comments
 (0)