forked from ngaut/unistore
/
peer.go
1843 lines (1619 loc) · 54.1 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2019-present PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package raftstore
import (
"bytes"
"encoding/binary"
"fmt"
"math"
"sync/atomic"
"time"
"unsafe"
"github.com/ngaut/unistore/tikv/raftstore/raftlog"
"github.com/ngaut/unistore/tikv/mvcc"
"github.com/pingcap/kvproto/pkg/eraftpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/raft_cmdpb"
rspb "github.com/pingcap/kvproto/pkg/raft_serverpb"
"github.com/pingcap/log"
"github.com/zhangjinpeng1987/raft"
)
type ReadyICPair struct {
Ready raft.Ready
IC *InvokeContext
}
type StaleState int
const (
StaleStateValid StaleState = 0 + iota
StaleStateToValidate
StaleStateLeaderMissing
)
type ReqCbPair struct {
Req *raft_cmdpb.RaftCmdRequest
Cb *Callback
}
type ReadIndexRequest struct {
id uint64
cmds []*ReqCbPair
renewLeaseTime *time.Time
}
func NewReadIndexRequest(id uint64, cmds []*ReqCbPair, renewLeaseTime *time.Time) *ReadIndexRequest {
return &ReadIndexRequest{
id: id,
cmds: cmds,
renewLeaseTime: renewLeaseTime,
}
}
func (r *ReadIndexRequest) binaryId() []byte {
var buf = make([]byte, 8)
binary.BigEndian.PutUint64(buf, r.id)
return buf
}
type ReadIndexQueue struct {
idAllocator uint64
reads []*ReadIndexRequest
readyCnt int
}
func (q *ReadIndexQueue) PopFront() *ReadIndexRequest {
if len(q.reads) > 0 {
req := q.reads[0]
q.reads = q.reads[1:]
return req
}
return nil
}
func NotifyStaleReq(term uint64, cb *Callback) {
cb.Done(ErrRespStaleCommand(term))
}
func NotifyReqRegionRemoved(regionId uint64, cb *Callback) {
regionNotFound := &ErrRegionNotFound{RegionId: regionId}
resp := ErrResp(regionNotFound)
cb.Done(resp)
}
func (r *ReadIndexQueue) NextId() uint64 {
r.idAllocator += 1
return r.idAllocator
}
func (r *ReadIndexQueue) ClearUncommitted(term uint64) {
uncommitted := r.reads[r.readyCnt:]
r.reads = r.reads[:r.readyCnt]
for _, read := range uncommitted {
for _, reqCbPair := range read.cmds {
NotifyStaleReq(term, reqCbPair.Cb)
}
read.cmds = nil
}
}
type ProposalMeta struct {
Index uint64
Term uint64
RenewLeaseTime *time.Time
}
type ProposalQueue struct {
queue []*ProposalMeta
}
func (q *ProposalQueue) PopFront(term uint64) *ProposalMeta {
if len(q.queue) == 0 || q.queue[0].Term > term {
return nil
}
meta := q.queue[0]
q.queue = q.queue[1:]
return meta
}
func (q *ProposalQueue) Push(meta *ProposalMeta) {
q.queue = append(q.queue, meta)
}
func (q *ProposalQueue) Clear() {
for i := range q.queue {
q.queue[i] = nil
}
q.queue = q.queue[:0]
}
const (
ProposalContext_SyncLog ProposalContext = 1
ProposalContext_Split ProposalContext = 1 << 1
ProposalContext_PrepareMerge ProposalContext = 1 << 2
)
type ProposalContext byte
func (c ProposalContext) ToBytes() []byte {
return []byte{byte(c)}
}
func NewProposalContextFromBytes(ctx []byte) *ProposalContext {
var res ProposalContext
l := len(ctx)
if l == 0 {
return nil
} else if l == 1 {
res = ProposalContext(ctx[0])
} else {
panic(fmt.Sprintf("Invalid ProposalContext %v", ctx))
}
return &res
}
func (c *ProposalContext) contains(flag ProposalContext) bool {
return byte(*c)&byte(flag) != 0
}
func (c *ProposalContext) insert(flag ProposalContext) {
*c |= flag
}
type PeerStat struct {
WrittenBytes uint64
WrittenKeys uint64
}
/// A struct that stores the state to wait for `PrepareMerge` apply result.
///
/// When handling the apply result of a `CommitMerge`, the source peer may have
/// not handle the apply result of the `PrepareMerge`, so the target peer has
/// to abort current handle process and wait for it asynchronously.
type WaitApplyResultState struct {
/// The following apply results waiting to be handled, including the `CommitMerge`.
/// These will be handled once `ReadyToMerge` is true.
results []*applyTaskRes
/// It is used by target peer to check whether the apply result of `PrepareMerge` is handled.
readyToMerge *uint32
}
type RecentAddedPeer struct {
RejectDurationAsSecs uint64
Id uint64
AddedTime time.Time
}
func NewRecentAddedPeer(rejectDurationAsSecs uint64) *RecentAddedPeer {
return &RecentAddedPeer{
RejectDurationAsSecs: rejectDurationAsSecs,
Id: 0,
AddedTime: time.Now(),
}
}
func (r *RecentAddedPeer) Update(id uint64, now time.Time) {
r.Id = id
r.AddedTime = now
}
func (r *RecentAddedPeer) Contains(id uint64) bool {
if r.Id == id {
now := time.Now()
elapsedSecs := now.Sub(r.AddedTime).Seconds()
return uint64(elapsedSecs) < r.RejectDurationAsSecs
}
return false
}
/// `ConsistencyState` is used for consistency check.
type ConsistencyState struct {
LastCheckTime time.Time
// (computed_result_or_to_be_verified, index, hash)
Index uint64
Hash []byte
}
type DestroyPeerJob struct {
Initialized bool
AsyncRemove bool
RegionId uint64
Peer *metapb.Peer
}
type Peer struct {
Meta *metapb.Peer
regionId uint64
RaftGroup *raft.RawNode
peerStorage *PeerStorage
proposals *ProposalQueue
applyProposals []*proposal
pendingReads *ReadIndexQueue
peerCache map[uint64]*metapb.Peer
// Record the last instant of each peer's heartbeat response.
PeerHeartbeats map[uint64]time.Time
/// Record the instants of peers being added into the configuration.
/// Remove them after they are not pending any more.
PeersStartPendingTime map[uint64]time.Time
RecentAddedPeer *RecentAddedPeer
/// an inaccurate difference in region size since last reset.
SizeDiffHint uint64
/// delete keys' count since last reset.
deleteKeysHint uint64
/// approximate size of the region.
ApproximateSize *uint64
/// approximate keys of the region.
ApproximateKeys *uint64
CompactionDeclinedBytes uint64
ConsistencyState *ConsistencyState
Tag string
// Index of last scheduled committed raft log.
LastApplyingIdx uint64
LastCompactedIdx uint64
// The index of the latest urgent proposal index.
lastUrgentProposalIdx uint64
// The index of the latest committed split command.
lastCommittedSplitIdx uint64
// Approximate size of logs that is applied but not compacted yet.
RaftLogSizeHint uint64
PendingRemove bool
// The index of the latest committed prepare merge command.
lastCommittedPrepareMergeIdx uint64
PendingMergeState *rspb.MergeState
leaderMissingTime *time.Time
leaderLease *Lease
leaderChecker leaderChecker
// If a snapshot is being applied asynchronously, messages should not be sent.
pendingMessages []eraftpb.Message
PendingMergeApplyResult *WaitApplyResultState
PeerStat PeerStat
}
func NewPeer(storeId uint64, cfg *Config, engines *Engines, region *metapb.Region, regionSched chan<- task,
peer *metapb.Peer) (*Peer, error) {
if peer.GetId() == InvalidID {
return nil, fmt.Errorf("invalid peer id")
}
tag := fmt.Sprintf("[region %v] %v", region.GetId(), peer.GetId())
ps, err := NewPeerStorage(engines, region, regionSched, peer.GetId(), tag)
if err != nil {
return nil, err
}
appliedIndex := ps.AppliedIndex()
raftCfg := &raft.Config{
ID: peer.GetId(),
ElectionTick: cfg.RaftElectionTimeoutTicks,
HeartbeatTick: cfg.RaftHeartbeatTicks,
MaxSizePerMsg: cfg.RaftMaxSizePerMsg,
MaxInflightMsgs: cfg.RaftMaxInflightMsgs,
Applied: appliedIndex,
CheckQuorum: true,
PreVote: cfg.Prevote,
Storage: ps,
}
raftGroup, err := raft.NewRawNode(raftCfg, nil)
if err != nil {
return nil, err
}
now := time.Now()
p := &Peer{
Meta: peer,
regionId: region.GetId(),
RaftGroup: raftGroup,
peerStorage: ps,
proposals: new(ProposalQueue),
pendingReads: new(ReadIndexQueue),
peerCache: make(map[uint64]*metapb.Peer),
PeerHeartbeats: make(map[uint64]time.Time),
PeersStartPendingTime: make(map[uint64]time.Time),
RecentAddedPeer: NewRecentAddedPeer(uint64(cfg.RaftRejectTransferLeaderDuration.Seconds())),
ConsistencyState: &ConsistencyState{
LastCheckTime: now,
Index: RaftInvalidIndex,
},
leaderMissingTime: &now,
Tag: tag,
LastApplyingIdx: appliedIndex,
lastUrgentProposalIdx: math.MaxInt64,
leaderLease: NewLease(cfg.RaftStoreMaxLeaderLease),
}
p.leaderChecker.peerID = p.PeerId()
p.leaderChecker.region = unsafe.Pointer(region)
p.leaderChecker.term.Store(p.Term())
p.leaderChecker.appliedIndexTerm.Store(ps.appliedIndexTerm)
// If this region has only one peer and I am the one, campaign directly.
if len(region.GetPeers()) == 1 && region.GetPeers()[0].GetStoreId() == storeId {
err = p.RaftGroup.Campaign()
if err != nil {
return nil, err
}
}
return p, nil
}
func (p *Peer) getEventContext() *PeerEventContext {
return &PeerEventContext{
LeaderChecker: &p.leaderChecker,
RegionId: p.regionId,
}
}
func (p *Peer) insertPeerCache(peer *metapb.Peer) {
p.peerCache[peer.GetId()] = peer
}
func (p *Peer) removePeerCache(peerID uint64) {
delete(p.peerCache, peerID)
}
func (p *Peer) getPeerFromCache(peerID uint64) *metapb.Peer {
if peer, ok := p.peerCache[peerID]; ok {
return peer
}
for _, peer := range p.peerStorage.Region().GetPeers() {
if peer.GetId() == peerID {
p.insertPeerCache(peer)
return peer
}
}
return nil
}
/// Register self to applyMsgs so that the peer is then usable.
/// Also trigger `RegionChangeEvent::Create` here.
func (p *Peer) Activate(applyMsgs *applyMsgs) {
applyMsgs.appendMsg(p.regionId, NewMsg(MsgTypeApplyRegistration, newRegistration(p)))
}
func (p *Peer) nextProposalIndex() uint64 {
return p.RaftGroup.Raft.RaftLog.LastIndex() + 1
}
/// Tries to destroy itself. Returns a job (if needed) to do more cleaning tasks.
func (p *Peer) MaybeDestroy() *DestroyPeerJob {
if p.PendingRemove {
log.S().Infof("%v is being destroyed, skip", p.Tag)
return nil
}
initialized := p.peerStorage.isInitialized()
asyncRemove := false
if p.IsApplyingSnapshot() {
if !p.Store().CancelApplyingSnap() {
log.S().Infof("%v stale peer %v is applying snapshot", p.Tag, p.Meta.Id)
return nil
}
// There is no tasks in apply/local read worker.
asyncRemove = false
} else {
asyncRemove = initialized
}
p.PendingRemove = true
p.leaderChecker.invalid.Store(true)
return &DestroyPeerJob{
AsyncRemove: asyncRemove,
Initialized: initialized,
RegionId: p.regionId,
Peer: p.Meta,
}
}
/// Does the real destroy task which includes:
/// 1. Set the region to tombstone;
/// 2. Clear data;
/// 3. Notify all pending requests.
func (p *Peer) Destroy(engine *Engines, keepData bool) error {
start := time.Now()
region := p.Region()
log.S().Infof("%v begin to destroy", p.Tag)
// Set Tombstone state explicitly
kvWB := new(WriteBatch)
raftWB := new(WriteBatch)
if err := p.Store().clearMeta(kvWB, raftWB); err != nil {
return err
}
var mergeState *rspb.MergeState
if p.PendingMergeState != nil {
mergeState = p.PendingMergeState
}
WritePeerState(kvWB, region, rspb.PeerState_Tombstone, mergeState)
// write kv rocksdb first in case of restart happen between two write
// Todo: sync = ctx.cfg.sync_log
if err := kvWB.WriteToKV(engine.kv); err != nil {
return err
}
if err := raftWB.WriteToRaft(engine.raft); err != nil {
return err
}
if p.Store().isInitialized() && !keepData {
// If we meet panic when deleting data and raft log, the dirty data
// will be cleared by a newer snapshot applying or restart.
if err := p.Store().ClearData(); err != nil {
log.S().Errorf("%v failed to schedule clear data task %v", p.Tag, err)
}
}
for _, read := range p.pendingReads.reads {
for _, r := range read.cmds {
NotifyReqRegionRemoved(region.Id, r.Cb)
}
read.cmds = nil
}
p.pendingReads.reads = nil
for _, proposal := range p.applyProposals {
NotifyReqRegionRemoved(region.Id, proposal.cb)
}
p.applyProposals = nil
log.S().Infof("%v destroy itself, takes %v", p.Tag, time.Now().Sub(start))
return nil
}
func (p *Peer) isInitialized() bool {
return p.peerStorage.isInitialized()
}
func (p *Peer) Region() *metapb.Region {
return p.peerStorage.Region()
}
/// Set the region of a peer.
///
/// This will update the region of the peer, caller must ensure the region
/// has been preserved in a durable device.
func (p *Peer) SetRegion(region *metapb.Region) {
if p.Region().GetRegionEpoch().GetVersion() < region.GetRegionEpoch().GetVersion() {
// Epoch version changed, disable read on the localreader for this region.
p.leaderLease.ExpireRemoteLease()
}
p.Store().SetRegion(region)
// Always update leaderChecker's region to avoid stale region info after a follower
// becoming a leader.
if !p.PendingRemove {
atomic.StorePointer(&p.leaderChecker.region, unsafe.Pointer(region))
}
}
func (p *Peer) PeerId() uint64 {
return p.Meta.GetId()
}
func (p *Peer) GetRaftStatus() *raft.Status {
return p.RaftGroup.Status()
}
func (p *Peer) LeaderId() uint64 {
return p.RaftGroup.Raft.Lead
}
func (p *Peer) IsLeader() bool {
return p.RaftGroup.Raft.State == raft.StateLeader
}
func (p *Peer) GetRole() raft.StateType {
return p.RaftGroup.Raft.State
}
func (p *Peer) Store() *PeerStorage {
return p.peerStorage
}
func (p *Peer) IsApplyingSnapshot() bool {
return p.Store().IsApplyingSnapshot()
}
/// Returns `true` if the raft group has replicated a snapshot but not committed it yet.
func (p *Peer) HasPendingSnapshot() bool {
return p.RaftGroup.GetSnap() != nil
}
func (p *Peer) Send(trans Transport, msgs []eraftpb.Message) error {
for _, msg := range msgs {
msgType := msg.MsgType
err := p.sendRaftMessage(msg, trans)
if err != nil {
return err
}
switch msgType {
case eraftpb.MessageType_MsgTimeoutNow:
// After a leader transfer procedure is triggered, the lease for
// the old leader may be expired earlier than usual, since a new leader
// may be elected and the old leader doesn't step down due to
// network partition from the new leader.
// For lease safety during leader transfer, transit `leader_lease`
// to suspect.
p.leaderLease.Suspect(time.Now())
default:
}
}
return nil
}
/// Steps the raft message.
func (p *Peer) Step(m *eraftpb.Message) error {
if p.IsLeader() && m.GetFrom() != InvalidID {
p.PeerHeartbeats[m.GetFrom()] = time.Now()
// As the leader we know we are not missing.
p.leaderMissingTime = nil
} else if m.GetFrom() == p.LeaderId() {
// As another role know we're not missing.
p.leaderMissingTime = nil
}
return p.RaftGroup.Step(*m)
}
/// Checks and updates `peer_heartbeats` for the peer.
func (p *Peer) CheckPeers() {
if !p.IsLeader() {
if len(p.PeerHeartbeats) > 0 {
p.PeerHeartbeats = make(map[uint64]time.Time)
}
return
}
if len(p.PeerHeartbeats) == len(p.Region().GetPeers()) {
return
}
// Insert heartbeats in case that some peers never response heartbeats.
region := p.Region()
for _, peer := range region.GetPeers() {
if _, ok := p.PeerHeartbeats[peer.GetId()]; !ok {
p.PeerHeartbeats[peer.GetId()] = time.Now()
}
}
}
/// Collects all down peers.
func (p *Peer) CollectDownPeers(maxDuration time.Duration) []*pdpb.PeerStats {
downPeers := make([]*pdpb.PeerStats, 0)
for _, peer := range p.Region().GetPeers() {
if peer.GetId() == p.Meta.GetId() {
continue
}
if hb, ok := p.PeerHeartbeats[peer.GetId()]; ok {
elapsed := time.Since(hb)
if elapsed > maxDuration {
stats := &pdpb.PeerStats{
Peer: peer,
DownSeconds: uint64(elapsed.Seconds()),
}
downPeers = append(downPeers, stats)
}
}
}
return downPeers
}
/// Collects all pending peers and update `peers_start_pending_time`.
func (p *Peer) CollectPendingPeers() []*metapb.Peer {
pendingPeers := make([]*metapb.Peer, 0, len(p.Region().GetPeers()))
status := p.RaftGroup.Status()
truncatedIdx := p.Store().truncatedIndex()
// status.Progress includes learner progress
for id, progress := range status.Progress {
if id == p.Meta.GetId() {
continue
}
if progress.Match < truncatedIdx {
if peer := p.getPeerFromCache(id); peer != nil {
pendingPeers = append(pendingPeers, peer)
if _, ok := p.PeersStartPendingTime[id]; !ok {
now := time.Now()
p.PeersStartPendingTime[id] = now
log.S().Debugf("%v peer %v start pending at %v", p.Tag, id, now)
}
}
}
}
return pendingPeers
}
func (p *Peer) clearPeersStartPendingTime() {
for id := range p.PeersStartPendingTime {
delete(p.PeersStartPendingTime, id)
}
}
/// Returns `true` if any new peer catches up with the leader in replicating logs.
/// And updates `PeersStartPendingTime` if needed.
func (p *Peer) AnyNewPeerCatchUp(peerId uint64) bool {
if len(p.PeersStartPendingTime) == 0 {
return false
}
if !p.IsLeader() {
p.clearPeersStartPendingTime()
return false
}
if startPendingTime, ok := p.PeersStartPendingTime[peerId]; ok {
truncatedIdx := p.Store().truncatedIndex()
progress, ok := p.RaftGroup.Raft.Prs[peerId]
if !ok {
progress, ok = p.RaftGroup.Raft.LearnerPrs[peerId]
}
if ok {
if progress.Match >= truncatedIdx {
delete(p.PeersStartPendingTime, peerId)
elapsed := time.Since(startPendingTime)
log.S().Debugf("%v peer %v has caught up logs, elapsed: %v", p.Tag, peerId, elapsed)
return true
}
}
}
return false
}
func (p *Peer) CheckStaleState(cfg *Config) StaleState {
if p.IsLeader() {
// Leaders always have valid state.
//
// We update the leader_missing_time in the `func Step`. However one peer region
// does not send any raft messages, so we have to check and update it before
// reporting stale states.
p.leaderMissingTime = nil
return StaleStateValid
}
naivePeer := !p.isInitialized() || p.RaftGroup.Raft.IsLearner
// Updates the `leader_missing_time` according to the current state.
//
// If we are checking this it means we suspect the leader might be missing.
// Mark down the time when we are called, so we can check later if it's been longer than it
// should be.
if p.leaderMissingTime == nil {
now := time.Now()
p.leaderMissingTime = &now
return StaleStateValid
} else {
elapsed := time.Since(*p.leaderMissingTime)
if elapsed >= cfg.MaxLeaderMissingDuration {
// Resets the `leader_missing_time` to avoid sending the same tasks to
// PD worker continuously during the leader missing timeout.
now := time.Now()
p.leaderMissingTime = &now
return StaleStateToValidate
} else if elapsed >= cfg.AbnormalLeaderMissingDuration && !naivePeer {
// A peer is considered as in the leader missing state
// if it's initialized but is isolated from its leader or
// something bad happens that the raft group can not elect a leader.
return StaleStateLeaderMissing
}
return StaleStateValid
}
}
func (p *Peer) OnRoleChanged(observer PeerEventObserver, ready *raft.Ready) {
ss := ready.SoftState
if ss != nil {
if ss.RaftState == raft.StateLeader {
// The local read can only be performed after a new leader has applied
// the first empty entry on its term. After that the lease expiring time
// should be updated to
// send_to_quorum_ts + max_lease
// as the comments in `Lease` explain.
// It is recommended to update the lease expiring time right after
// this peer becomes leader because it's more convenient to do it here and
// it has no impact on the correctness.
p.MaybeRenewLeaderLease(time.Now())
if !p.PendingRemove {
p.leaderChecker.term.Store(p.Term())
}
observer.OnRoleChange(p.getEventContext().RegionId, ss.RaftState)
} else if ss.RaftState == raft.StateFollower {
p.leaderLease.Expire()
observer.OnRoleChange(p.getEventContext().RegionId, ss.RaftState)
}
}
}
func (p *Peer) ReadyToHandlePendingSnap() bool {
// If apply worker is still working, written apply state may be overwritten
// by apply worker. So we have to wait here.
// Please note that committed_index can't be used here. When applying a snapshot,
// a stale heartbeat can make the leader think follower has already applied
// the snapshot, and send remaining log entries, which may increase committed_index.
return p.LastApplyingIdx == p.Store().AppliedIndex()
}
func (p *Peer) readyToHandleRead() bool {
// 1. There may be some values that are not applied by this leader yet but the old leader,
// if applied_index_term isn't equal to current term.
// 2. There may be stale read if the old leader splits really slow,
// the new region may already elected a new leader while
// the old leader still think it owns the splitted range.
// 3. There may be stale read if a target leader is in another store and
// applied commit merge, written new values, but the sibling peer in
// this store does not apply commit merge, so the leader is not ready
// to read, until the merge is rollbacked.
return p.Store().appliedIndexTerm == p.Term() && !p.isSplitting() && !p.isMerging()
}
func (p *Peer) isSplitting() bool {
return p.lastCommittedSplitIdx > p.Store().AppliedIndex()
}
func (p *Peer) isMerging() bool {
return p.lastCommittedPrepareMergeIdx > p.Store().AppliedIndex() || p.PendingMergeState != nil
}
func (p *Peer) TakeApplyProposals() *regionProposal {
if len(p.applyProposals) == 0 {
return nil
}
props := p.applyProposals
p.applyProposals = nil
return newRegionProposal(p.PeerId(), p.regionId, props)
}
func (p *Peer) HandleRaftReadyAppend(trans Transport, applyMsgs *applyMsgs, kvWB, raftWB *WriteBatch, observer PeerEventObserver) *ReadyICPair {
if p.PendingRemove {
return nil
}
if p.Store().CheckApplyingSnap() {
// If we continue to handle all the messages, it may cause too many messages because
// leader will send all the remaining messages to this follower, which can lead
// to full message queue under high load.
log.S().Debugf("%v still applying snapshot, skip further handling", p.Tag)
return nil
}
if len(p.pendingMessages) > 0 {
messages := p.pendingMessages
p.pendingMessages = nil
if err := p.Send(trans, messages); err != nil {
log.S().Warnf("%v clear snapshot pengding messages err: %v", p.Tag, err)
}
}
if p.HasPendingSnapshot() && !p.ReadyToHandlePendingSnap() {
log.S().Debugf("%v [apply_id: %v, last_applying_idx: %v] is not ready to apply snapshot.", p.Tag, p.Store().AppliedIndex(), p.LastApplyingIdx)
return nil
}
if p.peerStorage.genSnapTask != nil {
applyMsgs.appendMsg(p.regionId, Msg{
Type: MsgTypeApplySnapshot,
Data: p.peerStorage.genSnapTask,
})
p.peerStorage.genSnapTask = nil
}
if !p.RaftGroup.HasReadySince(&p.LastApplyingIdx) {
return nil
}
log.S().Debugf("%v handle raft ready", p.Tag)
ready := p.RaftGroup.ReadySince(p.LastApplyingIdx)
// TODO: workaround for:
// in kvproto/eraftpb, we use *SnapshotMetadata
// but in etcd, they use SnapshotMetadata
if ready.Snapshot.GetMetadata() == nil {
ready.Snapshot.Metadata = &eraftpb.SnapshotMetadata{}
}
p.OnRoleChanged(observer, &ready)
// The leader can write to disk and replicate to the followers concurrently
// For more details, check raft thesis 10.2.1.
if p.IsLeader() {
if err := p.Send(trans, ready.Messages); err != nil {
log.S().Warnf("%v leader send message err: %v", p.Tag, err)
}
ready.Messages = ready.Messages[:0]
}
invokeCtx, err := p.Store().SaveReadyState(kvWB, raftWB, &ready)
if err != nil {
panic(fmt.Sprintf("failed to handle raft ready, error: %v", err))
}
return &ReadyICPair{Ready: ready, IC: invokeCtx}
}
func (p *Peer) PostRaftReadyPersistent(trans Transport, applyMsgs *applyMsgs, ready *raft.Ready, invokeCtx *InvokeContext) *ApplySnapResult {
if invokeCtx.hasSnapshot() {
// When apply snapshot, there is no log applied and not compacted yet.
p.RaftLogSizeHint = 0
}
applySnapResult := p.Store().PostReadyPersistent(invokeCtx)
if applySnapResult != nil && p.Meta.GetRole() == metapb.PeerRole_Learner {
// The peer may change from learner to voter after snapshot applied.
var pr *metapb.Peer
for _, peer := range p.Region().GetPeers() {
if peer.GetId() == p.Meta.GetId() {
pr = &metapb.Peer{
Id: peer.Id,
StoreId: peer.StoreId,
Role: peer.Role,
}
}
}
if !PeerEqual(pr, p.Meta) {
log.S().Infof("%v meta changed in applying snapshot, before %v, after %v", p.Tag, p.Meta, pr)
p.Meta = pr
}
}
if !p.IsLeader() {
if p.IsApplyingSnapshot() {
p.pendingMessages = ready.Messages
ready.Messages = nil
} else {
if err := p.Send(trans, ready.Messages); err != nil {
log.S().Warnf("%v follower send messages err: %v", p.Tag, err)
}
}
}
if applySnapResult != nil {
p.Activate(applyMsgs)
}
return applySnapResult
}
// Try to renew leader lease.
func (p *Peer) MaybeRenewLeaderLease(ts time.Time) {
// A non-leader peer should never has leader lease.
// A splitting leader should not renew its lease.
// Because we split regions asynchronous, the leader may read stale results
// if splitting runs slow on the leader.
// // A merging leader should not renew its lease.
// Because we merge regions asynchronous, the leader may read stale results
// if commit merge runs slow on sibling peers.
if !p.IsLeader() || p.isSplitting() || p.isMerging() {
return
}
p.leaderLease.Renew(ts)
remoteLease := p.leaderLease.MaybeNewRemoteLease(p.Term())
if !p.PendingRemove && remoteLease != nil {
atomic.StorePointer(&p.leaderChecker.leaderLease, unsafe.Pointer(remoteLease))
}
}
func (p *Peer) MaybeCampaign(parentIsLeader bool) bool {
// The peer campaigned when it was created, no need to do it again.
if len(p.Region().GetPeers()) <= 1 || !parentIsLeader {
return false
}
// If last peer is the leader of the region before split, it's intuitional for
// it to become the leader of new split region.
p.RaftGroup.Campaign()
return true
}
func (p *Peer) findProposeTime(index, term uint64) *time.Time {
for {
meta := p.proposals.PopFront(term)
if meta == nil {
return nil
}
if meta.Index == index && meta.Term == term {
return meta.RenewLeaseTime
}
}
}
func (p *Peer) Term() uint64 {
return p.RaftGroup.Raft.Term
}
func (p *Peer) Stop() {
p.Store().CancelApplyingSnap()
}
func (p *Peer) HeartbeatPd(pdScheduler chan<- task) {
pdScheduler <- task{
tp: taskTypePDHeartbeat,
data: &pdRegionHeartbeatTask{
region: p.Region(),
peer: p.Meta,
downPeers: p.CollectDownPeers(time.Minute * 5),
pendingPeers: p.CollectPendingPeers(),
writtenBytes: p.PeerStat.WrittenBytes,
writtenKeys: p.PeerStat.WrittenKeys,
approximateSize: p.ApproximateSize,
approximateKeys: p.ApproximateKeys,
},
}
}
func (p *Peer) sendRaftMessage(msg eraftpb.Message, trans Transport) error {
sendMsg := new(rspb.RaftMessage)
sendMsg.RegionId = p.regionId
// set current epoch
sendMsg.RegionEpoch = &metapb.RegionEpoch{
ConfVer: p.Region().RegionEpoch.ConfVer,
Version: p.Region().RegionEpoch.Version,
}
fromPeer := *p.Meta
toPeer := p.getPeerFromCache(msg.To)
if toPeer == nil {
return fmt.Errorf("failed to lookup recipient peer %v in region %v", msg.To, p.regionId)
}
log.S().Debugf("%v, send raft msg %v from %v to %v", p.Tag, msg.MsgType, fromPeer.Id, toPeer.Id)
sendMsg.FromPeer = &fromPeer
sendMsg.ToPeer = toPeer
// There could be two cases:
// 1. Target peer already exists but has not established communication with leader yet
// 2. Target peer is added newly due to member change or region split, but it's not
// created yet
// For both cases the region start key and end key are attached in RequestVote and
// Heartbeat message for the store of that peer to check whether to create a new peer
// when receiving these messages, or just to wait for a pending region split to perform
// later.
if p.Store().isInitialized() && isInitialMsg(&msg) {
sendMsg.StartKey = append([]byte{}, p.Region().StartKey...)
sendMsg.EndKey = append([]byte{}, p.Region().EndKey...)
}
sendMsg.Message = &msg
return trans.Send(sendMsg)
}
func (p *Peer) HandleRaftReadyApply(kv *mvcc.DBBundle, applyMsgs *applyMsgs, ready *raft.Ready) {
// Call `HandleRaftCommittedEntries` directly here may lead to inconsistency.
// In some cases, there will be some pending committed entries when applying a
// snapshot. If we call `HandleRaftCommittedEntries` directly, these updates
// will be written to disk. Because we apply snapshot asynchronously, so these
// updates will soon be removed. But the soft state of raft is still be updated
// in memory. Hence when handle ready next time, these updates won't be included
// in `ready.committed_entries` again, which will lead to inconsistency.