/
raft.go
165 lines (141 loc) · 3.89 KB
/
raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package server
import (
"context"
"log"
"net/http"
"strconv"
"strings"
"sync"
"time"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
"go.uber.org/zap"
"github.com/beihai0xff/puff/pkg/types"
"github.com/beihai0xff/puff/server/transport"
)
type raftNodeConfig struct {
id uint64 // client ID for raft session
peerMap map[uint64]string
logger *zap.Logger
// to check if msg receiver is removed from cluster
isIDRemoved func(id uint64) bool
raft.Node
raftStorage *raft.MemoryStorage
// heartbeat 心跳消息发送间隔
heartbeat time.Duration // for logging
}
type raftNode struct {
tickMu *sync.Mutex
node raft.Node
// MemoryStorage 是 etcd raft 提供的一个基于内存的实现,并不能进行持久化
raftStorage *raft.MemoryStorage
// transport specifies the transport to send and receive msgs to members.
// Sending messages MUST NOT block. It is okay to drop messages, since
// clients should timeout and reissue their messages.
// If transport is nil, server will panic.
transport transport.Transporter
// 提供一个周期性的时钟定时触发 Tick 方法
ticker *time.Ticker
raftNodeConfig
done <-chan struct{}
}
func newRaftNode(config raftNodeConfig) *raftNode {
n := &raftNode{
raftNodeConfig: config,
raftStorage: raft.NewMemoryStorage(),
done: make(chan struct{}),
}
if n.heartbeat == 0 {
n.ticker = &time.Ticker{}
} else {
n.ticker = time.NewTicker(n.heartbeat)
}
go n.startNode()
return n
}
func (rn *raftNode) startNode() {
peers := []raft.Peer{}
for i := range rn.peerMap {
peers = append(peers, raft.Peer{ID: i})
}
c := &raft.Config{
ID: rn.id,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rn.raftStorage,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
}
rn.node = raft.StartNode(c, peers)
// TODO: 添加通信模块实现
rn.transport = &transport.GRPCTransport{
Logger: rn.logger,
ID: types.ID(rn.id),
ClusterID: 0x1000,
Raft: rn,
ServerStats: stats.NewServerStats("", ""),
LeaderStats: stats.NewLeaderStats(rn.logger, strconv.FormatUint(rn.id, 10)),
ErrorC: make(chan error),
}
rn.transport.Start()
for peer, addr := range rn.peerMap {
if peer != rn.id {
rn.transport.AddPeer(types.ID(peer), []string{addr})
}
}
go rn.serveRaft()
go rn.run()
}
func (rn *raftNode) serveRaft() {
addr := rn.peerMap[rn.id][strings.LastIndex(rn.peerMap[rn.id], ":"):]
server := http.Server{
Addr: addr,
Handler: rn.transport.Handler(),
}
server.ListenAndServe()
}
func (rn *raftNode) run() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rn.node.Tick()
case rd := <-rn.node.Ready():
rn.raftStorage.Append(rd.Entries)
rn.transport.Send(rd.Messages)
if !raft.IsEmptySnap(rd.Snapshot) {
rn.raftStorage.ApplySnapshot(rd.Snapshot)
}
for _, entry := range rd.CommittedEntries {
// TODO: handle Entries
switch entry.Type {
case raftpb.EntryNormal:
log.Printf("Receive committed data on node %v: %v\n", rn.id, string(entry.Data))
case raftpb.EntryConfChangeV2:
var cc raftpb.ConfChangeV2
cc.Unmarshal(entry.Data)
rn.node.ApplyConfChange(cc)
}
}
rn.node.Advance()
case <-rn.done:
// stop raft state machine and thus stop the Transport.
rn.transport.Stop()
return
}
}
}
// raft.Node does not have locks in Raft package
func (rn *raftNode) tick() {
rn.tickMu.Lock()
defer rn.tickMu.Unlock()
rn.Tick()
}
func (rn *raftNode) Process(ctx context.Context, m raftpb.Message) error {
return rn.node.Step(ctx, m)
}
func (rn *raftNode) IsIDRemoved(id uint64) bool { return false }
func (rn *raftNode) ReportUnreachable(id uint64) {}
func (rn *raftNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}