-
Notifications
You must be signed in to change notification settings - Fork 4
/
tracker.go
66 lines (56 loc) · 1.66 KB
/
tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package command
import (
"github.com/ibalajiarun/go-consensus/peer/peerpb"
"github.com/ibalajiarun/go-consensus/pkg/command/commandpb"
"github.com/ibalajiarun/go-consensus/pkg/logger"
)
// Tracker tracks in-flight Raft proposals.
type Tracker struct {
logger logger.Logger
requests map[trackerID]chan<- *commandpb.CommandResult
responses map[trackerID]*commandpb.CommandResult
}
type trackerID struct {
target peerpb.PeerID
id uint64
}
// MakeTracker creates a new proposal Tracker.
func MakeTracker(log logger.Logger) Tracker {
return Tracker{
logger: log,
requests: make(map[trackerID]chan<- *commandpb.CommandResult),
responses: make(map[trackerID]*commandpb.CommandResult),
}
}
// Register registers a new proposal with the tracker.
func (pr *Tracker) Register(cmd *commandpb.Command, c chan<- *commandpb.CommandResult) bool {
pr.logger.Debugf("Registering response stream for %d %d", cmd.Target, cmd.Timestamp)
tid := trackerID{cmd.Target, cmd.Timestamp}
if res, ok := pr.responses[tid]; ok {
c <- res
return false
}
if _, ok := pr.requests[tid]; !ok {
pr.requests[tid] = c
return true
}
return false
}
// Finish informs a tracked proposal that it has completed.
func (pr *Tracker) Finish(cp *commandpb.CommandResult) {
pr.logger.Debugf("Notifying response stream for %d %d", cp.Target, cp.Timestamp)
tid := trackerID{cp.Target, cp.Timestamp}
if ret, ok := pr.requests[tid]; ok {
delete(pr.requests, tid)
ret <- cp
} else {
pr.responses[tid] = cp
}
}
// FinishAll informs all tracked proposal that they have completed.
func (pr *Tracker) FinishAll() {
for id, c := range pr.requests {
close(c)
delete(pr.requests, id)
}
}