Skip to content

Commit

Permalink
rocksdb: Compaction guard: split SST by region boundaries (tikv#8115)
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Wu <yiwu@pingcap.com>
  • Loading branch information
yiwu-arbug committed Oct 9, 2020
1 parent 1f2b76f commit 4784818
Show file tree
Hide file tree
Showing 19 changed files with 581 additions and 62 deletions.
4 changes: 2 additions & 2 deletions cmd/src/bin/tikv-ctl.rs
Expand Up @@ -81,7 +81,7 @@ fn new_debug_executor(
let mut kv_db_opts = cfg.rocksdb.build_opt();
kv_db_opts.set_env(env.clone());
kv_db_opts.set_paranoid_checks(!skip_paranoid_checks);
let kv_cfs_opts = cfg.rocksdb.build_cf_opts(&cache);
let kv_cfs_opts = cfg.rocksdb.build_cf_opts(&cache, None);
let kv_path = PathBuf::from(kv_path).canonicalize().unwrap();
let kv_path = kv_path.to_str().unwrap();
let kv_db =
Expand All @@ -103,7 +103,7 @@ fn new_debug_executor(
if !cfg.raft_engine.enable {
let mut raft_db_opts = cfg.raftdb.build_opt();
raft_db_opts.set_env(env);
let raft_db_cf_opts = cfg.raftdb.build_cf_opts(&cache);
let raft_db_cf_opts = cfg.raftdb.build_cf_opts(&cache, None);
let raft_db = engine_rocks::raw_util::new_engine_opt(
&raft_path,
raft_db_opts,
Expand Down
13 changes: 10 additions & 3 deletions cmd/src/server.rs
Expand Up @@ -885,7 +885,8 @@ impl TiKVServer<RocksEngine> {
let config_raftdb = &self.config.raftdb;
let mut raft_db_opts = config_raftdb.build_opt();
raft_db_opts.set_env(env.clone());
let raft_db_cf_opts = config_raftdb.build_cf_opts(&block_cache);
let raft_db_cf_opts =
config_raftdb.build_cf_opts(&block_cache, Some(&self.region_info_accessor));
let raft_engine = engine_rocks::raw_util::new_engine_opt(
raft_db_path.to_str().unwrap(),
raft_db_opts,
Expand All @@ -897,7 +898,10 @@ impl TiKVServer<RocksEngine> {
let mut kv_db_opts = self.config.rocksdb.build_opt();
kv_db_opts.set_env(env);
kv_db_opts.add_event_listener(self.create_raftstore_compaction_listener());
let kv_cfs_opts = self.config.rocksdb.build_cf_opts(&block_cache);
let kv_cfs_opts = self
.config
.rocksdb
.build_cf_opts(&block_cache, Some(&self.region_info_accessor));
let db_path = self.store_path.join(Path::new(DEFAULT_ROCKSDB_SUB_DIR));
let kv_engine = engine_rocks::raw_util::new_engine_opt(
db_path.to_str().unwrap(),
Expand Down Expand Up @@ -956,7 +960,10 @@ impl TiKVServer<RaftLogEngine> {
let mut kv_db_opts = self.config.rocksdb.build_opt();
kv_db_opts.set_env(env);
kv_db_opts.add_event_listener(self.create_raftstore_compaction_listener());
let kv_cfs_opts = self.config.rocksdb.build_cf_opts(&block_cache);
let kv_cfs_opts = self
.config
.rocksdb
.build_cf_opts(&block_cache, Some(&self.region_info_accessor));
let db_path = self.store_path.join(Path::new(DEFAULT_ROCKSDB_SUB_DIR));
let kv_engine = engine_rocks::raw_util::new_engine_opt(
db_path.to_str().unwrap(),
Expand Down
5 changes: 4 additions & 1 deletion components/engine_panic/src/cf_options.rs
@@ -1,7 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use crate::db_options::PanicTitanDBOptions;
use engine_traits::ColumnFamilyOptions;
use engine_traits::{ColumnFamilyOptions, SstPartitionerFactory};

pub struct PanicColumnFamilyOptions;

Expand Down Expand Up @@ -38,4 +38,7 @@ impl ColumnFamilyOptions for PanicColumnFamilyOptions {
fn get_disable_auto_compactions(&self) -> bool {
panic!()
}
fn set_sst_partitioner_factory<F: SstPartitionerFactory>(&mut self, factory: F) {
panic!()
}
}
9 changes: 7 additions & 2 deletions components/engine_rocks/src/cf_options.rs
@@ -1,7 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use crate::db_options::RocksTitanDBOptions;
use engine_traits::ColumnFamilyOptions;
use crate::{db_options::RocksTitanDBOptions, sst_partitioner::RocksSstPartitionerFactory};
use engine_traits::{ColumnFamilyOptions, SstPartitionerFactory};
use rocksdb::ColumnFamilyOptions as RawCFOptions;

#[derive(Clone)]
Expand Down Expand Up @@ -59,4 +59,9 @@ impl ColumnFamilyOptions for RocksColumnFamilyOptions {
fn get_disable_auto_compactions(&self) -> bool {
self.0.get_disable_auto_compactions()
}

fn set_sst_partitioner_factory<F: SstPartitionerFactory>(&mut self, factory: F) {
self.0
.set_sst_partitioner_factory(RocksSstPartitionerFactory(factory));
}
}
6 changes: 4 additions & 2 deletions components/engine_rocks/src/lib.rs
Expand Up @@ -47,16 +47,18 @@ mod logger;
pub use crate::logger::*;
mod misc;
pub use crate::misc::*;
pub mod range_properties;
mod snapshot;
pub use crate::range_properties::*;
pub use crate::snapshot::*;
mod sst;
pub use crate::sst::*;
mod sst_partitioner;
pub use crate::sst_partitioner::*;
mod table_properties;
pub use crate::table_properties::*;
mod write_batch;
pub use crate::write_batch::*;
pub mod range_properties;
pub use crate::range_properties::*;

mod engine_iterator;
pub use crate::engine_iterator::*;
Expand Down
58 changes: 58 additions & 0 deletions components/engine_rocks/src/sst_partitioner.rs
@@ -0,0 +1,58 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use std::ffi::CString;

pub struct RocksSstPartitionerFactory<F: engine_traits::SstPartitionerFactory>(pub F);

impl<F: engine_traits::SstPartitionerFactory> rocksdb::SstPartitionerFactory
for RocksSstPartitionerFactory<F>
{
type Partitioner = RocksSstPartitioner<F::Partitioner>;

fn name(&self) -> &CString {
self.0.name()
}

fn create_partitioner(
&self,
context: &rocksdb::SstPartitionerContext,
) -> Option<Self::Partitioner> {
let ctx = engine_traits::SstPartitionerContext {
is_full_compaction: context.is_full_compaction,
is_manual_compaction: context.is_manual_compaction,
output_level: context.output_level,
smallest_key: context.smallest_key,
largest_key: context.largest_key,
};
self.0
.create_partitioner(&ctx)
.map(|p| RocksSstPartitioner(p))
}
}

pub struct RocksSstPartitioner<P: engine_traits::SstPartitioner>(P);

impl<P: engine_traits::SstPartitioner> rocksdb::SstPartitioner for RocksSstPartitioner<P> {
fn should_partition(
&self,
request: &rocksdb::SstPartitionerRequest,
) -> rocksdb::SstPartitionerResult {
let req = engine_traits::SstPartitionerRequest {
prev_user_key: request.prev_user_key,
current_user_key: request.current_user_key,
current_output_file_size: request.current_output_file_size,
};
match self.0.should_partition(&req) {
engine_traits::SstPartitionerResult::NotRequired => {
rocksdb::SstPartitionerResult::NotRequired
}
engine_traits::SstPartitionerResult::Required => {
rocksdb::SstPartitionerResult::Required
}
}
}

fn can_do_trivial_move(&self, smallest_key: &[u8], largest_key: &[u8]) -> bool {
self.0.can_do_trivial_move(smallest_key, largest_key)
}
}
3 changes: 2 additions & 1 deletion components/engine_traits/src/cf_options.rs
@@ -1,6 +1,6 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use crate::db_options::TitanDBOptions;
use crate::{db_options::TitanDBOptions, sst_partitioner::SstPartitionerFactory};

pub trait ColumnFamilyOptions {
type TitanDBOptions: TitanDBOptions;
Expand All @@ -15,4 +15,5 @@ pub trait ColumnFamilyOptions {
fn set_titandb_options(&mut self, opts: &Self::TitanDBOptions);
fn get_target_file_size_base(&self) -> u64;
fn get_disable_auto_compactions(&self) -> bool;
fn set_sst_partitioner_factory<F: SstPartitionerFactory>(&mut self, factory: F);
}
2 changes: 2 additions & 0 deletions components/engine_traits/src/lib.rs
Expand Up @@ -297,6 +297,8 @@ mod encryption;
pub use crate::encryption::*;
mod properties;
pub use crate::properties::*;
mod sst_partitioner;
pub use crate::sst_partitioner::*;
mod range_properties;
pub use crate::range_properties::*;

Expand Down
40 changes: 40 additions & 0 deletions components/engine_traits/src/sst_partitioner.rs
@@ -0,0 +1,40 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use std::ffi::CString;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SstPartitionerRequest<'a> {
pub prev_user_key: &'a [u8],
pub current_user_key: &'a [u8],
pub current_output_file_size: u64,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SstPartitionerResult {
NotRequired,
Required,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SstPartitionerContext<'a> {
pub is_full_compaction: bool,
pub is_manual_compaction: bool,
pub output_level: i32,
pub smallest_key: &'a [u8],
pub largest_key: &'a [u8],
}

pub trait SstPartitioner {
fn should_partition(&self, req: &SstPartitionerRequest) -> SstPartitionerResult;
fn can_do_trivial_move(&self, smallest_key: &[u8], largest_key: &[u8]) -> bool;
}

pub trait SstPartitionerFactory: Sync + Send {
// Lifetime of the partitioner can be changed to be bounded by the factory's lifetime once
// generic associated types is supported.
// https://github.com/rust-lang/rfcs/blob/master/text/1598-generic_associated_types.md
type Partitioner: SstPartitioner + 'static;

fn name(&self) -> &CString;
fn create_partitioner(&self, context: &SstPartitionerContext) -> Option<Self::Partitioner>;
}
67 changes: 66 additions & 1 deletion components/raftstore/src/coprocessor/region_info_accessor.rs
@@ -1,7 +1,7 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::collections::BTreeMap;
use std::collections::Bound::{Excluded, Unbounded};
use std::collections::Bound::{Excluded, Included, Unbounded};
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::sync::{mpsc, Arc, Mutex};
use std::time::Duration;
Expand Down Expand Up @@ -84,6 +84,11 @@ pub enum RegionInfoQuery {
region_id: u64,
callback: Callback<Option<RegionInfo>>,
},
GetRegionsInRange {
start_key: Vec<u8>,
end_key: Vec<u8>,
callback: Callback<Vec<Region>>,
},
/// Gets all contents from the collection. Only used for testing.
DebugDump(mpsc::Sender<(RegionsMap, RegionRangesMap)>),
}
Expand All @@ -98,6 +103,14 @@ impl Display for RegionInfoQuery {
RegionInfoQuery::FindRegionById { region_id, .. } => {
write!(f, "FindRegionById(region_id: {})", region_id)
}
RegionInfoQuery::GetRegionsInRange {
start_key, end_key, ..
} => write!(
f,
"GetRegionsInRange(start_key: {}, end_key: {})",
hex::encode_upper(start_key),
hex::encode_upper(end_key)
),
RegionInfoQuery::DebugDump(_) => write!(f, "DebugDump"),
}
}
Expand Down Expand Up @@ -361,6 +374,23 @@ impl RegionCollector {
callback(self.regions.get(&region_id).cloned());
}

pub fn handle_get_regions_in_range(
&self,
start_key: Vec<u8>,
end_key: Vec<u8>,
callback: Callback<Vec<Region>>,
) {
let mut regions = vec![];
for (_, region_id) in self
.region_ranges
.range((Included(start_key), Included(end_key)))
{
let region_info = &self.regions[region_id];
regions.push(region_info.region.clone());
}
callback(regions);
}

fn handle_raftstore_event(&mut self, event: RaftStoreEvent) {
{
let region = event.get_region();
Expand Down Expand Up @@ -419,6 +449,13 @@ impl Runnable for RegionCollector {
} => {
self.handle_find_region_by_id(region_id, callback);
}
RegionInfoQuery::GetRegionsInRange {
start_key,
end_key,
callback,
} => {
self.handle_get_regions_in_range(start_key, end_key, callback);
}
RegionInfoQuery::DebugDump(tx) => {
tx.send((self.regions.clone(), self.region_ranges.clone()))
.unwrap();
Expand Down Expand Up @@ -514,6 +551,10 @@ pub trait RegionInfoProvider: Send + Clone + 'static {
) -> Result<()> {
unimplemented!()
}

fn get_regions_in_range(&self, _start_key: &[u8], _end_key: &[u8]) -> Result<Vec<Region>> {
unimplemented!()
}
}

impl RegionInfoProvider for RegionInfoAccessor {
Expand All @@ -540,6 +581,30 @@ impl RegionInfoProvider for RegionInfoAccessor {
.schedule(msg)
.map_err(|e| box_err!("failed to send request to region collector: {:?}", e))
}

fn get_regions_in_range(&self, start_key: &[u8], end_key: &[u8]) -> Result<Vec<Region>> {
let (tx, rx) = mpsc::channel();
let msg = RegionInfoQuery::GetRegionsInRange {
start_key: start_key.to_vec(),
end_key: end_key.to_vec(),
callback: Box::new(move |regions| {
if let Err(e) = tx.send(regions) {
warn!("failed to send get_regions_in_range result: {:?}", e);
}
}),
};
self.scheduler
.schedule(msg)
.map_err(|e| box_err!("failed to send request to region collector: {:?}", e))
.and_then(|_| {
rx.recv().map_err(|e| {
box_err!(
"failed to receive get_regions_in_range result from region collector: {:?}",
e
)
})
})
}
}

#[cfg(test)]
Expand Down

0 comments on commit 4784818

Please sign in to comment.