From 7c495c71c945f611117595e10b20b0fcbdff11d4 Mon Sep 17 00:00:00 2001 From: ActivePeter <1020401660@qq.com> Date: Mon, 19 Feb 2024 05:30:06 -0800 Subject: [PATCH] feat(kv): batch operation & lock --- .github/workflows/ci.yml | 1 + .vscode/tasks.json | 17 + apps/_wasm_serverless_lib/src/lib.rs | 181 +++++++-- apps/longchain/Cargo.lock | 22 +- apps/longchain/Cargo.toml | 2 +- apps/longchain/app.yaml | 6 +- apps/longchain/src/lib.rs | 104 +++--- apps/word_count/src/lib.rs | 14 +- scripts/build/ans_build_demo_apps.yml | 1 + scripts/deploy_single_node/ans_build.yml | 2 +- scripts/deploy_single_node/run_node.sh | 1 + scripts/http_test.py | 24 +- scripts/install/run_ans_install_build.sh | 2 +- src/_back/storage/kv/local_kv/local_kv.rs | 20 +- .../storage/kv/local_kv/local_kv_kernel.rs | 2 +- .../storage/kv/local_kv/local_kv_sled.rs | 8 +- .../kv/raft_kv/async_raft_kernel/mod.rs | 6 +- .../kv/raft_kv/async_raft_kernel/network.rs | 12 +- .../kv/raft_kv/async_raft_kernel/storage.rs | 6 +- src/_back/storage/kv/raft_kv/mod.rs | 24 +- .../storage/kv/raft_kv/tikvraft_kernel/mod.rs | 22 +- src/config.rs | 3 + src/general/kv_interface.rs | 48 +-- src/general/network/msg_pack.rs | 53 ++- src/general/network/p2p.rs | 63 ++-- src/general/network/p2p_quic.rs | 1 + src/general/network/proto_src/kv.proto | 59 ++- src/general/network/proto_src/sche.proto | 2 +- src/master/http_handler.rs | 2 +- src/master/master_kv.rs | 189 ++++++++++ src/master/mod.rs | 1 + src/sys.rs | 50 ++- src/worker/function_event/kv_event.rs | 286 +++++++------- src/worker/kv_storage.rs | 4 + src/worker/kv_user_client.rs | 162 ++++---- src/worker/mod.rs | 1 + src/worker/wasm_host_funcs/kv.rs | 350 ++++++++++++++---- src/worker/wasm_host_funcs/mod.rs | 12 +- ws_derive/src/test.rs | 4 +- 39 files changed, 1227 insertions(+), 540 deletions(-) create mode 100644 .vscode/tasks.json create mode 100644 src/master/master_kv.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f8588bb..790ab3c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,6 +26,7 @@ jobs: - name: Build WasmEdge Image run: bash scripts/docker/WasmEdge/build_image.sh + - name: Build Waverless Image run: bash scripts/docker/Waverless/build_image.sh diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..891156a --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,17 @@ +{ + // See https://go.microsoft.com/fwlink/?LinkId=733558 + // for the documentation about the tasks.json format + "version": "2.0.0", + "tasks": [ + { + "label": "cargo build", + "type": "shell", + "command": "cargo build --release" + }, + { + "label": "build all", + "type": "shell", + "command": "bash scripts/deploy_single_node/run_ans_build.sh" + }, + ] +} \ No newline at end of file diff --git a/apps/_wasm_serverless_lib/src/lib.rs b/apps/_wasm_serverless_lib/src/lib.rs index 76da246..ab3d529 100644 --- a/apps/_wasm_serverless_lib/src/lib.rs +++ b/apps/_wasm_serverless_lib/src/lib.rs @@ -7,40 +7,177 @@ use std::vec::Vec; // use wasmedge_bindgen_macro::*; extern "C" { - fn kv_set(kptr: *const u8, klen: i32, v: *const u8, vlen: i32); - fn kv_get_len(kptr: *const u8, klen: i32, vlen: &mut i32, id: &mut i32); - fn kv_get(id: i32, vptr: *const u8); + // fn kv_set(kptr: *const u8, klen: i32, v: *const u8, vlen: i32); + // fn kv_get_len(kptr: *const u8, klen: i32, vlen: &mut i32, id: &mut i32); + // fn kv_get(id: i32, vptr: *const u8); + fn kv_batch_ope(ope_ptr: *const i32, ope_len: i32, ope_id: &mut i32); + fn kv_batch_res(ope_id: i32, args_ptr: *const i32, args_len: i32); fn open_file(fname: *const u8, fnamelen: i32, fd: &mut i32); fn read_file_at(fd: i32, buf: *const u8, buflen: i32, offset: i32, readlen: &mut i32); } -pub fn kv_set_wrapper(key: &[u8], value: &[u8]) { - unsafe { - kv_set( - key.as_ptr(), - key.len() as i32, - value.as_ptr(), - value.len() as i32, - ) - }; +// pub enum KvOpe { +// Set(&[u8], &[u8]), +// Get(&[u8]), +// Delete(&[u8]), +// Lock(&[u8]), +// Unlock(u32), +// } + +const SET_ID: i32 = 1; +const GET_ID: i32 = 2; +const LOCK_ID: i32 = 3; +const DELETE_ID: i32 = 4; + +pub enum KvResult { + Set, + GetLen(i32), + Get(Option>), + Delete, + Lock(u32), + Unlock, +} + +impl KvResult { + fn one_ptr(&self) -> Option { + match self { + KvResult::Set => None, + KvResult::GetLen(len) => Some(len as *const i32 as i32), + KvResult::Get(vec) => vec.as_ref().map(|v| v.as_ptr() as i32), + KvResult::Delete => None, + KvResult::Lock(lockid) => Some(lockid as *const u32 as i32), + KvResult::Unlock => None, + } + } } -pub fn kv_get_wrapper(key: &[u8]) -> Vec { - unsafe { - let mut veclen: i32 = 0; - let mut id: i32 = 0; - kv_get_len(key.as_ptr(), key.len() as i32, &mut veclen, &mut id); +pub struct KvBatch { + batch_args: Vec, + results: Vec, +} - let mut vec = Vec::new(); - if veclen > 0 { - vec.resize(veclen as usize, 0); - kv_get(id, vec.as_ptr()); +impl KvBatch { + pub fn new() -> Self { + Self { + batch_args: vec![0], + results: Vec::new(), } + } + pub fn reset(mut self) -> Self { + self.batch_args.clear(); + self.batch_args.push(0); + self + } + pub fn then_set(mut self, key: &[u8], value: &[u8]) -> Self { + self.batch_args.push(SET_ID as i32); + self.batch_args.push(key.as_ptr() as i32); + self.batch_args.push(key.len() as i32); + self.batch_args.push(value.as_ptr() as i32); + self.batch_args.push(value.len() as i32); + self.results.push(KvResult::Set); + self + } + pub fn then_get(mut self, key: &[u8]) -> Self { + self.batch_args.push(GET_ID as i32); + self.batch_args.push(key.as_ptr() as i32); + self.batch_args.push(key.len() as i32); + self.results.push(KvResult::GetLen(0)); + self.batch_args + .push(self.results.iter().rev().next().unwrap().one_ptr().unwrap()); + + self + } + pub fn then_delete(mut self, key: &[u8]) -> Self { + self.batch_args.push(DELETE_ID as i32); + self.batch_args.push(key.as_ptr() as i32); + self.batch_args.push(key.len() as i32); + self.results.push(KvResult::Delete); - vec + self + } + pub fn then_lock(mut self, key: &[u8]) -> Self { + self.batch_args.push(LOCK_ID as i32); + self.batch_args.push(key.as_ptr() as i32); + self.batch_args.push(key.len() as i32); + self.batch_args.push(-1); + self.results.push(KvResult::Lock(0)); + self.batch_args + .push(self.results.iter().rev().next().unwrap().one_ptr().unwrap()); + + self + } + pub fn then_unlock(mut self, key: &[u8], id: u32) -> Self { + self.batch_args.push(LOCK_ID as i32); + self.batch_args.push(key.as_ptr() as i32); + self.batch_args.push(key.len() as i32); + self.batch_args.push(id as i32); + self.results.push(KvResult::Unlock); + + self + } + pub fn finally_call(mut self) -> Vec { + self.batch_args[0] = self.results.len() as i32; + println!("batch args: {:?}", self.batch_args); + let mut id = 0; + unsafe { + kv_batch_ope( + self.batch_args.as_ptr(), + self.batch_args.len() as i32, + &mut id, + ) + }; + self.batch_args.clear(); + for (ope_idx, res) in self.results.iter_mut().enumerate() { + let mut is_get_len = None; + match res { + KvResult::GetLen(len) => { + is_get_len = Some(*len); + } + _ => {} + } + if let Some(len) = is_get_len { + if len >= 0 { + let vec = vec![0; len as usize]; + *res = KvResult::Get(Some(vec)); + self.batch_args.push(ope_idx as i32); + self.batch_args.push(res.one_ptr().unwrap()); + } else { + *res = KvResult::Get(None); + } + } + } + unsafe { kv_batch_res(id, self.batch_args.as_ptr(), self.batch_args.len() as i32) }; + self.results } } +// pub fn kv_set_wrapper(key: &[u8], value: &[u8]) { +// unsafe { +// kv_set( +// key.as_ptr(), +// key.len() as i32, +// value.as_ptr(), +// value.len() as i32, +// ) +// }; +// } + +// pub fn kv_get_wrapper(key: &[u8]) -> Vec { +// unsafe { +// let mut veclen: i32 = 0; +// let mut id: i32 = 0; +// kv_get_len(key.as_ptr(), key.len() as i32, &mut veclen, &mut id); + +// let mut vec = Vec::new(); +// if veclen > 0 { +// vec.resize(veclen as usize, 0); +// kv_get(id, vec.as_ptr()); +// } + +// vec +// } +// } + pub struct HostFile { fd: i32, } diff --git a/apps/longchain/Cargo.lock b/apps/longchain/Cargo.lock index 2307fd0..08d410c 100644 --- a/apps/longchain/Cargo.lock +++ b/apps/longchain/Cargo.lock @@ -20,6 +20,17 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "longchain" +version = "0.1.0" +dependencies = [ + "wasm-bindgen", + "wasm_serverless_lib", + "wasmedge-bindgen", + "wasmedge-bindgen-macro", + "wasmedge-wasi-helper", +] + [[package]] name = "once_cell" version = "1.19.0" @@ -152,14 +163,3 @@ name = "wasmedge-wasi-helper" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd351a86b66f511cbeab0cb848cc9dbd06b4165689c2f992a74500fb809b19dd" - -[[package]] -name = "word_count" -version = "0.1.0" -dependencies = [ - "wasm-bindgen", - "wasm_serverless_lib", - "wasmedge-bindgen", - "wasmedge-bindgen-macro", - "wasmedge-wasi-helper", -] diff --git a/apps/longchain/Cargo.toml b/apps/longchain/Cargo.toml index 1831e40..3830789 100644 --- a/apps/longchain/Cargo.toml +++ b/apps/longchain/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "word_count" +name = "longchain" version = "0.1.0" edition = "2021" diff --git a/apps/longchain/app.yaml b/apps/longchain/app.yaml index 0d6777d..dd62477 100644 --- a/apps/longchain/app.yaml +++ b/apps/longchain/app.yaml @@ -5,7 +5,8 @@ fns: args: # - http_text: kvs: - chain_key_{}: [set] + # chain_lock: [lock] + chain_count: [set] chain_loop: # 函数输入参数为触发事件关联数据,比如http就是json(未适配),kv就是key @@ -15,4 +16,5 @@ fns: - kv_key: 0 # 用于表征数据消费关系,决策时直接将数据存到目标执行位置 kvs: - chain_key_{}: [set,get,delete] \ No newline at end of file + # chain_lock: [unlock] + chain_count: [set,get,delete] \ No newline at end of file diff --git a/apps/longchain/src/lib.rs b/apps/longchain/src/lib.rs index e877c76..0b879ae 100644 --- a/apps/longchain/src/lib.rs +++ b/apps/longchain/src/lib.rs @@ -2,6 +2,7 @@ use std::fs::File; use std::io::BufReader; use std::io::Read; use std::mem::ManuallyDrop; +use std::str::from_utf8; use wasm_serverless_lib::*; #[allow(unused_imports)] use wasmedge_bindgen::*; @@ -53,63 +54,58 @@ use wasmedge_wasi_helper::wasmedge_wasi_helper::_initialize; #[no_mangle] pub fn chain_begin() { - _initialize(); - let file_path = "files/random_words.txt"; - println!("split file start"); - // read file - let mut file = HostFile::open(file_path); - let mut slice_id = 0; - let mut offset: usize = 0; - // read 1 mb slice, align to \n, write to kv store - - // 读取 1 MB 的数据 - let mut buffer = Vec::with_capacity(1024 * 1024); - loop { - println!("buffer begin len {}", buffer.len()); - let len = file.read_at(offset, &mut buffer); - offset += len; - - println!("split file one slice with len {} {}", buffer.len(), len); - if buffer.len() == 0 { - break; - } - // find last \n - if let Some(last) = buffer - .iter() - .enumerate() - .rev() - .find(|(_, &b)| b == b'\n') - .map(|(i, _)| i) - { - println!("split file one slice with last \\n at{}", last); - kv_set_wrapper( - format!("wordcount_slice_{}", slice_id).as_bytes(), - &buffer[..last], + let res = KvBatch::new() + .then_lock("chain_lock".as_bytes()) + .then_get("chain_count".as_bytes()) + .finally_call(); + let lock_id = if let KvResult::Lock(lock_id) = res[0] { + lock_id + } else { + panic!("chain_begin lock failed"); + }; + let mut batch = KvBatch::new(); + if let KvResult::Get(Some(count)) = &res[1] { + let count_str = from_utf8(count).unwrap(); + println!("chain count: {}", count_str); + let count = count_str.parse::().unwrap(); + if count < 100 { + batch = batch.then_set( + "chain_count".as_bytes(), + format!("{}", count + 1).as_bytes(), ); - slice_id += 1; - if last + 1 < buffer.len() { - for i in last + 1..buffer.len() { - buffer[i - last - 1] = buffer[i]; - } - buffer.truncate(buffer.len() - last - 1); - } else { - buffer.clear(); - } - - println!("buffer end len {}", buffer.len()); } else { - break; + batch = batch.then_set("chain_count".as_bytes(), format!("{}", 0).as_bytes()); } + } else { + batch = batch.then_set("chain_count".as_bytes(), "1".as_bytes()); } + batch + .then_unlock("chain_lock".as_bytes(), lock_id) + .finally_call(); } -#[no_mangle] -pub fn chain_loop(key: *mut u8, key_len: u32) { - let key = unsafe { Vec::from_raw_parts(key, key_len as usize, key_len as usize) }; - // let val = kv_get_wrapper(&key); - println!( - "handle_one_slice k {}", - std::str::from_utf8(&key).unwrap(), - // std::str::from_utf8(&val).unwrap(), - ); -} +// #[no_mangle] +// pub fn chain_loop(key: *mut u8, key_len: u32) { +// let res = KvBatch::new().then_get("chain_count").finally_call(); +// match res[0] { +// KvResult::Get(value) => { +// if let Some(value) = value { +// let mut count = value.parse::().unwrap(); +// count += 1; +// if count < 10 { +// let key = unsafe { std::slice::from_raw_parts(key, key_len as usize) }; +// let key = std::str::from_utf8(key).unwrap(); +// KvBatch::new() +// .then_set(key, &value) +// .then_set("chain_count", &format!("{}", count)) +// .finally_call(); +// } else { +// KvBatch::new().then_unlock("chain_lock").finally_call(); +// } +// } +// } +// _ => { +// panic!("chain_loop get chain_count failed") +// } +// } +// } diff --git a/apps/word_count/src/lib.rs b/apps/word_count/src/lib.rs index b7f22f0..42f37f4 100644 --- a/apps/word_count/src/lib.rs +++ b/apps/word_count/src/lib.rs @@ -82,10 +82,16 @@ pub fn split_file() { .map(|(i, _)| i) { println!("split file one slice with last \\n at{}", last); - kv_set_wrapper( - format!("wordcount_slice_{}", slice_id).as_bytes(), - &buffer[..last], - ); + KvBatch::new() + .then_set( + format!("wordcount_slice_{}", slice_id).as_bytes(), + &buffer[..last], + ) + .finally_call(); + // kv_set_wrapper( + // format!("wordcount_slice_{}", slice_id).as_bytes(), + // &buffer[..last], + // ); slice_id += 1; if last + 1 < buffer.len() { for i in last + 1..buffer.len() { diff --git a/scripts/build/ans_build_demo_apps.yml b/scripts/build/ans_build_demo_apps.yml index 9686673..e12c242 100644 --- a/scripts/build/ans_build_demo_apps.yml +++ b/scripts/build/ans_build_demo_apps.yml @@ -6,5 +6,6 @@ loop: - fn2 - word_count + - longchain - name: App needed data shell: python3 _gen_app_need_data.py \ No newline at end of file diff --git a/scripts/deploy_single_node/ans_build.yml b/scripts/deploy_single_node/ans_build.yml index 9fc99d5..a71005e 100644 --- a/scripts/deploy_single_node/ans_build.yml +++ b/scripts/deploy_single_node/ans_build.yml @@ -4,7 +4,7 @@ - name: Build the application on the master node shell: cargo build --release - name: Run build ansible script - shell: ansible-playbook -i ../local_ansible_conf.ini build/ans_build_demo_apps.yml + shell: ansible-playbook -i local_ansible_conf.ini build/ans_build_demo_apps.yml args: chdir: ../ # mkdir -p scripts/deploy_single_node/test_dir/files diff --git a/scripts/deploy_single_node/run_node.sh b/scripts/deploy_single_node/run_node.sh index 043dfde..80f39ee 100644 --- a/scripts/deploy_single_node/run_node.sh +++ b/scripts/deploy_single_node/run_node.sh @@ -3,5 +3,6 @@ # 1: node_id export LANG=C.UTF-8 export RUST_BACKTRACE=1 +export RUST_LOG=info,wasm_serverless=debug NODE_ID=$1 target/release/wasm_serverless $NODE_ID scripts/deploy_single_node/test_dir \ No newline at end of file diff --git a/scripts/http_test.py b/scripts/http_test.py index 1092bd8..2432aad 100644 --- a/scripts/http_test.py +++ b/scripts/http_test.py @@ -1,13 +1,19 @@ import requests from pprint import pprint import time +import threading -ms = time.time()*1000.0 -res = requests.get('http://127.0.0.1:2501/word_count') -# json={'after_which': 0, -# 'order_by': 0, -# 'tags': [], -# 'search_str': "", -# 'price_range': []}, verify=False) -ms_ret = time.time()*1000.0 -print(res, ms_ret-ms) +def run_one(): + ms = time.time()*1000.0 + res = requests.get('http://127.0.0.1:2501/longchain') + # json={'after_which': 0, + # 'order_by': 0, + # 'tags': [], + # 'search_str': "", + # 'price_range': []}, verify=False) + ms_ret = time.time()*1000.0 + print(res, ms_ret-ms) + +# 10 concurrent requests by multi-threading +for i in range(10): + threading.Thread(target=run_one).start() \ No newline at end of file diff --git a/scripts/install/run_ans_install_build.sh b/scripts/install/run_ans_install_build.sh index f2e50e4..bee9dd3 100644 --- a/scripts/install/run_ans_install_build.sh +++ b/scripts/install/run_ans_install_build.sh @@ -1,2 +1,2 @@ export LANG=C.UTF-8 -ansible-playbook -vv scripts/install/ans_install_build.yml -i scripts/local_ansible_conf.ini \ No newline at end of file +ansible-playbook -vv scripts/install/ans_install_build.yml -i scripts/local_ansible_conf.ini diff --git a/src/_back/storage/kv/local_kv/local_kv.rs b/src/_back/storage/kv/local_kv/local_kv.rs index 16d7318..e1efaab 100644 --- a/src/_back/storage/kv/local_kv/local_kv.rs +++ b/src/_back/storage/kv/local_kv/local_kv.rs @@ -4,7 +4,7 @@ use std::collections::BTreeMap; use crate::{ network::proto::kv::{kv_response::KvPairOpt, KeyRange, KvPair}, result::WSResult, - storage::kv::kv_interface::{KVInterface, KVNode, SetOptions}, + storage::kv::kv_interface::{KvInterface, KvNode, SetOptions}, sys::{LogicalModule, LogicalModuleNewArgs}, util::JoinHandleWrapper, }; @@ -12,17 +12,17 @@ use async_trait::async_trait; use tokio::sync::RwLock; use ws_derive::LogicalModule; -// pub struct LocalKV { +// pub struct LocalKv { // k: K, // } #[derive(LogicalModule)] -pub struct LocalKVNode { +pub struct LocalKvNode { map: RwLock, Vec>>, } #[async_trait] -impl KVInterface for LocalKVNode { +impl KvInterface for LocalKvNode { async fn get(&self, key_range: KeyRange) -> WSResult> { let mut res = vec![]; if key_range.end.len() > 0 { @@ -47,19 +47,19 @@ impl KVInterface for LocalKVNode { } #[async_trait] -impl KVNode for LocalKVNode { +impl KvNode for LocalKvNode { async fn ready(&self) -> bool { true } } #[async_trait] -impl LogicalModule for LocalKVNode { +impl LogicalModule for LocalKvNode { fn inner_new(_args: LogicalModuleNewArgs) -> Self where Self: Sized, { - LocalKVNode { + LocalKvNode { map: RwLock::new(BTreeMap::new()), } } @@ -71,7 +71,7 @@ impl LogicalModule for LocalKVNode { } // #[async_trait] -// impl LocalKVRaw for LocalKV { +// impl LocalKvRaw for LocalKv { // #[inline] // async fn get(&self, key: &[u8], end: Option<&[u8]>) -> WSResult, Vec)>> { // self.k.get(key, end).await @@ -87,7 +87,7 @@ impl LogicalModule for LocalKVNode { // } // } -// impl LocalKV { +// impl LocalKv { // pub async fn get_spec(&self, k: &K, end: Option<&K>) -> WSResult> // where // K: LocalKey, @@ -147,7 +147,7 @@ impl LogicalModule for LocalKVNode { // type Value: DeserializeOwned + Serialize; // } -// pub trait KVPair { +// pub trait KvPair { // type Key: DeserializeOwned + Serialize; // type Value: DeserializeOwned + Serialize; // } diff --git a/src/_back/storage/kv/local_kv/local_kv_kernel.rs b/src/_back/storage/kv/local_kv/local_kv_kernel.rs index 064d5c0..b000313 100644 --- a/src/_back/storage/kv/local_kv/local_kv_kernel.rs +++ b/src/_back/storage/kv/local_kv/local_kv_kernel.rs @@ -5,7 +5,7 @@ use crate::result::WSResult; // use super::dist_kv::SetOptions; #[async_trait] -pub trait LocalKVRaw: Send + Sync + 'static { +pub trait LocalKvRaw: Send + Sync + 'static { async fn get(&self, key: &[u8], end: Option<&[u8]>) -> WSResult, Vec)>>; async fn set( &self, diff --git a/src/_back/storage/kv/local_kv/local_kv_sled.rs b/src/_back/storage/kv/local_kv/local_kv_sled.rs index ba8b2ea..1f65572 100644 --- a/src/_back/storage/kv/local_kv/local_kv_sled.rs +++ b/src/_back/storage/kv/local_kv/local_kv_sled.rs @@ -1,8 +1,8 @@ -// pub struct LocalKVSled {} +// pub struct LocalKvSled {} -// impl LocalKV for LocalKVSled { -// fn new() -> LocalKVSled { -// LocalKVSled {} +// impl LocalKv for LocalKvSled { +// fn new() -> LocalKvSled { +// LocalKvSled {} // } // fn get(&self, key: &str) -> Option { diff --git a/src/_back/storage/kv/raft_kv/async_raft_kernel/mod.rs b/src/_back/storage/kv/raft_kv/async_raft_kernel/mod.rs index d7bb5f6..98ea238 100644 --- a/src/_back/storage/kv/raft_kv/async_raft_kernel/mod.rs +++ b/src/_back/storage/kv/raft_kv/async_raft_kernel/mod.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use crate::{ result::{ErrCvt, WSResult}, - sys::{LogicalModuleNewArgs, MetaKVView, NodeID}, + sys::{LogicalModuleNewArgs, MetaKvView, NodeID}, util::JoinHandleWrapper, }; @@ -23,7 +23,7 @@ pub type MemRaft = Raft; #[derive(LogicalModule)] pub struct AsyncRaftModule { - view: MetaKVView, + view: MetaKvView, raft_module: RwLock>, } @@ -40,7 +40,7 @@ impl LogicalModule for AsyncRaftModule { Self: Sized, { Self { - view: MetaKVView::new(args.logical_modules_ref.clone()), + view: MetaKvView::new(args.logical_modules_ref.clone()), raft_module: RwLock::new(None), } } diff --git a/src/_back/storage/kv/raft_kv/async_raft_kernel/network.rs b/src/_back/storage/kv/raft_kv/async_raft_kernel/network.rs index e5f4731..fcfeeca 100644 --- a/src/_back/storage/kv/raft_kv/async_raft_kernel/network.rs +++ b/src/_back/storage/kv/raft_kv/async_raft_kernel/network.rs @@ -4,10 +4,10 @@ use super::{ AsyncRaftModule, }; use crate::{ - kv::raft_kv::RaftKVNode, + kv::raft_kv::RaftKvNode, network::proto, result::WSError, - sys::{MetaKVView, NodeID}, + sys::{MetaKvView, NodeID}, }; use anyhow::Result; use async_raft::{ @@ -23,11 +23,11 @@ use async_trait::async_trait; /// A type which emulates a network transport and implements the `RaftNetwork` trait. pub struct RaftRouter { // ... some internal state ... - view: MetaKVView, + view: MetaKvView, } impl RaftRouter { - pub fn new(view: MetaKVView) -> Self { + pub fn new(view: MetaKvView) -> Self { Self { view } } } @@ -260,7 +260,7 @@ impl AsyncRaftModule { // c // ); if let Some(raft_meta) = self_view.meta_kv() { - let reft_meta = raft_meta.downcast_ref::().unwrap(); + let reft_meta = raft_meta.downcast_ref::().unwrap(); match reft_meta.raft_inner.raft().vote(c.into()).await { Ok(res) => { // tracing::info!("handled vote request success: {:?}", res); @@ -300,7 +300,7 @@ impl AsyncRaftModule { // req // ); if let Some(raft_meta) = self_view.meta_kv() { - let reft_meta = raft_meta.downcast_ref::().unwrap(); + let reft_meta = raft_meta.downcast_ref::().unwrap(); match reft_meta.raft_inner.raft().append_entries(req.into()).await { Ok(res) => { // tracing::info!( diff --git a/src/_back/storage/kv/raft_kv/async_raft_kernel/storage.rs b/src/_back/storage/kv/raft_kv/async_raft_kernel/storage.rs index 26dfa0d..788c07f 100644 --- a/src/_back/storage/kv/raft_kv/async_raft_kernel/storage.rs +++ b/src/_back/storage/kv/raft_kv/async_raft_kernel/storage.rs @@ -17,7 +17,7 @@ use crate::kv::kv_interface::SetOptions; use crate::network::proto; use crate::network::proto::kv::kv_request::{KvDeleteRequest, KvPutRequest}; use crate::network::proto::kv::{KeyRange, KvPairs}; -use crate::sys::MetaKVView; +use crate::sys::MetaKvView; const ERR_INCONSISTENT_LOG: &str = "a query was received which was expecting data to be in place which does not exist in the log"; @@ -111,12 +111,12 @@ pub struct MemStore { hs: RwLock>, /// The current snapshot. current_snapshot: RwLock>, - view: MetaKVView, + view: MetaKvView, } impl MemStore { /// Create a new `MemStore` instance. - pub fn new(id: NodeId, view: MetaKVView) -> Self { + pub fn new(id: NodeId, view: MetaKvView) -> Self { let log = RwLock::new(BTreeMap::new()); let sm = RwLock::new(MemStoreStateMachine::default()); let hs = RwLock::new(None); diff --git a/src/_back/storage/kv/raft_kv/mod.rs b/src/_back/storage/kv/raft_kv/mod.rs index 31c4637..a047001 100644 --- a/src/_back/storage/kv/raft_kv/mod.rs +++ b/src/_back/storage/kv/raft_kv/mod.rs @@ -1,5 +1,5 @@ //! -//! # Meta Dist KV +//! # Meta Dist Kv //! //! stores basical meta data like router table //! @@ -7,15 +7,15 @@ // mod openraft_adapter; // pub mod tikvraft_proxy; mod async_raft_kernel; -// use self::tikvraft_proxy::TiKVRaftModule; +// use self::tikvraft_proxy::TiKvRaftModule; // use self::async_raft_kernel::storage::ClientRequest; use self::async_raft_kernel::storage::{ClientRequest, OpeType}; use super::{ - dist_kv::KVNode, - kv_interface::{KVInterface, SetOptions}, + dist_kv::KvNode, + kv_interface::{KvInterface, SetOptions}, }; use crate::{ network::proto::{ @@ -26,7 +26,7 @@ use crate::{ }, }, result::WSResult, - sys::{LogicalModule, LogicalModuleNewArgs, MetaKVView, NodeID}, + sys::{LogicalModule, LogicalModuleNewArgs, MetaKvView, NodeID}, util::JoinHandleWrapper, }; @@ -39,21 +39,21 @@ use ws_derive::LogicalModule; pub type RaftModule = async_raft_kernel::AsyncRaftModule; #[derive(LogicalModule)] -pub struct RaftKVNode { +pub struct RaftKvNode { pub raft_inner: RaftModule, - view: MetaKVView, + view: MetaKvView, } #[async_trait] -impl LogicalModule for RaftKVNode { +impl LogicalModule for RaftKvNode { fn inner_new(args: LogicalModuleNewArgs) -> Self where Self: Sized, { Self { raft_inner: RaftModule::new(args.clone()), - view: MetaKVView::new(args.logical_modules_ref.clone()), - // raft_module: TiKVRaftModule::new(args.clone()), + view: MetaKvView::new(args.logical_modules_ref.clone()), + // raft_module: TiKvRaftModule::new(args.clone()), } } async fn start(&self) -> WSResult> { @@ -66,7 +66,7 @@ impl LogicalModule for RaftKVNode { } #[async_trait] -impl KVNode for RaftKVNode { +impl KvNode for RaftKvNode { async fn ready(&self) -> bool { let res = self.raft_inner.raft().client_read().await; match res { @@ -80,7 +80,7 @@ impl KVNode for RaftKVNode { } #[async_trait] -impl KVInterface for RaftKVNode { +impl KvInterface for RaftKvNode { async fn get(&self, key_range: KeyRange) -> WSResult> { // get from local persist // self.view diff --git a/src/_back/storage/kv/raft_kv/tikvraft_kernel/mod.rs b/src/_back/storage/kv/raft_kv/tikvraft_kernel/mod.rs index 14ab653..2c8b640 100644 --- a/src/_back/storage/kv/raft_kv/tikvraft_kernel/mod.rs +++ b/src/_back/storage/kv/raft_kv/tikvraft_kernel/mod.rs @@ -20,7 +20,7 @@ use tokio::task::JoinHandle; use crate::{ module_iter::*, module_state_trans::LogicalModuleWaiter, - module_view::TiKVRaftModuleLMView, + module_view::TiKvRaftModuleLMView, network::p2p::P2PModule, network::serial::MsgPack, result::WSResult, @@ -38,19 +38,19 @@ pub enum RaftMsg { } #[derive(LogicalModuleParent, LogicalModule)] -pub struct TiKVRaftModule { - pub logical_modules_view: TiKVRaftModuleLMView, +pub struct TiKvRaftModule { + pub logical_modules_view: TiKvRaftModuleLMView, name: String, } -impl LogicalModule for TiKVRaftModule { +impl LogicalModule for TiKvRaftModule { fn inner_new(mut args: LogicalModuleNewArgs) -> Self where Self: Sized, { args.expand_parent_name(Self::self_name()); Self { - logical_modules_view: TiKVRaftModuleLMView::new(), + logical_modules_view: TiKvRaftModuleLMView::new(), name: args.parent_name, } } @@ -108,12 +108,12 @@ struct RaftThreadState { remaining_timeout: Duration, timeout: Duration, rx: std::sync::mpsc::Receiver, - view: TiKVRaftModuleLMView, + view: TiKvRaftModuleLMView, proprosed_join: bool, } impl RaftThreadState { - fn new(rx: std::sync::mpsc::Receiver, view: TiKVRaftModuleLMView) -> Self { + fn new(rx: std::sync::mpsc::Receiver, view: TiKvRaftModuleLMView) -> Self { let mut node = new_node(view.p2p().this_node.1); let timeout = Duration::from_millis(100); @@ -177,23 +177,23 @@ impl RaftThreadState { } pub fn new_tick_thread( - view: TiKVRaftModuleLMView, + view: TiKvRaftModuleLMView, rx: std::sync::mpsc::Receiver, mut waiter: LogicalModuleWaiter, ) { let mut state = RaftThreadState::new(rx, view); - sync_loop!("TiKVRaftModule::raft_tick", waiter, { state.tick() }); + sync_loop!("TiKvRaftModule::raft_tick", waiter, { state.tick() }); } fn handle_ready( - view: &TiKVRaftModuleLMView, + view: &TiKvRaftModuleLMView, rx: &mut std::sync::mpsc::Receiver, remaining_timeout: Duration, node: &mut RawNode, mut ready: Ready, ) { - fn handle_messages(view: &TiKVRaftModuleLMView, msgs: Vec) { + fn handle_messages(view: &TiKvRaftModuleLMView, msgs: Vec) { // Send messages to other peers. for msg in &msgs { let to = msg.to; diff --git a/src/config.rs b/src/config.rs index 2f94a90..1f770b0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,6 +15,9 @@ pub struct NodesConfig { } impl NodesConfig { + pub fn node_cnt(&self) -> usize { + self.peers.len() + 1 + } pub fn this_node(&self) -> NodeID { self.this.0 } diff --git a/src/general/kv_interface.rs b/src/general/kv_interface.rs index 7bb848b..2667dcd 100644 --- a/src/general/kv_interface.rs +++ b/src/general/kv_interface.rs @@ -1,10 +1,29 @@ -use crate::{result::WSResult, sys::LogicalModule}; +use crate::{ + result::WSResult, + sys::{LogicalModule, NodeID}, +}; use async_trait::async_trait; -use downcast_rs::{impl_downcast, Downcast}; use super::network::proto; -pub struct SetOptions {} +pub struct KvOptions { + spec_node: Option, +} + +impl KvOptions { + pub fn new() -> KvOptions { + KvOptions { spec_node: None } + } + + pub fn spec_node(&self) -> Option { + self.spec_node + } + + pub fn with_spec_node(mut self, node: NodeID) -> KvOptions { + self.spec_node = Some(node); + self + } +} #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum KvOps { @@ -14,23 +33,10 @@ pub enum KvOps { } #[async_trait] -pub trait KVInterface: LogicalModule { - async fn get(&self, key_range: proto::kv::KeyRange) -> WSResult>; - async fn set( +pub trait KvInterface: LogicalModule { + async fn call( &self, - kvs: Vec, - opts: SetOptions, - ) -> WSResult>; -} - -impl SetOptions { - pub fn new() -> SetOptions { - SetOptions {} - } -} - -#[async_trait] -pub trait KVNode: KVInterface + Downcast { - async fn ready(&self) -> bool; + kv: proto::kv::KvRequests, + opt: KvOptions, + ) -> WSResult; } -impl_downcast!(KVNode); diff --git a/src/general/network/msg_pack.rs b/src/general/network/msg_pack.rs index 39ec08d..50ddd26 100644 --- a/src/general/network/msg_pack.rs +++ b/src/general/network/msg_pack.rs @@ -1,6 +1,9 @@ use downcast_rs::{impl_downcast, Downcast}; -use super::{p2p::MsgId, proto}; +use super::{ + p2p::MsgId, + proto::{self, kv::KvResponse}, +}; macro_rules! count_modules { ($module:ty) => {1u32}; @@ -49,13 +52,13 @@ define_msg_ids!( proto::raft::VoteResponse, proto::raft::AppendEntriesRequest, proto::raft::AppendEntriesResponse, - proto::kv::MetaKvRequest, - proto::kv::MetaKvResponse, proto::sche::MakeSchePlanReq, proto::sche::MakeSchePlanResp, proto::sche::DistributeTaskReq, proto::sche::DistributeTaskResp, - proto::metric::RscMetric + proto::metric::RscMetric, + proto::kv::KvRequests, + proto::kv::KvResponses ); pub trait RPCReq: MsgPack + Default { @@ -70,10 +73,6 @@ impl RPCReq for proto::raft::AppendEntriesRequest { type Resp = proto::raft::AppendEntriesResponse; } -impl RPCReq for proto::kv::MetaKvRequest { - type Resp = proto::kv::MetaKvResponse; -} - impl RPCReq for proto::sche::MakeSchePlanReq { type Resp = proto::sche::MakeSchePlanResp; } @@ -82,6 +81,44 @@ impl RPCReq for proto::sche::DistributeTaskReq { type Resp = proto::sche::DistributeTaskResp; } +impl RPCReq for proto::kv::KvRequests { + type Resp = proto::kv::KvResponses; +} + +pub trait KvResponseExt { + fn new_lock(lock_id: u32) -> KvResponse; + fn new_common(kvs: Vec) -> KvResponse; + fn lock_id(&self) -> Option; + fn common_kvs(&self) -> Option<&Vec>; +} + +impl KvResponseExt for KvResponse { + fn new_common(kvs: Vec) -> KvResponse { + KvResponse { + resp: Some(proto::kv::kv_response::Resp::CommonResp( + proto::kv::kv_response::KvResponse { kvs }, + )), + } + } + fn new_lock(lock_id: u32) -> KvResponse { + KvResponse { + resp: Some(proto::kv::kv_response::Resp::LockId(lock_id)), + } + } + fn lock_id(&self) -> Option { + match self.resp.as_ref().unwrap() { + proto::kv::kv_response::Resp::CommonResp(_) => None, + proto::kv::kv_response::Resp::LockId(id) => Some(*id), + } + } + fn common_kvs(&self) -> Option<&Vec> { + match self.resp.as_ref().unwrap() { + proto::kv::kv_response::Resp::CommonResp(resp) => Some(&resp.kvs), + proto::kv::kv_response::Resp::LockId(_) => None, + } + } +} + // impl MsgId for raft::prelude::Message { // fn msg_id(&self) -> u32 { // 0 diff --git a/src/general/network/p2p.rs b/src/general/network/p2p.rs index d234b70..156a713 100644 --- a/src/general/network/p2p.rs +++ b/src/general/network/p2p.rs @@ -198,6 +198,12 @@ impl RPCResponsor { pub async fn send_resp(&self, resp: R::Resp) -> WSResult<()> { self.responsor.send_resp(resp).await } + pub fn node_id(&self) -> NodeID { + self.responsor.node_id + } + pub fn task_id(&self) -> TaskId { + self.responsor.task_id + } } pub struct Responser { @@ -276,7 +282,7 @@ impl P2PModule { *b.downcast::().unwrap() } }; - + // tracing::debug!("dispatch from {} msg:{:?}", nid, msg); f( Responser { task_id, @@ -378,6 +384,7 @@ impl P2PModule { REQ: MsgPack, RESP: MsgPack, { + // tracing::debug!("call_rpc_inner req{:?}", r); // alloc from global let taskid: TaskId = self.next_task_id.fetch_add(1, Ordering::Relaxed); let (tx, rx) = tokio::sync::oneshot::channel::>(); @@ -398,25 +405,27 @@ impl P2PModule { return Ok(*resp); } - if self.rpc_holder.read().get(&(r.msg_id(), node_id)).is_none() { - let res = self - .rpc_holder - .write() - .insert((r.msg_id(), node_id), Default::default()); - assert!(res.is_none()); - } - let inner_lock = self - .rpc_holder - .read() - .get(&(r.msg_id(), node_id)) - .unwrap() - .clone(); - let _hold_lock = inner_lock.lock().await; + // if self.rpc_holder.read().get(&(r.msg_id(), node_id)).is_none() { + // let res = self + // .rpc_holder + // .write() + // .insert((r.msg_id(), node_id), Default::default()); + // assert!(res.is_none()); + // } + // let inner_lock = self + // .rpc_holder + // .read() + // .get(&(r.msg_id(), node_id)) + // .unwrap() + // .clone(); + // let _hold_lock = inner_lock.lock().await; // tracing::info!("holding lock msg:{} node:{}", r.msg_id(), node_id); let _ = self .waiting_tasks .insert((taskid, node_id), Some(tx).into()); + + // tracing::debug!("rpc send to node {} with taskid {}", node_id, taskid); match self .p2p_kernel .send(node_id, taskid, r.msg_id(), r.encode_to_vec()) @@ -431,15 +440,16 @@ impl P2PModule { Err(err) => { let _ = self.waiting_tasks.remove(&(taskid, node_id)).unwrap(); // tracing::info!("1stop holding lock msg:{} node:{}", r.msg_id(), node_id); + tracing::error!("rpc send failed: {:?}", err); return Err(err); } } - // TODO: handle timeout - if node_id == 3 { - // tracing::info!("waiting for response from {}", node_id); - } - // tracing::info!("2holding lock msg:{} node:{}", r.msg_id(), node_id); + // tracing::debug!( + // "rpc waiting for response from node {} for task {}", + // node_id, + // taskid + // ); let resp = match tokio::time::timeout(dur, rx).await { Ok(resp) => resp.unwrap_or_else(|err| { panic!("waiting for response failed: {:?}", err); @@ -448,20 +458,16 @@ impl P2PModule { // maybe removed or not let _ = self.waiting_tasks.remove(&(taskid, node_id)); // let _ = self.p2p_kernel.close(node_id).await; - if node_id == 3 { - tracing::warn!("rpc timeout: {:?} to node {}", err, node_id); - } + + tracing::error!("rpc timeout: {:?} to node {}", err, node_id); + // tracing::warn!("rpc timeout: {:?} to node {}", err, node_id); // tracing::info!("2stop holding lock msg:{} node:{}", r.msg_id(), node_id); return Err(WsNetworkConnErr::RPCTimout(node_id).into()); } }; - // tracing::info!("3holding lock msg:{} node:{}", r.msg_id(), node_id); - if node_id == 3 { - // tracing::info!("got response from {}", node_id); - } + let resp = resp.downcast::().unwrap(); - // tracing::info!("3stop holding lock msg:{} node:{}", r.msg_id(), node_id); Ok(*resp) } @@ -475,6 +481,7 @@ impl P2PModule { ) -> WSResult<()> { let read = self.dispatch_map.read(); if let Some(cb) = read.get(&id) { + // tracing::debug!("dispatch {} from: {}", id, nid); cb(nid, self, taskid, data)?; Ok(()) } else { diff --git a/src/general/network/p2p_quic.rs b/src/general/network/p2p_quic.rs index 417789f..1529fa4 100644 --- a/src/general/network/p2p_quic.rs +++ b/src/general/network/p2p_quic.rs @@ -351,6 +351,7 @@ async fn handle_connection( if let Some(WireMsg((head, _, bytes))) = msg { match deserialize_msg_id_task_id(&head) { Ok((msg_id, task_id)) => { + tracing::debug!("new to dispatch task_id: {}", task_id); // println!("Received from {remote_addr:?} --> {bytes:?}"); view.p2p().dispatch(remote_id, msg_id, task_id, bytes.into()); // if bytes == *MSG_MARCO { diff --git a/src/general/network/proto_src/kv.proto b/src/general/network/proto_src/kv.proto index d9b8aa1..72156f5 100644 --- a/src/general/network/proto_src/kv.proto +++ b/src/general/network/proto_src/kv.proto @@ -6,44 +6,63 @@ message KeyRange { bytes end=2; } -message KVPair { +message KvPair { bytes key=1; bytes value=2; } -message KVRequest { - message KVPutRequest{ - KVPairs kvs=1; +message KvRequest { + message KvPutRequest{ + KvPair kv=1; } - message KVGetRequest{ + message KvGetRequest{ KeyRange range=1; } - message KVDeleteRequest{ + message KvDeleteRequest{ KeyRange range=1; } + message KvLockRequest{ + bool read_or_write=1; + repeated uint32 release_id=2; + KeyRange range=3; + } oneof op { - KVPutRequest set=1; - KVGetRequest get=2; - KVDeleteRequest delete=3; + KvPutRequest set=1; + KvGetRequest get=2; + KvDeleteRequest delete=3; + KvLockRequest lock=4; } } -message KVPairs{ - repeated KVPair kvs=1; +message KvPairs{ + repeated KvPair kvs=1; } -message KVResponse{ - message KVPairOpt{ - KVPair kv=1; +message KvResponse{ + message KvResponse{ + repeated KvPair kvs=1; + } + oneof resp { + KvResponse common_resp=1; + uint32 lock_id=2; } - repeated KVPairOpt kvs=1; } -message MetaKVRequest{ - KVRequest request=1; +message KvRequests{ + string app=1; + string func=2; + repeated KvRequest requests=3; } -message MetaKVResponse{ - KVResponse response=1; -} \ No newline at end of file +message KvResponses{ + repeated KvResponse responses=1; +} + +// message MetaKvRequest{ +// KvRequest request=1; +// } + +// message MetaKvResponse{ +// KvResponse response=1; +// } \ No newline at end of file diff --git a/src/general/network/proto_src/sche.proto b/src/general/network/proto_src/sche.proto index edfbb1f..f0b7857 100644 --- a/src/general/network/proto_src/sche.proto +++ b/src/general/network/proto_src/sche.proto @@ -9,7 +9,7 @@ message MakeSchePlanReq{ string func=2; } enum TriggerType{ - SetKV = 0; + SetKv = 0; } repeated AppFn app_fns=1; TriggerType trigger_type=2; diff --git a/src/master/http_handler.rs b/src/master/http_handler.rs index 29b40fe..d7a6e42 100644 --- a/src/master/http_handler.rs +++ b/src/master/http_handler.rs @@ -49,7 +49,7 @@ impl LogicalModule for MasterHttpHandler { impl MasterHttpHandler { fn handle_prometheus(&self) -> Response { let mut body = String::new(); - tracing::info!("handle_prometheus"); + tracing::debug!("handle_prometheus"); encode(&mut body, &self.view.metric_observor().registry).unwrap(); let mut resp = (StatusCode::OK, body).into_response(); // hyper::header::CONTENT_TYPE, diff --git a/src/master/master_kv.rs b/src/master/master_kv.rs new file mode 100644 index 0000000..9382417 --- /dev/null +++ b/src/master/master_kv.rs @@ -0,0 +1,189 @@ +use std::{collections::HashMap, sync::Arc}; + +use async_trait::async_trait; +use parking_lot::RwLock; +use tokio::sync::Notify; +use ws_derive::LogicalModule; + +use crate::{ + general::network::{ + msg_pack::KvResponseExt, + p2p::{RPCHandler, RPCResponsor, TaskId}, + proto::{ + self, + kv::{KvRequests, KvResponse, KvResponses}, + }, + }, + result::WSResult, + sys::{LogicalModule, LogicalModuleNewArgs, MasterView, NodeID}, + util::JoinHandleWrapper, +}; + +#[derive(LogicalModule)] +pub struct MasterKv { + kv_map: RwLock, Vec>>, + lock_notifiers: RwLock, (NodeID, u32, Arc)>>, + rpc_handler: RPCHandler, + view: MasterView, +} + +#[async_trait] +impl LogicalModule for MasterKv { + fn inner_new(args: LogicalModuleNewArgs) -> Self + where + Self: Sized, + { + Self { + kv_map: RwLock::new(HashMap::new()), + lock_notifiers: RwLock::new(HashMap::new()), + rpc_handler: RPCHandler::default(), + view: MasterView::new(args.logical_modules_ref.clone()), + } + } + async fn start(&self) -> WSResult> { + let view = self.view.clone(); + self.rpc_handler + .regist(self.view.p2p(), move |responsor, reqs| { + let view = view.clone(); + let _ = tokio::spawn(async move { + view.master_kv().handle_kv_requests(reqs, responsor).await; + }); + Ok(()) + }); + + Ok(vec![]) + } +} + +impl MasterKv { + async fn handle_kv_requests( + &self, + reqs: proto::kv::KvRequests, + responsor: RPCResponsor, + ) { + let mut kv_responses = KvResponses { responses: vec![] }; + for req in reqs.requests { + kv_responses.responses.push(match req.op.unwrap() { + proto::kv::kv_request::Op::Set(set) => { + self.handle_kv_set(set, responsor.node_id()).await + } + proto::kv::kv_request::Op::Get(get) => self.handle_kv_get(get).await, + proto::kv::kv_request::Op::Delete(delete) => self.handle_kv_delete(delete).await, + proto::kv::kv_request::Op::Lock(lock) => { + self.handle_kv_lock(lock, responsor.node_id(), responsor.task_id()) + .await + } + }) + } + if let Err(err) = responsor.send_resp(kv_responses).await { + tracing::error!("handle kv requests error:{}", err); + }; + } + async fn handle_kv_set( + &self, + set: proto::kv::kv_request::KvPutRequest, + _from: NodeID, + ) -> KvResponse { + tracing::debug!("handle_kv_set:{:?}", set); + let mut kvs = vec![]; + if let Some(kv) = set.kv { + let key = kv.key.clone(); + let res = self.kv_map.write().insert(kv.key, kv.value); + if let Some(v) = res { + kvs.push(proto::kv::KvPair { key, value: v }); + } + } + + KvResponse::new_common(kvs) + } + async fn handle_kv_get(&self, get: proto::kv::kv_request::KvGetRequest) -> KvResponse { + tracing::debug!("handle_kv_get:{:?}", get); + let mut kvs = vec![]; + if let Some(v) = self.kv_map.read().get(&get.range.as_ref().unwrap().start) { + kvs.push(proto::kv::KvPair { + key: get.range.unwrap().start, + value: v.clone(), + }); + } + KvResponse::new_common(kvs) + } + async fn handle_kv_delete(&self, delete: proto::kv::kv_request::KvDeleteRequest) -> KvResponse { + tracing::debug!("handle_kv_delete:{:?}", delete); + let res = self + .kv_map + .write() + .remove(&delete.range.as_ref().unwrap().start); + let mut kvs = vec![]; + if let Some(v) = res { + kvs.push(proto::kv::KvPair { + key: delete.range.unwrap().start, + value: v, + }); + } + KvResponse::new_common(kvs) + } + async fn handle_kv_lock( + &self, + lock: proto::kv::kv_request::KvLockRequest, + from: NodeID, + task: TaskId, + ) -> KvResponse { + tracing::debug!("handle_kv_lock:{:?}", lock); + let mut notify_last = None; + loop { + if let Some(&release_id) = lock.release_id.get(0) { + tracing::debug!("unlock:{:?}", release_id); + // valid unlock: + // - is the owner + // - match verify id + let mut is_owner = false; + let mut write = self.lock_notifiers.write(); + if let Some((nodeid, real_release_id, _)) = + write.get(&lock.range.as_ref().unwrap().start) + { + if *nodeid == from && *real_release_id == release_id { + is_owner = true; + } + } + if is_owner { + tracing::debug!("unlock success"); + let (_, _, notify) = write.remove(&lock.range.as_ref().unwrap().start).unwrap(); + notify.notify_one(); + return KvResponse::new_common(vec![]); + } + } else { + // get, just get the lock + // the key creator will be the owner of the lock + let mut notify = None; + { + let mut write = self.lock_notifiers.write(); + let notify_to_insert = if let Some(notify) = notify_last.take() { + notify + } else { + Arc::new(Notify::new()) + }; + let _ = write + .entry(lock.range.as_ref().unwrap().start.clone()) + .and_modify(|v| { + tracing::debug!("lock already exists"); + notify = Some(v.2.clone()); + }) + .or_insert_with(|| { + tracing::debug!("lock not exists, preempt"); + (from, task, notify_to_insert) + }); + } + // didn't get the lock + if let Some(notify) = notify { + notify_last = Some(notify); + tracing::debug!("wait for other to release"); + // wait for release + notify_last.as_ref().unwrap().notified().await; + continue; + } else { + return KvResponse::new_lock(task); + } + } + } + } +} diff --git a/src/master/mod.rs b/src/master/mod.rs index 3febe02..9c37907 100644 --- a/src/master/mod.rs +++ b/src/master/mod.rs @@ -1,3 +1,4 @@ pub mod http_handler; pub mod master; +pub mod master_kv; pub mod metric_observor; diff --git a/src/sys.rs b/src/sys.rs index 2fd7afa..b546127 100644 --- a/src/sys.rs +++ b/src/sys.rs @@ -1,10 +1,14 @@ use crate::{ config::NodesConfig, general::{ - fs::Fs, metric_publisher::MetricPublisher, network::http_handler::HttpHandler, - network::p2p::P2PModule, + fs::Fs, + metric_publisher::MetricPublisher, + network::{http_handler::HttpHandler, p2p::P2PModule}, + }, + master::{ + http_handler::MasterHttpHandler, master::Master, master_kv::MasterKv, + metric_observor::MetricObservor, }, - master::{http_handler::MasterHttpHandler, master::Master, metric_observor::MetricObservor}, util, worker::{ executor::Executor, http_handler::WorkerHttpHandler, instance_manager::InstanceManager, @@ -12,7 +16,7 @@ use crate::{ }, }; use crate::{ - // kv::{data_router::DataRouter, data_router_client::DataRouterClient, kv_client::KVClient}, + // kv::{data_router::DataRouter, data_router_client::DataRouterClient, kv_client::KvClient}, // module_iter::LogicalModuleParent, // network::p2p::P2PModule, result::WSResult, @@ -165,7 +169,7 @@ macro_rules! logical_modules { // pub struct LogicalModules { // // #[sub] -// // pub kv_client: KVClient, // each module need a kv service +// // pub kv_client: KvClient, // each module need a kv service // // #[sub] // // #[view()] // // pub data_router_client: DataRouterClient, // kv_client_need a data_router service @@ -174,20 +178,20 @@ macro_rules! logical_modules { // // #[parent] // // pub p2p: P2PModule, // network basic service // // pub scheduler_node: Option, // scheduler service -// // pub general_kv_client: GKV::KVClient, -// // pub general_kv: Option, +// // pub general_kv_client: GKv::KvClient, +// // pub general_kv: Option, // // #[view(p2p, local_kv_client)] // // pub raft: Option>, -// pub meta_kv_client: Box, // get set key range +// pub meta_kv_client: Box, // get set key range // // #[view(p2p, raft)] -// // pub meta_kv: Option>, // run the raft or other consensus algorithm, handle meta_kv request +// // pub meta_kv: Option>, // run the raft or other consensus algorithm, handle meta_kv request // /// get set by metakv or generalkv directly -// pub local_kv_client: Box, +// pub local_kv_client: Box, // // handle request, local storage operations -// pub local_kv: Option>, +// pub local_kv: Option>, // // #[parent] // // pub data_router: Option, // data_router service -// // pub kv_node: Option, // kv service +// // pub kv_node: Option, // kv service // } #[derive(Clone)] @@ -310,6 +314,8 @@ logical_modules!( Option, master, Option, + master_kv, + Option, //////////////////////////// // worker worker, @@ -324,15 +330,15 @@ logical_modules!( Option ); -// logical_module_view_impl!(MetaKVClientView); -// logical_module_view_impl!(MetaKVClientView, meta_kv_client, Box); -// logical_module_view_impl!(MetaKVClientView, meta_kv, Option>); -// logical_module_view_impl!(MetaKVClientView, p2p, P2PModule); +// logical_module_view_impl!(MetaKvClientView); +// logical_module_view_impl!(MetaKvClientView, meta_kv_client, Box); +// logical_module_view_impl!(MetaKvClientView, meta_kv, Option>); +// logical_module_view_impl!(MetaKvClientView, p2p, P2PModule); -logical_module_view_impl!(MetaKVView); -// logical_module_view_impl!(MetaKVView, p2p, P2PModule); -// logical_module_view_impl!(MetaKVView, meta_kv, Option>); -// logical_module_view_impl!(MetaKVView, local_kv, Option>); +logical_module_view_impl!(MetaKvView); +// logical_module_view_impl!(MetaKvView, p2p, P2PModule); +// logical_module_view_impl!(MetaKvView, meta_kv, Option>); +// logical_module_view_impl!(MetaKvView, local_kv, Option>); logical_module_view_impl!(P2PView); logical_module_view_impl!(P2PView, p2p, P2PModule); @@ -398,6 +404,7 @@ logical_module_view_impl!(FsView, fs, Fs); logical_module_view_impl!(MasterView); logical_module_view_impl!(MasterView, p2p, P2PModule); logical_module_view_impl!(MasterView, master, Option); +logical_module_view_impl!(MasterView, master_kv, Option); fn modules_mut_ref(modules: &Arc) -> &mut LogicalModules { // let _ = SETTED_MODULES_COUNT.fetch_add(1, Ordering::SeqCst); @@ -440,6 +447,7 @@ impl LogicalModules { }, metric_observor: None, master: None, + master_kv: None, worker: None, kv_user_client: None, instance_manager: None, @@ -451,6 +459,7 @@ impl LogicalModules { if is_master { logical_modules.metric_observor = Some(MetricObservor::new(args.clone())); logical_modules.master = Some(Master::new(args.clone())); + logical_modules.master_kv = Some(MasterKv::new(args.clone())); } else { logical_modules.kv_user_client = Some(KvUserClient::new(args.clone())); logical_modules.instance_manager = Some(InstanceManager::new(args.clone())); @@ -471,6 +480,7 @@ impl LogicalModules { // master start_module_opt!(self, sys, metric_observor); start_module_opt!(self, sys, master); + start_module_opt!(self, sys, master_kv); //worker start_module_opt!(self, sys, worker); start_module_opt!(self, sys, kv_user_client); diff --git a/src/worker/function_event/kv_event.rs b/src/worker/function_event/kv_event.rs index 4cfc843..d73fed6 100644 --- a/src/worker/function_event/kv_event.rs +++ b/src/worker/function_event/kv_event.rs @@ -1,141 +1,155 @@ -use std::time::Duration; +// use std::time::Duration; -use crate::{ - general::{ - kv_interface::{KVInterface, KvOps, SetOptions}, - network::proto::{self}, - }, - util, - worker::{ - app_meta::{AppMetaManager, KeyPattern, KvMeta}, - executor::FunctionCtx, - kv_user_client::KvUserClient, - }, -}; +// use crate::{ +// general::{ +// kv_interface::{KvInterface, KvOps, KvOptions}, +// network::proto::{self, kv::KvPairs}, +// }, +// util, +// worker::{ +// app_meta::{AppMetaManager, KeyPattern, KvMeta}, +// executor::FunctionCtx, +// kv_user_client::KvUserClient, +// }, +// }; -async fn handle_set_event( - kv: proto::kv::KvPair, - kv_client: &KvUserClient, - prev_fn_ctx: &FunctionCtx, - triggers: Vec<(&String, &String, &KvMeta)>, -) { - let worker = kv_client.view.worker(); - let p2p = kv_client.view.instance_manager().view.p2p(); - let schecmd = match worker - .rpc_caller_make_sche - .call( - p2p, - p2p.nodes_config.get_master_node(), - proto::sche::MakeSchePlanReq { - app_fns: triggers - .iter() - .map(|(app, func, _)| proto::sche::make_sche_plan_req::AppFn { - app: app.to_string(), - func: func.to_string(), - }) - .collect::>(), - trigger_type: proto::sche::make_sche_plan_req::TriggerType::SetKv as i32, - }, - None, - ) - .await - { - Ok(schecmd) => schecmd, - Err(e) => { - tracing::error!("rpc call error: {:?}", e); - return; - } - }; - tracing::info!("got sche plan from master: {:?}", schecmd); - // 1. sync set kv - // TODO: use schecmd.data_target_node - let key = kv.key.clone(); - if let Ok(_res) = kv_client.set(vec![kv], SetOptions::new()).await { - for (&target_node, (app, func, _)) in schecmd.sche_target_node.iter().zip(triggers) { - let view = kv_client.view.clone(); - let app = app.to_owned(); - let func = func.to_owned(); - let key = key.clone(); +// async fn handle_set_event( +// kv: proto::kv::KvPair, +// kv_client: &KvUserClient, +// prev_fn_ctx: &FunctionCtx, +// triggers: Vec<(&String, &String, &KvMeta)>, +// ) { +// let worker = kv_client.view.worker(); +// let p2p = kv_client.view.instance_manager().view.p2p(); +// let schecmd = match worker +// .rpc_caller_make_sche +// .call( +// p2p, +// p2p.nodes_config.get_master_node(), +// proto::sche::MakeSchePlanReq { +// app_fns: triggers +// .iter() +// .map(|(app, func, _)| proto::sche::make_sche_plan_req::AppFn { +// app: app.to_string(), +// func: func.to_string(), +// }) +// .collect::>(), +// trigger_type: proto::sche::make_sche_plan_req::TriggerType::SetKv as i32, +// }, +// None, +// ) +// .await +// { +// Ok(schecmd) => schecmd, +// Err(e) => { +// tracing::error!("rpc call error: {:?}", e); +// return; +// } +// }; +// tracing::info!("got sche plan from master: {:?}", schecmd); +// // 1. sync set kv +// // TODO: use schecmd.data_target_node +// let key = kv.key.clone(); +// if let Ok(_res) = kv_client +// .call( +// proto::kv::KvRequests { +// requests: vec![proto::kv::KvRequest { +// op: Some(proto::kv::kv_request::Op::Set( +// proto::kv::kv_request::KvPutRequest { +// kvs: Some(KvPairs { kvs: vec![kv] }), +// }, +// )), +// }], +// }, +// KvOptions::new().with_spec_node(schecmd.data_target_node), +// ) +// .await +// { +// for (&target_node, (app, func, _)) in schecmd.sche_target_node.iter().zip(triggers) { +// let view = kv_client.view.clone(); +// let app = app.to_owned(); +// let func = func.to_owned(); +// let key = key.clone(); - let remote_sche_task = tokio::spawn(async move { - let sub_task = view.executor().register_sub_task(); - if let Err(err) = view - .worker() - .rpc_caller_distribute_task - .call( - view.p2p(), - target_node, - proto::sche::DistributeTaskReq { - app, - func, - task_id: sub_task, - trigger: Some(proto::sche::distribute_task_req::Trigger::KvKeySet(key)), - }, - // max wait time - Some(Duration::from_secs(60 * 30)), - ) - .await - { - tracing::error!("sche sub fn failed with err: {}", err); - } - }); - unsafe { - util::unsafe_mut(prev_fn_ctx) - .sub_waiters - .push(remote_sche_task) - } - } - } -} +// let remote_sche_task = tokio::spawn(async move { +// let sub_task = view.executor().register_sub_task(); +// if let Err(err) = view +// .worker() +// .rpc_caller_distribute_task +// .call( +// view.p2p(), +// target_node, +// proto::sche::DistributeTaskReq { +// app, +// func, +// task_id: sub_task, +// trigger: Some(proto::sche::distribute_task_req::Trigger::KvKeySet(key)), +// }, +// // max wait time +// Some(Duration::from_secs(60 * 30)), +// ) +// .await +// { +// tracing::error!("sche sub fn failed with err: {}", err); +// } +// }); +// unsafe { +// util::unsafe_mut(prev_fn_ctx) +// .sub_waiters +// .push(remote_sche_task) +// } +// } +// } +// } -pub async fn check_and_handle_event( - // record triggerd events - fn_ctx: &FunctionCtx, - // tigger pattern - pattern: &KeyPattern, - // kv operation - kv_client: &KvUserClient, - // app meta to get trigger infos - app_meta_man: &AppMetaManager, - // may trigger op - op: KvOps, - // kv to set - kv: proto::kv::KvPair, -) { - match op { - KvOps::Get | KvOps::Delete => { - tracing::warn!("kv event not support get/delete"); - return; - } - KvOps::Set => {} - } - tracing::info!("event trigger kv ope matched"); - let triggers = if let Some(triggers) = app_meta_man.get_pattern_triggers(&*pattern.0) { - if triggers.is_empty() { - return; - } - triggers - } else { - return; - }; - tracing::info!("kv pattern has potential triggers"); - // collect must consume triggers - let triggers: Vec<(&String, &String, &KvMeta)> = triggers - .iter() - .filter_map(|(app, func)| { - let maytrigger_fnmeta = app_meta_man - .get_app_meta(app) - .unwrap() - .get_fn_meta(func) - .unwrap(); +// pub async fn check_and_handle_event( +// // record triggerd events +// fn_ctx: &FunctionCtx, +// // tigger pattern +// pattern: &KeyPattern, +// // kv operation +// kv_client: &KvUserClient, +// // app meta to get trigger infos +// app_meta_man: &AppMetaManager, +// // may trigger op +// op: KvOps, +// // kv to set +// kv: proto::kv::KvPair, +// ) { +// match op { +// KvOps::Get | KvOps::Delete => { +// tracing::warn!("kv event not support get/delete"); +// return; +// } +// KvOps::Set => {} +// } +// tracing::info!("event trigger kv ope matched"); +// let triggers = if let Some(triggers) = app_meta_man.get_pattern_triggers(&*pattern.0) { +// if triggers.is_empty() { +// return; +// } +// triggers +// } else { +// return; +// }; +// tracing::info!("kv pattern has potential triggers"); +// // collect must consume triggers +// let triggers: Vec<(&String, &String, &KvMeta)> = triggers +// .iter() +// .filter_map(|(app, func)| { +// let maytrigger_fnmeta = app_meta_man +// .get_app_meta(app) +// .unwrap() +// .get_fn_meta(func) +// .unwrap(); - if let Some(kvmeta) = maytrigger_fnmeta.find_will_trigger_kv_event(pattern, op) { - Some((app, func, kvmeta)) - } else { - None - } - }) - .collect::>(); - tracing::info!("kv pattern has {} triggers", triggers.len()); - handle_set_event(kv, kv_client, fn_ctx, triggers).await; -} +// if let Some(kvmeta) = maytrigger_fnmeta.find_will_trigger_kv_event(pattern, op) { +// Some((app, func, kvmeta)) +// } else { +// None +// } +// }) +// .collect::>(); +// tracing::info!("kv pattern has {} triggers", triggers.len()); +// handle_set_event(kv, kv_client, fn_ctx, triggers).await; +// } diff --git a/src/worker/kv_storage.rs b/src/worker/kv_storage.rs index e69de29..3bc4059 100644 --- a/src/worker/kv_storage.rs +++ b/src/worker/kv_storage.rs @@ -0,0 +1,4 @@ +// pub struct KvStorage { +// // testmap: SkipMap, Vec>, +// pub view: KvStorageView, +// } diff --git a/src/worker/kv_user_client.rs b/src/worker/kv_user_client.rs index e2e1251..c89344c 100644 --- a/src/worker/kv_user_client.rs +++ b/src/worker/kv_user_client.rs @@ -1,26 +1,29 @@ +use std::time::Duration; + use crate::{ general::{ - kv_interface::{KVInterface, KvOps, SetOptions}, - network::proto::kv::{kv_response::KvPairOpt, KeyRange, KvPair}, + kv_interface::{KvInterface, KvOptions}, + network::{ + p2p::RPCCaller, + proto::kv::{KvRequests, KvResponses}, + }, }, - result::{WSResult, WsPermissionErr}, + result::WSResult, sys::{KvUserClientView, LogicalModule, LogicalModuleNewArgs}, - util::{JoinHandleWrapper, TryUtf8VecU8}, - worker::executor::FunctionCtx, + util::JoinHandleWrapper, }; use async_trait::async_trait; // use crossbeam_skiplist::SkipMap; use ws_derive::LogicalModule; -use super::function_event::kv_event; - -// use super::super::kv_interface::KVInterface; +// use super::super::kv_interface::KvInterface; #[derive(LogicalModule)] pub struct KvUserClient { // testmap: SkipMap, Vec>, pub view: KvUserClientView, + rpc_caller_kv: RPCCaller, } #[async_trait] @@ -37,9 +40,12 @@ impl LogicalModule for KvUserClient { Self { // testmap: SkipMap::new(), view: KvUserClientView::new(args.logical_modules_ref.clone()), + rpc_caller_kv: RPCCaller::default(), } } async fn start(&self) -> WSResult> { + self.rpc_caller_kv.regist(self.view.p2p()); + let all = vec![]; Ok(all) @@ -47,7 +53,7 @@ impl LogicalModule for KvUserClient { } // #[async_trait] -// impl KVInterface for RaftKVNode { +// impl KvInterface for RaftKvNode { // async fn get(&self, key_range: KeyRange) -> WSResult> { // // get data position from master // // get data from position @@ -59,7 +65,7 @@ impl LogicalModule for KvUserClient { lazy_static::lazy_static! { static ref KV_USER_CLIENT: Option=None; - // static ref RECENT_KV_CACHE: Cache>=Cache::>::builder() + // static ref RECENT_Kv_CACHE: Cache>=Cache::>::builder() // .time_to_live(Duration::from_secs(10)) // .weigher(|_key, value| -> u32 { value.len().try_into().unwrap_or(u32::MAX) }) // // This cache will hold up to 32MiB of values. @@ -74,76 +80,82 @@ pub fn kv_user_client() -> &'static KvUserClient { } #[async_trait] -impl KVInterface for KvUserClient { - async fn get(&self, _key_range: KeyRange) -> WSResult> { - // if let Some(res) = self.testmap.get(&key_range.start) { - // Ok(vec![KvPair { - // key: res.key().clone(), - // value: res.value().clone(), - // }]) - // } else - { - Ok(vec![]) +impl KvInterface for KvUserClient { + async fn call(&self, req: KvRequests, opt: KvOptions) -> WSResult { + if let Some(node_id) = opt.spec_node() { + self.rpc_caller_kv + .call( + self.view.p2p(), + node_id, + req, + Some(Duration::from_secs(60 * 30)), + ) + .await + } else { + // 1. dicide placement position + // 2. send data to the position + self.rpc_caller_kv + .call( + self.view.p2p(), + self.view.p2p().nodes_config.get_master_node(), + req, + Some(Duration::from_secs(60 * 30)), + ) + .await } } - async fn set(&self, _kvs: Vec, _opts: SetOptions) -> WSResult> { - // for kv in kvs { - // let _ = self.testmap.insert(kv.key, kv.value); - // } - Ok(vec![]) - } } impl KvUserClient { // pub async fn get_by_app_fn(&self, key_range: KeyRange) -> WSResult> {} - pub async fn set_by_app_fn( - &self, - mut kvs: Vec, - _opts: SetOptions, - fn_ctx: &FunctionCtx, - ) -> WSResult> { - tracing::info!("app called kv set"); - let app_meta_man = self.view.instance_manager().app_meta_manager.read().await; - // let key = kvs[0].key.clone(); - let patterns = { - let appmeta = app_meta_man.get_app_meta(&fn_ctx.app).unwrap(); - let fnmeta = appmeta.get_fn_meta(&fn_ctx.func).unwrap(); - - // make sure each key set is valid - let mut patterns = vec![]; - for kv in kvs.iter() { - if let Some(pattern) = fnmeta.match_key(&kv.key, KvOps::Set) { - patterns.push(pattern); - } else { - // no pattern matched - return Err(WsPermissionErr::AccessKeyPermissionDenied { - app: fn_ctx.app.clone(), - func: fn_ctx.func.clone(), - access_key: TryUtf8VecU8(kv.key.clone()), - } - .into()); - } - } - patterns - }; - - // TODO: handle errors - for (kv, pattern) in kvs.iter_mut().zip(&patterns) { - tracing::info!("app called kv set really"); - // one pattern may trigger multiple functions - kv_event::check_and_handle_event( - fn_ctx, - pattern, - self, - &*app_meta_man, - KvOps::Set, - std::mem::take(kv), - ) - .await; - } + // pub async fn set_by_app_fn( + // &self, + // mut kvs: Vec, + // _opts: KvOptions, + // fn_ctx: &FunctionCtx, + // ) -> WSResult> { + // tracing::info!("app called kv set"); + // let app_meta_man = self.view.instance_manager().app_meta_manager.read().await; - Ok(vec![]) - // // common plan - // self.set(kvs, SetOptions::new()).await - } + // // make sure each key set is valid + // let patterns = { + // let appmeta = app_meta_man.get_app_meta(&fn_ctx.app).unwrap(); + // let fnmeta = appmeta.get_fn_meta(&fn_ctx.func).unwrap(); + + // let mut patterns = vec![]; + // for kv in kvs.iter() { + // if let Some(pattern) = fnmeta.match_key(&kv.key, KvOps::Set) { + // patterns.push(pattern); + // } else { + // // no pattern matched + // return Err(WsPermissionErr::AccessKeyPermissionDenied { + // app: fn_ctx.app.clone(), + // func: fn_ctx.func.clone(), + // access_key: TryUtf8VecU8(kv.key.clone()), + // } + // .into()); + // } + // } + // patterns + // }; + + // // TODO: handle errors + // for (kv, pattern) in kvs.iter_mut().zip(&patterns) { + // tracing::info!("app called kv set really"); + // // one pattern may trigger multiple functions + // kv_event::check_and_handle_event( + // fn_ctx, + // pattern, + // self, + // &*app_meta_man, + // KvOps::Set, + // std::mem::take(kv), + // ) + // .await; + // } + + // Ok(vec![]) + // // // common plan + // // self.set(kvs, SetOptions::new()).await + // } } diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 8ae20fc..7d3ad49 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -3,6 +3,7 @@ pub mod executor; pub mod function_event; pub mod http_handler; pub mod instance_manager; +pub mod kv_storage; pub mod kv_user_client; pub mod wasm; pub mod wasm_host_funcs; diff --git a/src/worker/wasm_host_funcs/kv.rs b/src/worker/wasm_host_funcs/kv.rs index f08c931..99d9df2 100644 --- a/src/worker/wasm_host_funcs/kv.rs +++ b/src/worker/wasm_host_funcs/kv.rs @@ -1,7 +1,13 @@ use super::{utils, HostFuncRegister}; use crate::general::{ - kv_interface::{KVInterface, SetOptions}, - network::proto::kv::{KeyRange, KvPair}, + kv_interface::{KvInterface, KvOptions}, + network::{ + msg_pack::KvResponseExt, + proto::{ + self, + kv::{KeyRange, KvPair, KvRequest, KvRequests, KvResponses}, + }, + }, }; use crate::worker::kv_user_client::kv_user_client; use moka::sync::Cache; @@ -16,96 +22,296 @@ use wasmedge_sdk::{ }; // fn kv_set(kptr: *const u8, klen: i32, v: *const u8, vlen: i32); -type KvSetArgs = (i32, i32, i32, i32); -#[cfg_attr(target_os = "linux", async_host_function)] -async fn kv_set_async( - // caller: CallingFrame, - // args: Vec, - caller: Caller, - args: Vec, - _ctx: *mut T, -) -> Result, HostFuncError> { - let key = utils::u8slice(&caller, args[0].to_i32(), args[1].to_i32()).to_owned(); - let value = utils::u8slice(&caller, args[2].to_i32(), args[3].to_i32()).to_owned(); - let cur_app_fn = utils::current_app_fn_ctx(&caller); - kv_user_client() - .set_by_app_fn(vec![KvPair { key, value }], SetOptions {}, unsafe { - cur_app_fn.0.as_ref() - }) - .await; - Ok(vec![]) -} +// type KvSetArgs = (i32, i32, i32, i32); +// #[cfg_attr(target_os = "linux", async_host_function)] +// async fn kv_set_async( +// // caller: CallingFrame, +// // args: Vec, +// caller: Caller, +// args: Vec, +// _ctx: *mut T, +// ) -> Result, HostFuncError> { +// let key = utils::u8slice(&caller, args[0].to_i32(), args[1].to_i32()).to_owned(); +// let value = utils::u8slice(&caller, args[2].to_i32(), args[3].to_i32()).to_owned(); +// let cur_app_fn = utils::current_app_fn_ctx(&caller); +// kv_user_client() +// .set_by_app_fn(KvPair { key, value }, KvOptions::new(), unsafe { +// cur_app_fn.0.as_ref() +// }) +// .await; +// Ok(vec![]) +// } +// #[cfg(target_os = "macos")] +// fn kv_set(k_ptr: i32, k_len: i32, v_ptr: i32, v_len: i32) { +// // let +// } + +// fn kv_get_len(kptr: *const u8, klen: i32, vlen: &mut i32, id: &mut i32); +// type KvGetLenArgs = (i32, i32, i32, i32); +// #[cfg_attr(target_os = "linux", async_host_function)] +// async fn kv_get_len_async( +// // caller: CallingFrame, +// // args: Vec, +// caller: Caller, +// args: Vec, +// _ctx: *mut T, +// ) -> Result, HostFuncError> { +// let key = utils::u8slice(&caller, args[0].to_i32(), args[1].to_i32()); +// let len = utils::mutref::(&caller, args[2].to_i32()); +// let id = utils::mutref::(&caller, args[3].to_i32()); +// if let Ok(mut res) = kv_user_client() +// .get(KeyRange { +// start: key.to_owned(), +// end: vec![], +// }) +// .await +// { +// if res.len() > 0 { +// let KvPair { key: _, value } = res.pop().unwrap(); +// let newid = NEXT_CACHE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); +// if newid < 0 { +// NEXT_CACHE_ID.store(0, std::sync::atomic::Ordering::Relaxed); +// } +// let newid = NEXT_CACHE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); +// *len = value.len() as i32; +// *id = newid; +// RECENT_Kv_CACHE.insert(newid, value); +// } else { +// *len = 0; +// } +// } else { +// *len = 0; +// } + +// // kv_user_client() +// Ok(vec![]) +// } + #[cfg(target_os = "macos")] -fn kv_set(k_ptr: i32, k_len: i32, v_ptr: i32, v_len: i32) { - // let +fn kv_get_len(caller: Caller, args: Vec) -> Result, HostFuncError> { + util::call_async_from_sync(kv_get_len_async(caller, args, std::ptr::null_mut::<()>())) } +// fn kv_get(id: i32, vlen: *const u8); +// type KvGetArgs = (i32, i32); +// #[cfg_attr(target_os = "linux", host_function)] +// fn kv_get(caller: Caller, args: Vec) -> Result, HostFuncError> { +// let id = args[0].to_i32(); +// if let Some(res) = RECENT_Kv_CACHE.remove(&id) { +// let retslice = utils::mutu8sclice(&caller, args[1].to_i32(), res.len() as i32).unwrap(); +// retslice.copy_from_slice(res.as_slice()); +// } + +// Ok(vec![]) +// } + lazy_static::lazy_static! { - static ref RECENT_KV_CACHE: Cache>=Cache::>::builder() + static ref RECENT_KV_CACHE: Cache=Cache::builder() .time_to_live(Duration::from_secs(10)) - .weigher(|_key, value| -> u32 { value.len().try_into().unwrap_or(u32::MAX) }) // This cache will hold up to 32MiB of values. - .max_capacity(32 * 1024 * 1024) + .max_capacity(10240) .build(); static ref NEXT_CACHE_ID: AtomicI32=AtomicI32::new(0); } -// fn kv_get_len(kptr: *const u8, klen: i32, vlen: &mut i32, id: &mut i32); -type KvGetLenArgs = (i32, i32, i32, i32); + +const SET_ID: usize = 1; +const GET_ID: usize = 2; +const LOCK_ID: usize = 3; +const DELETE_ID: usize = 4; + +type KvBatchOpe = (i32, i32, i32); #[cfg_attr(target_os = "linux", async_host_function)] -async fn kv_get_len_async( - // caller: CallingFrame, - // args: Vec, +async fn kv_batch_ope( caller: Caller, args: Vec, _ctx: *mut T, ) -> Result, HostFuncError> { - let key = utils::u8slice(&caller, args[0].to_i32(), args[1].to_i32()); - let len = utils::mutref::(&caller, args[2].to_i32()); - let id = utils::mutref::(&caller, args[3].to_i32()); - if let Ok(mut res) = kv_user_client() - .get(KeyRange { - start: key.to_owned(), - end: vec![], - }) + let opes_arg_ptr = args[0].to_i32(); + let opes_arg_len = args[1].to_i32(); + let opes_id = utils::mutref::(&caller, args[2].to_i32()); + let args = utils::i32slice(&caller, opes_arg_ptr, opes_arg_len); + let func_ctx = unsafe { utils::current_app_fn_ctx(&caller).0.as_ref() }; + + // request and response mem position + let ope_cnt = args[0]; + let mut requests: Vec = vec![]; + let mut cur_idx = 1; + // tracing::debug!("args:{:?}", args); + // Construct the requests + for _ in 0..ope_cnt { + let ope_type = args[cur_idx]; + match ope_type as usize { + // set + SET_ID => { + let key = utils::u8slice(&caller, args[cur_idx + 1], args[cur_idx + 2]); + let value = utils::u8slice(&caller, args[cur_idx + 3], args[cur_idx + 4]); + requests.push(KvRequest { + op: Some(proto::kv::kv_request::Op::Set( + proto::kv::kv_request::KvPutRequest { + kv: Some(KvPair { + key: key.to_owned(), + value: value.to_owned(), + }), + }, + )), + }); + cur_idx += 5; + } + // get + GET_ID => { + let key = utils::u8slice(&caller, args[cur_idx + 1], args[cur_idx + 2]); + // tracing::debug!("ptr:{} len:{} get key:{:?}", args[cur_idx + 1], args[cur_idx + 2],key); + requests.push(KvRequest { + op: Some(proto::kv::kv_request::Op::Get( + proto::kv::kv_request::KvGetRequest { + range: Some(KeyRange { + start: key.to_owned(), + end: vec![], + }), + }, + )), + }); + cur_idx += 4; + } + // lock + LOCK_ID => { + let key = utils::u8slice(&caller, args[cur_idx + 1], args[cur_idx + 2]); + // // first bit + // let read_or_write = args[cur_idx + 3] & 1 == 1; + // <0 means get + let release_id = args[cur_idx + 3]; + requests.push(KvRequest { + op: Some(proto::kv::kv_request::Op::Lock( + proto::kv::kv_request::KvLockRequest { + read_or_write: false, + release_id: if release_id < 0 { + vec![] + } else { + vec![release_id as u32] + }, + range: Some(KeyRange { + start: key.to_owned(), + end: vec![], + }), + }, + )), + }); + cur_idx += 5; + } + DELETE_ID => { + let key = utils::u8slice(&caller, args[cur_idx + 1], args[cur_idx + 2]); + requests.push(KvRequest { + op: Some(proto::kv::kv_request::Op::Delete( + proto::kv::kv_request::KvDeleteRequest { + range: Some(KeyRange { + start: key.to_owned(), + end: vec![], + }), + }, + )), + }); + cur_idx += 3; + } + _ => { + panic!("not implemented, reqs{:?},{:X}", requests, ope_type); + } + } + } + // tracing::debug!("requests:{:?}", requests); + match kv_user_client() + .call( + KvRequests { + requests, + app: func_ctx.app.clone(), + func: func_ctx.func.clone(), + }, + KvOptions::new(), + ) .await { - if res.len() > 0 { - let KvPair { key: _, value } = res.pop().unwrap(); - let newid = NEXT_CACHE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if newid < 0 { - NEXT_CACHE_ID.store(0, std::sync::atomic::Ordering::Relaxed); + Ok(res) => { + let id = NEXT_CACHE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + // Write back the results to wasm runtime + let mut cur_idx = 1; + let mut resps = res.responses.iter(); + // Construct the requests + for _ in 0..ope_cnt { + let ope_type = args[cur_idx]; + match ope_type as usize { + // set + SET_ID => { + let _ = resps.next().unwrap(); + cur_idx += 5; + } + // get + GET_ID => { + let kvs = resps.next().unwrap().common_kvs().unwrap(); + // get len + *utils::mutref::(&caller, args[cur_idx + 3]) = if kvs.len() > 0 { + kvs.get(0).unwrap().value.len() as i32 + } else { + -1 + }; + cur_idx += 4; + } + // lock + LOCK_ID => { + if let Some(lockid) = resps.next().unwrap().lock_id() { + // lock id is allocated by the remote when call the lock + *utils::mutref::(&caller, args[cur_idx + 4]) = lockid; + } else { + // unlock, no response + } + cur_idx += 5; + } + DELETE_ID => { + let _ = resps.next().unwrap(); + cur_idx += 3; + } + _ => { + panic!("not implemented"); + } + } } - let newid = NEXT_CACHE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - *len = value.len() as i32; - *id = newid; - RECENT_KV_CACHE.insert(newid, value); - } else { - *len = 0; + RECENT_KV_CACHE.insert(id, res); + *opes_id = id; + } + Err(err) => { + tracing::error!("kv batch ope error:{}", err); } - } else { - *len = 0; } - - // kv_user_client() Ok(vec![]) } -#[cfg(target_os = "macos")] -fn kv_get_len(caller: Caller, args: Vec) -> Result, HostFuncError> { - util::call_async_from_sync(kv_get_len_async(caller, args, std::ptr::null_mut::<()>())) -} - -// fn kv_get(id: i32, vlen: *const u8); -type KvGetArgs = (i32, i32); -#[cfg_attr(target_os = "linux", host_function)] -fn kv_get(caller: Caller, args: Vec) -> Result, HostFuncError> { +#[host_function] +fn kv_batch_res(caller: Caller, args: Vec) -> Result, HostFuncError> { let id = args[0].to_i32(); - if let Some(res) = RECENT_KV_CACHE.remove(&id) { - let retslice = utils::mutu8sclice(&caller, args[1].to_i32(), res.len() as i32).unwrap(); - retslice.copy_from_slice(res.as_slice()); - } + if let Some(res) = RECENT_KV_CACHE.get(&id) { + let args_ptr = args[1].to_i32(); + let args_len = args[2].to_i32(); + let args = utils::i32slice(&caller, args_ptr, args_len); + let mut cur_idx = 0; + while cur_idx < args.len() { + let ope_idx = args[cur_idx]; + if let Some(res) = res.responses.get(ope_idx as usize) { + if let Some(kvs) = res.common_kvs() { + if let Some(kv) = kvs.get(0) { + let slice = + utils::mutu8sclice(&caller, args[cur_idx + 1], kv.value.len() as i32) + .unwrap(); + slice.copy_from_slice(kvs.get(0).unwrap().value.as_slice()); + } + } else if let Some(_lock_id) = res.lock_id() { + // do nothing + } else { + panic!("not implemented"); + } + } + cur_idx += 2; + } + } Ok(vec![]) } @@ -114,11 +320,13 @@ pub(super) struct KvFuncsRegister; impl HostFuncRegister for KvFuncsRegister { fn register(&self, builder: ImportObjectBuilder) -> ImportObjectBuilder { builder - .with_async_func::("kv_set", kv_set_async, None) - .unwrap() - .with_async_func::("kv_get_len", kv_get_len_async, None) + .with_async_func::("kv_batch_ope", kv_batch_ope, None) .unwrap() - .with_func::("kv_get", kv_get, None) + .with_func::("kv_batch_res", kv_batch_res, None) .unwrap() + // .with_async_func::("kv_get_len", kv_get_len_async, None) + // .unwrap() + // .with_func::("kv_get", kv_get, None) + // .unwrap() } } diff --git a/src/worker/wasm_host_funcs/mod.rs b/src/worker/wasm_host_funcs/mod.rs index 6011952..bea23ca 100644 --- a/src/worker/wasm_host_funcs/mod.rs +++ b/src/worker/wasm_host_funcs/mod.rs @@ -41,17 +41,25 @@ mod utils { } pub fn u8slice<'a>(caller: &impl WasmCtx, ptr: i32, len: i32) -> &'a [u8] { - // tracing::info!("getting u8 slice"); + // tracing::debug!("u8slice ptr: {}, len: {}", ptr, len); let mem = caller .memory(0) .unwrap() .data_pointer(ptr as u32, len as u32) .unwrap(); let res = unsafe { std::slice::from_raw_parts(mem, len as usize) }; - // tracing::info!("got u8 slice"); res } + pub fn i32slice<'a>(caller: &impl WasmCtx, ptr: i32, len: i32) -> &'a [i32] { + let mem = caller + .memory(0) + .unwrap() + .data_pointer(ptr as u32, len as u32) + .unwrap(); + unsafe { std::slice::from_raw_parts(mem as *const i32, len as usize) } + } + pub fn mutu8sclice<'a>(caller: &impl WasmCtx, ptr: i32, len: i32) -> Option<&'a mut [u8]> { if let Ok(mem) = caller .memory(0) diff --git a/ws_derive/src/test.rs b/ws_derive/src/test.rs index 1d1aa24..84de76d 100644 --- a/ws_derive/src/test.rs +++ b/ws_derive/src/test.rs @@ -2,7 +2,7 @@ struct MetaKvClientView { inner: Option>, } impl MetaKvClientView { - fn local_kv_client(&self) -> &Box { + fn local_kv_client(&self) -> &Box { self.inner.upgrade().unwrap().local_kv_client } } @@ -10,7 +10,7 @@ struct LocalKvClientView { inner: Weak, } impl LocalKvClientView { - fn local_kv(&self) -> &Option> { + fn local_kv(&self) -> &Option> { self.inner.upgrade().unwrap().local_kv } }