This repository has been archived by the owner on May 22, 2023. It is now read-only.
/
recovery_coordinator.go
97 lines (81 loc) · 3.15 KB
/
recovery_coordinator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package consensus
import (
"github.com/golang/glog"
"github.com/heidi-ann/ios/msgs"
"reflect"
)
// returns true if successful
// start index is inclusive and end index is exclusive
func runRecoveryCoordinator(view int, startIndex int, endIndex int, peerNet *msgs.PeerNet, config ConfigAll) bool {
if startIndex == endIndex {
return true
} else if endIndex < startIndex {
glog.Fatal("Invalid recovery range ", startIndex, endIndex)
}
glog.Info("Starting recovery for indexes ", startIndex, " to ", endIndex)
// dispatch query to all
query := msgs.QueryRequest{config.ID, view, startIndex, endIndex}
peerNet.OutgoingBroadcast.Requests.Query <- query
// collect responses
noopEntry := msgs.Entry{0, false, []msgs.ClientRequest{noop}}
candidates := make([]msgs.Entry, endIndex-startIndex)
for i := 0; i < endIndex-startIndex; i++ {
candidates[i] = noopEntry
}
//check only one response is received per sender, index= node ID
for replied := make([]bool, config.N); !config.Quorum.checkRecoveryQuorum(replied); {
msg := <-peerNet.Incoming.Responses.Query
if msg.Request == query {
// check this is not a duplicate
if replied[msg.Response.SenderID] {
glog.Warning("Response already received from ", msg.Response.SenderID)
} else {
// check view
if msg.Response.View < view {
glog.Fatal("Reply view is < current view, this should not have occurred")
}
if view < msg.Response.View {
glog.Warning("Stepping down from recovery coordinator")
return false
}
res := msg.Response
replied[msg.Response.SenderID] = true
for i := 0; i < endIndex-startIndex; i++ {
if !reflect.DeepEqual(res.Entries[i], msgs.Entry{}) {
// if committed, then done
if res.Entries[i].Committed {
candidates[i] = res.Entries[i]
// TODO: add early exit here
}
// if first entry, then new candidate
if reflect.DeepEqual(candidates[i], noopEntry) {
candidates[i] = res.Entries[i]
}
// if higher view then candidate then new candidate
if res.Entries[i].View > candidates[i].View {
candidates[i] = res.Entries[i]
}
// if same view and different requests then panic!
if res.Entries[i].View == candidates[i].View && !reflect.DeepEqual(res.Entries[i].Requests, candidates[i].Requests) {
glog.Fatal("Same index has been issued more then once", res.Entries[i].Requests, candidates[i].Requests)
}
} else {
glog.V(1).Info("Log entry at index ", i, " on node ID ", msg.Response.SenderID, " is missing")
}
}
}
}
}
glog.Info("New view phase is finished")
// set the next view and marked as uncommitted
// TODO: add shortcut to skip prepare phase is entries are already committed.
for i := 0; i < endIndex-startIndex; i++ {
candidates[i] = msgs.Entry{view, false, candidates[i].Requests}
}
coord := msgs.CoordinateRequest{config.ID, view, startIndex, endIndex, true, candidates}
peerNet.OutgoingUnicast[config.ID].Requests.Coordinate <- coord
<-peerNet.Incoming.Responses.Coordinate
// TODO: check msg replies to the msg we just sent
glog.Info("Recovery completed for indexes ", startIndex, " to ", endIndex)
return true
}