@@ -36,6 +36,7 @@ import (
3636 "github.com/dgraph-io/dgraph/types/facets"
3737 "github.com/dgraph-io/dgraph/x"
3838 "github.com/pkg/errors"
39+ "github.com/golang/protobuf/proto"
3940)
4041
4142var (
@@ -310,7 +311,7 @@ func hasDeleteAll(mpost *pb.Posting) bool {
310311}
311312
312313// Ensure that you either abort the uncommitted postings or commit them before calling me.
313- func (l * List ) updateMutationLayer (mpost * pb.Posting ) {
314+ func (l * List ) updateMutationLayer (mpost * pb.Posting , singleUidUpdate bool ) error {
314315 l .AssertLock ()
315316 x .AssertTrue (mpost .Op == Set || mpost .Op == Del )
316317
@@ -322,26 +323,60 @@ func (l *List) updateMutationLayer(mpost *pb.Posting) {
322323 l .mutationMap = make (map [uint64 ]* pb.PostingList )
323324 }
324325 l .mutationMap [mpost .StartTs ] = plist
325- return
326+ return nil
326327 }
328+
327329 plist , ok := l .mutationMap [mpost .StartTs ]
328330 if ! ok {
329- plist := & pb.PostingList {}
330- plist .Postings = append (plist .Postings , mpost )
331+ plist = & pb.PostingList {}
331332 if l .mutationMap == nil {
332333 l .mutationMap = make (map [uint64 ]* pb.PostingList )
333334 }
334335 l .mutationMap [mpost .StartTs ] = plist
335- return
336336 }
337+
338+ if singleUidUpdate {
339+ // This handles the special case when adding a value to predicates of type uid.
340+ // The current value should be deleted in favor of this value. This needs to
341+ // be done because the fingerprint for the value is not math.MaxUint64 as is
342+ // the case with the rest of the scalar predicates.
343+ plist := & pb.PostingList {}
344+ plist .Postings = append (plist .Postings , mpost )
345+
346+ err := l .iterate (mpost .StartTs , 0 , func (obj * pb.Posting ) error {
347+ // Ignore values which have the same uid as they will get replaced
348+ // by the current value.
349+ if obj .Uid == mpost .Uid {
350+ return nil
351+ }
352+
353+ // Mark all other values as deleted. By the end of the iteration, the
354+ // list of postings will contain deleted operations and only one set
355+ // for the mutation stored in mpost.
356+ objCopy := proto .Clone (obj ).(* pb.Posting )
357+ objCopy .Op = Del
358+ plist .Postings = append (plist .Postings , objCopy )
359+ return nil
360+ })
361+ if err != nil {
362+ return err
363+ }
364+
365+ // Update the mutation map with the new plist. Return here since the code below
366+ // does not apply for predicates of type uid.
367+ l .mutationMap [mpost .StartTs ] = plist
368+ return nil
369+ }
370+
337371 // Even if we have a delete all in this transaction, we should still pick up any updates since.
338372 for i , prev := range plist .Postings {
339373 if prev .Uid == mpost .Uid {
340374 plist .Postings [i ] = mpost
341- return
375+ return nil
342376 }
343377 }
344378 plist .Postings = append (plist .Postings , mpost )
379+ return nil
345380}
346381
347382// TypeID returns the typeid of destination vertex
@@ -400,17 +435,26 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed
400435 t .ValueId = fingerprintEdge (t )
401436 mpost .Uid = t .ValueId
402437 }
403- l .updateMutationLayer (mpost )
404438
405- // We ensure that commit marks are applied to posting lists in the right
406- // order. We can do so by proposing them in the same order as received by the Oracle delta
407- // stream from Zero, instead of in goroutines.
408- var conflictKey uint64
439+ // Check whether this mutation is an update for a predicate of type uid.
409440 pk , err := x .Parse (l .key )
410441 if err != nil {
411442 return errors .Wrapf (err , "cannot parse key when adding mutation to list with key %s" ,
412443 hex .EncodeToString (l .key ))
413444 }
445+ pred , ok := schema .State ().Get (t .Attr )
446+ isSingleUidUpdate := ok && ! pred .GetList () && pred .GetValueType () == pb .Posting_UID &&
447+ pk .IsData () && mpost .Op == Set && mpost .PostingType == pb .Posting_REF
448+
449+ if err != l .updateMutationLayer (mpost , isSingleUidUpdate ) {
450+ return errors .Wrapf (err , "cannot update mutation layer of key %s with value %+v" ,
451+ hex .EncodeToString (l .key ), mpost )
452+ }
453+
454+ // We ensure that commit marks are applied to posting lists in the right
455+ // order. We can do so by proposing them in the same order as received by the Oracle delta
456+ // stream from Zero, instead of in goroutines.
457+ var conflictKey uint64
414458 switch {
415459 case schema .State ().HasNoConflict (t .Attr ):
416460 break
0 commit comments