Skip to content
Permalink
Browse files

Fix OOM: Only keep deltas in memory for a pending txn (#3349)

Currently, we bring and keep the entire posting list in memory for each pending txn, which remains there until the txn is committed or aborted. Mutations can easily touch a lot of data (including indices) which gets very expensive in terms of memory usage, causing OOMs during data loads.

This PR fixes that issue by only keeping the deltas that need to be applied to the lists and discards the lists as soon as mutation application is done. On a commit, these deltas are then directly written to disk. On a read from the same txn, we apply the delta onto a newly read posting list, so a pending txn can read back its own write.

This PR dramatically reduces the memory usage when mutations are going on, avoiding OOMs.

Changes:
* Instead of keeping the entire posting list in memory, only keep the deltas. This significantly reduces the memory usage, in fact, make it negligible.
* Keep track of max version per posting list and use that to avoid repeat commits.
* Revert changes to increment tool. They cause two counters to get created.
* Add txn.Update in the right places, so any PLs in cache get converted to diffs.
* Remove CommitToMemory
  • Loading branch information...
manishrjain committed May 2, 2019
1 parent 5b3b5d0 commit fe07e3e8a859f94411af7869e5ab1c72450e0a20
Showing with 96 additions and 98 deletions.
  1. +7 −28 posting/index.go
  2. +1 −1 posting/index_test.go
  3. +10 −1 posting/list.go
  4. +42 −2 posting/lists.go
  5. +14 −48 posting/mvcc.go
  6. +13 −7 posting/oracle.go
  7. +7 −0 worker/draft.go
  8. +0 −9 worker/mutation.go
  9. +1 −1 worker/predicate_test.go
  10. +1 −1 worker/task.go
@@ -509,41 +509,20 @@ func (r *rebuild) Run(ctx context.Context) error {
}
glog.V(1).Infof("Rebuild: Iteration done. Now committing at ts=%d\n", r.startTs)

// We must commit all the posting lists to memory, so they'd be picked up
// during posting list rollup below.
if err := txn.CommitToMemory(r.startTs); err != nil {
return err
}
txn.Update() // Convert data into deltas.

// Now we write all the created posting lists to disk.
writer := NewTxnWriter(pstore)
for key := range txn.deltas {
pl, err := txn.Get([]byte(key))
if err != nil {
return err
}

le := pl.Length(r.startTs, 0)
if le == 0 {
for key, delta := range txn.cache.deltas {
if len(delta) == 0 {
continue
}
kvs, err := pl.Rollup()
if err != nil {
// We choose to write the PL at r.startTs, so it won't be read by txns,
// which occurred before this schema mutation. Typically, we use
// kv.Version as the timestamp.
if err := writer.SetAt([]byte(key), delta, BitDeltaPosting, r.startTs); err != nil {
return err
}

for _, kv := range kvs {
// We choose to write the PL at r.startTs, so it won't be read by txns,
// which occurred before this schema mutation. Typically, we use
// kv.Version as the timestamp.
if err = writer.SetAt(kv.Key, kv.Value, kv.UserMeta[0], r.startTs); err != nil {
return err
}
}
// This locking is just to catch any future issues. We shouldn't need
// to release this lock, because each posting list must only be accessed
// once and never again.
pl.Lock()
}
glog.V(1).Infoln("Rebuild: Flushing all writes.")
return writer.Flush()
@@ -156,10 +156,10 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32,
require.NoError(t, err)
}

txn.Update()
writer := NewTxnWriter(pstore)
require.NoError(t, txn.CommitToDisk(writer, commitTs))
require.NoError(t, writer.Flush())
require.NoError(t, txn.CommitToMemory(commitTs))
}

const schemaVal = `
@@ -462,7 +462,7 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) er

l.updateMutationLayer(mpost)
atomic.AddInt32(&l.pendingTxns, 1)
txn.AddKeys(string(l.key), conflictKey)
txn.AddConflictKey(conflictKey)
return nil
}

@@ -478,6 +478,15 @@ func (l *List) GetMutation(startTs uint64) []byte {
return nil
}

func (l *List) SetMutation(startTs uint64, data []byte) {
pl := new(pb.PostingList)
x.Check(pl.Unmarshal(data))

l.Lock()
l.mutationMap[startTs] = pl
l.Unlock()
}

func (l *List) CommitMutation(startTs, commitTs uint64) error {
l.Lock()
defer l.Unlock()
@@ -179,11 +179,27 @@ func GetNoStore(key []byte) (rlist *List, err error) {
type LocalCache struct {
sync.RWMutex

startTs uint64

// The keys for these maps is a string representation of the Badger key for the posting list.
// deltas keep track of the updates made by txn. These must be kept around until written to disk
// during commit.
deltas map[string][]byte

// max committed timestamp of the read posting lists.
maxVersions map[string]uint64

// plists are posting lists in memory. They can be discarded to reclaim space.
plists map[string]*List
}

func NewLocalCache() *LocalCache {
return &LocalCache{plists: make(map[string]*List)}
func NewLocalCache(startTs uint64) *LocalCache {
return &LocalCache{
startTs: startTs,
deltas: make(map[string][]byte),
plists: make(map[string]*List),
maxVersions: make(map[string]uint64),
}
}

func (lc *LocalCache) getNoStore(key string) *List {
@@ -218,5 +234,29 @@ func (lc *LocalCache) Get(key []byte) (*List, error) {
if err != nil {
return nil, err
}
// If we just brought this posting list into memory and we already have a delta for it, let's
// apply it before returning the list.
lc.RLock()
if delta, ok := lc.deltas[skey]; ok && len(delta) > 0 {
pl.SetMutation(lc.startTs, delta)
}
lc.RUnlock()
return lc.Set(skey, pl), nil
}

func (lc *LocalCache) UpdateDeltasAndDiscardLists() {
lc.Lock()
defer lc.Unlock()
if len(lc.plists) == 0 {
return
}

for key, pl := range lc.plists {
data := pl.GetMutation(lc.startTs)
if len(data) > 0 {
lc.deltas[key] = data
}
lc.maxVersions[key] = pl.maxVersion()
}
lc.plists = make(map[string]*List)
}
@@ -23,14 +23,12 @@ import (
"math"
"strconv"
"sync/atomic"
"time"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
farm "github.com/dgryski/go-farm"
"github.com/golang/glog"
)

var (
@@ -48,14 +46,12 @@ func (txn *Txn) ShouldAbort() bool {
return atomic.LoadUint32(&txn.shouldAbort) > 0
}

func (txn *Txn) AddKeys(key, conflictKey string) {
func (txn *Txn) AddConflictKey(conflictKey string) {
txn.Lock()
defer txn.Unlock()
if txn.deltas == nil || txn.conflicts == nil {
txn.deltas = make(map[string]struct{})
if txn.conflicts == nil {
txn.conflicts = make(map[string]struct{})
}
txn.deltas[key] = struct{}{}
if len(conflictKey) > 0 {
txn.conflicts[conflictKey] = struct{}{}
}
@@ -74,7 +70,9 @@ func (txn *Txn) Fill(ctx *api.TxnContext, gid uint32) {
ctx.Keys = append(ctx.Keys, fps)
}
}
for key := range txn.deltas {

txn.Update()
for key := range txn.cache.deltas {
pk := x.Parse([]byte(key))
// Also send the group id that the predicate was being served by. This is useful when
// checking if Zero should allow a commit during a predicate move.
@@ -92,13 +90,15 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
if commitTs == 0 {
return nil
}

cache := txn.cache
cache.Lock()
defer cache.Unlock()

var keys []string
txn.Lock()
// TODO: We can remove the deltas here. Now that we're using txn local cache.
for key := range txn.deltas {
for key := range cache.deltas {
keys = append(keys, key)
}
txn.Unlock()

var idx int
for idx < len(keys) {
@@ -108,15 +108,11 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
err := writer.Update(commitTs, func(btxn *badger.Txn) error {
for ; idx < len(keys); idx++ {
key := keys[idx]
plist, err := txn.Get([]byte(key))
if err != nil {
return err
}
data := plist.GetMutation(txn.StartTs)
if data == nil {
data := cache.deltas[key]
if len(data) == 0 {
continue
}
if plist.maxVersion() >= commitTs {
if ts := cache.maxVersions[key]; ts >= commitTs {
// Skip write because we already have a write at a higher ts.
// Logging here can cause a lot of output when doing Raft log replay. So, let's
// not output anything here.
@@ -135,36 +131,6 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
return nil
}

func (txn *Txn) CommitToMemory(commitTs uint64) error {
txn.Lock()
defer txn.Unlock()
// TODO: Figure out what shouldAbort is for, and use it correctly. This should really be
// shouldDiscard.
// defer func() {
// atomic.StoreUint32(&txn.shouldAbort, 1)
// }()
for key := range txn.deltas {
inner:
for {
plist, err := txn.Get([]byte(key))
if err != nil {
return err
}
err = plist.CommitMutation(txn.StartTs, commitTs)
switch err {
case nil:
break inner
case ErrRetry:
time.Sleep(5 * time.Millisecond)
default:
glog.Warningf("Error while committing to memory: %v\n", err)
return err
}
}
}
return nil
}

func unmarshalOrCopy(plist *pb.PostingList, item *badger.Item) error {
return item.Value(func(val []byte) error {
if len(val) == 0 {
@@ -48,10 +48,6 @@ type Txn struct {
shouldAbort uint32
// Fields which can changed after init
sync.Mutex
// Deltas keeps track of the posting list keys, and whether they should be considered for
// conflict detection or not. When a txn is marked committed or aborted, we use the keys stored
// here to determine which posting lists to get and update.
deltas map[string]struct{}

// Keeps track of conflict keys that should be used to determine if this
// transaction conflicts with another.
@@ -61,21 +57,25 @@ type Txn struct {
// determine unhealthy, stale txns.
lastUpdate time.Time

cache *LocalCache
cache *LocalCache // This pointer does not get modified.
}

func NewTxn(startTs uint64) *Txn {
return &Txn{
StartTs: startTs,
cache: NewLocalCache(),
cache: NewLocalCache(startTs),
lastUpdate: time.Now(),
}
}

func (txn *Txn) Get(key []byte) (*List, error) {
return txn.cache.Get(key)
}
func (txn *Txn) Update() {
txn.cache.UpdateDeltasAndDiscardLists()
}

// Store is used by tests.
func (txn *Txn) Store(pl *List) *List {
return txn.cache.Set(string(pl.key), pl)
}
@@ -139,6 +139,12 @@ func (o *oracle) MinPendingStartTs() uint64 {
return min
}

func (o *oracle) NumPendingTxns() int {
o.RLock()
defer o.RUnlock()
return len(o.pendingTxns)
}

func (o *oracle) TxnOlderThan(dur time.Duration) (res []uint64) {
o.RLock()
defer o.RUnlock()
@@ -237,7 +243,7 @@ func (o *oracle) GetTxn(startTs uint64) *Txn {
func (txn *Txn) matchesDelta(ok func(key []byte) bool) bool {
txn.Lock()
defer txn.Unlock()
for key := range txn.deltas {
for key := range txn.cache.deltas {
if ok([]byte(key)) {
return true
}
@@ -284,6 +284,9 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
return dy.ErrConflict
}

// Discard the posting lists from cache to release memory at the end.
defer txn.Update()

sort.Slice(m.Edges, func(i, j int) bool {
ei := m.Edges[i]
ej := m.Edges[j]
@@ -525,6 +528,7 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
if txn == nil {
return
}
txn.Update()
err := x.RetryUntilSuccess(x.WorkerConfig.MaxRetries, 10*time.Millisecond, func() error {
return txn.CommitToDisk(writer, commit)
})
@@ -1145,6 +1149,9 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
return nil, err
}

if num := posting.Oracle().NumPendingTxns(); num > 0 {
glog.Infof("Num pending txns: %d", num)
}
// We can't rely upon the Raft entries to determine the minPendingStart,
// because there are many cases during mutations where we don't commit or
// abort the transaction. This might happen due to an early error thrown.
@@ -503,15 +503,6 @@ func populateMutationMap(src *pb.Mutations) (map[uint32]*pb.Mutations, error) {
return mm, nil
}

func commitOrAbort(ctx context.Context, startTs, commitTs uint64) error {
txn := posting.Oracle().GetTxn(startTs)
if txn == nil {
return nil
}
// Ensures that we wait till prewrite is applied
return txn.CommitToMemory(commitTs)
}

type res struct {
err error
ctx *api.TxnContext
@@ -73,10 +73,10 @@ func commitTransaction(t *testing.T, edge *pb.DirectedEdge, l *posting.List) {

commit := commitTs(startTs)

txn.Update()
writer := posting.NewTxnWriter(pstore)
require.NoError(t, txn.CommitToDisk(writer, commit))
require.NoError(t, writer.Flush())
require.NoError(t, txn.CommitToMemory(commit))
}

// Hacky tests change laster
@@ -749,7 +749,7 @@ func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, erro
qs.cache = posting.Oracle().CacheAt(q.ReadTs)
}
if qs.cache == nil {
qs.cache = posting.NewLocalCache()
qs.cache = posting.NewLocalCache(q.ReadTs)
}

out, err := qs.helpProcessTask(ctx, q, gid)

0 comments on commit fe07e3e

Please sign in to comment.
You can’t perform that action at this time.