Skip to content

Commit

Permalink
Fix(Zero): Fix how Zero snapshots and purge works (#7096)
Browse files Browse the repository at this point in the history
Zero snapshots were being calculated in a round about way. This got worse due to a recent bug introduction, which caused Zeros to not get snapshots if we don't purge keyCommits.

This PR make Zero leader calculate the snapshots based on the checkpoints from Alpha leaders (instead of their snapshots). It then proposes this snapshot to the Zero group, causing all members to create snapshot and purge keyCommit map at the same time.
  • Loading branch information
manishrjain committed Dec 10, 2020
1 parent fccb5bc commit 5efdfbf
Show file tree
Hide file tree
Showing 9 changed files with 897 additions and 430 deletions.
1 change: 1 addition & 0 deletions compose/run.sh
Expand Up @@ -47,4 +47,5 @@ Info "rebuilding dgraph ..."
# The up command handles that automatically

Info "bringing up containers"
docker-compose -p dgraph down
docker-compose --compatibility -p dgraph up --force-recreate --remove-orphans
29 changes: 4 additions & 25 deletions dgraph/cmd/zero/oracle.go
Expand Up @@ -47,14 +47,11 @@ type Oracle struct {
keyCommit *z.Tree // fp(key) -> commitTs. Used to detect conflict.
maxAssigned uint64 // max transaction assigned by us.

// timestamp at the time of start of server or when it became leader. Used to detect conflicts.
tmax uint64
// All transactions with startTs < startTxnTs return true for hasConflict.
startTxnTs uint64
subscribers map[int]chan pb.OracleDelta
updates chan *pb.OracleDelta
doneUntil y.WaterMark
syncMarks []syncMark
}

// Init initializes the oracle.
Expand Down Expand Up @@ -105,6 +102,10 @@ func (o *Oracle) purgeBelow(minTs uint64) {

o.Lock()
defer o.Unlock()

// Set startTxnTs so that every txn with start ts less than this, would be aborted.
o.startTxnTs = minTs

// Dropping would be cheaper if abort/commits map is sharded
for ts := range o.commits {
if ts < minTs {
Expand All @@ -121,7 +122,6 @@ func (o *Oracle) purgeBelow(minTs uint64) {
}
o.keyCommit.DeleteBelow(minTs)
timer.Record("deleteBelow")
o.tmax = minTs
glog.V(2).Infof("Purged below ts:%d, len(o.commits):%d, keyCommit: [before: %+v, after: %+v].\n",
minTs, len(o.commits), stats, o.keyCommit.Stats())
if timer.Total() > time.Second {
Expand Down Expand Up @@ -267,7 +267,6 @@ func (o *Oracle) updateCommitStatusHelper(index uint64, src *api.TxnContext) boo
} else {
o.commits[src.StartTs] = src.CommitTs
}
o.syncMarks = append(o.syncMarks, syncMark{index: index, ts: src.StartTs})
return true
}

Expand Down Expand Up @@ -488,26 +487,6 @@ func (s *Server) Oracle(_ *api.Payload, server pb.Zero_OracleServer) error {
}
}

// SyncedUntil returns the timestamp up to which all the nodes have synced.
func (s *Server) SyncedUntil() uint64 {
s.orc.Lock()
defer s.orc.Unlock()
// Find max index with timestamp less than tmax
var idx int
for i, sm := range s.orc.syncMarks {
idx = i
if sm.ts >= s.orc.tmax {
break
}
}
var syncUntil uint64
if idx > 0 {
syncUntil = s.orc.syncMarks[idx-1].index
}
s.orc.syncMarks = s.orc.syncMarks[idx:]
return syncUntil
}

// TryAbort attempts to abort the given transactions which are not already committed..
func (s *Server) TryAbort(ctx context.Context,
txns *pb.TxnTimestamps) (*pb.OracleDelta, error) {
Expand Down
148 changes: 127 additions & 21 deletions dgraph/cmd/zero/raft.go
Expand Up @@ -296,6 +296,32 @@ func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
return nil
}

func (n *node) applySnapshot(snap *pb.ZeroSnapshot) error {
existing, err := n.Store.Snapshot()
if err != nil {
return err
}
if existing.Metadata.Index >= snap.Index {
glog.V(2).Infof("Skipping snapshot at %d, because found one at %d\n",
snap.Index, existing.Metadata.Index)
return nil
}
n.server.orc.purgeBelow(snap.CheckpointTs)

data, err := snap.Marshal()
x.Check(err)

for {
// We should never let CreateSnapshot have an error.
err := n.Store.CreateSnapshot(snap.Index, n.ConfState(), data)
if err == nil {
break
}
glog.Warningf("Error while calling CreateSnapshot: %v. Retrying...", err)
}
return nil
}

func (n *node) applyProposal(e raftpb.Entry) (string, error) {
var p pb.ZeroProposal
// Raft commits empty entry on becoming a leader.
Expand Down Expand Up @@ -334,13 +360,6 @@ func (n *node) applyProposal(e raftpb.Entry) (string, error) {
group.SnapshotTs = x.Max(group.SnapshotTs, ts)
}
}
purgeTs := uint64(math.MaxUint64)
for _, group := range state.Groups {
purgeTs = x.Min(purgeTs, group.SnapshotTs)
}
if purgeTs < math.MaxUint64 {
n.server.orc.purgeBelow(purgeTs)
}
}
if p.Member != nil {
if err := n.handleMemberProposal(p.Member); err != nil {
Expand Down Expand Up @@ -371,6 +390,11 @@ func (n *node) applyProposal(e raftpb.Entry) (string, error) {
expiry := time.Unix(state.License.ExpiryTs, 0).UTC()
state.License.Enabled = time.Now().UTC().Before(expiry)
}
if p.Snapshot != nil {
if err := n.applySnapshot(p.Snapshot); err != nil {
glog.Errorf("While applying snapshot: %v\n", err)
}
}

switch {
case p.MaxLeaseId > state.MaxLeaseId:
Expand Down Expand Up @@ -647,35 +671,117 @@ func (n *node) checkQuorum(closer *z.Closer) {

func (n *node) snapshotPeriodically(closer *z.Closer) {
defer closer.Done()
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

for {
select {
case <-ticker.C:
n.trySnapshot(1000)
if err := n.calculateAndProposeSnapshot(); err != nil {
glog.Errorf("While calculateAndProposeSnapshot: %v", err)
}

case <-closer.HasBeenClosed():
return
}
}
}

func (n *node) trySnapshot(skip uint64) {
existing, err := n.Store.Snapshot()
x.Checkf(err, "Unable to get existing snapshot")
si := existing.Metadata.Index
idx := n.server.SyncedUntil()
if idx <= si+skip {
return
// calculateAndProposeSnapshot works by tracking Alpha group leaders' checkpoint timestamps. It then
// finds the minimum checkpoint ts across these groups, say Tmin. And then, iterates over Zero Raft
// logs to determine what all entries we could discard which are below Tmin. It uses that
// information to calculate a snapshot, which it proposes to other Zeros. When the proposal arrives
// via Raft, all Zeros apply it to themselves via applySnapshot in raft.Ready.
func (n *node) calculateAndProposeSnapshot() error {
// Only run this on the leader.
if !n.AmLeader() {
return nil
}

data, err := n.server.MarshalMembershipState()
x.Check(err)
_, span := otrace.StartSpan(n.ctx, "Calculate.Snapshot",
otrace.WithSampler(otrace.AlwaysSample()))
defer span.End()

err = n.Store.CreateSnapshot(idx, n.ConfState(), data)
x.Checkf(err, "While creating snapshot")
glog.Infof("Writing snapshot at index: %d, applied mark: %d\n", idx, n.Applied.DoneUntil())
// We calculate the minimum timestamp from all the group's maxAssigned.
discardBelow := uint64(math.MaxUint64)
{
s := n.server
s.RLock()
if len(s.state.Groups) != len(s.checkpointPerGroup) {
glog.Infof("Skipping creating a snapshot. Num groups: %d, Num max assigned: %d",
len(s.state.Groups), len(s.checkpointPerGroup))
s.RUnlock()
return nil
}
for _, ts := range s.checkpointPerGroup {
discardBelow = x.Min(discardBelow, ts)
}
s.RUnlock()
}

first, err := n.Store.FirstIndex()
if err != nil {
span.Annotatef(nil, "FirstIndex error: %v", err)
return err
}
last, err := n.Store.LastIndex()
if err != nil {
span.Annotatef(nil, "LastIndex error: %v", err)
return err
}

span.Annotatef(nil, "First index: %d. Last index: %d. Discard Below: %d",
first, last, discardBelow)

var snapshotIndex uint64
for batchFirst := first; batchFirst <= last; {
entries, err := n.Store.Entries(batchFirst, last+1, 256<<20)
if err != nil {
span.Annotatef(nil, "Error: %v", err)
return err
}
// Exit early from the loop if no entries were found.
if len(entries) == 0 {
break
}
for _, entry := range entries {
if entry.Type != raftpb.EntryNormal {
continue
}
var p pb.ZeroProposal
if err := p.Unmarshal(entry.Data); err != nil {
span.Annotatef(nil, "Error: %v", err)
return err
}
if txn := p.Txn; txn != nil {
if txn.CommitTs > 0 && txn.CommitTs < discardBelow {
snapshotIndex = entry.Index
}
}
}
batchFirst = entries[len(entries)-1].Index + 1
}
if snapshotIndex == 0 {
return nil
}
span.Annotatef(nil, "Taking snapshot at: %d", snapshotIndex)
state := n.server.membershipState()

zs := &pb.ZeroSnapshot{
Index: snapshotIndex,
CheckpointTs: discardBelow,
State: state,
}
glog.V(2).Infof("Proposing snapshot at Index: %d Checkpoint Ts: %d\n",
zs.Index, zs.CheckpointTs)
zp := &pb.ZeroProposal{Snapshot: zs}
if err = n.proposeAndWait(n.ctx, zp); err != nil {
glog.Errorf("Error while proposing snapshot: %v\n", err)
span.Annotatef(nil, "Error while proposing snapshot: %v", err)
return err
}
span.Annotatef(nil, "Snapshot proposed: Done")
return nil
}

const tickDur = 100 * time.Millisecond
Expand Down
2 changes: 0 additions & 2 deletions dgraph/cmd/zero/run.go
Expand Up @@ -291,8 +291,6 @@ func run() {
_ = httpListener.Close()
// Stop Raft.
st.node.closer.SignalAndWait()
// Try to generate a snapshot before the shutdown.
st.node.trySnapshot(0)
// Stop all internal requests.
_ = grpcListener.Close()

Expand Down
13 changes: 12 additions & 1 deletion dgraph/cmd/zero/zero.go
Expand Up @@ -75,6 +75,8 @@ type Server struct {

moveOngoing chan struct{}
blockCommitsOn *sync.Map

checkpointPerGroup map[uint32]uint64
}

// Init initializes the zero server.
Expand All @@ -96,6 +98,7 @@ func (s *Server) Init() {
s.closer = z.NewCloser(2) // grpc and http
s.blockCommitsOn = new(sync.Map)
s.moveOngoing = make(chan struct{}, 1)
s.checkpointPerGroup = make(map[uint32]uint64)

go s.rebalanceTablets()
}
Expand Down Expand Up @@ -346,7 +349,7 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {

s.RLock()
defer s.RUnlock()
// There is only one member.
// There is only one member. We use for loop because we don't know what the mid is.
for mid, dstMember := range dst.Members {
group, has := s.state.Groups[dstMember.GroupId]
if !has {
Expand Down Expand Up @@ -635,6 +638,14 @@ func (s *Server) ShouldServe(

// UpdateMembership updates the membership of the given group.
func (s *Server) UpdateMembership(ctx context.Context, group *pb.Group) (*api.Payload, error) {
// Only Zero leader would get these membership updates.
if ts := group.GetCheckpointTs(); ts > 0 {
for _, m := range group.GetMembers() {
s.Lock()
s.checkpointPerGroup[m.GetGroupId()] = ts
s.Unlock()
}
}
proposals, err := s.createProposals(group)
if err != nil {
// Sleep here so the caller doesn't keep on retrying indefinitely, creating a busy
Expand Down
8 changes: 8 additions & 0 deletions protos/pb.proto
Expand Up @@ -139,6 +139,7 @@ message Group {
map<string, Tablet> tablets = 2; // Predicate + others are key.
uint64 snapshot_ts = 3; // Stores Snapshot transaction ts.
uint64 checksum = 4; // Stores a checksum.
uint64 checkpoint_ts = 5; // Stores checkpoint ts as seen by leader.
}

message License {
Expand All @@ -159,6 +160,7 @@ message ZeroProposal {
string key = 8; // Used as unique identifier for proposal id.
string cid = 9; // Used as unique identifier for the cluster.
License license = 10;
ZeroSnapshot snapshot = 11; // Used to make Zeros take a snapshot.
}

// MembershipState is used to pack together the current membership state of all the nodes
Expand Down Expand Up @@ -267,6 +269,12 @@ message Snapshot {
uint64 since_ts = 5;
}

message ZeroSnapshot {
uint64 index = 1;
uint64 checkpoint_ts = 2;
MembershipState state = 5;
}

message RestoreRequest {
uint32 group_id = 1;
uint64 restore_ts = 2;
Expand Down

0 comments on commit 5efdfbf

Please sign in to comment.