Skip to content

Commit

Permalink
Merge pull request tikv#5 from joccau/cherry-pick-12770
Browse files Browse the repository at this point in the history
log_backup: adapt the enhanced checkpoint model
  • Loading branch information
joccau committed Jun 11, 2022
2 parents 0e445f8 + c9b5f06 commit b1d493c
Show file tree
Hide file tree
Showing 19 changed files with 1,932 additions and 610 deletions.
689 changes: 283 additions & 406 deletions components/backup-stream/src/endpoint.rs

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion components/backup-stream/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ where
#[macro_export(crate)]
macro_rules! annotate {
($inner: expr, $message: expr) => {
Error::Other(tikv_util::box_err!("{}: {}", $message, $inner))
{
use tikv_util::box_err;
$crate::errors::Error::Other(box_err!("{}: {}", $message, $inner))
}
};
($inner: expr, $format: literal, $($args: expr),+) => {
annotate!($inner, format_args!($format, $($args),+))
Expand Down
67 changes: 43 additions & 24 deletions components/backup-stream/src/event_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ impl PendingMemoryQuota {
/// EventLoader transforms data from the snapshot into ApplyEvent.
pub struct EventLoader<S: Snapshot> {
scanner: DeltaScanner<S>,
// pooling the memory.
entry_batch: EntryBatch,
}

const ENTRY_BATCH_SIZE: usize = 1024;

impl<S: Snapshot> EventLoader<S> {
pub fn load_from(
snapshot: S,
Expand All @@ -93,20 +97,31 @@ impl<S: Snapshot> EventLoader<S> {
from_ts, to_ts, region_id
))?;

Ok(Self { scanner })
Ok(Self {
scanner,
entry_batch: EntryBatch::with_capacity(ENTRY_BATCH_SIZE),
})
}

/// Scan a batch of events from the snapshot, and save them into the internal buffer.
fn fill_entries(&mut self) -> Result<Statistics> {
assert!(
self.entry_batch.is_empty(),
"EventLoader: the entry batch isn't empty when filling entries, which is error-prone, please call `omit_entries` first. (len = {})",
self.entry_batch.len()
);
self.scanner.scan_entries(&mut self.entry_batch)?;
Ok(self.scanner.take_statistics())
}

/// scan a batch of events from the snapshot. Tracking the locks at the same time.
/// note: maybe make something like [`EntryBatch`] for reducing allocation.
fn scan_batch(
/// Drain the internal buffer, converting them to the [`ApplyEvents`],
/// and tracking the locks at the same time.
fn omit_entries_to(
&mut self,
batch_size: usize,
result: &mut ApplyEvents,
resolver: &mut TwoPhaseResolver,
) -> Result<Statistics> {
let mut b = EntryBatch::with_capacity(batch_size);
self.scanner.scan_entries(&mut b)?;
for entry in b.drain() {
) -> Result<()> {
for entry in self.entry_batch.drain() {
match entry {
TxnEntry::Prewrite {
default: (key, value),
Expand Down Expand Up @@ -149,7 +164,7 @@ impl<S: Snapshot> EventLoader<S> {
}
}
}
Ok(self.scanner.take_statistics())
Ok(())
}
}

Expand All @@ -158,15 +173,15 @@ impl<S: Snapshot> EventLoader<S> {
/// Note: maybe we can merge those two structures?
#[derive(Clone)]
pub struct InitialDataLoader<E, R, RT> {
router: RT,
regions: R,
pub(crate) router: RT,
pub(crate) regions: R,
// Note: maybe we can make it an abstract thing like `EventSink` with
// method `async (KvEvent) -> Result<()>`?
sink: Router,
tracing: SubscriptionTracer,
scheduler: Scheduler<Task>,
quota: PendingMemoryQuota,
handle: tokio::runtime::Handle,
pub(crate) sink: Router,
pub(crate) tracing: SubscriptionTracer,
pub(crate) scheduler: Scheduler<Task>,
pub(crate) quota: PendingMemoryQuota,
pub(crate) handle: tokio::runtime::Handle,

_engine: PhantomData<E>,
}
Expand Down Expand Up @@ -215,12 +230,17 @@ where
Error::RaftRequest(pbe) => {
!(pbe.has_epoch_not_match()
|| pbe.has_not_leader()
|| pbe.get_message().contains("stale observe id"))
|| pbe.get_message().contains("stale observe id")
|| pbe.has_region_not_found())
}
Error::RaftStore(raftstore::Error::RegionNotFound(_))
| Error::RaftStore(raftstore::Error::NotLeader(..)) => false,
_ => true,
};
e.report(format_args!(
"during getting initial snapshot for region {:?}; can retry = {}",
region, can_retry
));
last_err = match last_err {
None => Some(e),
Some(err) => Some(Error::Contextual {
Expand Down Expand Up @@ -347,8 +367,8 @@ where
let start = Instant::now();
loop {
let mut events = ApplyEvents::with_capacity(1024, region.id);
let stat =
self.with_resolver(region, |r| event_loader.scan_batch(1024, &mut events, r))?;
let stat = event_loader.fill_entries()?;
self.with_resolver(region, |r| event_loader.omit_entries_to(&mut events, r))?;
if events.is_empty() {
metrics::INITIAL_SCAN_DURATION.observe(start.saturating_elapsed_secs());
return Ok(stats.stat);
Expand Down Expand Up @@ -376,6 +396,7 @@ where
region: &Region,
start_ts: TimeStamp,
snap: impl Snapshot,
on_finish: impl FnOnce() + Send + 'static,
) -> Result<Statistics> {
let _guard = self.handle.enter();
// It is ok to sink more data than needed. So scan to +inf TS for convenance.
Expand Down Expand Up @@ -405,6 +426,7 @@ where
region_id
));
}
on_finish()
});
stats
}
Expand All @@ -425,10 +447,7 @@ where
// At that time, we have nowhere to record the lock status of this region.
let success = try_send!(
self.scheduler,
Task::ModifyObserve(ObserveOp::Start {
region: r.region,
needs_initial_scanning: true
})
Task::ModifyObserve(ObserveOp::Start { region: r.region })
);
if success {
crate::observer::IN_FLIGHT_START_OBSERVE_MESSAGE.fetch_add(1, Ordering::SeqCst);
Expand Down
4 changes: 3 additions & 1 deletion components/backup-stream/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.
#![feature(slice_group_by)]
#![feature(result_flattening)]
#![feature(assert_matches)]
#![feature(test)]
Expand All @@ -8,9 +9,10 @@ mod endpoint;
pub mod errors;
mod event_loader;
pub mod metadata;
mod metrics;
pub(crate) mod metrics;
pub mod observer;
pub mod router;
mod subscription_manager;
mod subscription_track;
mod utils;

Expand Down
Loading

0 comments on commit b1d493c

Please sign in to comment.