Skip to content

Commit

Permalink
engine_rocks: Implement get_range_approximate_split_keys
Browse files Browse the repository at this point in the history
Signed-off-by: Brian Anderson <andersrb@gmail.com>
  • Loading branch information
brson committed Jul 15, 2020
1 parent b895cf0 commit 5051b30
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/engine_rocks/Cargo.toml
Expand Up @@ -20,6 +20,7 @@ sse = ["rocksdb/sse"]
[dependencies]
encryption = { path = "../encryption" }
engine_traits = { path = "../engine_traits" }
hex = "0.4"
keys = { path = "../keys" }
num_cpus = "1"
prometheus = { version = "0.8", features = ["nightly", "push"] }
Expand Down
105 changes: 101 additions & 4 deletions components/engine_rocks/src/range_properties.rs
@@ -1,7 +1,7 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use std::path::Path;
use engine_traits::{RangePropertiesExt, Result, Range, CF_WRITE, CFHandleExt, MiscExt, TablePropertiesExt, TablePropertiesCollection, TableProperties, LARGE_CFS, CF_LOCK};
use engine_traits::{RangePropertiesExt, Result, Range, CF_WRITE, CFHandleExt, MiscExt, TablePropertiesExt, TablePropertiesCollection, TableProperties, LARGE_CFS, CF_LOCK, CF_DEFAULT};
use crate::engine::RocksEngine;
use crate::properties::{get_range_entries_and_versions, RangeProperties};

Expand Down Expand Up @@ -121,10 +121,107 @@ impl RangePropertiesExt for RocksEngine {
}

fn get_range_approximate_split_keys(&self, range: Range, region_id: u64, split_size: u64, max_size: u64, batch_split_limit: u64) -> Result<Vec<Vec<u8>>> {
panic!()
let get_cf_size = |cf: &str| self.get_range_approximate_size_cf(cf, range, region_id, 0);
let cfs = [
(CF_DEFAULT, box_try!(get_cf_size(CF_DEFAULT))),
(CF_WRITE, box_try!(get_cf_size(CF_WRITE))),
// CF_LOCK doesn't have RangeProperties until v4.0, so we swallow the error for
// backward compatibility.
(CF_LOCK, get_cf_size(CF_LOCK).unwrap_or(0)),
];

let total_size: u64 = cfs.iter().map(|(_, s)| s).sum();
if total_size == 0 {
return Err(box_err!("all CFs are empty"));
}

let (cf, cf_size) = cfs.iter().max_by_key(|(_, s)| s).unwrap();
// assume the size of keys is uniform distribution in both cfs.
let cf_split_size = split_size * cf_size / total_size;

self.get_range_approximate_split_keys_cf(cf, range, region_id, cf_split_size, max_size, batch_split_limit)
}

fn get_range_approximate_split_keys_cf(&self, cfname: &str, range: Range, region_id: u64, split_size: u64, max_size: u64, batch_split_limit: u64) -> Result<Vec<Vec<u8>>> {
panic!()
fn get_range_approximate_split_keys_cf(&self, cfname: &str, range: Range, _region_id: u64, split_size: u64, max_size: u64, batch_split_limit: u64) -> Result<Vec<Vec<u8>>> {
let start_key = &range.start_key;
let end_key = &range.end_key;
let collection = box_try!(self.get_range_properties_cf(cfname, &start_key, &end_key));

let mut keys = vec![];
let mut total_size = 0;
for (_, v) in collection.iter() {
let props = box_try!(RangeProperties::decode(&v.user_collected_properties()));
total_size += props.get_approximate_size_in_range(&start_key, &end_key);

keys.extend(
props
.take_excluded_range(start_key, end_key)
.into_iter()
.map(|(k, _)| k),
);
}
if keys.len() == 1 {
return Ok(vec![]);
}
if keys.is_empty() || total_size == 0 || split_size == 0 {
return Err(box_err!(
"unexpected key len {} or total_size {} or split size {}, len of collection {}, cf {}, start {}, end {}",
keys.len(),
total_size,
split_size,
collection.len(),
cfname,
hex::encode_upper(&start_key),
hex::encode_upper(&end_key)
));
}
keys.sort();

// use total size of this range and the number of keys in this range to
// calculate the average distance between two keys, and we produce a
// split_key every `split_size / distance` keys.
let len = keys.len();
let distance = total_size as f64 / len as f64;
let n = (split_size as f64 / distance).ceil() as usize;
if n == 0 {
return Err(box_err!(
"unexpected n == 0, total_size: {}, split_size: {}, len: {}, distance: {}",
total_size,
split_size,
keys.len(),
distance
));
}

// cause first element of the iterator will always be returned by step_by(),
// so the first key returned may not the desired split key. Note that, the
// start key of region is not included, so we we drop first n - 1 keys.
//
// For example, the split size is `3 * distance`. And the numbers stand for the
// key in `RangeProperties`, `^` stands for produced split key.
//
// skip:
// start___1___2___3___4___5___6___7....
// ^ ^
//
// not skip:
// start___1___2___3___4___5___6___7....
// ^ ^ ^
let mut split_keys = keys
.into_iter()
.skip(n - 1)
.step_by(n)
.collect::<Vec<Vec<u8>>>();

if split_keys.len() as u64 > batch_split_limit {
split_keys.truncate(batch_split_limit as usize);
} else {
// make sure not to split when less than max_size for last part
let rest = (len % n) as u64;
if rest * distance as u64 + split_size < max_size {
split_keys.pop();
}
}
Ok(split_keys)
}
}

0 comments on commit 5051b30

Please sign in to comment.