Skip to content

Commit fe07e3e

Browse files
authored
Fix OOM: Only keep deltas in memory for a pending txn (#3349)
Currently, we bring and keep the entire posting list in memory for each pending txn, which remains there until the txn is committed or aborted. Mutations can easily touch a lot of data (including indices) which gets very expensive in terms of memory usage, causing OOMs during data loads. This PR fixes that issue by only keeping the deltas that need to be applied to the lists and discards the lists as soon as mutation application is done. On a commit, these deltas are then directly written to disk. On a read from the same txn, we apply the delta onto a newly read posting list, so a pending txn can read back its own write. This PR dramatically reduces the memory usage when mutations are going on, avoiding OOMs. Changes: * Instead of keeping the entire posting list in memory, only keep the deltas. This significantly reduces the memory usage, in fact, make it negligible. * Keep track of max version per posting list and use that to avoid repeat commits. * Revert changes to increment tool. They cause two counters to get created. * Add txn.Update in the right places, so any PLs in cache get converted to diffs. * Remove CommitToMemory
1 parent 5b3b5d0 commit fe07e3e

File tree

10 files changed

+96
-98
lines changed

10 files changed

+96
-98
lines changed

posting/index.go

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -509,41 +509,20 @@ func (r *rebuild) Run(ctx context.Context) error {
509509
}
510510
glog.V(1).Infof("Rebuild: Iteration done. Now committing at ts=%d\n", r.startTs)
511511

512-
// We must commit all the posting lists to memory, so they'd be picked up
513-
// during posting list rollup below.
514-
if err := txn.CommitToMemory(r.startTs); err != nil {
515-
return err
516-
}
512+
txn.Update() // Convert data into deltas.
517513

518514
// Now we write all the created posting lists to disk.
519515
writer := NewTxnWriter(pstore)
520-
for key := range txn.deltas {
521-
pl, err := txn.Get([]byte(key))
522-
if err != nil {
523-
return err
524-
}
525-
526-
le := pl.Length(r.startTs, 0)
527-
if le == 0 {
516+
for key, delta := range txn.cache.deltas {
517+
if len(delta) == 0 {
528518
continue
529519
}
530-
kvs, err := pl.Rollup()
531-
if err != nil {
520+
// We choose to write the PL at r.startTs, so it won't be read by txns,
521+
// which occurred before this schema mutation. Typically, we use
522+
// kv.Version as the timestamp.
523+
if err := writer.SetAt([]byte(key), delta, BitDeltaPosting, r.startTs); err != nil {
532524
return err
533525
}
534-
535-
for _, kv := range kvs {
536-
// We choose to write the PL at r.startTs, so it won't be read by txns,
537-
// which occurred before this schema mutation. Typically, we use
538-
// kv.Version as the timestamp.
539-
if err = writer.SetAt(kv.Key, kv.Value, kv.UserMeta[0], r.startTs); err != nil {
540-
return err
541-
}
542-
}
543-
// This locking is just to catch any future issues. We shouldn't need
544-
// to release this lock, because each posting list must only be accessed
545-
// once and never again.
546-
pl.Lock()
547526
}
548527
glog.V(1).Infoln("Rebuild: Flushing all writes.")
549528
return writer.Flush()

posting/index_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,10 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32,
156156
require.NoError(t, err)
157157
}
158158

159+
txn.Update()
159160
writer := NewTxnWriter(pstore)
160161
require.NoError(t, txn.CommitToDisk(writer, commitTs))
161162
require.NoError(t, writer.Flush())
162-
require.NoError(t, txn.CommitToMemory(commitTs))
163163
}
164164

165165
const schemaVal = `

posting/list.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) er
462462

463463
l.updateMutationLayer(mpost)
464464
atomic.AddInt32(&l.pendingTxns, 1)
465-
txn.AddKeys(string(l.key), conflictKey)
465+
txn.AddConflictKey(conflictKey)
466466
return nil
467467
}
468468

@@ -478,6 +478,15 @@ func (l *List) GetMutation(startTs uint64) []byte {
478478
return nil
479479
}
480480

481+
func (l *List) SetMutation(startTs uint64, data []byte) {
482+
pl := new(pb.PostingList)
483+
x.Check(pl.Unmarshal(data))
484+
485+
l.Lock()
486+
l.mutationMap[startTs] = pl
487+
l.Unlock()
488+
}
489+
481490
func (l *List) CommitMutation(startTs, commitTs uint64) error {
482491
l.Lock()
483492
defer l.Unlock()

posting/lists.go

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,11 +179,27 @@ func GetNoStore(key []byte) (rlist *List, err error) {
179179
type LocalCache struct {
180180
sync.RWMutex
181181

182+
startTs uint64
183+
184+
// The keys for these maps is a string representation of the Badger key for the posting list.
185+
// deltas keep track of the updates made by txn. These must be kept around until written to disk
186+
// during commit.
187+
deltas map[string][]byte
188+
189+
// max committed timestamp of the read posting lists.
190+
maxVersions map[string]uint64
191+
192+
// plists are posting lists in memory. They can be discarded to reclaim space.
182193
plists map[string]*List
183194
}
184195

185-
func NewLocalCache() *LocalCache {
186-
return &LocalCache{plists: make(map[string]*List)}
196+
func NewLocalCache(startTs uint64) *LocalCache {
197+
return &LocalCache{
198+
startTs: startTs,
199+
deltas: make(map[string][]byte),
200+
plists: make(map[string]*List),
201+
maxVersions: make(map[string]uint64),
202+
}
187203
}
188204

189205
func (lc *LocalCache) getNoStore(key string) *List {
@@ -218,5 +234,29 @@ func (lc *LocalCache) Get(key []byte) (*List, error) {
218234
if err != nil {
219235
return nil, err
220236
}
237+
// If we just brought this posting list into memory and we already have a delta for it, let's
238+
// apply it before returning the list.
239+
lc.RLock()
240+
if delta, ok := lc.deltas[skey]; ok && len(delta) > 0 {
241+
pl.SetMutation(lc.startTs, delta)
242+
}
243+
lc.RUnlock()
221244
return lc.Set(skey, pl), nil
222245
}
246+
247+
func (lc *LocalCache) UpdateDeltasAndDiscardLists() {
248+
lc.Lock()
249+
defer lc.Unlock()
250+
if len(lc.plists) == 0 {
251+
return
252+
}
253+
254+
for key, pl := range lc.plists {
255+
data := pl.GetMutation(lc.startTs)
256+
if len(data) > 0 {
257+
lc.deltas[key] = data
258+
}
259+
lc.maxVersions[key] = pl.maxVersion()
260+
}
261+
lc.plists = make(map[string]*List)
262+
}

posting/mvcc.go

Lines changed: 14 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@ import (
2323
"math"
2424
"strconv"
2525
"sync/atomic"
26-
"time"
2726

2827
"github.com/dgraph-io/badger"
2928
"github.com/dgraph-io/dgo/protos/api"
3029
"github.com/dgraph-io/dgraph/protos/pb"
3130
"github.com/dgraph-io/dgraph/x"
3231
farm "github.com/dgryski/go-farm"
33-
"github.com/golang/glog"
3432
)
3533

3634
var (
@@ -48,14 +46,12 @@ func (txn *Txn) ShouldAbort() bool {
4846
return atomic.LoadUint32(&txn.shouldAbort) > 0
4947
}
5048

51-
func (txn *Txn) AddKeys(key, conflictKey string) {
49+
func (txn *Txn) AddConflictKey(conflictKey string) {
5250
txn.Lock()
5351
defer txn.Unlock()
54-
if txn.deltas == nil || txn.conflicts == nil {
55-
txn.deltas = make(map[string]struct{})
52+
if txn.conflicts == nil {
5653
txn.conflicts = make(map[string]struct{})
5754
}
58-
txn.deltas[key] = struct{}{}
5955
if len(conflictKey) > 0 {
6056
txn.conflicts[conflictKey] = struct{}{}
6157
}
@@ -74,7 +70,9 @@ func (txn *Txn) Fill(ctx *api.TxnContext, gid uint32) {
7470
ctx.Keys = append(ctx.Keys, fps)
7571
}
7672
}
77-
for key := range txn.deltas {
73+
74+
txn.Update()
75+
for key := range txn.cache.deltas {
7876
pk := x.Parse([]byte(key))
7977
// Also send the group id that the predicate was being served by. This is useful when
8078
// checking if Zero should allow a commit during a predicate move.
@@ -92,13 +90,15 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
9290
if commitTs == 0 {
9391
return nil
9492
}
93+
94+
cache := txn.cache
95+
cache.Lock()
96+
defer cache.Unlock()
97+
9598
var keys []string
96-
txn.Lock()
97-
// TODO: We can remove the deltas here. Now that we're using txn local cache.
98-
for key := range txn.deltas {
99+
for key := range cache.deltas {
99100
keys = append(keys, key)
100101
}
101-
txn.Unlock()
102102

103103
var idx int
104104
for idx < len(keys) {
@@ -108,15 +108,11 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
108108
err := writer.Update(commitTs, func(btxn *badger.Txn) error {
109109
for ; idx < len(keys); idx++ {
110110
key := keys[idx]
111-
plist, err := txn.Get([]byte(key))
112-
if err != nil {
113-
return err
114-
}
115-
data := plist.GetMutation(txn.StartTs)
116-
if data == nil {
111+
data := cache.deltas[key]
112+
if len(data) == 0 {
117113
continue
118114
}
119-
if plist.maxVersion() >= commitTs {
115+
if ts := cache.maxVersions[key]; ts >= commitTs {
120116
// Skip write because we already have a write at a higher ts.
121117
// Logging here can cause a lot of output when doing Raft log replay. So, let's
122118
// not output anything here.
@@ -135,36 +131,6 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
135131
return nil
136132
}
137133

138-
func (txn *Txn) CommitToMemory(commitTs uint64) error {
139-
txn.Lock()
140-
defer txn.Unlock()
141-
// TODO: Figure out what shouldAbort is for, and use it correctly. This should really be
142-
// shouldDiscard.
143-
// defer func() {
144-
// atomic.StoreUint32(&txn.shouldAbort, 1)
145-
// }()
146-
for key := range txn.deltas {
147-
inner:
148-
for {
149-
plist, err := txn.Get([]byte(key))
150-
if err != nil {
151-
return err
152-
}
153-
err = plist.CommitMutation(txn.StartTs, commitTs)
154-
switch err {
155-
case nil:
156-
break inner
157-
case ErrRetry:
158-
time.Sleep(5 * time.Millisecond)
159-
default:
160-
glog.Warningf("Error while committing to memory: %v\n", err)
161-
return err
162-
}
163-
}
164-
}
165-
return nil
166-
}
167-
168134
func unmarshalOrCopy(plist *pb.PostingList, item *badger.Item) error {
169135
return item.Value(func(val []byte) error {
170136
if len(val) == 0 {

posting/oracle.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,6 @@ type Txn struct {
4848
shouldAbort uint32
4949
// Fields which can changed after init
5050
sync.Mutex
51-
// Deltas keeps track of the posting list keys, and whether they should be considered for
52-
// conflict detection or not. When a txn is marked committed or aborted, we use the keys stored
53-
// here to determine which posting lists to get and update.
54-
deltas map[string]struct{}
5551

5652
// Keeps track of conflict keys that should be used to determine if this
5753
// transaction conflicts with another.
@@ -61,21 +57,25 @@ type Txn struct {
6157
// determine unhealthy, stale txns.
6258
lastUpdate time.Time
6359

64-
cache *LocalCache
60+
cache *LocalCache // This pointer does not get modified.
6561
}
6662

6763
func NewTxn(startTs uint64) *Txn {
6864
return &Txn{
6965
StartTs: startTs,
70-
cache: NewLocalCache(),
66+
cache: NewLocalCache(startTs),
7167
lastUpdate: time.Now(),
7268
}
7369
}
7470

7571
func (txn *Txn) Get(key []byte) (*List, error) {
7672
return txn.cache.Get(key)
7773
}
74+
func (txn *Txn) Update() {
75+
txn.cache.UpdateDeltasAndDiscardLists()
76+
}
7877

78+
// Store is used by tests.
7979
func (txn *Txn) Store(pl *List) *List {
8080
return txn.cache.Set(string(pl.key), pl)
8181
}
@@ -139,6 +139,12 @@ func (o *oracle) MinPendingStartTs() uint64 {
139139
return min
140140
}
141141

142+
func (o *oracle) NumPendingTxns() int {
143+
o.RLock()
144+
defer o.RUnlock()
145+
return len(o.pendingTxns)
146+
}
147+
142148
func (o *oracle) TxnOlderThan(dur time.Duration) (res []uint64) {
143149
o.RLock()
144150
defer o.RUnlock()
@@ -237,7 +243,7 @@ func (o *oracle) GetTxn(startTs uint64) *Txn {
237243
func (txn *Txn) matchesDelta(ok func(key []byte) bool) bool {
238244
txn.Lock()
239245
defer txn.Unlock()
240-
for key := range txn.deltas {
246+
for key := range txn.cache.deltas {
241247
if ok([]byte(key)) {
242248
return true
243249
}

worker/draft.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,9 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
284284
return dy.ErrConflict
285285
}
286286

287+
// Discard the posting lists from cache to release memory at the end.
288+
defer txn.Update()
289+
287290
sort.Slice(m.Edges, func(i, j int) bool {
288291
ei := m.Edges[i]
289292
ej := m.Edges[j]
@@ -525,6 +528,7 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
525528
if txn == nil {
526529
return
527530
}
531+
txn.Update()
528532
err := x.RetryUntilSuccess(x.WorkerConfig.MaxRetries, 10*time.Millisecond, func() error {
529533
return txn.CommitToDisk(writer, commit)
530534
})
@@ -1145,6 +1149,9 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
11451149
return nil, err
11461150
}
11471151

1152+
if num := posting.Oracle().NumPendingTxns(); num > 0 {
1153+
glog.Infof("Num pending txns: %d", num)
1154+
}
11481155
// We can't rely upon the Raft entries to determine the minPendingStart,
11491156
// because there are many cases during mutations where we don't commit or
11501157
// abort the transaction. This might happen due to an early error thrown.

worker/mutation.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -503,15 +503,6 @@ func populateMutationMap(src *pb.Mutations) (map[uint32]*pb.Mutations, error) {
503503
return mm, nil
504504
}
505505

506-
func commitOrAbort(ctx context.Context, startTs, commitTs uint64) error {
507-
txn := posting.Oracle().GetTxn(startTs)
508-
if txn == nil {
509-
return nil
510-
}
511-
// Ensures that we wait till prewrite is applied
512-
return txn.CommitToMemory(commitTs)
513-
}
514-
515506
type res struct {
516507
err error
517508
ctx *api.TxnContext

worker/predicate_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ func commitTransaction(t *testing.T, edge *pb.DirectedEdge, l *posting.List) {
7373

7474
commit := commitTs(startTs)
7575

76+
txn.Update()
7677
writer := posting.NewTxnWriter(pstore)
7778
require.NoError(t, txn.CommitToDisk(writer, commit))
7879
require.NoError(t, writer.Flush())
79-
require.NoError(t, txn.CommitToMemory(commit))
8080
}
8181

8282
// Hacky tests change laster

worker/task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,7 @@ func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, erro
749749
qs.cache = posting.Oracle().CacheAt(q.ReadTs)
750750
}
751751
if qs.cache == nil {
752-
qs.cache = posting.NewLocalCache()
752+
qs.cache = posting.NewLocalCache(q.ReadTs)
753753
}
754754

755755
out, err := qs.helpProcessTask(ctx, q, gid)

0 commit comments

Comments
 (0)