diff --git a/Cargo.lock b/Cargo.lock index cbf4868d1e2..e58ac653e7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1051,6 +1051,7 @@ dependencies = [ "configuration", "encryption", "engine_traits", + "hex 0.4.2", "keys", "kvproto", "lazy_static", diff --git a/components/engine_rocks/Cargo.toml b/components/engine_rocks/Cargo.toml index a2e427ad50a..daeeb9415be 100644 --- a/components/engine_rocks/Cargo.toml +++ b/components/engine_rocks/Cargo.toml @@ -20,6 +20,7 @@ sse = ["rocksdb/sse"] [dependencies] encryption = { path = "../encryption" } engine_traits = { path = "../engine_traits" } +hex = "0.4" keys = { path = "../keys" } num_cpus = "1" prometheus = { version = "0.8", features = ["nightly", "push"] } diff --git a/components/engine_rocks/src/range_properties.rs b/components/engine_rocks/src/range_properties.rs index 9abf4c670c7..d52e046e13f 100644 --- a/components/engine_rocks/src/range_properties.rs +++ b/components/engine_rocks/src/range_properties.rs @@ -1,7 +1,7 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. use std::path::Path; -use engine_traits::{RangePropertiesExt, Result, Range, CF_WRITE, CFHandleExt, MiscExt, TablePropertiesExt, TablePropertiesCollection, TableProperties, LARGE_CFS, CF_LOCK}; +use engine_traits::{RangePropertiesExt, Result, Range, CF_WRITE, CFHandleExt, MiscExt, TablePropertiesExt, TablePropertiesCollection, TableProperties, LARGE_CFS, CF_LOCK, CF_DEFAULT}; use crate::engine::RocksEngine; use crate::properties::{get_range_entries_and_versions, RangeProperties}; @@ -121,10 +121,107 @@ impl RangePropertiesExt for RocksEngine { } fn get_range_approximate_split_keys(&self, range: Range, region_id: u64, split_size: u64, max_size: u64, batch_split_limit: u64) -> Result>> { - panic!() + let get_cf_size = |cf: &str| self.get_range_approximate_size_cf(cf, range, region_id, 0); + let cfs = [ + (CF_DEFAULT, box_try!(get_cf_size(CF_DEFAULT))), + (CF_WRITE, box_try!(get_cf_size(CF_WRITE))), + // CF_LOCK doesn't have RangeProperties until v4.0, so we swallow the error for + // backward compatibility. + (CF_LOCK, get_cf_size(CF_LOCK).unwrap_or(0)), + ]; + + let total_size: u64 = cfs.iter().map(|(_, s)| s).sum(); + if total_size == 0 { + return Err(box_err!("all CFs are empty")); + } + + let (cf, cf_size) = cfs.iter().max_by_key(|(_, s)| s).unwrap(); + // assume the size of keys is uniform distribution in both cfs. + let cf_split_size = split_size * cf_size / total_size; + + self.get_range_approximate_split_keys_cf(cf, range, region_id, cf_split_size, max_size, batch_split_limit) } - fn get_range_approximate_split_keys_cf(&self, cfname: &str, range: Range, region_id: u64, split_size: u64, max_size: u64, batch_split_limit: u64) -> Result>> { - panic!() + fn get_range_approximate_split_keys_cf(&self, cfname: &str, range: Range, _region_id: u64, split_size: u64, max_size: u64, batch_split_limit: u64) -> Result>> { + let start_key = &range.start_key; + let end_key = &range.end_key; + let collection = box_try!(self.get_range_properties_cf(cfname, &start_key, &end_key)); + + let mut keys = vec![]; + let mut total_size = 0; + for (_, v) in collection.iter() { + let props = box_try!(RangeProperties::decode(&v.user_collected_properties())); + total_size += props.get_approximate_size_in_range(&start_key, &end_key); + + keys.extend( + props + .take_excluded_range(start_key, end_key) + .into_iter() + .map(|(k, _)| k), + ); + } + if keys.len() == 1 { + return Ok(vec![]); + } + if keys.is_empty() || total_size == 0 || split_size == 0 { + return Err(box_err!( + "unexpected key len {} or total_size {} or split size {}, len of collection {}, cf {}, start {}, end {}", + keys.len(), + total_size, + split_size, + collection.len(), + cfname, + hex::encode_upper(&start_key), + hex::encode_upper(&end_key) + )); + } + keys.sort(); + + // use total size of this range and the number of keys in this range to + // calculate the average distance between two keys, and we produce a + // split_key every `split_size / distance` keys. + let len = keys.len(); + let distance = total_size as f64 / len as f64; + let n = (split_size as f64 / distance).ceil() as usize; + if n == 0 { + return Err(box_err!( + "unexpected n == 0, total_size: {}, split_size: {}, len: {}, distance: {}", + total_size, + split_size, + keys.len(), + distance + )); + } + + // cause first element of the iterator will always be returned by step_by(), + // so the first key returned may not the desired split key. Note that, the + // start key of region is not included, so we we drop first n - 1 keys. + // + // For example, the split size is `3 * distance`. And the numbers stand for the + // key in `RangeProperties`, `^` stands for produced split key. + // + // skip: + // start___1___2___3___4___5___6___7.... + // ^ ^ + // + // not skip: + // start___1___2___3___4___5___6___7.... + // ^ ^ ^ + let mut split_keys = keys + .into_iter() + .skip(n - 1) + .step_by(n) + .collect::>>(); + + if split_keys.len() as u64 > batch_split_limit { + split_keys.truncate(batch_split_limit as usize); + } else { + // make sure not to split when less than max_size for last part + let rest = (len % n) as u64; + if rest * distance as u64 + split_size < max_size { + split_keys.pop(); + } + } + Ok(split_keys) } }