Skip to content

Commit c736b19

Browse files
committed
Snapshot streaming:
Typically a follower would ask a leader to send a snapshot. There's a rare edge that we witnessed where the leader would switch after asking the follower to get a snapshot. In that case, if the follower sends a snapshot request to another follower, Dgraph ends up in an infinitely failing loop. We skip this loop by not checking if the receiver of the request is a leader. Any follower should be able to service the request, once it is past the read timestamp. That's what this PR does.
1 parent 2a68d54 commit c736b19

File tree

1 file changed

+9
-6
lines changed

1 file changed

+9
-6
lines changed

worker/snapshot.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/dgraph-io/dgraph/conn"
2727
"github.com/dgraph-io/dgraph/posting"
2828
"github.com/dgraph-io/dgraph/protos/pb"
29-
"github.com/dgraph-io/dgraph/x"
3029
)
3130

3231
const (
@@ -108,6 +107,15 @@ func doStreamSnapshot(snap *pb.Snapshot, out pb.Worker_StreamSnapshotServer) err
108107
// might be OK. Otherwise, we'd want to version the schemas as well. Currently, they're stored
109108
// at timestamp=1.
110109

110+
// We no longer check if this node is the leader, because the leader can switch between snapshot
111+
// requests. Therefore, we wait until this node has reached snap.ReadTs, before servicing the
112+
// request. Any other node in the group should have the same data as the leader, once it is past
113+
// the read timestamp.
114+
glog.Infof("Waiting to reach timestamp: %d", snap.ReadTs)
115+
if err := posting.Oracle().WaitForTs(out.Context(), snap.ReadTs); err != nil {
116+
return err
117+
}
118+
111119
var num int
112120
stream := pstore.NewStreamAt(snap.ReadTs)
113121
stream.LogPrefix = "Sending Snapshot"
@@ -150,11 +158,6 @@ func (w *grpcWorker) StreamSnapshot(stream pb.Worker_StreamSnapshotServer) error
150158
atomic.AddInt32(&n.streaming, 1)
151159
defer atomic.AddInt32(&n.streaming, -1)
152160

153-
if !x.IsTestRun() {
154-
if !n.AmLeader() {
155-
return errNotLeader
156-
}
157-
}
158161
snap, err := stream.Recv()
159162
if err != nil {
160163
// If we don't even receive a request (here or if no StreamSnapshot is called), we can't

0 commit comments

Comments
 (0)