Skip to content
Permalink
Browse files

Posting List Evictions

- Avoid acquiring a read lock on posting.List during LRU cache eviction,
by using atomics pendingTxns field, which gets incremented for every
mutation, and potentially reset to 0 after a commit.
- Only run evictions for 10ms out of a second.
- Remove done field from LRU cache. We don't need to block on it during
server shutdown anymore. Previously, we did because the PLs
were being written to disk during evictions, but that's no longer the
case.
  • Loading branch information...
manishrjain committed Nov 9, 2018
1 parent b5ac4d7 commit e2bcfdad058148395b082e093d1fe63846f1132f
Showing with 22 additions and 25 deletions.
  1. +18 −10 posting/list.go
  2. +0 −5 posting/lists.go
  3. +4 −6 posting/lru.go
  4. +0 −4 worker/worker.go
@@ -74,8 +74,10 @@ type List struct {
plist *pb.PostingList
mutationMap map[uint64]*pb.PostingList
minTs uint64 // commit timestamp of immutable layer, reject reads before this ts.
deleteMe int32 // Using atomic for this, to avoid expensive SetForDeletion operation.
estimatedSize int32

pendingTxns int32 // Using atomic for this, to avoid locking in SetForDeletion operation.
deleteMe int32 // Using atomic for this, to avoid expensive SetForDeletion operation.
}

// calculateSize would give you the size estimate. This is expensive, so run it carefully.
@@ -223,17 +225,12 @@ func (l *List) EstimatedSize() int32 {
}

// SetForDeletion will mark this List to be deleted, so no more mutations can be applied to this.
// Ensure that we don't acquire any locks during a call to this function, so the LRU cache can
// proceed smoothly.
func (l *List) SetForDeletion() bool {
if l.AlreadyLocked() {
if atomic.LoadInt32(&l.pendingTxns) > 0 {
return false
}
l.RLock()
defer l.RUnlock()
for _, plist := range l.mutationMap {
if plist.CommitTs == 0 {
return false
}
}
atomic.StoreInt32(&l.deleteMe, 1)
return true
}
@@ -368,6 +365,7 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) er

l.updateMutationLayer(mpost)
atomic.AddInt32(&l.estimatedSize, int32(mpost.Size()+16 /* various overhead */))
atomic.AddInt32(&l.pendingTxns, 1)
txn.AddKeys(string(l.key), conflictKey)
return nil
}
@@ -394,8 +392,18 @@ func (l *List) commitMutation(startTs, commitTs uint64) error {
if atomic.LoadInt32(&l.deleteMe) == 1 {
return ErrRetry
}

l.AssertLock()

// Check if we still have a pending txn when we return from this function.
defer func() {
for _, plist := range l.mutationMap {
if plist.CommitTs == 0 {
return // Got a pending txn.
}
}
atomic.StoreInt32(&l.pendingTxns, 0)
}()

plist, ok := l.mutationMap[startTs]
if !ok {
// It was already committed, might be happening due to replay.
@@ -25,7 +25,6 @@ import (
"runtime"
"strconv"
"strings"
"sync/atomic"
"time"

"golang.org/x/net/trace"
@@ -225,10 +224,6 @@ func Cleanup() {
closer.SignalAndWait()
}

func StopLRUEviction() {
atomic.StoreInt32(&lcache.done, 1)
}

// Get stores the List corresponding to key, if it's not there already.
// to lru cache and returns it.
//
@@ -23,7 +23,6 @@ import (
"container/list"
"context"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/dgraph/x"
@@ -41,7 +40,6 @@ type listCache struct {
evicts uint64
ll *list.List
cache map[string]*list.Element
done int32
}

type CacheStats struct {
@@ -113,17 +111,17 @@ func (c *listCache) removeOldestLoop() {
defer ticker.Stop()
for range ticker.C {
c.removeOldest()
if atomic.LoadInt32(&c.done) > 0 {
return
}
}
}

func (c *listCache) removeOldest() {
c.Lock()
defer c.Unlock()

// Only allow evictions for 10ms out of a second.
deadline := time.Now().Add(10 * time.Millisecond)
ele := c.ll.Back()
for c.curSize > c.MaxSize && atomic.LoadInt32(&c.done) == 0 {
for c.curSize > c.MaxSize && time.Now().Before(deadline) {
if ele == nil {
if c.curSize < 0 {
c.curSize = 0
@@ -27,7 +27,6 @@ import (

"github.com/dgraph-io/badger"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"

@@ -99,7 +98,4 @@ func BlockingStop() {

glog.Infof("Stopping worker server...")
workerServer.Stop()

// TODO: What is this for?
posting.StopLRUEviction()
}

0 comments on commit e2bcfda

Please sign in to comment.
You can’t perform that action at this time.