Skip to content

Commit

Permalink
Add interfaces to get region local state (tikv#51)
Browse files Browse the repository at this point in the history
Signed-off-by: Zhigao Tong <tongzhigao@pingcap.com>

Conflicts:
	components/raftstore/src/engine_store_ffi/interfaces.rs
	raftstore-proxy/ffi/src/RaftStoreProxyFFI/@Version
	raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h

Conflicts:
	components/raftstore/src/engine_store_ffi/interfaces.rs
	components/raftstore/src/engine_store_ffi/mod.rs
	components/server/src/server.rs
	components/server/src/util.rs
	raftstore-proxy/ffi/src/RaftStoreProxyFFI/@Version
	raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h
  • Loading branch information
solotzg committed Apr 8, 2022
1 parent c5a9813 commit 463976a
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 314 deletions.
407 changes: 134 additions & 273 deletions components/raftstore/src/engine_store_ffi/interfaces.rs

Large diffs are not rendered by default.

145 changes: 119 additions & 26 deletions components/raftstore/src/engine_store_ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use encryption::DataKeyManager;
use engine_rocks::get_env;
use engine_rocks::{RocksSstIterator, RocksSstReader};
use engine_traits::{
EncryptionKeyManager, EncryptionMethod, FileEncryptionInfo, Iterator, SeekKey, SstReader,
CF_DEFAULT, CF_LOCK, CF_WRITE,
EncryptionKeyManager, EncryptionMethod, FileEncryptionInfo, Iterator, Peekable, SeekKey,
SstReader, CF_DEFAULT, CF_LOCK, CF_WRITE,
};
use kvproto::{kvrpcpb, metapb, raft_cmdpb};
use protobuf::Message;
Expand All @@ -25,8 +25,8 @@ pub use crate::engine_store_ffi::interfaces::root::DB::{
WriteCmdType, WriteCmdsView,
};
use crate::engine_store_ffi::interfaces::root::DB::{
ConstRawVoidPtr, FileEncryptionInfoRaw, RaftStoreProxyPtr, RawCppPtrType, RawCppStringPtr,
SSTReaderInterfaces, SSTView, SSTViewVec, RAFT_STORE_PROXY_MAGIC_NUMBER,
ConstRawVoidPtr, FileEncryptionInfoRaw, KVGetStatus, RaftStoreProxyPtr, RawCppPtrType,
RawCppStringPtr, SSTReaderInterfaces, SSTView, SSTViewVec, RAFT_STORE_PROXY_MAGIC_NUMBER,
RAFT_STORE_PROXY_VERSION,
};
use crate::store::LockCFFileReader;
Expand All @@ -53,15 +53,70 @@ impl<T> UnwrapExternCFunc<T> for std::option::Option<T> {
}

pub struct RaftStoreProxy {
pub status: AtomicU8,
pub key_manager: Option<Arc<DataKeyManager>>,
pub read_index_client: Box<dyn read_index_helper::ReadIndex>,
status: AtomicU8,
key_manager: Option<Arc<DataKeyManager>>,
read_index_client: Box<dyn read_index_helper::ReadIndex>,
kv_engine: std::sync::RwLock<Option<engine_rocks::RocksEngine>>,
}

pub trait RaftStoreProxyFFI: Sync {
fn set_status(&mut self, s: RaftProxyStatus);
fn get_value_cf<F>(&self, cf: &str, key: &[u8], cb: F)
where
F: FnOnce(Result<Option<&[u8]>, String>);
fn set_kv_engine(&mut self, kv_engine: Option<engine_rocks::RocksEngine>);
}

impl RaftStoreProxy {
pub fn set_status(&mut self, s: RaftProxyStatus) {
pub fn new(
status: AtomicU8,
key_manager: Option<Arc<DataKeyManager>>,
read_index_client: Box<dyn read_index_helper::ReadIndex>,
kv_engine: std::sync::RwLock<Option<engine_rocks::RocksEngine>>,
) -> Self {
RaftStoreProxy {
status,
key_manager,
read_index_client,
kv_engine,
}
}
}

impl RaftStoreProxyFFI for RaftStoreProxy {
fn set_kv_engine(&mut self, kv_engine: Option<engine_rocks::RocksEngine>) {
let mut lock = self.kv_engine.write().unwrap();
*lock = kv_engine;
}

fn set_status(&mut self, s: RaftProxyStatus) {
self.status.store(s as u8, Ordering::SeqCst);
}

fn get_value_cf<F>(&self, cf: &str, key: &[u8], cb: F)
where
F: FnOnce(Result<Option<&[u8]>, String>),
{
let kv_engine_lock = self.kv_engine.read().unwrap();
let kv_engine = kv_engine_lock.as_ref();
if kv_engine.is_none() {
cb(Err(format!("KV engine is not initialized")));
return;
}
let value = kv_engine.unwrap().get_value_cf(cf, key);
match value {
Ok(v) => {
if let Some(x) = v {
cb(Ok(Some(&x)));
} else {
cb(Ok(None));
}
}
Err(e) => {
cb(Err(format!("{}", e)));
}
}
}
}

impl RaftStoreProxyPtr {
Expand All @@ -81,7 +136,43 @@ impl From<&RaftStoreProxy> for RaftStoreProxyPtr {
}
}

#[no_mangle]
unsafe extern "C" fn ffi_get_region_local_state(
proxy_ptr: RaftStoreProxyPtr,
region_id: u64,
data: RawVoidPtr,
error_msg: *mut RawCppStringPtr,
) -> KVGetStatus {
assert!(!proxy_ptr.is_null());

let region_state_key = keys::region_state_key(region_id);
let mut res = KVGetStatus::NotFound;
proxy_ptr
.as_ref()
.get_value_cf(engine_traits::CF_RAFT, &region_state_key, |value| {
match value {
Ok(v) => {
if let Some(buff) = v {
get_engine_store_server_helper().set_pb_msg_by_bytes(
interfaces::root::DB::MsgPBType::RegionLocalState,
data,
buff.into(),
);
res = KVGetStatus::Ok;
} else {
res = KVGetStatus::NotFound;
}
}
Err(e) => {
let msg = get_engine_store_server_helper().gen_cpp_string(e.as_ref());
*error_msg = msg;
res = KVGetStatus::Error;
}
};
});

return res;
}

pub extern "C" fn ffi_handle_get_proxy_status(proxy_ptr: RaftStoreProxyPtr) -> RaftProxyStatus {
unsafe {
let r = proxy_ptr.as_ref().status.load(Ordering::SeqCst);
Expand Down Expand Up @@ -112,17 +203,20 @@ pub extern "C" fn ffi_encryption_method(
pub extern "C" fn ffi_batch_read_index(
proxy_ptr: RaftStoreProxyPtr,
view: CppStrVecView,
res: RawVoidPtr,
timeout_ms: u64,
) -> RawVoidPtr {
fn_insert_batch_read_index_resp: Option<unsafe extern "C" fn(RawVoidPtr, BaseBuffView, u64)>,
) {
assert!(!proxy_ptr.is_null());
debug_assert!(fn_insert_batch_read_index_resp.is_some());
if view.len != 0 {
assert_ne!(view.view, std::ptr::null());
}
unsafe {
let mut req_vec = Vec::with_capacity(view.len as usize);
for i in 0..view.len as usize {
let mut req = kvrpcpb::ReadIndexRequest::default();
let p = &(*view.view.offset(i as isize));
let p = &(*view.view.add(i));
assert_ne!(p.data, std::ptr::null());
assert_ne!(p.len, 0);
req.merge_from_bytes(p.to_slice()).unwrap();
Expand All @@ -132,17 +226,11 @@ pub extern "C" fn ffi_batch_read_index(
.as_ref()
.read_index_client
.batch_read_index(req_vec, Duration::from_millis(timeout_ms));
let res = get_engine_store_server_helper().gen_batch_read_index_res(resp.len() as u64);
assert_ne!(res, std::ptr::null_mut());
for (r, region_id) in &resp {
let r = ProtoMsgBaseBuff::new(r);
get_engine_store_server_helper().insert_batch_read_index_resp(
res,
r.borrow().into(),
*region_id,
);
(fn_insert_batch_read_index_resp.into_inner())(res, r.borrow().into(), *region_id)
}
res
}
}

Expand Down Expand Up @@ -384,6 +472,7 @@ impl RaftStoreProxyFFIHelper {
fn_next: Some(ffi_next),
fn_gc: Some(ffi_gc),
},
fn_get_region_local_state: Some(ffi_get_region_local_state),
}
}
}
Expand Down Expand Up @@ -570,10 +659,12 @@ impl From<&Vec<SSTView>> for SSTViewVec {
}
}

unsafe impl Sync for EngineStoreServerHelper {}

impl EngineStoreServerHelper {
fn gc_raw_cpp_ptr(&self, ptr: *mut ::std::os::raw::c_void, tp: RawCppPtrType) {
unsafe {
(self.fn_gc_raw_cpp_ptr.into_inner())(self.inner, ptr, tp);
(self.fn_gc_raw_cpp_ptr.into_inner())(ptr, tp);
}
}

Expand Down Expand Up @@ -689,17 +780,19 @@ impl EngineStoreServerHelper {
pub fn handle_check_terminated(&self) -> bool {
unsafe { (self.fn_handle_check_terminated.into_inner())(self.inner) != 0 }
}

fn gen_cpp_string(&self, buff: &[u8]) -> RawCppStringPtr {
debug_assert!(self.fn_gen_cpp_string.is_some());
unsafe { (self.fn_gen_cpp_string.into_inner())(buff.into()).into_raw() as RawCppStringPtr }
}

fn gen_batch_read_index_res(&self, cap: u64) -> RawVoidPtr {
unsafe { (self.fn_gen_batch_read_index_res.into_inner())(cap) }
}

fn insert_batch_read_index_resp(&self, data: RawVoidPtr, buf: BaseBuffView, region_id: u64) {
unsafe { (self.fn_insert_batch_read_index_resp.into_inner())(data, buf, region_id) }
fn set_pb_msg_by_bytes(
&self,
type_: interfaces::root::DB::MsgPBType,
ptr: RawVoidPtr,
buff: BaseBuffView,
) {
debug_assert!(self.fn_set_pb_msg_by_bytes.is_some());
unsafe { (self.fn_set_pb_msg_by_bytes.into_inner())(type_, ptr, buff) }
}

pub fn handle_http_request(&self, path: &str) -> HttpRequestRes {
Expand Down
16 changes: 10 additions & 6 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use crate::raft_engine_switch::{check_and_dump_raft_db, check_and_dump_raft_engi
use crate::{memory::*, setup::*};
use raftstore::engine_store_ffi::{
get_engine_store_server_helper, EngineStoreServerStatus, RaftProxyStatus, RaftStoreProxy,
RaftStoreProxyFFIHelper, ReadIndexClient,
RaftStoreProxyFFI, RaftStoreProxyFFIHelper, ReadIndexClient,
};
use std::sync::atomic::{AtomicBool, AtomicU8};

Expand Down Expand Up @@ -139,14 +139,15 @@ pub unsafe fn run_tikv(config: TiKvConfig) {
tikv.init_fs();
tikv.init_yatp();
tikv.init_encryption();
let mut proxy = RaftStoreProxy {
status: AtomicU8::new(RaftProxyStatus::Idle as u8),
key_manager: tikv.encryption_key_manager.clone(),
read_index_client: Box::new(ReadIndexClient::new(
let mut proxy = RaftStoreProxy::new(
AtomicU8::new(RaftProxyStatus::Idle as u8),
tikv.encryption_key_manager.clone(),
Box::new(ReadIndexClient::new(
tikv.router.clone(),
SysQuota::cpu_cores_quota() as usize * 2,
)),
};
std::sync::RwLock::new(None),
);

let proxy_helper = RaftStoreProxyFFIHelper::new(&proxy);

Expand All @@ -173,6 +174,9 @@ pub unsafe fn run_tikv(config: TiKvConfig) {
let (engines, engines_info) = tikv.init_raw_engines(Some(limiter.clone()));
limiter.set_low_priority_io_adjustor_if_needed(Some(engines_info.clone()));
tikv.init_engines(engines.clone());
{
proxy.set_kv_engine(Some(engines.kv.clone()));
}
let server_config = tikv.init_servers();
tikv.register_services();
tikv.init_metrics_flusher(fetcher, engines_info);
Expand Down
4 changes: 2 additions & 2 deletions raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//ef3c0a05cdfd04ac7f95eac4a3be74d2//501000//
//acb3bd3f66d23dc9275b051d2ebb5a00//501001//
#pragma once
#include <cstdint>
namespace DB { constexpr uint32_t RAFT_STORE_PROXY_VERSION = 501000; }
namespace DB { constexpr uint32_t RAFT_STORE_PROXY_VERSION = 501001; }
4 changes: 4 additions & 0 deletions raftstore-proxy/ffi/src/RaftStoreProxyFFI/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@
namespace DB {
using ConstRawVoidPtr = const void *;
using RawVoidPtr = void *;

struct RawCppString;
using RawCppStringPtr = RawCppString *;

} // namespace DB
2 changes: 0 additions & 2 deletions raftstore-proxy/ffi/src/RaftStoreProxyFFI/EncryptionFFI.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
#include "Common.h"

namespace DB {
struct RawCppString;
using RawCppStringPtr = RawCppString *;
enum class FileEncryptionRes : uint8_t {
Disabled = 0,
Ok,
Expand Down
27 changes: 22 additions & 5 deletions raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ struct SSTReaderInterfaces {
void (*fn_gc)(SSTReaderPtr, ColumnFamilyType);
};

enum class MsgPBType : uint32_t {
ReadIndexResponse = 0,
ServerInfoResponse,
RegionLocalState,
};

enum class KVGetStatus : uint32_t {
Ok = 0,
Error,
NotFound,
};

struct RaftStoreProxyFFIHelper {
RaftStoreProxyPtr proxy_ptr;
RaftProxyStatus (*fn_handle_get_proxy_status)(RaftStoreProxyPtr);
Expand All @@ -133,9 +145,14 @@ struct RaftStoreProxyFFIHelper {
BaseBuffView);
FileEncryptionInfoRaw (*fn_handle_link_file)(RaftStoreProxyPtr, BaseBuffView,
BaseBuffView);
RawVoidPtr (*fn_handle_batch_read_index)(RaftStoreProxyPtr, CppStrVecView,
uint64_t);
void (*fn_handle_batch_read_index)(
RaftStoreProxyPtr, CppStrVecView, RawVoidPtr, uint64_t,
void (*fn_insert_batch_read_index_resp)(RawVoidPtr, BaseBuffView,
uint64_t)); // To remove
SSTReaderInterfaces sst_reader_interfaces;
KVGetStatus (*fn_get_region_local_state)(RaftStoreProxyPtr,
uint64_t region_id, RawVoidPtr data,
RawCppStringPtr *error_msg);
};

struct EngineStoreServerHelper {
Expand Down Expand Up @@ -167,8 +184,8 @@ struct EngineStoreServerHelper {
HttpRequestRes (*fn_handle_http_request)(EngineStoreServerWrap *,
BaseBuffView);
uint8_t (*fn_check_http_uri_available)(BaseBuffView);
void (*fn_gc_raw_cpp_ptr)(EngineStoreServerWrap *, RawVoidPtr, RawCppPtrType);
RawVoidPtr (*fn_gen_batch_read_index_res)(uint64_t);
void (*fn_insert_batch_read_index_resp)(RawVoidPtr, BaseBuffView, uint64_t);
void (*fn_gc_raw_cpp_ptr)(RawVoidPtr, RawCppPtrType);
void (*fn_set_pb_msg_by_bytes)(MsgPBType type, RawVoidPtr ptr,
BaseBuffView buff);
};
} // namespace DB

0 comments on commit 463976a

Please sign in to comment.