-
Notifications
You must be signed in to change notification settings - Fork 72
/
replica.go
93 lines (80 loc) · 2.27 KB
/
replica.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package m2paxos
import (
"github.com/ailidani/paxi"
"github.com/ailidani/paxi/log"
)
// Replica is WPaxos replica node
type Replica struct {
paxi.Node
paxi map[paxi.Key]*kpaxos
}
// NewReplica create new Replica instance
func NewReplica(id paxi.ID) *Replica {
r := new(Replica)
r.Node = paxi.NewNode(id)
r.paxi = make(map[paxi.Key]*kpaxos)
r.Register(paxi.Request{}, r.handleRequest)
r.Register(Prepare{}, r.handlePrepare)
r.Register(Promise{}, r.handlePromise)
r.Register(Accept{}, r.handleAccept)
r.Register(Accepted{}, r.handleAccepted)
r.Register(Commit{}, r.handleCommit)
r.Register(LeaderChange{}, r.handleLeaderChange)
return r
}
func (r *Replica) init(key paxi.Key) {
if _, exists := r.paxi[key]; !exists {
r.paxi[key] = newKPaxos(key, r.Node)
}
}
func (r *Replica) handleRequest(m paxi.Request) {
log.Debugf("Replica %s received %v\n", r.ID(), m)
key := m.Command.Key
r.init(key)
p := r.paxi[key]
if p.IsLeader() || p.Ballot() == 0 {
p.HandleRequest(m)
to := p.Hit(m.NodeID)
if to != "" && to.Zone() != r.ID().Zone() {
p.Send(to, LeaderChange{
Key: key,
To: to,
From: r.ID(),
Ballot: p.Ballot(),
})
}
} else {
go r.Forward(p.Leader(), m)
}
}
func (r *Replica) handlePrepare(m Prepare) {
log.Debugf("Replica %s ===[%v]===>>> Replica %s\n", m.Ballot.ID(), m, r.ID())
r.init(m.Key)
r.paxi[m.Key].HandleP1a(m.P1a)
}
func (r *Replica) handlePromise(m Promise) {
log.Debugf("Replica %s ===[%v]===>>> Replica %s\n", m.ID, m, r.ID())
r.paxi[m.Key].HandleP1b(m.P1b)
}
func (r *Replica) handleAccept(m Accept) {
log.Debugf("Replica %s ===[%v]===>>> Replica %s\n", m.Ballot.ID(), m, r.ID())
r.init(m.Key)
r.paxi[m.Key].HandleP2a(m.P2a)
}
func (r *Replica) handleAccepted(m Accepted) {
log.Debugf("Replica %s ===[%v]===>>> Replica %s\n", m.ID, m, r.ID())
r.paxi[m.Key].HandleP2b(m.P2b)
}
func (r *Replica) handleCommit(m Commit) {
log.Debugf("Replica %s ===[%v]===>>> Replica %s\n", m.Ballot.ID(), m, r.ID())
r.init(m.Key)
r.paxi[m.Key].HandleP3(m.P3)
}
func (r *Replica) handleLeaderChange(m LeaderChange) {
log.Debugf("Replica %s ===[%v]===>>> Replica %s\n", m.From, m, r.ID())
p := r.paxi[m.Key]
if m.Ballot == p.Ballot() && m.To == r.ID() {
// log.Debugf("Replica %s : change leader of key %d\n", r.ID(), m.Key)
p.P1a()
}
}