From ffb28bd8dcd78aae7624da737465af3afa2bc85c Mon Sep 17 00:00:00 2001 From: aman-bansal Date: Thu, 4 Feb 2021 11:35:47 +0530 Subject: [PATCH] fixing implementation for ludicrous mode --- worker/change_data_ee.go | 42 +++++++++++++++++++--------------------- worker/draft.go | 5 +++++ 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/worker/change_data_ee.go b/worker/change_data_ee.go index 83f47fc5592..0a4e1040229 100644 --- a/worker/change_data_ee.go +++ b/worker/change_data_ee.go @@ -20,17 +20,13 @@ import ( "sync/atomic" "time" - "github.com/dgraph-io/ristretto/z" - "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/types" - + "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" - - "github.com/dgraph-io/dgraph/protos/pb" "go.etcd.io/etcd/raft/raftpb" - - "github.com/dgraph-io/dgraph/x" ) const defaultCDCConfig = "enabled=false; max_recovery=10000" @@ -183,20 +179,20 @@ func (cd *ChangeData) processCDCEvents() { continue } // In ludicrous events send the events as soon as you get it. - // We wont wait for oracle delta in case of ludicrous mode + // We wont wait for oracle delta in case of ludicrous mode. + // Since all mutations will eventually succeed. + // We can set the read ts here only. if x.WorkerConfig.LudicrousMode { if err := sendEvents(nil, events); err != nil { return } cd.cdcIndex = entry.Index + cd.updateMinReadTs(proposal.Mutations.StartTs) continue } if cd.pendingEvents[proposal.Mutations.StartTs] == nil { cd.pendingEvents[proposal.Mutations.StartTs] = make([]CDCEvent, 0) } - if proposal.Mutations.StartTs == 10005 { - glog.Infof("pending event is %v", proposal.Mutations) - } cd.pendingEvents[proposal.Mutations.StartTs] = append(cd.pendingEvents[proposal.Mutations.StartTs], events...) } @@ -204,20 +200,17 @@ func (cd *ChangeData) processCDCEvents() { if proposal.Delta != nil { for _, ts := range proposal.Delta.Txns { cd.maxReadTs = x.Max(cd.maxReadTs, ts.StartTs) - if !x.WorkerConfig.LudicrousMode { - pending := cd.pendingEvents[ts.StartTs] - if ts.CommitTs > 0 && len(pending) > 0 { - if err := sendEvents(ts, pending); err != nil { - return - } + pending := cd.pendingEvents[ts.StartTs] + if ts.CommitTs > 0 && len(pending) > 0 { + if err := sendEvents(ts, pending); err != nil { + return } - // delete from pending events once events are sent - delete(cd.pendingEvents, ts.StartTs) - _ = cd.evaluateAndSetMinReadTs() } + // delete from pending events once events are sent + delete(cd.pendingEvents, ts.StartTs) + _ = cd.evaluateAndSetMinReadTs() } } - cd.cdcIndex = entry.Index } } @@ -240,7 +233,6 @@ func (cd *ChangeData) processCDCEvents() { iter = 0 minTs := cd.evaluateAndSetMinReadTs() glog.V(2).Infof("proposing CDC minReadTs %d", minTs) - glog.V(2).Infof("lenght of pending events %d", len(cd.pendingEvents)) if err := groups().Node.proposeCDCMinReadTs(minTs); err != nil { glog.Errorf("not able to propose cdc minReadTs %v", err) } @@ -256,6 +248,12 @@ func (cd *ChangeData) processCDCEvents() { // thus making followers to clear raft logs between next proposal. // In this way we can loose some events. func (cd *ChangeData) evaluateAndSetMinReadTs() uint64 { + if cd == nil { + return math.MaxUint64 + } + if x.WorkerConfig.LudicrousMode { + return cd.getCDCMinReadTs() + } min := cd.maxReadTs for ts := range cd.pendingEvents { if ts < min { diff --git a/worker/draft.go b/worker/draft.go index 73a171ed39c..fea945d6694 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -893,6 +893,11 @@ func (n *node) retrieveSnapshot(snap pb.Snapshot) error { } func (n *node) proposeCDCMinReadTs(minTs uint64) error { + // in case of ludicrous mode it could be zero. no need to send it + if minTs == 0 { + return nil + } + proposal := &pb.Proposal{ CDCMinReadTs: minTs, }