-
Notifications
You must be signed in to change notification settings - Fork 0
/
distributed.go
549 lines (494 loc) · 17.7 KB
/
distributed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
package log
import (
"bytes"
"crypto/tls"
"fmt"
"io"
"net"
"os"
"path/filepath"
"time"
api "github.com/ac0mz/proglog/api/v1"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"google.golang.org/protobuf/proto"
)
// DistributedLog は分散ログサーバが保持するログ情報を管理する。
type DistributedLog struct {
config Config
log *Log // 単一サーバでの複製を行わないログ
raftLog *logStore // raftで作成した分散複製ログ
raft *raft.Raft
}
func NewDistributedLog(dataDir string, config Config) (*DistributedLog, error) {
l := &DistributedLog{
config: config,
}
if err := l.setupLog(dataDir); err != nil {
return nil, err
}
if err := l.setupRaft(dataDir); err != nil {
return nil, err
}
return l, nil
}
// setupLog はサーバのログストアを作成し、ユーザのレコードをストアに保存する。
func (l *DistributedLog) setupLog(dataDir string) error {
logDir := filepath.Join(dataDir, "log")
if err := os.MkdirAll(logDir, 0755); err != nil {
return err
}
var err error
l.log, err = NewLog(logDir, l.config)
return err
}
// setupRaft はRaftサーバの設定を行い、インスタンスを作成する。
//
// NOTE: Raftインスタンスの構成要素 ※当該メソッドの設定対象
// - 与えられたコマンドを適用する有限ステートマシン (FSM: finite-state_machine)
// - 上記コマンドを保存するログストア (log_store)
// - クラスタ構成 (クラスタ内のサーバ、アドレス、現在のターム、サーバが投票した候補など重要なメタデータ) を
// 保存する安定ストア (stable_store; key-value_store; Boltを利用)
// - データのコンパクトなスナップショットを保存するスナップショットストア (snapshot_store)
// 必要なときに効率的にデータを復旧する
// - 他のRaftサーバと接続するために使うネットワークトランスポート
func (l *DistributedLog) setupRaft(dataDir string) (err error) {
fsm := &fsm{log: l.log}
logDir := filepath.Join(dataDir, "raft", "log")
if err = os.MkdirAll(logDir, 0755); err != nil {
return err
}
logConfig := l.config
logConfig.Segment.InitialOffset = 1 // raftの要件に従い、初期オフセットを1に設定
l.raftLog, err = newLogStore(logDir, logConfig)
if err != nil {
return err
}
stableStore, err := raftboltdb.NewBoltStore(
filepath.Join(dataDir, "raft", "stable"),
)
if err != nil {
return err
}
retain := 1 // 1つのスナップショットを保持する
snapshotStore, err := raft.NewFileSnapshotStore(
filepath.Join(dataDir, "raft"),
retain,
os.Stderr,
)
if err != nil {
return err
}
maxPool := 5
timeout := 10 * time.Second
transport := raft.NewNetworkTransport(
l.config.Raft.StreamLayer,
maxPool,
timeout,
os.Stderr,
)
config := raft.DefaultConfig()
config.LocalID = l.config.Raft.LocalID // サーバの一意なIDで設定必須
// 以下、テスト高速化用にタイムアウトを上書き
if l.config.Raft.HeartbeatTimeout != 0 {
config.HeartbeatTimeout = l.config.Raft.HeartbeatTimeout
}
if l.config.Raft.ElectionTimeout != 0 {
config.ElectionTimeout = l.config.Raft.ElectionTimeout
}
if l.config.Raft.LeaderLeaseTimeout != 0 {
config.LeaderLeaseTimeout = l.config.Raft.LeaderLeaseTimeout
}
if l.config.Raft.CommitTimeout != 0 {
config.CommitTimeout = l.config.Raft.CommitTimeout
}
l.raft, err = raft.NewRaft(
config,
fsm,
l.raftLog,
stableStore,
snapshotStore,
transport,
)
if err != nil {
return err
}
// NOTE: 以下ブロックは不要かも。今後の正誤表を要確認。
hasState, err := raft.HasExistingState(l.raftLog, stableStore, snapshotStore)
if err != nil {
return err
}
// 1台目のサーバの場合、ブートストラップを実行
if l.config.Raft.Bootstrap && !hasState {
config := raft.Configuration{
Servers: []raft.Server{{
ID: config.LocalID,
Address: raft.ServerAddress(l.config.Raft.BindAddr),
}},
}
err = l.raft.BootstrapCluster(config).Error()
}
return err
}
// Append はログにレコードを追加する。
func (l *DistributedLog) Append(record *api.Record) (uint64, error) {
res, err := l.apply(
AppendRequestType,
&api.ProduceRequest{Record: record},
)
if err != nil {
return 0, err
}
return res.(*api.ProduceResponse).Offset, nil
}
// apply はRaftのAPIにリクエストを適用し、そのレスポンスを返却する。
func (l *DistributedLog) apply(
reqType RequestType,
req proto.Message,
) (interface{}, error) {
var buf bytes.Buffer // Raftが複製するレコードのデータ
if _, err := buf.Write([]byte{byte(reqType)}); err != nil {
return nil, err
}
b, err := proto.Marshal(req)
if err != nil {
return nil, err
}
if _, err := buf.Write(b); err != nil {
return nil, err
}
timeout := 10 * time.Second
future := l.raft.Apply(buf.Bytes(), timeout) // レコードを複製し、リーダーのログにレコード追加
// 結果(エラーか正常終了か)が分かるまで待機
// ※エラーのパターンは、Raftが処理するコマンドに時間が掛かっている場合、サーバがシャットダウンした場合
if future.Error() != nil {
return nil, err
}
// FSMのApplyメソッドの結果を返却
res := future.Response()
// Raftは単一の値を返却するため、型アサーションで検査
if err, ok := res.(error); ok {
return nil, err
}
return res, nil
}
// Read はサーバのログからオフセットで指定されたレコードを読み出す。
// 緩やかな一貫性 (relaxed consistency) のため、Raftを経由せずに読み出し操作を行う。
//
// NOTE:
// 強い一貫性 (strong consistency) が必要な場合、読み出しは書き込みに対して
// 最新でなければならないためRaftを経由する必要があるが、読み出し効率が悪くなり性能が落ちる。
func (l *DistributedLog) Read(offset uint64) (*api.Record, error) {
return l.log.Read(offset)
}
// Join はRaftクラスタにサーバを追加する。(すべてのサーバは投票者として追加される)
//
// NOTE:
// Raftは AddNonVoter API で非投票者としてサーバ追加することもサポートしている。数多くのサーバに
// 状態を複製し、最終的に一貫性のある読み取り専用の状態を提供したい場合、非投票者サーバが有用となる。
// なぜなら投票権を持つサーバを追加する度に、リーダーは過半数を得るために多くのサーバと通信する必要があり、
// レプリケーションや選出に時間がかかる可能性が高くなるためである。
func (l *DistributedLog) Join(id, addr string) error {
configFuture := l.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
return err
}
serverID := raft.ServerID(id)
serverAddr := raft.ServerAddress(addr)
for _, srv := range configFuture.Configuration().Servers {
if srv.ID == serverID || srv.Address == serverAddr {
if srv.ID == serverID && srv.Address == serverAddr {
// サーバはすでに参加済のためスキップ
return nil
}
// 既存サーバを除去
removeFuture := l.raft.RemoveServer(serverID, 0, 0)
if err := removeFuture.Error(); err != nil {
return err
}
}
}
addFuture := l.raft.AddVoter(serverID, serverAddr, 0, 0)
if err := addFuture.Error(); err != nil {
return err
}
return nil
}
// Leave はサーバをRaftクラスタから除去する。リーダーを除去した場合、新たな選出が行われる。
func (l *DistributedLog) Leave(id string) error {
removeFuture := l.raft.RemoveServer(raft.ServerID(id), 0, 0)
return removeFuture.Error()
}
// WaitForLeader はクラスタがリーダーを選出するか、タイムアウトするまで待機する。
func (l *DistributedLog) WaitForLeader(timeout time.Duration) error {
timeoutc := time.After(timeout)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-timeoutc:
return fmt.Errorf("timed out")
case <-ticker.C:
if l := l.raft.Leader(); l != "" {
return nil
}
}
}
}
// Close はRaftインスタンスをシャットダウンし、Raftのログストア及びローカルのログを閉じる。
func (l *DistributedLog) Close() error {
f := l.raft.Shutdown()
if err := f.Error(); err != nil {
return err
}
if err := l.raftLog.Log.Close(); err != nil {
return nil
}
return l.log.Close()
}
// GetServers はクラスタのサーバ情報一覧を返却する。
// サーバ情報にはクライアントが接続すべきサーバアドレスと、リーダかどうかのフラグを含む。
//
// NOTE:
// Raftの設定はクラスタ内のサーバで構成され、その設定には各サーバのID,アドレス,投票権(※)が含まれている。
// ※当該システムでは投票権は利用しない。
// そのためRaftは、クラスタ内のリーダーのアドレスを教えることができる。
func (l *DistributedLog) GetServers() ([]*api.Server, error) {
future := l.raft.GetConfiguration()
if err := future.Error(); err != nil {
return nil, err
}
var servers []*api.Server
for _, server := range future.Configuration().Servers {
servers = append(servers, &api.Server{
Id: string(server.ID),
RpcAddr: string(server.Address),
IsLeader: l.raft.Leader() == server.Address,
})
}
return servers, nil
}
var _ raft.FSM = (*fsm)(nil)
// fsm は有限ステートマシン (finite-state machine) として操作する対象のログを管理する。
type fsm struct {
log *Log
}
type RequestType uint8
const (
AppendRequestType RequestType = 0
)
// Apply はログエントリをコミット後にRaftから呼び出される。
func (f *fsm) Apply(record *raft.Log) interface{} {
buf := record.Data
reqType := RequestType(buf[0])
// リクエスト種別でどのコマンドを実行する (ロジックを含む対応メソッドを呼び出す) かを切り分け
switch reqType {
case AppendRequestType:
return f.applyAppend(buf[1:])
}
return nil
}
// applyAppend はローカルのログにレコードを追加する。
func (f *fsm) applyAppend(b []byte) interface{} {
var req api.ProduceRequest
if err := proto.Unmarshal(b, &req); err != nil {
return err
}
offset, err := f.log.Append(req.Record)
if err != nil {
return err
}
return &api.ProduceResponse{Offset: offset}
}
// Snapshot は定期的にRaftから呼び出され、状態 (FSMのログ) の point-in-time snapshot を取得する。
//
// 設定した SnapshotInterval, SnapshotThreshold に従ってRaftから呼び出される。
// - SnapshotInterval: Raftがsnapshotを取得すべきかを検査する間隔。デフォルトは2分。
// - SnapshotThreshold: 新規snapshotの作成前に、最後のsnapshotから何個のログを取得するか。デフォルトは8192個。
//
// NOTE: 当該スナップショットの2つの目的
// - 1つはRaftがすでに適用したコマンドのログを保存しないよう、Raftのログをコンパクトにする
// - リーダーがログ全体を何度も複製させずに、Raftが新規でサーバを起動できるようにする
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
r := f.log.Reader()
return &snapshot{reader: r}, nil
}
var _ raft.FSMSnapshot = (*snapshot)(nil)
type snapshot struct {
reader io.Reader
}
// Persist はRaftから呼び出され、状態 (FSMのログ) をスナップショットストアに保存する。
//
// NOTE:
// 当該サービスではスナップショットストアとしてファイルを使用するため、
// スナップショットが完了すると、Raftのすべてのログデータを含むファイルを取得できる。
func (s *snapshot) Persist(sink raft.SnapshotSink) error {
if _, err := io.Copy(sink, s.reader); err != nil {
_ = sink.Cancel()
return err
}
return sink.Close()
}
// Release はスナップショットが終了すると、Raftから呼び出される。
func (s *snapshot) Release() {}
// Restore はスナップショットからFSMを復元するためにRaftから呼び出される。
//
// NOTE:
// あるサーバが失われた後に新たなサーバを追加した場合、失ったサーバのFSMを復元する状況において
// FSMの状態がリーダーの複製された状態と一致するよう、既存の状態を破棄する必要がある。
func (f *fsm) Restore(snapshot io.ReadCloser) error {
b := make([]byte, lenWidth)
var buf bytes.Buffer
for i := 0; ; i++ {
_, err := io.ReadFull(snapshot, b)
if err == io.EOF {
break // すべて読み出し終えたらループを抜ける
} else if err != nil {
return err
}
size := int64(enc.Uint64(b))
if _, err = io.CopyN(&buf, snapshot, size); err != nil {
return err
}
// 元のレコードを復元
record := &api.Record{}
if err = proto.Unmarshal(buf.Bytes(), record); err != nil {
return err
}
if i == 0 {
// 1件目のレコードの場合、初期オフセットとしてレコードのオフセットを設定し、
// 既存の状態を破棄 (初期オフセットを用いて新規セグメントを作成)
f.log.Config.Segment.InitialOffset = record.Offset
if err := f.log.Reset(); err != nil {
return err
}
}
if _, err = f.log.Append(record); err != nil {
return err
}
buf.Reset()
}
return nil
}
var _ raft.LogStore = (*logStore)(nil)
type logStore struct {
*Log
}
func newLogStore(dir string, c Config) (*logStore, error) {
log, err := NewLog(dir, c)
if err != nil {
return nil, err
}
return &logStore{log}, nil
}
// FirstIndex はRaftから呼び出され、最古のオフセット(Raftではインデックスと呼ぶ)を取得する。
func (l *logStore) FirstIndex() (uint64, error) {
return l.LowestOffset()
}
// LastIndex はRaftから呼び出され、最新のオフセット(Raftではインデックスと呼ぶ)を取得する。
func (l *logStore) LastIndex() (uint64, error) {
return l.HighestOffset()
}
// GetLog はRaftから呼び出され、indexを元にレコードを取得し、outに値を設定する。
func (l *logStore) GetLog(index uint64, out *raft.Log) error {
in, err := l.Read(index)
if err != nil {
return err
}
out.Data = in.Value
out.Index = in.Offset
out.Type = raft.LogType(in.Type)
out.Term = in.Term
return nil
}
// StoreLog はRaftから呼び出され、ログにレコードを追加する。
func (l *logStore) StoreLog(record *raft.Log) error {
return l.StoreLogs([]*raft.Log{record})
}
// StoreLogs はRaftから呼び出され、ログにレコードを追加する。
func (l *logStore) StoreLogs(records []*raft.Log) error {
for _, record := range records {
if _, err := l.Append(&api.Record{
Value: record.Data,
Term: record.Term,
Type: uint32(record.Type),
}); err != nil {
return err
}
}
return nil
}
// DeleteRange は古いレコードを削除する。
//
// NOTE:
// 本来DeleteRangeメソッドはオフセット間のレコードを削除する。
// 古いレコードやスナップショットに保存されているレコードを削除するためにRaftから呼び出される。
func (l *logStore) DeleteRange(min, max uint64) error {
return l.Truncate(max)
}
var _ raft.StreamLayer = (*StreamLayer)(nil)
// StreamLayer はRaftサーバと接続するにあたって低レベルなストリーム抽象化を提供するための、
// トランスポートのStreamLayerインタフェースを満たすメソッドを実装する。
type StreamLayer struct {
ln net.Listener
serverTLSConfig *tls.Config // サーバ間の暗号化通信における受信コネクションを受け入れるためのTLS設定
peerTLSConfig *tls.Config // サーバ間の暗号化通信における送信コネクションを作成するためのTLS設定
}
func NewStreamLayer(ln net.Listener, serverTLSConfig, peerTLSConfig *tls.Config) *StreamLayer {
return &StreamLayer{
ln: ln,
serverTLSConfig: serverTLSConfig,
peerTLSConfig: peerTLSConfig,
}
}
const RaftRPC = 1
// Dial はRaftクラスタ内における他サーバへの新たな発信コネクションを作成する。
//
// NOTE:
// サーバ接続の際、コネクション種別を識別するためにRaftRPCバイトを書き込み、
// ログのgRPCリクエストと同じポートでRaftを多重化できる。
// ストリームレイヤをピアTLSで設定することで、TLSクライアント側の接続が行われる。
func (s *StreamLayer) Dial(addr raft.ServerAddress, timeout time.Duration) (net.Conn, error) {
dialer := &net.Dialer{Timeout: timeout}
conn, err := dialer.Dial("tcp", string(addr))
if err != nil {
return nil, err
}
// Raft RPC であることを特定する
_, err = conn.Write([]byte{byte(RaftRPC)})
if err != nil {
return nil, err
}
if s.peerTLSConfig != nil {
conn = tls.Client(conn, s.peerTLSConfig)
}
return conn, nil
}
// Accept はDialメソッドに対応し、入ってくるコネクションを受け入れ、
// コネクション種別を識別するバイトを読み出し、サーバ側のTLS接続を作成する。
func (s *StreamLayer) Accept() (net.Conn, error) {
conn, err := s.ln.Accept()
if err != nil {
return nil, err
}
b := make([]byte, 1)
_, err = conn.Read(b)
if err != nil {
return nil, err
}
if !bytes.Equal([]byte{byte(RaftRPC)}, b) {
return nil, fmt.Errorf("not a raft rpc")
}
if s.serverTLSConfig != nil {
return tls.Server(conn, s.serverTLSConfig), nil
}
return conn, nil
}
// Close はリスナーをクローズする。
func (s *StreamLayer) Close() error {
return s.ln.Close()
}
// Addr はリスナーのアドレスを返却する。
func (s *StreamLayer) Addr() net.Addr {
return s.ln.Addr()
}