From 4476e273804abe376b5fbcf0a7cbd43856de0b8f Mon Sep 17 00:00:00 2001 From: tonyxuqqi Date: Wed, 11 May 2022 15:44:34 -0700 Subject: [PATCH 1/6] raftstore: fix split buckets bug (#12476) close tikv/tikv#12467 Signed-off-by: qi.xu Co-authored-by: qi.xu --- .../src/coprocessor/split_check/half.rs | 110 +++++++++++++++++- components/raftstore/src/store/fsm/peer.rs | 4 +- .../raftstore/src/store/worker/split_check.rs | 6 + 3 files changed, 117 insertions(+), 3 deletions(-) diff --git a/components/raftstore/src/coprocessor/split_check/half.rs b/components/raftstore/src/coprocessor/split_check/half.rs index df7503942b0..55a9bd48984 100644 --- a/components/raftstore/src/coprocessor/split_check/half.rs +++ b/components/raftstore/src/coprocessor/split_check/half.rs @@ -141,7 +141,7 @@ mod tests { }; use crate::{ coprocessor::{Config, CoprocessorHost}, - store::{BucketRange, SplitCheckRunner, SplitCheckTask}, + store::{BucketRange, CasualMessage, SplitCheckRunner, SplitCheckTask}, }; #[test] @@ -309,6 +309,114 @@ mod tests { must_generate_buckets(&rx, &exp_bucket_keys); } + #[test] + fn test_generate_region_bucket_with_deleting_data() { + let path = Builder::new().prefix("test-raftstore").tempdir().unwrap(); + let path_str = path.path().to_str().unwrap(); + let db_opts = DBOptions::default(); + let cfs_opts = ALL_CFS + .iter() + .map(|cf| { + let cf_opts = ColumnFamilyOptions::new(); + CFOptions::new(cf, cf_opts) + }) + .collect(); + let engine = engine_test::kv::new_engine_opt(path_str, db_opts, cfs_opts).unwrap(); + + let mut region = Region::default(); + region.set_id(1); + region.mut_peers().push(Peer::default()); + region.mut_region_epoch().set_version(2); + region.mut_region_epoch().set_conf_ver(5); + + let (tx, rx) = mpsc::sync_channel(100); + let cfg = Config { + region_max_size: ReadableSize(BUCKET_NUMBER_LIMIT as u64), + enable_region_bucket: true, + region_bucket_size: ReadableSize(20_u64), // so that each key below will form a bucket + ..Default::default() + }; + let mut runnable = + SplitCheckRunner::new(engine.clone(), tx.clone(), CoprocessorHost::new(tx, cfg)); + + // so bucket key will be all these keys + let mut exp_bucket_keys = vec![]; + for i in 0..11 { + let k = format!("{:04}", i).into_bytes(); + exp_bucket_keys.push(Key::from_raw(&k).as_encoded().clone()); + let k = keys::data_key(Key::from_raw(&k).as_encoded()); + engine.put_cf(CF_DEFAULT, &k, &k).unwrap(); + // Flush for every key so that we can know the exact middle key. + engine.flush_cf(CF_DEFAULT, true).unwrap(); + } + runnable.run(SplitCheckTask::split_check( + region.clone(), + false, + CheckPolicy::Scan, + None, + )); + must_generate_buckets(&rx, &exp_bucket_keys); + + exp_bucket_keys.clear(); + + // use non-existing bucket-range to simulate deleted data + // [0001,0002] [00032, 00035], [0004,0006], [0012, 0015], [0016, 0017] + // non-empty empty non-empty empty empty + let mut starts = vec![format!("{:04}", 1).into_bytes()]; + let mut ends = vec![format!("{:04}", 2).into_bytes()]; + starts.push(format!("{:05}", 32).into_bytes()); + ends.push(format!("{:05}", 35).into_bytes()); + starts.push(format!("{:04}", 4).into_bytes()); + ends.push(format!("{:04}", 6).into_bytes()); + starts.push(format!("{:04}", 12).into_bytes()); + ends.push(format!("{:04}", 15).into_bytes()); + starts.push(format!("{:04}", 16).into_bytes()); + ends.push(format!("{:04}", 17).into_bytes()); + let mut bucket_range_list = vec![BucketRange( + Key::from_raw(&starts[0]).as_encoded().clone(), + Key::from_raw(&ends[0]).as_encoded().clone(), + )]; + for i in 1..starts.len() { + bucket_range_list.push(BucketRange( + Key::from_raw(&starts[i]).as_encoded().clone(), + Key::from_raw(&ends[i]).as_encoded().clone(), + )) + } + + runnable.run(SplitCheckTask::split_check( + region.clone(), + false, + CheckPolicy::Scan, + Some(bucket_range_list), + )); + + loop { + if let Ok(( + _, + CasualMessage::RefreshRegionBuckets { + region_epoch: _, + buckets, + bucket_ranges, + .. + }, + )) = rx.try_recv() + { + assert_eq!(buckets.len(), bucket_ranges.unwrap().len()); + assert_eq!(buckets.len(), 5); + for i in 0..5 { + if i == 0 || i == 2 { + assert!(!buckets[i].keys.is_empty()); + assert!(buckets[i].size > 0); + } else { + assert!(buckets[i].keys.is_empty()); + assert_eq!(buckets[i].size, 0); + } + } + break; + } + } + } + #[test] fn test_get_region_approximate_middle_cf() { let tmp = Builder::new() diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index bae9d404cbf..2bc8de94357 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -5176,7 +5176,7 @@ where meta.version = gen_bucket_version(self.fsm.peer.term(), current_version); region_buckets.meta = Arc::new(meta); } else { - info!( + debug!( "refresh_region_buckets re-generates buckets"; "region_id" => self.fsm.region_id(), ); @@ -5212,7 +5212,7 @@ where self.fsm.peer.region_buckets.as_ref().unwrap().meta.clone(), )); } - info!( + debug!( "finished on_refresh_region_buckets"; "region_id" => self.fsm.region_id(), "buckets count" => buckets_count, diff --git a/components/raftstore/src/store/worker/split_check.rs b/components/raftstore/src/store/worker/split_check.rs index 338d49a7b36..a1674f1718e 100644 --- a/components/raftstore/src/store/worker/split_check.rs +++ b/components/raftstore/src/store/worker/split_check.rs @@ -430,6 +430,12 @@ where || bucket_range_list.is_empty() && !skip_check_bucket { buckets.push(bucket); + // in case some range's data in bucket_range_list is deleted + if buckets.len() < bucket_range_list.len() { + let mut deleted_buckets = + vec![Bucket::default(); bucket_range_list.len() - buckets.len()]; + buckets.append(&mut deleted_buckets); + } if !bucket_range_list.is_empty() { assert_eq!(buckets.len(), bucket_range_list.len()); } From e16490d7850cdf21cce6be8e468397a349bfe78f Mon Sep 17 00:00:00 2001 From: Zak Zhao <57036248+joccau@users.noreply.github.com> Date: Thu, 12 May 2022 12:34:35 +0800 Subject: [PATCH 2/6] br: move the file v1_backupmeta_xxxx.meta to v1/backupmeta/xxxx.meta (#12475) close tikv/tikv#12474 Signed-off-by: joccau --- components/backup-stream/src/router.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/components/backup-stream/src/router.rs b/components/backup-stream/src/router.rs index 47b01fac35c..a65f914d263 100644 --- a/components/backup-stream/src/router.rs +++ b/components/backup-stream/src/router.rs @@ -992,8 +992,7 @@ impl MetadataInfo { fn path_to_meta(&self) -> String { format!( - // "/v1/backupmeta/{:012}-{}.meta", - "v1_backupmeta_{:012}-{}.meta", + "v1/backupmeta/{:012}-{}.meta", self.min_resolved_ts.unwrap_or_default(), uuid::Uuid::new_v4() ) @@ -1364,7 +1363,7 @@ mod tests { let entry = entry.unwrap(); let filename = entry.file_name(); println!("walking {}", entry.path().display()); - if filename.to_str().unwrap().contains("v1_backupmeta") { + if entry.path().extension() == Some(OsStr::new("meta")) { meta_count += 1; } else if entry.path().extension() == Some(OsStr::new("log")) { log_count += 1; From e5fe8c68aa8c731408b32d6fba97cf76bcc82d23 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 12 May 2022 19:50:35 +0800 Subject: [PATCH 3/6] metrics: add grafana panels for RawKV causal timestamp (#12494) ref tikv/tikv#11965 Add Grafana panels for RawKV causal timestamp. Signed-off-by: pingyu Co-authored-by: Ti Chi Robot --- metrics/grafana/tikv_raw.json | 550 ++++++++++++++++++++++++++++++++++ 1 file changed, 550 insertions(+) diff --git a/metrics/grafana/tikv_raw.json b/metrics/grafana/tikv_raw.json index c06feb9ab0e..f81ac801173 100644 --- a/metrics/grafana/tikv_raw.json +++ b/metrics/grafana/tikv_raw.json @@ -283,6 +283,556 @@ "repeat": "command", "title": "Read - $command", "type": "row" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 23, + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "decimals": 1, + "description": "The number of get_ts requests", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 7 + }, + "hiddenSeries": false, + "id": 40, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": 300, + "sort": "max", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(tikv_causal_ts_provider_get_ts_duration_seconds_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (result)", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{result}}", + "metric": "tikv_causal_ts_provider_get_ts_duration_seconds_bucket", + "refId": "A", + "step": 10 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Get_ts requests", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "ops", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "decimals": 1, + "description": "The get_ts requests duration (P99)", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 7 + }, + "hiddenSeries": false, + "id": 42, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": false, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": 300, + "sort": "max", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(tikv_causal_ts_provider_get_ts_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le, result))", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{result}}-P99", + "refId": "A", + "step": 10 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "99% get_ts requests duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "decimals": 1, + "description": "The number of TSO batch renew requests", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 15 + }, + "hiddenSeries": false, + "id": 44, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": 300, + "sort": "max", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(tikv_causal_ts_provider_tso_batch_renew_duration_seconds_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (result, reason)", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{result}}-{{reason}}", + "metric": "tikv_causal_ts_provider_tso_batch_renew_duration_seconds_bucket", + "refId": "A", + "step": 10 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "TSO batch renew requests", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "ops", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "decimals": 1, + "description": "The TSO batch renew requests duration (P99)", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 15 + }, + "hiddenSeries": false, + "id": 46, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": false, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": 300, + "sort": "max", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(tikv_causal_ts_provider_tso_batch_renew_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le, result, reason))", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{result}}-{{reason}}-P99", + "refId": "A", + "step": 10 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "99% TSO batch renew requests duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 10, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "decimals": 1, + "description": "The TSO batch size", + "editable": true, + "error": false, + "fieldConfig": { + "defaults": { + "unit": "none" + }, + "overrides": [] + }, + "fill": 3, + "fillGradient": 0, + "grid": {}, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 23 + }, + "hiddenSeries": false, + "id": 48, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideZero": false, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sideWidth": 250, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": true, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(tikv_causal_ts_provider_tso_batch_size{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"})", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "batch size", + "refId": "A", + "step": 10 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "TSO batch size", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "none", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "title": "Causal timestamp", + "type": "row" } ], "refresh": false, From 684abdbb4cf601236b4f35719c5118659404df1c Mon Sep 17 00:00:00 2001 From: tonyxuqqi Date: Thu, 12 May 2022 14:44:35 -0700 Subject: [PATCH 4/6] bump master version to 6.1-alpha (#12496) ref tikv/tikv#11966 bump version to 6.1-alpha Signed-off-by: qi.xu Co-authored-by: qi.xu --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1a063173c5d..646bcc9396c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5947,7 +5947,7 @@ dependencies = [ [[package]] name = "tikv" -version = "6.0.0-alpha" +version = "6.1.0-alpha" dependencies = [ "anyhow", "api_version", diff --git a/Cargo.toml b/Cargo.toml index 9ba58307da2..9c11cae2727 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tikv" -version = "6.0.0-alpha" +version = "6.1.0-alpha" authors = ["The TiKV Authors"] description = "A distributed transactional key-value database powered by Rust and Raft" license = "Apache-2.0" From c5f1a4a8b6e951942c3f3f7ab99ec7396df3239c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Fri, 13 May 2022 22:28:37 +0800 Subject: [PATCH 5/6] log-backup: tweaking log backup (#12497) close tikv/tikv#12477 tweaking log backup. Signed-off-by: Yu Juncen --- components/backup-stream/src/endpoint.rs | 107 ++++++++++++++++-- components/backup-stream/src/metrics.rs | 6 + components/backup-stream/src/router.rs | 71 ++++++++---- .../backup-stream/src/subscription_track.rs | 11 ++ components/error_code/src/backup_stream.rs | 2 +- src/config.rs | 2 + 6 files changed, 163 insertions(+), 36 deletions(-) diff --git a/components/backup-stream/src/endpoint.rs b/components/backup-stream/src/endpoint.rs index 86c77619b69..18d2c2c284c 100644 --- a/components/backup-stream/src/endpoint.rs +++ b/components/backup-stream/src/endpoint.rs @@ -5,7 +5,11 @@ use std::{convert::AsRef, fmt, marker::PhantomData, path::PathBuf, sync::Arc, ti use concurrency_manager::ConcurrencyManager; use engine_traits::KvEngine; use error_code::ErrorCodeExt; -use kvproto::{brpb::StreamBackupError, metapb::Region}; +use futures::{Future, FutureExt}; +use kvproto::{ + brpb::{StreamBackupError, StreamBackupTaskInfo}, + metapb::Region, +}; use online_config::ConfigChange; use pd_client::PdClient; use raft::StateRole; @@ -16,7 +20,9 @@ use raftstore::{ }; use tikv::config::BackupStreamConfig; use tikv_util::{ - box_err, debug, error, info, + box_err, + config::ReadableDuration, + debug, error, info, time::Instant, warn, worker::{Runnable, Scheduler}, @@ -91,6 +97,7 @@ where PathBuf::from(config.temp_path.clone()), scheduler.clone(), config.temp_file_size_limit_per_task.0, + config.max_flush_interval.0, ); if let Some(meta_client) = meta_client.as_ref() { @@ -164,6 +171,7 @@ where PathBuf::from(config.temp_path.clone()), scheduler.clone(), config.temp_file_size_limit_per_task.0, + config.max_flush_interval.0, ); if let Some(meta_client) = meta_client.as_ref() { @@ -214,15 +222,18 @@ where fn on_fatal_error(&self, task: String, err: Box) { // Let's pause the task locally first. - self.on_unregister(&task); + let start_ts = self.unload_task(&task).map(|task| task.start_ts); err.report(format_args!("fatal for task {}", err)); let meta_cli = self.get_meta_client(); let store_id = self.store_id; let sched = self.scheduler.clone(); + let register_safepoint = self + .register_service_safepoint(task.clone(), TimeStamp::new(start_ts.unwrap_or_default())); self.pool.block_on(async move { // TODO: also pause the task using the meta client. let err_fut = async { + register_safepoint.await?; meta_cli.pause(&task).await?; let mut last_error = StreamBackupError::new(); last_error.set_error_code(err.error_code().code.to_owned()); @@ -400,10 +411,10 @@ where self.on_unregister(&task_name); } TaskOp::PauseTask(task_name) => { - self.on_unregister(&task_name); + self.on_pause(&task_name); } TaskOp::ResumeTask(task) => { - self.on_register(task); + self.load_task(task); } } } @@ -447,6 +458,17 @@ where // register task ranges pub fn on_register(&self, task: StreamTask) { + let name = task.info.name.clone(); + let start_ts = task.info.start_ts; + self.load_task(task); + + metrics::STORE_CHECKPOINT_TS + .with_label_values(&[name.as_str()]) + .set(start_ts as _); + } + + /// Load the task into memory: this would make the endpint start to observe. + fn load_task(&self, task: StreamTask) { if let Some(cli) = self.meta_client.as_ref() { let cli = cli.clone(); let init = self.make_initial_loader(); @@ -456,7 +478,18 @@ where "register backup stream task"; "task" => ?task, ); - + // clean the safepoint created at pause(if there is) + self.pool.spawn( + self.pd_client + .update_service_safe_point( + format!("{}-pause-guard", task.info.name), + TimeStamp::zero(), + Duration::new(0, 0), + ) + .map(|r| { + r.map_err(|err| Error::from(err).report("removing safe point for pausing")) + }), + ); self.pool.block_on(async move { let task_name = task.info.get_name(); match cli.ranges_of_task(task_name).await { @@ -501,23 +534,62 @@ where "failed to register backup stream task {} to router: ranges not found", task.info.get_name() )); - // TODO build a error handle mechanism #error 5 } } }); }; } - pub fn on_unregister(&self, task: &str) { + fn register_service_safepoint( + &self, + task_name: String, + // hint for make optimized safepoint when we cannot get that from `SubscriptionTracker` + start_ts: TimeStamp, + ) -> impl Future> + Send + 'static { + self.pd_client + .update_service_safe_point( + format!("{}-pause-guard", task_name), + self.subs.safepoint().max(start_ts), + ReadableDuration::hours(24).0, + ) + .map(|r| r.map_err(|err| err.into())) + } + + pub fn on_pause(&self, task: &str) { + let t = self.unload_task(task); + + if let Some(task) = t { + self.pool.spawn( + self.register_service_safepoint(task.name, TimeStamp::new(task.start_ts)) + .map(|r| { + r.map_err(|err| err.report("during setting service safepoint for task")) + }), + ); + } + } + + pub fn on_unregister(&self, task: &str) -> Option { + let info = self.unload_task(task); + + // reset the checkpoint ts of the task so it won't mislead the metrics. + metrics::STORE_CHECKPOINT_TS + .with_label_values(&[task]) + .set(0); + info + } + + /// unload a task from memory: this would stop observe the changes required by the task temporarily. + fn unload_task(&self, task: &str) -> Option { let router = self.range_router.clone(); - self.pool.block_on(async move { - router.unregister_task(task).await; - }); + let info = self + .pool + .block_on(async move { router.unregister_task(task).await }); // for now, we support one concurrent task only. // so simply clear all info would be fine. self.subs.clear(); self.observer.ranges.wl().clear(); + info } /// try advance the resolved ts by the pd tso. @@ -685,7 +757,12 @@ where ObserveOp::Start { region, needs_initial_scanning, - } => self.start_observe(region, needs_initial_scanning), + } => { + self.start_observe(region, needs_initial_scanning); + metrics::INITIAL_SCAN_REASON + .with_label_values(&["leader-changed"]) + .inc(); + } ObserveOp::Stop { ref region } => { self.subs.deregister_region(region, |_, _| true); } @@ -715,6 +792,9 @@ where region ) }); + metrics::INITIAL_SCAN_REASON + .with_label_values(&["region-changed"]) + .inc(); if let Err(e) = self.observe_over_with_initial_data_from_checkpoint( region, for_task, @@ -826,6 +906,9 @@ where .inc(); return Ok(()); } + metrics::INITIAL_SCAN_REASON + .with_label_values(&["retry"]) + .inc(); self.start_observe(region, true); Ok(()) } diff --git a/components/backup-stream/src/metrics.rs b/components/backup-stream/src/metrics.rs index 3f7c754a1fd..b3588978442 100644 --- a/components/backup-stream/src/metrics.rs +++ b/components/backup-stream/src/metrics.rs @@ -4,6 +4,12 @@ use lazy_static::lazy_static; use prometheus::*; lazy_static! { + pub static ref INITIAL_SCAN_REASON: IntCounterVec = register_int_counter_vec!( + "tikv_log_backup_initial_scan_reason", + "The reason of doing initial scanning", + &["reason"] + ) + .unwrap(); pub static ref HANDLE_EVENT_DURATION_HISTOGRAM: HistogramVec = register_histogram_vec!( "tikv_stream_event_handle_duration_sec", "The duration of handling an cmd batch.", diff --git a/components/backup-stream/src/router.rs b/components/backup-stream/src/router.rs index a65f914d263..c3423043a64 100644 --- a/components/backup-stream/src/router.rs +++ b/components/backup-stream/src/router.rs @@ -19,7 +19,7 @@ use external_storage::{BackendConfig, UnpinReader}; use external_storage_export::{create_storage, ExternalStorage}; use futures::io::Cursor; use kvproto::{ - brpb::{DataFileInfo, FileType, Metadata}, + brpb::{DataFileInfo, FileType, Metadata, StreamBackupTaskInfo}, raft_cmdpb::CmdType, }; use openssl::hash::{Hasher, MessageDigest}; @@ -238,11 +238,17 @@ pub struct Router(Arc); impl Router { /// Create a new router with the temporary folder. - pub fn new(prefix: PathBuf, scheduler: Scheduler, temp_file_size_limit: u64) -> Self { + pub fn new( + prefix: PathBuf, + scheduler: Scheduler, + temp_file_size_limit: u64, + max_flush_interval: Duration, + ) -> Self { Self(Arc::new(RouterInner::new( prefix, scheduler, temp_file_size_limit, + max_flush_interval, ))) } } @@ -278,6 +284,8 @@ pub struct RouterInner { scheduler: Scheduler, /// The size limit of temporary file per task. temp_file_size_limit: u64, + /// The max duration the local data can be pending. + max_flush_interval: Duration, } impl std::fmt::Debug for RouterInner { @@ -291,13 +299,19 @@ impl std::fmt::Debug for RouterInner { } impl RouterInner { - pub fn new(prefix: PathBuf, scheduler: Scheduler, temp_file_size_limit: u64) -> Self { + pub fn new( + prefix: PathBuf, + scheduler: Scheduler, + temp_file_size_limit: u64, + max_flush_interval: Duration, + ) -> Self { RouterInner { ranges: SyncRwLock::new(SegmentMap::default()), tasks: Mutex::new(HashMap::default()), prefix, scheduler, temp_file_size_limit, + max_flush_interval, } } @@ -350,7 +364,7 @@ impl RouterInner { // register task info let prefix_path = self.prefix.join(&task_name); - let stream_task = StreamTaskInfo::new(prefix_path, task).await?; + let stream_task = StreamTaskInfo::new(prefix_path, task, self.max_flush_interval).await?; self.tasks .lock() .await @@ -362,14 +376,15 @@ impl RouterInner { Ok(()) } - pub async fn unregister_task(&self, task_name: &str) { - if self.tasks.lock().await.remove(task_name).is_some() { + pub async fn unregister_task(&self, task_name: &str) -> Option { + self.tasks.lock().await.remove(task_name).map(|t| { info!( "backup stream unregister task"; "task" => task_name, ); self.unregister_ranges(task_name); - } + t.task.info.clone() + }) } /// get the task name by a key. @@ -607,7 +622,7 @@ impl TempFileKey { } pub struct StreamTaskInfo { - task: StreamTask, + pub(crate) task: StreamTask, /// support external storage. eg local/s3. pub(crate) storage: Arc, /// The parent directory of temporary files. @@ -648,7 +663,11 @@ impl std::fmt::Debug for StreamTaskInfo { impl StreamTaskInfo { /// Create a new temporary file set at the `temp_dir`. - pub async fn new(temp_dir: PathBuf, task: StreamTask) -> Result { + pub async fn new( + temp_dir: PathBuf, + task: StreamTask, + flush_interval: Duration, + ) -> Result { tokio::fs::create_dir_all(&temp_dir).await?; let storage = Arc::from(create_storage( task.info.get_storage(), @@ -662,9 +681,7 @@ impl StreamTaskInfo { files: SlotMap::default(), flushing_files: RwLock::default(), last_flush_time: AtomicPtr::new(Box::into_raw(Box::new(Instant::now()))), - // TODO make this config set by config or task? - // Keep `0.2 * FLUSH_STORAGE_INTERVAL` for doing flushing. - flush_interval: Duration::from_secs((FLUSH_STORAGE_INTERVAL as f64 * 0.8).round() as _), + flush_interval, total_size: AtomicUsize::new(0), flushing: AtomicBool::new(false), flush_fail_count: AtomicUsize::new(0), @@ -765,7 +782,10 @@ impl StreamTaskInfo { } pub fn should_flush(&self) -> bool { - self.get_last_flush_time().saturating_elapsed() >= self.flush_interval + // When it doesn't flush since 0.8x of auto-flush interval, we get ready to start flushing. + // So that we will get a buffer for the cost of actual flushing. + self.get_last_flush_time().saturating_elapsed_secs() + >= self.flush_interval.as_secs_f64() * 0.8 } pub fn is_flushing(&self) -> bool { @@ -892,11 +912,6 @@ impl StreamTaskInfo { .with_label_values(&["generate_metadata"]) .observe(sw.lap().as_secs_f64()); - // There is no file to flush, don't write the meta file. - if metadata_info.files.is_empty() { - return Ok(rts); - } - // flush log file to storage. self.flush_log().await?; @@ -1222,7 +1237,7 @@ mod tests { #[test] fn test_register() { let (tx, _) = dummy_scheduler(); - let router = RouterInner::new(PathBuf::new(), tx, 1024); + let router = RouterInner::new(PathBuf::new(), tx, 1024, Duration::from_secs(300)); // -----t1.start-----t1.end-----t2.start-----t2.end------ // --|------------|----------|------------|-----------|-- // case1 case2 case3 case4 case5 @@ -1308,7 +1323,7 @@ mod tests { let tmp = std::env::temp_dir().join(format!("{}", uuid::Uuid::new_v4())); tokio::fs::create_dir_all(&tmp).await?; let (tx, rx) = dummy_scheduler(); - let router = RouterInner::new(tmp.clone(), tx, 32); + let router = RouterInner::new(tmp.clone(), tx, 32, Duration::from_secs(300)); let (stream_task, storage_path) = task("dummy".to_owned()).await?; must_register_table(&router, stream_task, 1).await; @@ -1464,7 +1479,12 @@ mod tests { test_util::init_log_for_test(); let (tx, _rx) = dummy_scheduler(); let tmp = std::env::temp_dir().join(format!("{}", uuid::Uuid::new_v4())); - let router = Arc::new(RouterInner::new(tmp.clone(), tx, 1)); + let router = Arc::new(RouterInner::new( + tmp.clone(), + tx, + 1, + Duration::from_secs(300), + )); let (task, _path) = task("error_prone".to_owned()).await?; must_register_table(router.as_ref(), task, 1).await; router @@ -1491,7 +1511,7 @@ mod tests { test_util::init_log_for_test(); let (tx, _rx) = dummy_scheduler(); let tmp = std::env::temp_dir().join(format!("{}", uuid::Uuid::new_v4())); - let router = RouterInner::new(tmp.clone(), tx, 32); + let router = RouterInner::new(tmp.clone(), tx, 32, Duration::from_secs(300)); let mut stream_task = StreamBackupTaskInfo::default(); stream_task.set_name("nothing".to_string()); stream_task.set_storage(create_noop_storage_backend()); @@ -1518,7 +1538,12 @@ mod tests { test_util::init_log_for_test(); let (tx, rx) = dummy_scheduler(); let tmp = std::env::temp_dir().join(format!("{}", uuid::Uuid::new_v4())); - let router = Arc::new(RouterInner::new(tmp.clone(), tx, 1)); + let router = Arc::new(RouterInner::new( + tmp.clone(), + tx, + 1, + Duration::from_secs(300), + )); let (task, _path) = task("flush_failure".to_owned()).await?; must_register_table(router.as_ref(), task, 1).await; router diff --git a/components/backup-stream/src/subscription_track.rs b/components/backup-stream/src/subscription_track.rs index 0b6bdec7d0f..c62c0ee823d 100644 --- a/components/backup-stream/src/subscription_track.rs +++ b/components/backup-stream/src/subscription_track.rs @@ -58,6 +58,17 @@ impl RegionSubscription { } impl SubscriptionTracer { + /// get the current safe point: data before this ts have already be flushed and be able to be GCed. + pub fn safepoint(&self) -> TimeStamp { + // use the current resolved_ts is safe because it is only advanced when flushing. + self.0 + .iter() + .map(|r| r.resolver.resolved_ts()) + .min() + // NOTE: Maybe use the current timestamp? + .unwrap_or(TimeStamp::zero()) + } + /// clear the current `SubscriptionTracer`. pub fn clear(&self) { self.0.retain(|_, v| { diff --git a/components/error_code/src/backup_stream.rs b/components/error_code/src/backup_stream.rs index a0e24d07b85..af404bec28a 100644 --- a/components/error_code/src/backup_stream.rs +++ b/components/error_code/src/backup_stream.rs @@ -1,7 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. define_error_codes! { - "KV:StreamBackup:", + "KV:LogBackup:", ETCD => ("ETCD", "Error during requesting the meta store(etcd)", diff --git a/src/config.rs b/src/config.rs index 78110c4d228..9ad77e5f58c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2332,6 +2332,7 @@ impl Default for BackupConfig { #[serde(default)] #[serde(rename_all = "kebab-case")] pub struct BackupStreamConfig { + pub max_flush_interval: ReadableDuration, pub num_threads: usize, #[online_config(skip)] pub enable: bool, @@ -2352,6 +2353,7 @@ impl Default for BackupStreamConfig { fn default() -> Self { let cpu_num = SysQuota::cpu_cores_quota(); Self { + max_flush_interval: ReadableDuration::minutes(5), // use at most 50% of vCPU by default num_threads: (cpu_num * 0.5).clamp(1.0, 8.0) as usize, enable: false, From de208b5b4e97b134bc8b7daff7a4b304c618e9e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Fri, 13 May 2022 22:54:38 +0800 Subject: [PATCH 6/6] log-backup: added quota for initial scanning (#12469) close tikv/tikv#12468 Added quota for initial scanning of log backup. Signed-off-by: Yu Juncen Co-authored-by: Ti Chi Robot Co-authored-by: kennytm --- Cargo.lock | 1 + components/backup-stream/Cargo.toml | 1 + components/backup-stream/src/endpoint.rs | 61 +++++++++++++++---- components/backup-stream/src/event_loader.rs | 46 +++++++++++++- components/backup-stream/src/router.rs | 9 ++- .../backup-stream/src/subscription_track.rs | 11 ++-- components/backup-stream/src/utils.rs | 13 +++- src/config.rs | 16 ++++- 8 files changed, 135 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 646bcc9396c..5a0dfca16e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -500,6 +500,7 @@ dependencies = [ "url", "uuid", "walkdir", + "yatp", ] [[package]] diff --git a/components/backup-stream/Cargo.toml b/components/backup-stream/Cargo.toml index eec499f74bb..b83297c5a47 100644 --- a/components/backup-stream/Cargo.toml +++ b/components/backup-stream/Cargo.toml @@ -54,6 +54,7 @@ tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["compat"] } txn_types = { path = "../txn_types", default-features = false } uuid = "0.8" +yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" } [dev-dependencies] async-trait = "0.1" diff --git a/components/backup-stream/src/endpoint.rs b/components/backup-stream/src/endpoint.rs index 18d2c2c284c..5bb0fcb6ee6 100644 --- a/components/backup-stream/src/endpoint.rs +++ b/components/backup-stream/src/endpoint.rs @@ -34,12 +34,13 @@ use tokio::{ }; use tokio_stream::StreamExt; use txn_types::TimeStamp; +use yatp::task::callback::Handle as YatpHandle; -use super::metrics::{HANDLE_EVENT_DURATION_HISTOGRAM, HANDLE_KV_HISTOGRAM}; +use super::metrics::HANDLE_EVENT_DURATION_HISTOGRAM; use crate::{ annotate, errors::{Error, Result}, - event_loader::InitialDataLoader, + event_loader::{InitialDataLoader, PendingMemoryQuota}, metadata::{ store::{EtcdStore, MetaStore}, MetadataClient, MetadataEvent, StreamTask, @@ -67,6 +68,8 @@ pub struct Endpoint { pd_client: Arc, subs: SubscriptionTracer, concurrency_manager: ConcurrencyManager, + initial_scan_memory_quota: PendingMemoryQuota, + scan_pool: ScanPool, } impl Endpoint @@ -88,8 +91,10 @@ where pd_client: Arc, cm: ConcurrencyManager, ) -> Self { - let pool = create_tokio_runtime(config.num_threads, "br-stream") + // Always use 2 threads for I/O tasks. + let pool = create_tokio_runtime(config.io_threads, "br-stream") .expect("failed to create tokio runtime for backup stream worker."); + let scan_pool = create_scan_pool(config.num_threads); // TODO consider TLS? let meta_client = Some(cli); @@ -114,7 +119,8 @@ where }); pool.spawn(Self::starts_flush_ticks(range_router.clone())); } - + let initial_scan_memory_quota = + PendingMemoryQuota::new(config.initial_scan_pending_memory_quota.0 as _); info!("the endpoint of backup stream started"; "path" => %config.temp_path); Endpoint { meta_client, @@ -129,6 +135,8 @@ where pd_client, subs: Default::default(), concurrency_manager: cm, + initial_scan_memory_quota, + scan_pool, } } } @@ -152,8 +160,9 @@ where concurrency_manager: ConcurrencyManager, ) -> Endpoint { crate::metrics::STREAM_ENABLED.inc(); - let pool = create_tokio_runtime(config.num_threads, "backup-stream") + let pool = create_tokio_runtime(config.io_threads, "backup-stream") .expect("failed to create tokio runtime for backup stream worker."); + let scan_pool = create_scan_pool(config.num_threads); // TODO consider TLS? let meta_client = match pool.block_on(etcd_client::Client::connect(&endpoints, None)) { @@ -190,6 +199,8 @@ where pool.spawn(Self::starts_flush_ticks(range_router.clone())); } + let initial_scan_memory_quota = + PendingMemoryQuota::new(config.initial_scan_pending_memory_quota.0 as _); info!("the endpoint of stream backup started"; "path" => %config.temp_path); Endpoint { meta_client, @@ -204,6 +215,8 @@ where pd_client, subs: Default::default(), concurrency_manager, + initial_scan_memory_quota, + scan_pool, } } } @@ -380,7 +393,6 @@ where utils::handle_on_event_result(&sched, router.on_events(kvs).await); metrics::HEAP_MEMORY .sub(total_size as _); - HANDLE_KV_HISTOGRAM.observe(kv_count as _); let time_cost = sw.lap().as_secs_f64(); if time_cost > SLOW_EVENT_THRESHOLD { warn!("write to temp file too slow."; "time_cost" => ?time_cost, "region_id" => %region_id, "len" => %kv_count); @@ -399,6 +411,8 @@ where self.range_router.clone(), self.subs.clone(), self.scheduler.clone(), + self.initial_scan_memory_quota.clone(), + self.pool.handle().clone(), ) } @@ -439,7 +453,7 @@ where "end_key" => utils::redact(&end_key), ); } - tokio::task::spawn_blocking(move || { + self.spawn_at_scan_pool(move || { let range_init_result = init.initialize_range(start_key.clone(), end_key.clone()); match range_init_result { Ok(()) => { @@ -730,10 +744,15 @@ where })?; let region = region.clone(); - self.pool.spawn_blocking(move || { + // we should not spawn initial scanning tasks to the tokio blocking pool + // beacuse it is also used for converting sync File I/O to async. (for now!) + // In that condition, if we blocking for some resouces(for example, the `MemoryQuota`) + // at the block threads, we may meet some ghosty deadlock. + self.spawn_at_scan_pool(move || { + let begin = Instant::now_coarse(); match init.do_initial_scan(®ion, last_checkpoint, snap) { Ok(stat) => { - info!("initial scanning of leader transforming finished!"; "statistics" => ?stat, "region" => %region.get_id(), "from_ts" => %last_checkpoint); + info!("initial scanning of leader transforming finished!"; "takes" => ?begin.saturating_elapsed(), "region" => %region.get_id(), "from_ts" => %last_checkpoint); utils::record_cf_stat("lock", &stat.lock); utils::record_cf_stat("write", &stat.write); utils::record_cf_stat("default", &stat.data); @@ -744,6 +763,16 @@ where Ok(()) } + // spawn a task at the scan pool. + fn spawn_at_scan_pool(&self, task: impl FnOnce() + Send + 'static) { + self.scan_pool.spawn(move |_: &mut YatpHandle<'_>| { + tikv_alloc::add_thread_memory_accessor(); + let _io_guard = file_system::WithIOType::new(file_system::IOType::Replication); + task(); + tikv_alloc::remove_thread_memory_accessor(); + }) + } + fn find_task_by_region(&self, r: &Region) -> Option { self.range_router .find_task_by_range(&r.start_key, &r.end_key) @@ -935,14 +964,24 @@ where } } +type ScanPool = yatp::ThreadPool; + +/// Create a yatp pool for doing initial scanning. +fn create_scan_pool(num_threads: usize) -> ScanPool { + yatp::Builder::new("log-backup-scan") + .max_thread_count(num_threads) + .build_callback_pool() +} + /// Create a standard tokio runtime /// (which allows io and time reactor, involve thread memory accessor), fn create_tokio_runtime(thread_count: usize, thread_name: &str) -> TokioResult { tokio::runtime::Builder::new_multi_thread() .thread_name(thread_name) // Maybe make it more configurable? - // currently, blocking threads would be used for incremental scanning. - .max_blocking_threads(thread_count) + // currently, blocking threads would be used for tokio local I/O. + // (`File` API in `tokio::io` would use this pool.) + .max_blocking_threads(thread_count * 8) .worker_threads(thread_count) .enable_io() .enable_time() diff --git a/components/backup-stream/src/event_loader.rs b/components/backup-stream/src/event_loader.rs index b227a5dd542..8fb3735a1c7 100644 --- a/components/backup-stream/src/event_loader.rs +++ b/components/backup-stream/src/event_loader.rs @@ -1,6 +1,6 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::{marker::PhantomData, time::Duration}; +use std::{marker::PhantomData, sync::Arc, time::Duration}; use engine_traits::{KvEngine, CF_DEFAULT, CF_WRITE}; use futures::executor::block_on; @@ -17,6 +17,7 @@ use tikv::storage::{ Snapshot, Statistics, }; use tikv_util::{box_err, time::Instant, warn, worker::Scheduler}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use txn_types::{Key, Lock, TimeStamp}; use crate::{ @@ -33,6 +34,34 @@ use crate::{ const MAX_GET_SNAPSHOT_RETRY: usize = 3; +#[derive(Clone)] +pub struct PendingMemoryQuota(Arc); + +impl std::fmt::Debug for PendingMemoryQuota { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PendingMemoryQuota") + .field("remain", &self.0.available_permits()) + .field("total", &self.0) + .finish() + } +} + +pub struct PendingMemory(OwnedSemaphorePermit); + +impl PendingMemoryQuota { + pub fn new(quota: usize) -> Self { + Self(Arc::new(Semaphore::new(quota))) + } + + pub fn pending(&self, size: usize) -> PendingMemory { + PendingMemory( + tokio::runtime::Handle::current() + .block_on(self.0.clone().acquire_many_owned(size as _)) + .expect("BUG: the semaphore is closed unexpectedly."), + ) + } +} + /// EventLoader transforms data from the snapshot into ApplyEvent. pub struct EventLoader { scanner: DeltaScanner, @@ -132,6 +161,8 @@ pub struct InitialDataLoader { sink: Router, tracing: SubscriptionTracer, scheduler: Scheduler, + quota: PendingMemoryQuota, + handle: tokio::runtime::Handle, _engine: PhantomData, } @@ -148,6 +179,8 @@ where sink: Router, tracing: SubscriptionTracer, sched: Scheduler, + quota: PendingMemoryQuota, + handle: tokio::runtime::Handle, ) -> Self { Self { router, @@ -156,6 +189,8 @@ where tracing, scheduler: sched, _engine: PhantomData, + quota, + handle, } } @@ -287,14 +322,19 @@ where return Ok(stats.stat); } stats.add_statistics(&stat); + let region_id = region.get_id(); let sink = self.sink.clone(); let event_size = events.size(); let sched = self.scheduler.clone(); + let permit = self.quota.pending(event_size); + debug!("sending events to router"; "size" => %event_size, "region" => %region_id); metrics::INCREMENTAL_SCAN_SIZE.observe(event_size as f64); metrics::HEAP_MEMORY.add(event_size as _); join_handles.push(tokio::spawn(async move { utils::handle_on_event_result(&sched, sink.on_events(events).await); metrics::HEAP_MEMORY.sub(event_size as _); + debug!("apply event done"; "size" => %event_size, "region" => %region_id); + drop(permit); })); } } @@ -305,6 +345,7 @@ where start_ts: TimeStamp, snap: impl Snapshot, ) -> Result { + let _guard = self.handle.enter(); // It is ok to sink more data than needed. So scan to +inf TS for convenance. let event_loader = EventLoader::load_from(snap, start_ts, TimeStamp::max(), region)?; let tr = self.tracing.clone(); @@ -317,7 +358,7 @@ where // TODO: use an `WaitGroup` with asynchronous support. tokio::spawn(async move { for h in join_handles { - if let Err(err) = tokio::join!(h).0 { + if let Err(err) = h.await { warn!("failed to join task."; "err" => %err); } } @@ -330,7 +371,6 @@ where region_id )); } - debug!("phase one done."; "region_id" => %region_id); }); stats } diff --git a/components/backup-stream/src/router.rs b/components/backup-stream/src/router.rs index c3423043a64..ac8692f0bdf 100644 --- a/components/backup-stream/src/router.rs +++ b/components/backup-stream/src/router.rs @@ -50,7 +50,7 @@ use crate::{ endpoint::Task, errors::{ContextualResultExt, Error}, metadata::StreamTask, - metrics::SKIP_KV_COUNTER, + metrics::{HANDLE_KV_HISTOGRAM, SKIP_KV_COUNTER}, subscription_track::TwoPhaseResolver, try_send, utils::{self, SegmentMap, Slot, SlotMap, StopWatch}, @@ -122,7 +122,11 @@ impl ApplyEvents { utils::redact(&value) ) }) { - Ok(lock) => resolver.track_lock(lock.ts, key), + Ok(lock) => { + if utils::should_track_lock(&lock) { + resolver.track_lock(lock.ts, key) + } + } Err(err) => err.report(format!("region id = {}", region_id)), } } @@ -446,6 +450,7 @@ impl RouterInner { pub async fn on_events(&self, kv: ApplyEvents) -> Vec<(String, Result<()>)> { use futures::FutureExt; + HANDLE_KV_HISTOGRAM.observe(kv.len() as _); let partitioned_events = kv.partition_by_range(&self.ranges.rl()); let tasks = partitioned_events .into_iter() diff --git a/components/backup-stream/src/subscription_track.rs b/components/backup-stream/src/subscription_track.rs index c62c0ee823d..9d59c0d3021 100644 --- a/components/backup-stream/src/subscription_track.rs +++ b/components/backup-stream/src/subscription_track.rs @@ -1,6 +1,6 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use dashmap::{mapref::one::RefMut, DashMap}; use kvproto::metapb::Region; @@ -112,18 +112,21 @@ impl SubscriptionTracer { #[inline(always)] pub fn warn_if_gap_too_huge(&self, ts: TimeStamp) { - if TimeStamp::physical_now() - ts.physical() >= 10 * 60 * 1000 + let gap = TimeStamp::physical_now() - ts.physical(); + if gap >= 10 * 60 * 1000 /* 10 mins */ { let far_resolver = self .0 .iter() .min_by_key(|r| r.value().resolver.resolved_ts()); - warn!("stream backup resolver ts advancing too slow"; + warn!("log backup resolver ts advancing too slow"; "far_resolver" => %{match far_resolver { Some(r) => format!("{:?}", r.value().resolver), None => "BUG[NoResolverButResolvedTSDoesNotAdvance]".to_owned() - }}); + }}, + "gap" => ?Duration::from_millis(gap), + ); } } diff --git a/components/backup-stream/src/utils.rs b/components/backup-stream/src/utils.rs index 1c7eff33009..c104a100b56 100644 --- a/components/backup-stream/src/utils.rs +++ b/components/backup-stream/src/utils.rs @@ -15,7 +15,7 @@ use raftstore::{coprocessor::RegionInfoProvider, RegionInfo}; use tikv::storage::CfStatistics; use tikv_util::{box_err, time::Instant, warn, worker::Scheduler, Either}; use tokio::sync::{Mutex, RwLock}; -use txn_types::Key; +use txn_types::{Key, Lock, LockType}; use crate::{ errors::{Error, Result}, @@ -390,6 +390,17 @@ pub fn handle_on_event_result(doom_messenger: &Scheduler, result: Vec<(Str } } +/// tests whether the lock should be tracked or skipped. +pub fn should_track_lock(l: &Lock) -> bool { + match l.lock_type { + LockType::Put | LockType::Delete => true, + // Lock or Pessimistic lock won't commit more data, + // (i.e. won't break the integration of data between [Lock.start_ts, get_ts())) + // it is safe for ignoring them and advancing resolved_ts. + LockType::Lock | LockType::Pessimistic => false, + } +} + #[cfg(test)] mod test { use crate::utils::SegmentMap; diff --git a/src/config.rs b/src/config.rs index 9ad77e5f58c..f33d9c79fea 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2332,12 +2332,20 @@ impl Default for BackupConfig { #[serde(default)] #[serde(rename_all = "kebab-case")] pub struct BackupStreamConfig { + #[online_config(skip)] pub max_flush_interval: ReadableDuration, + #[online_config(skip)] pub num_threads: usize, #[online_config(skip)] + pub io_threads: usize, + #[online_config(skip)] pub enable: bool, + #[online_config(skip)] pub temp_path: String, + #[online_config(skip)] pub temp_file_size_limit_per_task: ReadableSize, + #[online_config(skip)] + pub initial_scan_pending_memory_quota: ReadableSize, } impl BackupStreamConfig { @@ -2352,14 +2360,18 @@ impl BackupStreamConfig { impl Default for BackupStreamConfig { fn default() -> Self { let cpu_num = SysQuota::cpu_cores_quota(); + let total_mem = SysQuota::memory_limit_in_bytes(); + let quota_size = (total_mem as f64 * 0.1).min(ReadableSize::mb(512).0 as _); Self { max_flush_interval: ReadableDuration::minutes(5), // use at most 50% of vCPU by default num_threads: (cpu_num * 0.5).clamp(1.0, 8.0) as usize, + io_threads: 2, enable: false, // TODO: may be use raft store directory temp_path: String::new(), temp_file_size_limit_per_task: ReadableSize::mb(128), + initial_scan_pending_memory_quota: ReadableSize(quota_size as _), } } } @@ -2662,8 +2674,8 @@ pub struct TiKvConfig { pub backup: BackupConfig, #[online_config(submodule)] - // The term "log-backup" and "backup-stream" points to the same object. - // But the product name is `log-backup`. + // The term "log backup" and "backup stream" are identity. + // The "log backup" should be the only product name exposed to the user. #[serde(rename = "log-backup")] pub backup_stream: BackupStreamConfig,