forked from pingcap/tidb-engine-ext
/
apply.rs
6436 lines (5938 loc) · 227 KB
/
apply.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0.
// #[PerformanceCriticalPath]
#[cfg(test)]
use std::sync::mpsc::Sender;
use std::{
borrow::Cow,
cmp,
cmp::{Ord, Ordering as CmpOrdering},
collections::VecDeque,
fmt::{self, Debug, Formatter},
mem,
ops::{Deref, DerefMut, Range as StdRange},
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
mpsc::SyncSender,
Arc, Mutex,
},
time::Duration,
usize,
vec::Drain,
};
use batch_system::{
BasicMailbox, BatchRouter, BatchSystem, Config as BatchSystemConfig, Fsm, HandleResult,
HandlerBuilder, PollHandler, Priority,
};
use collections::{HashMap, HashMapEntry, HashSet};
use crossbeam::channel::{TryRecvError, TrySendError};
use engine_traits::{
DeleteStrategy, KvEngine, Mutable, PerfContext, PerfContextKind, RaftEngine,
RaftEngineReadOnly, Range as EngineRange, Snapshot, SstMetaInfo, WriteBatch, ALL_CFS,
CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE,
};
use fail::fail_point;
use kvproto::{
import_sstpb::SstMeta,
kvrpcpb::ExtraOp as TxnExtraOp,
metapb::{PeerRole, Region, RegionEpoch},
raft_cmdpb::{
AdminCmdType, AdminRequest, AdminResponse, ChangePeerRequest, CmdType, CommitMergeRequest,
RaftCmdRequest, RaftCmdResponse, Request,
},
raft_serverpb::{MergeState, PeerState, RaftApplyState, RaftTruncatedState, RegionLocalState},
};
use pd_client::{new_bucket_stats, BucketMeta, BucketStat};
use prometheus::local::LocalHistogram;
use raft::eraftpb::{
ConfChange, ConfChangeType, ConfChangeV2, Entry, EntryType, Snapshot as RaftSnapshot,
};
use raft_proto::ConfChangeI;
use smallvec::{smallvec, SmallVec};
use sst_importer::SstImporter;
use tikv_alloc::trace::TraceEvent;
use tikv_util::{
box_err, box_try,
config::{Tracker, VersionTrack},
debug, error, info,
memory::HeapSize,
mpsc::{loose_bounded, LooseBoundedSender, Receiver},
safe_panic, slow_log,
time::{duration_to_sec, Instant},
warn,
worker::Scheduler,
Either, MustConsumeVec,
};
use time::Timespec;
use tracker::GLOBAL_TRACKERS;
use uuid::Builder as UuidBuilder;
use self::memtrace::*;
use super::metrics::*;
use crate::{
bytes_capacity,
coprocessor::{
ApplyCtxInfo, Cmd, CmdBatch, CmdObserveInfo, CoprocessorHost, ObserveHandle, ObserveLevel,
RegionState,
},
store::{
cmd_resp,
entry_storage::{self, CachedEntries},
fsm::RaftPollerBuilder,
local_metrics::{RaftMetrics, TimeTracker},
memory::*,
metrics::*,
msg::{Callback, PeerMsg, ReadResponse, SignificantMsg},
peer::Peer,
peer_storage::{write_initial_apply_state, write_peer_state},
util,
util::{
admin_cmd_epoch_lookup, check_region_epoch, compare_region_epoch, is_learner,
ChangePeerI, ConfChangeKind, KeysInfoFormatter, LatencyInspector,
},
Config, RegionSnapshot, RegionTask,
},
Error, Result,
};
const DEFAULT_APPLY_WB_SIZE: usize = 4 * 1024;
const APPLY_WB_SHRINK_SIZE: usize = 1024 * 1024;
const SHRINK_PENDING_CMD_QUEUE_CAP: usize = 64;
const MAX_APPLY_BATCH_SIZE: usize = 64 * 1024 * 1024;
pub struct PendingCmd<S>
where
S: Snapshot,
{
pub index: u64,
pub term: u64,
pub cb: Option<Callback<S>>,
}
impl<S> PendingCmd<S>
where
S: Snapshot,
{
fn new(index: u64, term: u64, cb: Callback<S>) -> PendingCmd<S> {
PendingCmd {
index,
term,
cb: Some(cb),
}
}
}
impl<S> Drop for PendingCmd<S>
where
S: Snapshot,
{
fn drop(&mut self) {
if self.cb.is_some() {
safe_panic!(
"callback of pending command at [index: {}, term: {}] is leak",
self.index,
self.term
);
}
}
}
impl<S> Debug for PendingCmd<S>
where
S: Snapshot,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"PendingCmd [index: {}, term: {}, has_cb: {}]",
self.index,
self.term,
self.cb.is_some()
)
}
}
impl<S: Snapshot> HeapSize for PendingCmd<S> {}
/// Commands waiting to be committed and applied.
#[derive(Debug)]
pub struct PendingCmdQueue<S>
where
S: Snapshot,
{
normals: VecDeque<PendingCmd<S>>,
conf_change: Option<PendingCmd<S>>,
}
impl<S> PendingCmdQueue<S>
where
S: Snapshot,
{
fn new() -> PendingCmdQueue<S> {
PendingCmdQueue {
normals: VecDeque::new(),
conf_change: None,
}
}
fn pop_normal(&mut self, index: u64, term: u64) -> Option<PendingCmd<S>> {
self.normals.pop_front().and_then(|cmd| {
if self.normals.capacity() > SHRINK_PENDING_CMD_QUEUE_CAP
&& self.normals.len() < SHRINK_PENDING_CMD_QUEUE_CAP
{
self.normals.shrink_to_fit();
}
if (cmd.term, cmd.index) > (term, index) {
self.normals.push_front(cmd);
return None;
}
Some(cmd)
})
}
fn append_normal(&mut self, cmd: PendingCmd<S>) {
self.normals.push_back(cmd);
}
fn take_conf_change(&mut self) -> Option<PendingCmd<S>> {
// conf change will not be affected when changing between follower and leader,
// so there is no need to check term.
self.conf_change.take()
}
// TODO: seems we don't need to separate conf change from normal entries.
fn set_conf_change(&mut self, cmd: PendingCmd<S>) {
self.conf_change = Some(cmd);
}
}
#[derive(Default, Debug)]
pub struct ChangePeer {
pub index: u64,
// The proposed ConfChangeV2 or (legacy) ConfChange
// ConfChange (if it is) will convert to ConfChangeV2
pub conf_change: ConfChangeV2,
// The change peer requests come along with ConfChangeV2
// or (legacy) ConfChange, for ConfChange, it only contains
// one element
pub changes: Vec<ChangePeerRequest>,
pub region: Region,
}
pub struct Range {
pub cf: String,
pub start_key: Vec<u8>,
pub end_key: Vec<u8>,
}
impl Debug for Range {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"{{ cf: {:?}, start_key: {:?}, end_key: {:?} }}",
self.cf,
log_wrappers::Value::key(&self.start_key),
log_wrappers::Value::key(&self.end_key)
)
}
}
impl Range {
fn new(cf: String, start_key: Vec<u8>, end_key: Vec<u8>) -> Range {
Range {
cf,
start_key,
end_key,
}
}
}
#[derive(Debug)]
pub enum ExecResult<S> {
ChangePeer(ChangePeer),
CompactLog {
state: RaftTruncatedState,
first_index: u64,
},
SplitRegion {
regions: Vec<Region>,
derived: Region,
new_split_regions: HashMap<u64, NewSplitPeer>,
},
PrepareMerge {
region: Region,
state: MergeState,
},
CommitMerge {
index: u64,
region: Region,
source: Region,
},
RollbackMerge {
region: Region,
commit: u64,
},
ComputeHash {
region: Region,
index: u64,
context: Vec<u8>,
snap: S,
},
VerifyHash {
index: u64,
context: Vec<u8>,
hash: Vec<u8>,
},
DeleteRange {
ranges: Vec<Range>,
},
IngestSst {
ssts: Vec<SstMetaInfo>,
},
TransferLeader {
term: u64,
},
}
/// The possible returned value when applying logs.
#[derive(Debug)]
pub enum ApplyResult<S> {
None,
Yield,
/// Additional result that needs to be sent back to raftstore.
Res(ExecResult<S>),
/// It is unable to apply the `CommitMerge` until the source peer
/// has applied to the required position and sets the atomic boolean
/// to true.
WaitMergeSource(Arc<AtomicU64>),
}
// The applied command and their callback
struct ApplyCallbackBatch<S>
where
S: Snapshot,
{
cmd_batch: Vec<CmdBatch>,
// The max observe level of current `Vec<CmdBatch>`
batch_max_level: ObserveLevel,
cb_batch: MustConsumeVec<(Callback<S>, RaftCmdResponse)>,
}
impl<S: Snapshot> ApplyCallbackBatch<S> {
fn new() -> ApplyCallbackBatch<S> {
ApplyCallbackBatch {
cmd_batch: vec![],
batch_max_level: ObserveLevel::None,
cb_batch: MustConsumeVec::new("callback of apply callback batch"),
}
}
fn push_batch(&mut self, observe_info: &CmdObserveInfo, region_id: u64) {
let cb = CmdBatch::new(observe_info, region_id);
self.batch_max_level = cmp::max(self.batch_max_level, cb.level);
self.cmd_batch.push(cb);
}
fn push_cb(&mut self, cb: Callback<S>, resp: RaftCmdResponse) {
self.cb_batch.push((cb, resp));
}
fn push(
&mut self,
cb: Option<Callback<S>>,
cmd: Cmd,
observe_info: &CmdObserveInfo,
region_id: u64,
) {
if let Some(cb) = cb {
self.cb_batch.push((cb, cmd.response.clone()));
}
self.cmd_batch
.last_mut()
.unwrap()
.push(observe_info, region_id, cmd);
}
}
pub trait Notifier<EK: KvEngine>: Send {
fn notify(&self, apply_res: Vec<ApplyRes<EK::Snapshot>>);
fn notify_one(&self, region_id: u64, msg: PeerMsg<EK>);
fn clone_box(&self) -> Box<dyn Notifier<EK>>;
}
struct ApplyContext<EK>
where
EK: KvEngine,
{
tag: String,
timer: Option<Instant>,
host: CoprocessorHost<EK>,
importer: Arc<SstImporter>,
region_scheduler: Scheduler<RegionTask<EK::Snapshot>>,
router: ApplyRouter<EK>,
notifier: Box<dyn Notifier<EK>>,
engine: EK,
applied_batch: ApplyCallbackBatch<EK::Snapshot>,
apply_res: Vec<ApplyRes<EK::Snapshot>>,
exec_log_index: u64,
exec_log_term: u64,
kv_wb: EK::WriteBatch,
kv_wb_last_bytes: u64,
kv_wb_last_keys: u64,
committed_count: usize,
// Whether synchronize WAL is preferred.
sync_log_hint: bool,
// Whether to use the delete range API instead of deleting one by one.
use_delete_range: bool,
perf_context: EK::PerfContext,
yield_duration: Duration,
store_id: u64,
/// region_id -> (peer_id, is_splitting)
/// Used for handling race between splitting and creating new peer.
/// An uninitialized peer can be replaced to the one from splitting iff they
/// are exactly the same peer.
pending_create_peers: Arc<Mutex<HashMap<u64, (u64, bool)>>>,
/// We must delete the ingested file before calling `callback` so that any
/// ingest-request reaching this peer could see this update if leader
/// had changed. We must also delete them after the applied-index
/// has been persisted to kvdb because this entry may replay because of
/// panic or power-off, which happened before `WriteBatch::write` and
/// after `SstImporter::delete`. We shall make sure that this entry will
/// never apply again at first, then we can delete the ssts files.
delete_ssts: Vec<SstMetaInfo>,
/// A self-defined engine may be slow to ingest ssts.
/// It may move some elements of `delete_ssts` into `pending_delete_ssts` to
/// delay deletion. Otherwise we may lost data.
pending_delete_ssts: Vec<SstMetaInfo>,
/// The priority of this Handler.
priority: Priority,
/// Whether to yield high-latency operation to low-priority handler.
yield_high_latency_operation: bool,
/// The ssts waiting to be ingested in `write_to_db`.
pending_ssts: Vec<SstMetaInfo>,
/// The pending inspector should be cleaned at the end of a write.
pending_latency_inspect: Vec<LatencyInspector>,
apply_wait: LocalHistogram,
apply_time: LocalHistogram,
key_buffer: Vec<u8>,
}
impl<EK> ApplyContext<EK>
where
EK: KvEngine,
{
pub fn new(
tag: String,
host: CoprocessorHost<EK>,
importer: Arc<SstImporter>,
region_scheduler: Scheduler<RegionTask<EK::Snapshot>>,
engine: EK,
router: ApplyRouter<EK>,
notifier: Box<dyn Notifier<EK>>,
cfg: &Config,
store_id: u64,
pending_create_peers: Arc<Mutex<HashMap<u64, (u64, bool)>>>,
priority: Priority,
) -> ApplyContext<EK> {
let kv_wb = engine.write_batch_with_cap(DEFAULT_APPLY_WB_SIZE);
ApplyContext {
tag,
timer: None,
host,
importer,
region_scheduler,
engine: engine.clone(),
router,
notifier,
kv_wb,
applied_batch: ApplyCallbackBatch::new(),
apply_res: vec![],
exec_log_index: 0,
exec_log_term: 0,
kv_wb_last_bytes: 0,
kv_wb_last_keys: 0,
committed_count: 0,
sync_log_hint: false,
use_delete_range: cfg.use_delete_range,
perf_context: engine.get_perf_context(cfg.perf_level, PerfContextKind::RaftstoreApply),
yield_duration: cfg.apply_yield_duration.0,
delete_ssts: vec![],
pending_delete_ssts: vec![],
store_id,
pending_create_peers,
priority,
yield_high_latency_operation: cfg.apply_batch_system.low_priority_pool_size > 0,
pending_ssts: vec![],
pending_latency_inspect: vec![],
apply_wait: APPLY_TASK_WAIT_TIME_HISTOGRAM.local(),
apply_time: APPLY_TIME_HISTOGRAM.local(),
key_buffer: Vec::with_capacity(1024),
}
}
/// Prepares for applying entries for `delegate`.
///
/// A general apply progress for a delegate is:
/// `prepare_for` -> `commit` [-> `commit` ...] -> `finish_for`.
/// After all delegates are handled, `write_to_db` method should be called.
pub fn prepare_for(&mut self, delegate: &mut ApplyDelegate<EK>) {
self.applied_batch
.push_batch(&delegate.observe_info, delegate.region.get_id());
}
/// Commits all changes have done for delegate. `persistent` indicates
/// whether write the changes into rocksdb.
///
/// This call is valid only when it's between a `prepare_for` and
/// `finish_for`.
pub fn commit(&mut self, delegate: &mut ApplyDelegate<EK>) {
if delegate.last_flush_applied_index < delegate.apply_state.get_applied_index() {
delegate.write_apply_state(self.kv_wb_mut());
}
self.commit_opt(delegate, true);
}
fn commit_opt(&mut self, delegate: &mut ApplyDelegate<EK>, persistent: bool) {
delegate.update_metrics(self);
if persistent {
self.write_to_db();
self.prepare_for(delegate);
delegate.last_flush_applied_index = delegate.apply_state.get_applied_index()
}
self.kv_wb_last_bytes = self.kv_wb().data_size() as u64;
self.kv_wb_last_keys = self.kv_wb().count() as u64;
}
/// Writes all the changes into RocksDB.
/// If it returns true, all pending writes are persisted in engines.
pub fn write_to_db(&mut self) -> bool {
let need_sync = self.sync_log_hint;
// There may be put and delete requests after ingest request in the same fsm.
// To guarantee the correct order, we must ingest the pending_sst first, and
// then persist the kv write batch to engine.
if !self.pending_ssts.is_empty() {
let tag = self.tag.clone();
self.importer
.ingest(&self.pending_ssts, &self.engine)
.unwrap_or_else(|e| {
panic!(
"{} failed to ingest ssts {:?}: {:?}",
tag, self.pending_ssts, e
);
});
self.pending_ssts = vec![];
}
if !self.kv_wb_mut().is_empty() {
self.perf_context.start_observe();
let mut write_opts = engine_traits::WriteOptions::new();
write_opts.set_sync(need_sync);
self.kv_wb().write_opt(&write_opts).unwrap_or_else(|e| {
panic!("failed to write to engine: {:?}", e);
});
let trackers: Vec<_> = self
.applied_batch
.cb_batch
.iter()
.flat_map(|(cb, _)| cb.get_trackers())
.flat_map(|trackers| trackers.iter().map(|t| t.as_tracker_token()))
.flatten()
.collect();
self.perf_context.report_metrics(&trackers);
self.sync_log_hint = false;
let data_size = self.kv_wb().data_size();
if data_size > APPLY_WB_SHRINK_SIZE {
// Control the memory usage for the WriteBatch.
self.kv_wb = self.engine.write_batch_with_cap(DEFAULT_APPLY_WB_SIZE);
} else {
// Clear data, reuse the WriteBatch, this can reduce memory allocations and
// deallocations.
self.kv_wb_mut().clear();
}
self.kv_wb_last_bytes = 0;
self.kv_wb_last_keys = 0;
}
if !self.delete_ssts.is_empty() {
let tag = self.tag.clone();
for sst in self.delete_ssts.drain(..) {
self.importer.delete(&sst.meta).unwrap_or_else(|e| {
panic!("{} cleanup ingested file {:?}: {:?}", tag, sst, e);
});
}
}
// Take the applied commands and their callback
let ApplyCallbackBatch {
cmd_batch,
batch_max_level,
mut cb_batch,
} = mem::replace(&mut self.applied_batch, ApplyCallbackBatch::new());
// Call it before invoking callback for preventing Commit is executed before
// Prewrite is observed.
self.host
.on_flush_applied_cmd_batch(batch_max_level, cmd_batch, &self.engine);
// Invoke callbacks
let now = std::time::Instant::now();
for (cb, resp) in cb_batch.drain(..) {
for tracker in cb.get_trackers().iter().flat_map(|v| *v) {
tracker.observe(now, &self.apply_time, |t| &mut t.metrics.apply_time_nanos);
}
cb.invoke_with_response(resp);
}
self.apply_time.flush();
self.apply_wait.flush();
need_sync
}
/// Finishes `Apply`s for the delegate.
pub fn finish_for(
&mut self,
delegate: &mut ApplyDelegate<EK>,
results: VecDeque<ExecResult<EK::Snapshot>>,
) {
if !delegate.pending_remove {
delegate.write_apply_state(self.kv_wb_mut());
}
self.commit_opt(delegate, false);
self.apply_res.push(ApplyRes {
region_id: delegate.region_id(),
apply_state: delegate.apply_state.clone(),
exec_res: results,
metrics: delegate.metrics.clone(),
applied_term: delegate.applied_term,
bucket_stat: delegate.buckets.clone().map(Box::new),
});
}
pub fn delta_bytes(&self) -> u64 {
self.kv_wb().data_size() as u64 - self.kv_wb_last_bytes
}
pub fn delta_keys(&self) -> u64 {
self.kv_wb().count() as u64 - self.kv_wb_last_keys
}
#[inline]
pub fn kv_wb(&self) -> &EK::WriteBatch {
&self.kv_wb
}
#[inline]
pub fn kv_wb_mut(&mut self) -> &mut EK::WriteBatch {
&mut self.kv_wb
}
/// Flush all pending writes to engines.
/// If it returns true, all pending writes are persisted in engines.
pub fn flush(&mut self) -> bool {
// TODO: this check is too hacky, need to be more verbose and less buggy.
let t = match self.timer.take() {
Some(t) => t,
None => return false,
};
// Write to engine
// raftstore.sync-log = true means we need prevent data loss when power failure.
// take raft log gc for example, we write kv WAL first, then write raft WAL,
// if power failure happen, raft WAL may synced to disk, but kv WAL may not.
// so we use sync-log flag here.
let is_synced = self.write_to_db();
if !self.apply_res.is_empty() {
fail_point!("before_nofity_apply_res");
let apply_res = mem::take(&mut self.apply_res);
self.notifier.notify(apply_res);
}
let elapsed = t.saturating_elapsed();
STORE_APPLY_LOG_HISTOGRAM.observe(duration_to_sec(elapsed) as f64);
for mut inspector in std::mem::take(&mut self.pending_latency_inspect) {
inspector.record_apply_process(elapsed);
inspector.finish();
}
slow_log!(
elapsed,
"{} handle ready {} committed entries",
self.tag,
self.committed_count
);
self.committed_count = 0;
is_synced
}
}
/// Calls the callback of `cmd` when the Region is removed.
fn notify_region_removed(region_id: u64, peer_id: u64, mut cmd: PendingCmd<impl Snapshot>) {
debug!(
"region is removed, notify commands";
"region_id" => region_id,
"peer_id" => peer_id,
"index" => cmd.index,
"term" => cmd.term
);
notify_req_region_removed(region_id, cmd.cb.take().unwrap());
}
pub fn notify_req_region_removed(region_id: u64, cb: Callback<impl Snapshot>) {
let region_not_found = Error::RegionNotFound(region_id);
let resp = cmd_resp::new_error(region_not_found);
cb.invoke_with_response(resp);
}
/// Calls the callback of `cmd` when it can not be processed further.
fn notify_stale_command(
region_id: u64,
peer_id: u64,
term: u64,
mut cmd: PendingCmd<impl Snapshot>,
) {
info!(
"command is stale, skip";
"region_id" => region_id,
"peer_id" => peer_id,
"index" => cmd.index,
"term" => cmd.term
);
notify_stale_req(term, cmd.cb.take().unwrap());
}
pub fn notify_stale_req(term: u64, cb: Callback<impl Snapshot>) {
let resp = cmd_resp::err_resp(Error::StaleCommand, term);
cb.invoke_with_response(resp);
}
pub fn notify_stale_req_with_msg(term: u64, msg: String, cb: Callback<impl Snapshot>) {
let mut resp = cmd_resp::err_resp(Error::StaleCommand, term);
resp.mut_header().mut_error().set_message(msg);
cb.invoke_with_response(resp);
}
/// Checks if a write is needed to be issued before handling the command.
fn should_write_to_engine(cmd: &RaftCmdRequest) -> bool {
if cmd.has_admin_request() {
match cmd.get_admin_request().get_cmd_type() {
// ComputeHash require an up to date snapshot.
AdminCmdType::ComputeHash |
// Merge needs to get the latest apply index.
AdminCmdType::CommitMerge |
AdminCmdType::RollbackMerge => return true,
_ => {}
}
}
// Some commands may modify keys covered by the current write batch, so we
// must write the current write batch to the engine first.
for req in cmd.get_requests() {
if req.has_delete_range() {
return true;
}
if req.has_ingest_sst() {
return true;
}
}
false
}
/// Checks if a write has high-latency operation.
fn has_high_latency_operation(cmd: &RaftCmdRequest) -> bool {
for req in cmd.get_requests() {
if req.has_delete_range() {
return true;
}
if req.has_ingest_sst() {
return true;
}
}
false
}
/// Checks if a write is needed to be issued after handling the command.
fn should_sync_log(cmd: &RaftCmdRequest) -> bool {
if cmd.has_admin_request() {
if cmd.get_admin_request().get_cmd_type() == AdminCmdType::CompactLog {
// We do not need to sync WAL before compact log, because this request will send
// a msg to raft_gc_log thread to delete the entries before this
// index instead of deleting them in apply thread directly.
return false;
}
return true;
}
for req in cmd.get_requests() {
// After ingest sst, sst files are deleted quickly. As a result,
// ingest sst command can not be handled again and must be synced.
// See more in Cleanup worker.
if req.has_ingest_sst() {
return true;
}
}
false
}
/// A struct that stores the state related to Merge.
///
/// When executing a `CommitMerge`, the source peer may have not applied
/// to the required index, so the target peer has to abort current execution
/// and wait for it asynchronously.
///
/// When rolling the stack, all states required to recover are stored in
/// this struct.
/// TODO: check whether generator/coroutine is a good choice in this case.
struct WaitSourceMergeState {
/// A flag that indicates whether the source peer has applied to the
/// required index. If the source peer is ready, this flag should be set
/// to the region id of source peer.
logs_up_to_date: Arc<AtomicU64>,
}
struct YieldState<EK>
where
EK: KvEngine,
{
/// All of the entries that need to continue to be applied after
/// the source peer has applied its logs.
pending_entries: Vec<Entry>,
/// All of messages that need to continue to be handled after
/// the source peer has applied its logs and pending entries
/// are all handled.
pending_msgs: Vec<Msg<EK>>,
/// Cache heap size for itself.
heap_size: Option<usize>,
}
impl<EK> Debug for YieldState<EK>
where
EK: KvEngine,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("YieldState")
.field("pending_entries", &self.pending_entries.len())
.field("pending_msgs", &self.pending_msgs.len())
.finish()
}
}
impl Debug for WaitSourceMergeState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WaitSourceMergeState")
.field("logs_up_to_date", &self.logs_up_to_date)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct NewSplitPeer {
pub peer_id: u64,
// `None` => success,
// `Some(s)` => fail due to `s`.
pub result: Option<String>,
}
/// The apply delegate of a Region which is responsible for handling committed
/// raft log entries of a Region.
///
/// `Apply` is a term of Raft, which means executing the actual commands.
/// In Raft, once some log entries are committed, for every peer of the Raft
/// group will apply the logs one by one. For write commands, it does write or
/// delete to local engine; for admin commands, it does some meta change of the
/// Raft group.
///
/// `Delegate` is just a structure to congregate all apply related fields of a
/// Region. The apply worker receives all the apply tasks of different Regions
/// located at this store, and it will get the corresponding apply delegate to
/// handle the apply task to make the code logic more clear.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct ApplyDelegate<EK>
where
EK: KvEngine,
{
/// The ID of the peer.
id: u64,
/// The term of the Region.
term: u64,
/// The Region information of the peer.
region: Region,
/// Peer_tag, "[region region_id] peer_id".
tag: String,
/// If the delegate should be stopped from polling.
/// A delegate can be stopped in conf change, merge or requested by destroy
/// message.
stopped: bool,
/// The start time of the current round to execute commands.
handle_start: Option<Instant>,
/// Set to true when removing itself because of
/// `ConfChangeType::RemoveNode`, and then any following committed logs
/// in same Ready should be applied failed.
pending_remove: bool,
/// The commands waiting to be committed and applied
pending_cmds: PendingCmdQueue<EK::Snapshot>,
/// The counter of pending request snapshots. See more in `Peer`.
pending_request_snapshot_count: Arc<AtomicUsize>,
/// Indicates the peer is in merging, if that compact log won't be
/// performed.
is_merging: bool,
/// Records the epoch version after the last merge.
last_merge_version: u64,
yield_state: Option<YieldState<EK>>,
/// A temporary state that keeps track of the progress of the source peer
/// state when CommitMerge is unable to be executed.
wait_merge_state: Option<WaitSourceMergeState>,
// ID of last region that reports ready.
ready_source_region_id: u64,
/// TiKV writes apply_state to KV RocksDB, in one write batch together with
/// kv data.
///
/// If we write it to Raft RocksDB, apply_state and kv data (Put, Delete)
/// are in separate WAL file. When power failure, for current raft log,
/// apply_index may synced to file, but KV data may not synced to file,
/// so we will lose data.
apply_state: RaftApplyState,
/// The term of the raft log at applied index.
applied_term: u64,
/// The latest flushed applied index.
last_flush_applied_index: u64,
/// Info about cmd observer.
observe_info: CmdObserveInfo,
/// The local metrics, and it will be flushed periodically.
metrics: ApplyMetrics,
/// Priority in batch system. When applying some commands which have high
/// latency, we decrease the priority of current fsm to reduce the
/// impact on other normal commands.
priority: Priority,
/// To fetch Raft entries for applying if necessary.
#[derivative(Debug = "ignore")]
raft_engine: Box<dyn RaftEngineReadOnly>,
trace: ApplyMemoryTrace,
buckets: Option<BucketStat>,
}
impl<EK> ApplyDelegate<EK>
where
EK: KvEngine,
{
fn from_registration(reg: Registration) -> ApplyDelegate<EK> {
ApplyDelegate {
id: reg.id,
tag: format!("[region {}] {}", reg.region.get_id(), reg.id),
region: reg.region,
pending_remove: false,
last_flush_applied_index: reg.apply_state.get_applied_index(),
apply_state: reg.apply_state,
applied_term: reg.applied_term,
term: reg.term,
stopped: false,
handle_start: None,
ready_source_region_id: 0,
yield_state: None,
wait_merge_state: None,
is_merging: reg.is_merging,
pending_cmds: PendingCmdQueue::new(),
metrics: Default::default(),
last_merge_version: 0,
pending_request_snapshot_count: reg.pending_request_snapshot_count,
// use a default `CmdObserveInfo` because observing is disable by default
observe_info: CmdObserveInfo::default(),
priority: Priority::Normal,
raft_engine: reg.raft_engine,
trace: ApplyMemoryTrace::default(),
buckets: None,
}
}
pub fn region_id(&self) -> u64 {
self.region.get_id()
}
pub fn id(&self) -> u64 {
self.id
}
/// Handles all the committed_entries, namely, applies the committed
/// entries.
fn handle_raft_committed_entries(
&mut self,
apply_ctx: &mut ApplyContext<EK>,
mut committed_entries_drainer: Drain<'_, Entry>,
) {
if committed_entries_drainer.len() == 0 {
return;
}
apply_ctx.prepare_for(self);
// If we send multiple ConfChange commands, only first one will be proposed
// correctly, others will be saved as a normal entry with no data, so we
// must re-propose these commands again.
apply_ctx.committed_count += committed_entries_drainer.len();
let mut results = VecDeque::new();
while let Some(entry) = committed_entries_drainer.next() {
if self.pending_remove {
// This peer is about to be destroyed, skip everything.
break;
}
let expect_index = self.apply_state.get_applied_index() + 1;
if expect_index != entry.get_index() {