diff --git a/posting/list_test.go b/posting/list_test.go index 4f588c7e1d7..9911dc60b45 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -475,10 +475,10 @@ func TestReadSingleValue(t *testing.T) { j = int(ol.minTs) } for ; j < i+6; j++ { - k, err := GetSingleValueForKey(key, uint64(j)) + tx := NewTxn(uint64(j)) + k, err := tx.cache.GetSinglePosting(key) require.NoError(t, err) - p := getFirst(t, k, uint64(j)) - checkValue(t, ol, string(p.Value), uint64(j)) + checkValue(t, ol, string(k.Postings[0].Value), uint64(j)) } } diff --git a/posting/lists.go b/posting/lists.go index ba41331c081..3d569766d7d 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -152,64 +152,12 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List { return updated } -func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, error) { - getNewPlistNil := func() (*List, error) { - lc.RLock() - defer lc.RUnlock() - if lc.plists == nil { - pl, err := GetSingleValueForKey(key, lc.startTs) - return pl, err - } - return nil, nil - } - - if l, err := getNewPlistNil(); l != nil || err != nil { - return l, err - } - - skey := string(key) - if pl := lc.getNoStore(skey); pl != nil { - return pl, nil - } - - var pl *List - var err error - if readFromDisk { - pl, err = GetSingleValueForKey(key, lc.startTs) - } else { - pl = &List{ - key: key, - plist: new(pb.PostingList), - } - } - - // If we just brought this posting list into memory and we already have a delta for it, let's - // apply it before returning the list. - lc.RLock() - if delta, ok := lc.deltas[skey]; ok && len(delta) > 0 { - pl.setMutation(lc.startTs, delta) - } - lc.RUnlock() - return pl, err -} - func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) { getNewPlistNil := func() (*List, error) { lc.RLock() defer lc.RUnlock() if lc.plists == nil { - if readFromDisk { - return getNew(key, pstore, lc.startTs) - } else { - pl := &List{ - key: key, - plist: new(pb.PostingList), - } - if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { - pl.setMutation(lc.startTs, delta) - } - return pl, nil - } + return getNew(key, pstore, lc.startTs) } return nil, nil } @@ -248,60 +196,58 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) } func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { - pl := &pb.PostingList{} - validatePl := func() { - i := 0 - for _, postings := range pl.Postings { - if hasDeleteAll(postings) { - pl = nil - return - } - if postings.Op != Del { - pl.Postings[i] = postings - i++ + + getPostings := func() (*pb.PostingList, error) { + pl := &pb.PostingList{} + lc.RLock() + if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { + err := pl.Unmarshal(delta) + if err != nil { + lc.RUnlock() + return pl, nil } } - pl.Postings = pl.Postings[:i] - } - lc.RLock() - if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { - err := pl.Unmarshal(delta) lc.RUnlock() + + txn := pstore.NewTransactionAt(lc.startTs, false) + item, err := txn.Get(key) if err != nil { - validatePl() - return pl, nil + return pl, err } - } else { - lc.RUnlock() - } - txn := pstore.NewTransactionAt(lc.startTs, false) - item, err := txn.Get(key) - if err != nil { - validatePl() + err = item.Value(func(val []byte) error { + if err := pl.Unmarshal(val); err != nil { + return err + } + return nil + }) + return pl, err } - err = item.Value(func(val []byte) error { - if err := pl.Unmarshal(val); err != nil { - return err - } - return nil - }) - + pl, err := getPostings() + if err == badger.ErrKeyNotFound { + err = nil + } if err != nil { - validatePl() return pl, err } - validatePl() + // Filter and remove STAR_ALL and OP_DELETE Postings + idx := 0 + for _, postings := range pl.Postings { + if hasDeleteAll(postings) { + return nil, nil + } + if postings.Op != Del { + pl.Postings[idx] = postings + idx++ + } + } + pl.Postings = pl.Postings[:idx] return pl, nil } -func (lc *LocalCache) GetSingle(key []byte) (*List, error) { - return lc.getSingleInternal(key, true) -} - // Get retrieves the cached version of the list associated with the given key. func (lc *LocalCache) Get(key []byte) (*List, error) { return lc.getInternal(key, true) diff --git a/posting/mvcc.go b/posting/mvcc.go index e55fc77a527..d228ad10f0d 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -457,81 +457,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { return l, nil } -func GetSingleValueForKey(key []byte, readTs uint64) (*List, error) { - cachedVal, ok := lCache.Get(key) - if ok { - l, ok := cachedVal.(*List) - if ok && l != nil { - // No need to clone the immutable layer or the key since mutations will not modify it. - lCopy := &List{ - minTs: l.minTs, - maxTs: l.maxTs, - key: key, - plist: l.plist, - } - l.RLock() - if l.mutationMap != nil { - lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap)) - for ts, pl := range l.mutationMap { - lCopy.mutationMap[ts] = proto.Clone(pl).(*pb.PostingList) - } - } - l.RUnlock() - return lCopy, nil - } - } - - if pstore.IsClosed() { - return nil, badger.ErrDBClosed - } - - l := new(List) - l.key = key - l.plist = new(pb.PostingList) - - txn := pstore.NewTransactionAt(readTs, false) - item, err := txn.Get(key) - if err != nil { - return l, err - } - - l.maxTs = x.Max(l.maxTs, item.Version()) - - switch item.UserMeta() { - case BitEmptyPosting: - l.minTs = item.Version() - case BitCompletePosting: - if err := unmarshalOrCopy(l.plist, item); err != nil { - return l, nil - } - l.minTs = item.Version() - - case BitDeltaPosting: - err := item.Value(func(val []byte) error { - pl := &pb.PostingList{} - if err := pl.Unmarshal(val); err != nil { - return err - } - pl.CommitTs = item.Version() - for _, mpost := range pl.Postings { - // commitTs, startTs are meant to be only in memory, not - // stored on disk. - mpost.CommitTs = item.Version() - } - if l.mutationMap == nil { - l.mutationMap = make(map[uint64]*pb.PostingList) - } - l.mutationMap[pl.CommitTs] = pl - return nil - }) - if err != nil { - return l, nil - } - } - - return l, nil -} - func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { cachedVal, ok := lCache.Get(key) if ok {