Skip to content

Commit

Permalink
Add interfaces to get region local state (tikv#51)
Browse files Browse the repository at this point in the history
Signed-off-by: Zhigao Tong <tongzhigao@pingcap.com>
  • Loading branch information
solotzg committed Feb 24, 2022
1 parent b9d508d commit 1e3f15f
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 91 deletions.
63 changes: 43 additions & 20 deletions components/raftstore/src/engine_store_ffi/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ pub mod root {
use self::super::super::root;
pub type ConstRawVoidPtr = *const ::std::os::raw::c_void;
pub type RawVoidPtr = *mut ::std::os::raw::c_void;
#[repr(C)]
#[derive(Debug)]
pub struct RawCppString {
_unused: [u8; 0],
}
pub type RawCppStringPtr = *mut root::DB::RawCppString;
#[repr(u8)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum ColumnFamilyType {
Lock = 0,
Write = 1,
Default = 2,
}
#[repr(C)]
#[derive(Debug)]
pub struct RawCppString {
_unused: [u8; 0],
}
pub type RawCppStringPtr = *mut root::DB::RawCppString;
#[repr(u8)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum FileEncryptionRes {
Expand Down Expand Up @@ -215,6 +215,20 @@ pub mod root {
),
>,
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum MsgPBType {
ReadIndexResponse = 0,
ServerInfoResponse = 1,
RegionLocalState = 2,
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum KVGetStatus {
Ok = 0,
Error = 1,
NotFound = 2,
}
#[repr(C)]
#[derive(Debug)]
pub struct RaftStoreProxyFFIHelper {
Expand Down Expand Up @@ -263,6 +277,13 @@ pub mod root {
arg2: root::DB::CppStrVecView,
arg3: root::DB::RawVoidPtr,
arg4: u64,
fn_insert_batch_read_index_resp: ::std::option::Option<
unsafe extern "C" fn(
arg1: root::DB::RawVoidPtr,
arg2: root::DB::BaseBuffView,
arg3: u64,
),
>,
),
>,
pub sst_reader_interfaces: root::DB::SSTReaderInterfaces,
Expand Down Expand Up @@ -301,6 +322,14 @@ pub mod root {
pub fn_poll_timer_task: ::std::option::Option<
unsafe extern "C" fn(task: root::DB::RawVoidPtr, waker: root::DB::RawVoidPtr) -> u8,
>,
pub fn_get_region_local_state: ::std::option::Option<
unsafe extern "C" fn(
arg1: root::DB::RaftStoreProxyPtr,
region_id: u64,
data: root::DB::RawVoidPtr,
error_msg: *mut root::DB::RawCppStringPtr,
) -> root::DB::KVGetStatus,
>,
}
#[repr(C)]
#[derive(Debug)]
Expand Down Expand Up @@ -382,19 +411,6 @@ pub mod root {
pub fn_gc_raw_cpp_ptr: ::std::option::Option<
unsafe extern "C" fn(arg1: root::DB::RawVoidPtr, arg2: root::DB::RawCppPtrType),
>,
pub fn_insert_batch_read_index_resp: ::std::option::Option<
unsafe extern "C" fn(
arg1: root::DB::RawVoidPtr,
arg2: root::DB::BaseBuffView,
arg3: u64,
),
>,
pub fn_set_read_index_resp: ::std::option::Option<
unsafe extern "C" fn(arg1: root::DB::RawVoidPtr, arg2: root::DB::BaseBuffView),
>,
pub fn_set_server_info_resp: ::std::option::Option<
unsafe extern "C" fn(arg1: root::DB::BaseBuffView, arg2: root::DB::RawVoidPtr),
>,
pub fn_get_config: ::std::option::Option<
unsafe extern "C" fn(
arg1: *mut root::DB::EngineStoreServerWrap,
Expand All @@ -407,8 +423,15 @@ pub mod root {
arg2: root::DB::BaseBuffView,
),
>,
pub fn_set_pb_msg_by_bytes: ::std::option::Option<
unsafe extern "C" fn(
type_: root::DB::MsgPBType,
ptr: root::DB::RawVoidPtr,
buff: root::DB::BaseBuffView,
),
>,
}
pub const RAFT_STORE_PROXY_VERSION: u64 = 7432805842186575727;
pub const RAFT_STORE_PROXY_VERSION: u64 = 1236987175086361028;
pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639;
}
}
171 changes: 135 additions & 36 deletions components/raftstore/src/engine_store_ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use encryption::DataKeyManager;
use engine_rocks::encryption::get_env;
use engine_rocks::{RocksSstIterator, RocksSstReader};
use engine_traits::{
EncryptionKeyManager, EncryptionMethod, FileEncryptionInfo, Iterator, SeekKey, SstReader,
CF_DEFAULT, CF_LOCK, CF_WRITE,
EncryptionKeyManager, EncryptionMethod, FileEncryptionInfo, Iterator, Peekable, SeekKey,
SstReader, CF_DEFAULT, CF_LOCK, CF_WRITE,
};
use kvproto::{kvrpcpb, metapb, raft_cmdpb};
use protobuf::Message;
Expand All @@ -21,12 +21,12 @@ pub use read_index_helper::ReadIndexClient;
pub use crate::engine_store_ffi::interfaces::root::DB::{
BaseBuffView, ColumnFamilyType, CppStrVecView, EngineStoreApplyRes, EngineStoreServerHelper,
EngineStoreServerStatus, FileEncryptionRes, FsStats, HttpRequestRes, HttpRequestStatus,
RaftCmdHeader, RaftProxyStatus, RaftStoreProxyFFIHelper, RawCppPtr, RawVoidPtr, SSTReaderPtr,
StoreStats, WriteCmdType, WriteCmdsView,
KVGetStatus, RaftCmdHeader, RaftProxyStatus, RaftStoreProxyFFIHelper, RawCppPtr,
RawCppStringPtr, RawVoidPtr, SSTReaderPtr, StoreStats, WriteCmdType, WriteCmdsView,
};
use crate::engine_store_ffi::interfaces::root::DB::{
ConstRawVoidPtr, FileEncryptionInfoRaw, RaftStoreProxyPtr, RawCppPtrType, RawCppStringPtr,
RawRustPtr, SSTReaderInterfaces, SSTView, SSTViewVec, RAFT_STORE_PROXY_MAGIC_NUMBER,
ConstRawVoidPtr, FileEncryptionInfoRaw, RaftStoreProxyPtr, RawCppPtrType, RawRustPtr,
SSTReaderInterfaces, SSTView, SSTViewVec, RAFT_STORE_PROXY_MAGIC_NUMBER,
RAFT_STORE_PROXY_VERSION,
};
use crate::store::LockCFFileReader;
Expand Down Expand Up @@ -54,15 +54,70 @@ impl<T> UnwrapExternCFunc<T> for std::option::Option<T> {
}

pub struct RaftStoreProxy {
pub status: AtomicU8,
pub key_manager: Option<Arc<DataKeyManager>>,
pub read_index_client: Box<dyn read_index_helper::ReadIndex>,
status: AtomicU8,
key_manager: Option<Arc<DataKeyManager>>,
read_index_client: Box<dyn read_index_helper::ReadIndex>,
kv_engine: std::sync::RwLock<Option<engine_rocks::RocksEngine>>,
}

pub trait RaftStoreProxyFFI: Sync {
fn set_status(&mut self, s: RaftProxyStatus);
fn get_value_cf<F>(&self, cf: &str, key: &[u8], cb: F)
where
F: FnOnce(Result<Option<&[u8]>, String>);
fn set_kv_engine(&mut self, kv_engine: Option<engine_rocks::RocksEngine>);
}

impl RaftStoreProxy {
pub fn set_status(&mut self, s: RaftProxyStatus) {
pub fn new(
status: AtomicU8,
key_manager: Option<Arc<DataKeyManager>>,
read_index_client: Box<dyn read_index_helper::ReadIndex>,
kv_engine: std::sync::RwLock<Option<engine_rocks::RocksEngine>>,
) -> Self {
RaftStoreProxy {
status,
key_manager,
read_index_client,
kv_engine,
}
}
}

impl RaftStoreProxyFFI for RaftStoreProxy {
fn set_kv_engine(&mut self, kv_engine: Option<engine_rocks::RocksEngine>) {
let mut lock = self.kv_engine.write().unwrap();
*lock = kv_engine;
}

fn set_status(&mut self, s: RaftProxyStatus) {
self.status.store(s as u8, Ordering::SeqCst);
}

fn get_value_cf<F>(&self, cf: &str, key: &[u8], cb: F)
where
F: FnOnce(Result<Option<&[u8]>, String>),
{
let kv_engine_lock = self.kv_engine.read().unwrap();
let kv_engine = kv_engine_lock.as_ref();
if kv_engine.is_none() {
cb(Err(format!("KV engine is not initialized")));
return;
}
let value = kv_engine.unwrap().get_value_cf(cf, key);
match value {
Ok(v) => {
if let Some(x) = v {
cb(Ok(Some(&x)));
} else {
cb(Ok(None));
}
}
Err(e) => {
cb(Err(format!("{}", e)));
}
}
}
}

impl RaftStoreProxyPtr {
Expand All @@ -82,6 +137,43 @@ impl From<&RaftStoreProxy> for RaftStoreProxyPtr {
}
}

unsafe extern "C" fn ffi_get_region_local_state(
proxy_ptr: RaftStoreProxyPtr,
region_id: u64,
data: RawVoidPtr,
error_msg: *mut RawCppStringPtr,
) -> KVGetStatus {
assert!(!proxy_ptr.is_null());

let region_state_key = keys::region_state_key(region_id);
let mut res = KVGetStatus::NotFound;
proxy_ptr
.as_ref()
.get_value_cf(engine_traits::CF_RAFT, &region_state_key, |value| {
match value {
Ok(v) => {
if let Some(buff) = v {
get_engine_store_server_helper().set_pb_msg_by_bytes(
interfaces::root::DB::MsgPBType::RegionLocalState,
data,
buff.into(),
);
res = KVGetStatus::Ok;
} else {
res = KVGetStatus::NotFound;
}
}
Err(e) => {
let msg = get_engine_store_server_helper().gen_cpp_string(e.as_ref());
*error_msg = msg;
res = KVGetStatus::Error;
}
};
});

return res;
}

pub extern "C" fn ffi_handle_get_proxy_status(proxy_ptr: RaftStoreProxyPtr) -> RaftProxyStatus {
unsafe {
let r = proxy_ptr.as_ref().status.load(Ordering::SeqCst);
Expand Down Expand Up @@ -111,8 +203,10 @@ pub extern "C" fn ffi_batch_read_index(
view: CppStrVecView,
res: RawVoidPtr,
timeout_ms: u64,
fn_insert_batch_read_index_resp: Option<unsafe extern "C" fn(RawVoidPtr, BaseBuffView, u64)>,
) {
assert!(!proxy_ptr.is_null());
debug_assert!(fn_insert_batch_read_index_resp.is_some());
if view.len != 0 {
assert_ne!(view.view, std::ptr::null());
}
Expand All @@ -132,7 +226,8 @@ pub extern "C" fn ffi_batch_read_index(
.batch_read_index(req_vec, time::Duration::from_millis(timeout_ms));
assert_ne!(res, std::ptr::null_mut());
for (r, region_id) in &resp {
get_engine_store_server_helper().insert_batch_read_index_resp(res, r, *region_id);
let r = ProtoMsgBaseBuff::new(r);
(fn_insert_batch_read_index_resp.into_inner())(res, Pin::new(&r).into(), *region_id)
}
}
}
Expand Down Expand Up @@ -516,6 +611,7 @@ impl RaftStoreProxyFFIHelper {
fn_gc_rust_ptr: Some(ffi_gc_rust_ptr),
fn_make_timer_task: Some(ffi_make_timer_task),
fn_poll_timer_task: Some(ffi_poll_timer_task),
fn_get_region_local_state: Some(ffi_get_region_local_state),
}
}
}
Expand Down Expand Up @@ -721,7 +817,7 @@ impl From<Pin<&Vec<SSTView>>> for SSTViewVec {

unsafe impl Sync for EngineStoreServerHelper {}

pub fn set_server_info_resp(res: BaseBuffView, ptr: RawVoidPtr) {
pub fn set_server_info_resp(res: &kvproto::diagnosticspb::ServerInfoResponse, ptr: RawVoidPtr) {
get_engine_store_server_helper().set_server_info_resp(res, ptr)
}

Expand Down Expand Up @@ -859,27 +955,13 @@ impl EngineStoreServerHelper {
unsafe { (self.fn_gen_cpp_string.into_inner())(buff.into()).into_raw() as RawCppStringPtr }
}

fn insert_batch_read_index_resp(
&self,
data: RawVoidPtr,
r: &kvrpcpb::ReadIndexResponse,
region_id: u64,
) {
debug_assert!(self.fn_insert_batch_read_index_resp.is_some());
let r = ProtoMsgBaseBuff::new(r);
unsafe {
(self.fn_insert_batch_read_index_resp.into_inner())(
data,
Pin::new(&r).into(),
region_id,
)
}
}

fn set_read_index_resp(&self, data: RawVoidPtr, r: &kvrpcpb::ReadIndexResponse) {
debug_assert!(self.fn_set_read_index_resp.is_some());
let r = ProtoMsgBaseBuff::new(r);
unsafe { (self.fn_set_read_index_resp.into_inner())(data, Pin::new(&r).into()) }
fn set_read_index_resp(&self, ptr: RawVoidPtr, r: &kvrpcpb::ReadIndexResponse) {
let buff = ProtoMsgBaseBuff::new(r);
self.set_pb_msg_by_bytes(
interfaces::root::DB::MsgPBType::ReadIndexResponse,
ptr,
Pin::new(&buff).into(),
)
}

pub fn handle_http_request(
Expand Down Expand Up @@ -914,10 +996,27 @@ impl EngineStoreServerHelper {
unsafe { (self.fn_check_http_uri_available.into_inner())(path.as_bytes().into()) != 0 }
}

pub fn set_server_info_resp(&self, res: BaseBuffView, ptr: RawVoidPtr) {
debug_assert!(self.fn_set_server_info_resp.is_some());
fn set_pb_msg_by_bytes(
&self,
type_: interfaces::root::DB::MsgPBType,
ptr: RawVoidPtr,
buff: BaseBuffView,
) {
debug_assert!(self.fn_set_pb_msg_by_bytes.is_some());
unsafe { (self.fn_set_pb_msg_by_bytes.into_inner())(type_, ptr, buff) }
}

unsafe { (self.fn_set_server_info_resp.into_inner())(res, ptr) }
pub fn set_server_info_resp(
&self,
res: &kvproto::diagnosticspb::ServerInfoResponse,
ptr: RawVoidPtr,
) {
let buff = ProtoMsgBaseBuff::new(res);
self.set_pb_msg_by_bytes(
interfaces::root::DB::MsgPBType::ServerInfoResponse,
ptr,
Pin::new(&buff).into(),
)
}

pub fn get_config(&self, full: bool) -> Vec<u8> {
Expand Down
Loading

0 comments on commit 1e3f15f

Please sign in to comment.