Skip to content

Commit

Permalink
log-backup: fix early return (tikv#13288)
Browse files Browse the repository at this point in the history
close tikv#13281

Fixed a bug that may cause data loss in log backup.

Signed-off-by: Yu Juncen <yu745514916@live.com>
Signed-off-by: fengou1 <feng.ou@pingcap.com>
  • Loading branch information
YuJuncen authored and fengou1 committed Aug 30, 2022
1 parent ad5736f commit 24df9a8
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/backup-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ engine_traits = { path = "../engine_traits", default-features = false }
error_code = { path = "../error_code" }
# We cannot update the etcd-client to latest version because of the cyclic requirement.
# Also we need wait until https://github.com/etcdv3/etcd-client/pull/43/files to be merged.
etcd-client = { git = "https://github.com/yujuncen/etcd-client", rev = "e0321a1990ee561cf042973666c0db61c8d82364", features = ["pub-response-field", "tls"] }
etcd-client = { git = "https://github.com/pingcap/etcd-client", rev = "e0321a1990ee561cf042973666c0db61c8d82364", features = ["pub-response-field", "tls"] }
external_storage = { path = "../external_storage", default-features = false }
external_storage_export = { path = "../external_storage/export", default-features = false }
fail = "0.5"
Expand Down
9 changes: 8 additions & 1 deletion components/backup-stream/src/event_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,16 @@ where
// we only need to record the disk throughput of this.
let (stat, disk_read) =
utils::with_record_read_throughput(|| event_loader.fill_entries());
// We must use the size of entry batch here to check whether we have progress.
// Or we may exit too early if there are only records:
// - can be inlined to `write` CF (hence it won't be written to default CF)
// - are prewritten. (hence it will only contains `Prewrite` records).
// In this condition, ALL records generate no ApplyEvent(only lock change),
// and we would exit after the first run of loop :(
let no_progress = event_loader.entry_batch.is_empty();
let stat = stat?;
self.with_resolver(region, |r| event_loader.emit_entries_to(&mut events, r))?;
if events.is_empty() {
if no_progress {
metrics::INITIAL_SCAN_DURATION.observe(start.saturating_elapsed_secs());
return Ok(stats.stat);
}
Expand Down
73 changes: 70 additions & 3 deletions components/backup-stream/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ impl SuiteBuilder {
for id in 1..=(n as u64) {
suite.start_endpoint(id, use_v3);
}
// TODO: The current mock metastore (slash_etc) doesn't supports multi-version.
// We must wait until the endpoints get ready to watching the metastore, or some
// modifies may be lost. Either make Endpoint::with_client wait until watch did
// start or make slash_etc support multi-version, then we can get rid of this
Expand Down Expand Up @@ -318,6 +317,19 @@ impl Suite {
inserted
}

fn commit_keys(&mut self, keys: Vec<Vec<u8>>, start_ts: TimeStamp, commit_ts: TimeStamp) {
let mut region_keys = HashMap::<u64, Vec<Vec<u8>>>::new();
for k in keys {
let enc_key = Key::from_raw(&k).into_encoded();
let region = self.cluster.get_region_id(&enc_key);
region_keys.entry(region).or_default().push(k);
}

for (region, keys) in region_keys {
self.must_kv_commit(region, keys, start_ts, commit_ts);
}
}

fn just_commit_a_key(&mut self, key: Vec<u8>, start_ts: TimeStamp, commit_ts: TimeStamp) {
let enc_key = Key::from_raw(&key).into_encoded();
let region = self.cluster.get_region_id(&enc_key);
Expand Down Expand Up @@ -604,10 +616,13 @@ mod test {
errors::Error, metadata::MetadataClient, router::TaskSelector, GetCheckpointResult,
RegionCheckpointOperation, RegionSet, Task,
};
use pd_client::PdClient;
use tikv_util::{box_err, defer, info, HandyRwLock};
use txn_types::TimeStamp;
use txn_types::{Key, TimeStamp};

use crate::{make_record_key, make_split_key_at_record, run_async_test, SuiteBuilder};
use crate::{
make_record_key, make_split_key_at_record, mutation, run_async_test, SuiteBuilder,
};

#[test]
fn basic() {
Expand Down Expand Up @@ -650,6 +665,58 @@ mod test {
suite.cluster.shutdown();
}

/// This test tests whether we can handle some weird transactions and their
/// race with initial scanning.
/// Generally, those transactions:
/// - Has N mutations, which's values are all short enough to be inlined in
/// the `Write` CF. (N > 1024)
/// - Commit the mutation set M first. (for all m in M: Nth-Of-Key(m) >
/// 1024)
/// ```text
/// |--...-----^------*---*-*--*-*-*-> (The line is the Key Space - from "" to inf)
/// +The 1024th key (* = committed mutation)
/// ```
/// - Before committing remaining mutations, PiTR triggered initial
/// scanning.
/// - The remaining mutations are committed before the instant when initial
/// scanning get the snapshot.
#[test]
fn with_split_txn() {
let mut suite = super::SuiteBuilder::new_named("split_txn").use_v3().build();
run_async_test(async {
let start_ts = suite.cluster.pd_client.get_tso().await.unwrap();
let keys = (1..1960).map(|i| make_record_key(1, i)).collect::<Vec<_>>();
suite.must_kv_prewrite(
1,
keys.clone()
.into_iter()
.map(|k| mutation(k, b"hello, world".to_vec()))
.collect(),
make_record_key(1, 1913),
start_ts,
);
let commit_ts = suite.cluster.pd_client.get_tso().await.unwrap();
suite.commit_keys(keys[1913..].to_vec(), start_ts, commit_ts);
suite.must_register_task(1, "test_split_txn");
suite.commit_keys(keys[..1913].to_vec(), start_ts, commit_ts);
suite.force_flush_files("test_split_txn");
suite.wait_for_flush();
let keys_encoded = keys
.iter()
.map(|v| {
Key::from_raw(v.as_slice())
.append_ts(commit_ts)
.into_encoded()
})
.collect::<Vec<_>>();
suite.check_for_write_records(
suite.flushed_files.path(),
keys_encoded.iter().map(Vec::as_slice),
);
});
suite.cluster.shutdown();
}

#[test]
/// This case tests whether the backup can continue when the leader failes.
fn leader_down() {
Expand Down

0 comments on commit 24df9a8

Please sign in to comment.