Skip to content

Commit

Permalink
Don't read posting lists from disk when mutating indices. (#3695)
Browse files Browse the repository at this point in the history
Adding index mutations can be performed without reading the posting
lists in disk. This change modifies the indexing process to avoid
reading any list from disk. It should result in performance
improvements, specially when trying to modify big index posting lists.
  • Loading branch information
martinmr committed Jul 23, 2019
1 parent b7beb08 commit d697ca0
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 9 deletions.
8 changes: 4 additions & 4 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (txn *Txn) addIndexMutation(ctx context.Context, edge *pb.DirectedEdge,
token string) error {
key := x.IndexKey(edge.Attr, token)

plist, err := txn.Get(key)
plist, err := txn.cache.GetFromDelta(key)
if err != nil {
return err
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,

func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) error {
key := x.ReverseKey(t.Attr, t.ValueId)
plist, err := txn.Get(key)
plist, err := txn.cache.GetFromDelta(key)
if err != nil {
return err
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func (l *List) handleDeleteAll(ctx context.Context, edge *pb.DirectedEdge,
func (txn *Txn) addCountMutation(ctx context.Context, t *pb.DirectedEdge, count uint32,
reverse bool) error {
key := x.CountKey(t.Attr, count, reverse)
plist, err := txn.Get(key)
plist, err := txn.cache.GetFromDelta(key)
if err != nil {
return err
}
Expand Down Expand Up @@ -940,7 +940,7 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error {

// Ensure that list is in the cache run by txn. Otherwise, nothing would
// get updated.
txn.cache.SetIfAbsent(string(pl.key), pl)
pl = txn.cache.SetIfAbsent(string(pl.key), pl)
if err := pl.addMutation(ctx, txn, t); err != nil {
return err
}
Expand Down
32 changes: 27 additions & 5 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List {
return updated
}

// Get retrieves the cached version of the list associated with the given key.
func (lc *LocalCache) Get(key []byte) (*List, error) {
func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) {
if lc == nil {
return getNew(key, pstore)
}
Expand All @@ -231,10 +230,21 @@ func (lc *LocalCache) Get(key []byte) (*List, error) {
return pl, nil
}

pl, err := getNew(key, pstore)
if err != nil {
return nil, err
var pl *List
if readFromDisk {
var err error
pl, err = getNew(key, pstore)
if err != nil {
return nil, err
}
} else {
pl = &List{
key: key,
mutationMap: make(map[uint64]*pb.PostingList),
plist: new(pb.PostingList),
}
}

// If we just brought this posting list into memory and we already have a delta for it, let's
// apply it before returning the list.
lc.RLock()
Expand All @@ -245,6 +255,18 @@ func (lc *LocalCache) Get(key []byte) (*List, error) {
return lc.SetIfAbsent(skey, pl), nil
}

// Get retrieves the cached version of the list associated with the given key.
func (lc *LocalCache) Get(key []byte) (*List, error) {
return lc.getInternal(key, true)
}

// GetFromDelta gets the cached version of the list without reading from disk
// and only applies the existing deltas. This is used in situations where the
// posting list will only be modified and not read (e.g adding index mutations).
func (lc *LocalCache) GetFromDelta(key []byte) (*List, error) {
return lc.getInternal(key, false)
}

// UpdateDeltasAndDiscardLists updates the delta cache before removing the stored posting lists.
func (lc *LocalCache) UpdateDeltasAndDiscardLists() {
lc.Lock()
Expand Down

0 comments on commit d697ca0

Please sign in to comment.