Skip to content

Commit

Permalink
set checkpoint to global checkpoint once start
Browse files Browse the repository at this point in the history
Signed-off-by: Yu Juncen <yujuncen@pingcap.com>
  • Loading branch information
YuJuncen committed May 16, 2022
1 parent c209007 commit d3408c1
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 34 deletions.
61 changes: 37 additions & 24 deletions components/backup-stream/src/async_task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,15 @@ where
metrics::INITIAL_SCAN_REASON
.with_label_values(&["region-changed"])
.inc();
if let Err(e) = self
.observe_over_with_initial_data_from_checkpoint(
let r = async {
Result::Ok(self.observe_over_with_initial_data_from_checkpoint(
region,
for_task,
self.get_last_checkpoint_of(&for_task).await?,
handle.clone(),
)
.await
{
))
}
.await;
if let Err(e) = r {
try_send!(
self.scheduler,
Task::ModifyObserve(ObserveOp::NotifyFailToStartObserve {
Expand All @@ -304,9 +305,8 @@ where
}
}

async fn start_observe(&self, region: Region) {
let handle = ObserveHandle::new();
let result = match self.find_task_by_region(&region) {
async fn try_start_observe(&self, region: &Region, handle: ObserveHandle) -> Result<()> {
match self.find_task_by_region(&region) {
None => {
warn!(
"the region {:?} is register to no task but being observed (start_key = {}; end_key = {}; task_stat = {:?}): maybe stale, aborting",
Expand All @@ -315,20 +315,30 @@ where
utils::redact(&region.get_end_key()),
self.range_router
);
return;
}

Some(for_task) => {
self.observe_over_with_initial_data_from_checkpoint(
&region,
for_task,
handle.clone(),
)
.await
let tso = self.get_last_checkpoint_of(&for_task).await?;
// We should set the local checkpoint to the newly added task ASAP,
// or once the original leader store of this region get rid of the slow-progress region,
// it may advance the global checkpoint unexpectlly.
// NOTE: maybe `spawn` here?
self.get_meta_client()
.set_local_task_checkpoint(&for_task, tso.into_inner())
.await?;
self.observe_over_with_initial_data_from_checkpoint(&region, tso, handle.clone());
}
};
}
Ok(())
}

if let Err(err) = result {
fn get_meta_client(&self) -> &MetadataClient<S> {
self.meta_cli.as_ref().unwrap()
}

async fn start_observe(&self, region: Region) {
let handle = ObserveHandle::new();
if let Err(err) = self.try_start_observe(&region, handle.clone()).await {
try_send!(
self.scheduler,
Task::ModifyObserve(ObserveOp::NotifyFailToStartObserve {
Expand Down Expand Up @@ -391,14 +401,18 @@ where
Ok(())
}

async fn observe_over_with_initial_data_from_checkpoint(
async fn get_last_checkpoint_of(&self, task: &str) -> Result<TimeStamp> {
let meta_cli = self.meta_cli.as_ref().unwrap().clone();
let last_checkpoint = TimeStamp::new(meta_cli.global_progress_of_task(task).await?);
Ok(last_checkpoint)
}

fn observe_over_with_initial_data_from_checkpoint(
&self,
region: &Region,
task: String,
last_checkpoint: TimeStamp,
handle: ObserveHandle,
) -> Result<()> {
let meta_cli = self.meta_cli.as_ref().unwrap().clone();
let last_checkpoint = TimeStamp::new(meta_cli.global_progress_of_task(&task).await?);
) {
self.subs
.register_region(region, handle.clone(), Some(last_checkpoint));

Expand All @@ -417,7 +431,6 @@ where
region.get_id()
));
}
Ok(())
}

fn find_task_by_region(&self, r: &Region) -> Option<String> {
Expand Down
9 changes: 2 additions & 7 deletions components/backup-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::{
convert::AsRef, fmt, marker::PhantomData, path::PathBuf, sync::Arc,
time::Duration,
};
use std::{convert::AsRef, fmt, marker::PhantomData, path::PathBuf, sync::Arc, time::Duration};

use concurrency_manager::ConcurrencyManager;
use engine_traits::KvEngine;
Expand All @@ -15,7 +12,6 @@ use kvproto::{
};
use online_config::ConfigChange;
use pd_client::PdClient;

use raftstore::{
coprocessor::{CmdBatch, ObserveHandle, RegionInfoProvider},
router::RaftStoreRouter,
Expand All @@ -36,7 +32,6 @@ use tokio::{
use tokio_stream::StreamExt;
use txn_types::TimeStamp;


use super::metrics::HANDLE_EVENT_DURATION_HISTOGRAM;
use crate::{
async_task_manager::RegionSubscriptionManager,
Expand Down Expand Up @@ -710,7 +705,7 @@ where
Error::from(err).report("failed to update service safe point!");
// don't give up?
}
if let Err(err) = meta_cli.step_task(&task, rts).await {
if let Err(err) = meta_cli.set_local_task_checkpoint(&task, rts).await {
err.report(format!("on flushing task {}", task));
// we can advance the progress at next time.
// return early so we won't be mislead by the metrics.
Expand Down
2 changes: 1 addition & 1 deletion components/backup-stream/src/metadata/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl<Store: MetaStore> MetadataClient<Store> {
}

/// forward the progress of some task.
pub async fn step_task(&self, task_name: &str, ts: u64) -> Result<()> {
pub async fn set_local_task_checkpoint(&self, task_name: &str, ts: u64) -> Result<()> {
let now = Instant::now();
defer! {
super::metrics::METADATA_OPERATION_LATENCY.with_label_values(&["task_step"]).observe(now.saturating_elapsed().as_secs_f64())
Expand Down
4 changes: 2 additions & 2 deletions components/backup-stream/src/metadata/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ async fn test_progress() -> Result<()> {
cli.insert_task_with_range(&task, &[]).await?;
let progress = cli.progress_of_task(&task.info.name).await?;
assert_eq!(progress, task.info.start_ts);
cli.step_task(&task.info.name, 42).await?;
cli.set_local_task_checkpoint(&task.info.name, 42).await?;
let progress = cli.progress_of_task(&task.info.name).await?;
assert_eq!(progress, 42);
cli.step_task(&task.info.name, 43).await?;
cli.set_local_task_checkpoint(&task.info.name, 43).await?;
let progress = cli.progress_of_task(&task.info.name).await?;
assert_eq!(progress, 43);
let other_store = MetadataClient::new(cli.meta_store.clone(), 43);
Expand Down

0 comments on commit d3408c1

Please sign in to comment.