Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
db1723c
add a src for handler
flaneur2020 Aug 17, 2025
43d234b
rename as cmd executor
flaneur2020 Aug 17, 2025
0decb3f
tune the command
flaneur2020 Aug 17, 2025
3090623
add mpmc channel
flaneur2020 Aug 17, 2025
fc0692e
add scaffold for workers
flaneur2020 Aug 17, 2025
5f83a2f
add the scaffold
flaneur2020 Aug 17, 2025
04f5de8
fix
flaneur2020 Aug 17, 2025
b15b00a
add cancellation_token
flaneur2020 Aug 17, 2025
c38abf6
add run_worker
flaneur2020 Aug 17, 2025
90836e4
handle cancellation_token in run_worker
flaneur2020 Aug 17, 2025
ec73cbb
add logs over worker
flaneur2020 Aug 17, 2025
409c733
tune the log
flaneur2020 Aug 17, 2025
f505204
add close()
flaneur2020 Aug 17, 2025
66d3cb5
fix build
flaneur2020 Aug 17, 2025
6f0356c
fix build
flaneur2020 Aug 17, 2025
e90fe7b
allow client to pass across tasks
flaneur2020 Aug 17, 2025
f997ab9
wrap Arc over client
flaneur2020 Aug 17, 2025
448592a
fix build
flaneur2020 Aug 17, 2025
029541a
feat: add runtime support to executor with worker threads
flaneur2020 Aug 17, 2025
9b38317
tune the comment
flaneur2020 Aug 17, 2025
1820c9a
tune the comment
flaneur2020 Aug 17, 2025
bc92292
add a test cae
flaneur2020 Aug 17, 2025
1fbf5b4
fix locking in Client
flaneur2020 Aug 17, 2025
404e7c8
add Default
flaneur2020 Aug 17, 2025
bcabd55
ehance the comment
flaneur2020 Aug 17, 2025
88f2e1b
fix
flaneur2020 Aug 17, 2025
4acd3c9
chore rename
flaneur2020 Aug 17, 2025
5c37b4c
add todo
flaneur2020 Aug 17, 2025
a677275
check the ls
flaneur2020 Aug 17, 2025
595b939
fix execute()
flaneur2020 Aug 17, 2025
8fe859c
merge origin/main
flaneur2020 Sep 6, 2025
0fa409a
cargo fmt
flaneur2020 Sep 6, 2025
5442c5e
fix license header
flaneur2020 Sep 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 43 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"src/server",
"src/conf",
"src/cmd",
"src/executor",
"src/client"
]

Expand Down Expand Up @@ -52,13 +53,22 @@ tokio = { version = "1", features = ["full"] }
snafu = "0.8"
tempfile = "3.8"
crc16 = "0.4"
async-channel = "2.5.0"
tokio-util = "0.7.16"
async-trait = "0.1"
foyer = { version = "0.18", features = ["nightly"] }

## workspaces members
storage = { path = "src/storage" }
kstd = { path = "src/kstd" }
common-macro = { path = "src/common/macro" }
net = { path = "src/net" }
resp = { path = "src/resp" }
server = { path = "src/server" }
conf = { path = "src/conf" }
cmd = { path = "src/cmd" }
executor = { path = "src/executor" }
client = { path = "src/client" }

[profile.dev]
split-debuginfo = "unpacked"
Expand Down
2 changes: 2 additions & 0 deletions src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ edition = "2021"
[dependencies]
async-trait = "0.1"
resp = { path = "../resp" }
parking_lot = { workspace = true }
tokio = { version = "1.0", features = ["sync"] }
90 changes: 57 additions & 33 deletions src/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use resp::RespData;
use tokio::sync::Mutex;

#[async_trait]
pub trait StreamTrait: Send + Sync {
Expand All @@ -25,73 +28,94 @@ pub trait StreamTrait: Send + Sync {
}

pub struct Client {
stream: Box<dyn StreamTrait>,
// using tokio::sync::Mutex may has risks to pass this Client object across
// tokio runtimes. we may require to figure out how to make a refactor to
// avoid passing Client across runtimes.
stream: Mutex<Box<dyn StreamTrait>>,
ctx: parking_lot::Mutex<ClientContext>,
}

struct ClientContext {
// TODO: use &[Vec<u8>], need lifetime.
argv: Vec<Vec<u8>>,
// Client name.
name: Vec<u8>,
cmd_name: Vec<u8>,
name: Arc<Vec<u8>>,
cmd_name: Arc<Vec<u8>>,
key: Vec<u8>,
reply: RespData,
}

impl Client {
pub fn new(stream: Box<dyn StreamTrait>) -> Self {
Self {
stream,
argv: Vec::default(),
name: Vec::default(),
cmd_name: Vec::default(),
key: Vec::default(),
reply: RespData::default(),
stream: Mutex::new(stream),
ctx: parking_lot::Mutex::new(ClientContext {
argv: Vec::default(),
name: Arc::new(Vec::default()),
cmd_name: Arc::new(Vec::default()),
key: Vec::default(),
reply: RespData::default(),
}),
}
}

pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
self.stream.read(buf).await
pub async fn read(&self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let mut stream = self.stream.lock().await;
stream.read(buf).await
}

pub async fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
self.stream.write(data).await
pub async fn write(&self, data: &[u8]) -> Result<usize, std::io::Error> {
let mut stream = self.stream.lock().await;
stream.write(data).await
}

pub fn set_argv(&mut self, argv: &[Vec<u8>]) {
self.argv = argv.to_vec()
pub fn set_argv(&self, argv: &[Vec<u8>]) {
let mut ctx = self.ctx.lock();
ctx.argv = argv.to_vec();
}

pub fn argv(&self) -> &[Vec<u8>] {
&self.argv
pub fn argv(&self) -> Vec<Vec<u8>> {
let ctx = self.ctx.lock();
ctx.argv.clone()
}

pub fn set_name(&mut self, name: &[u8]) {
self.name = name.to_vec()
pub fn set_name(&self, name: &[u8]) {
let mut ctx = self.ctx.lock();
ctx.name = Arc::new(name.to_vec());
}

pub fn name(&self) -> &[u8] {
&self.name
pub fn name(&self) -> Arc<Vec<u8>> {
let ctx = self.ctx.lock();
ctx.name.clone()
}

pub fn set_cmd_name(&mut self, name: &[u8]) {
self.cmd_name = name.to_vec()
pub fn set_cmd_name(&self, name: &[u8]) {
let mut ctx = self.ctx.lock();
ctx.cmd_name = Arc::new(name.to_vec());
}

pub fn cmd_name(&self) -> &[u8] {
&self.cmd_name
pub fn cmd_name(&self) -> Arc<Vec<u8>> {
let ctx = self.ctx.lock();
ctx.cmd_name.clone()
}

pub fn set_key(&mut self, key: &[u8]) {
self.key = key.to_vec()
pub fn set_key(&self, key: &[u8]) {
let mut ctx = self.ctx.lock();
ctx.key = key.to_vec();
}

pub fn key(&self) -> &[u8] {
&self.key
pub fn key(&self) -> Vec<u8> {
let ctx = self.ctx.lock();
ctx.key.clone()
}

pub fn reply_mut(&mut self) -> &mut RespData {
&mut self.reply
pub fn set_reply(&self, reply: RespData) {
let mut ctx = self.ctx.lock();
ctx.reply = reply;
}

pub fn take_reply(&mut self) -> RespData {
std::mem::take(&mut self.reply)
pub fn take_reply(&self) -> RespData {
let mut ctx = self.ctx.lock();
std::mem::take(&mut ctx.reply)
}
}
12 changes: 6 additions & 6 deletions src/cmd/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,26 @@ impl Cmd for GetCmd {
impl_cmd_meta!();
impl_cmd_clone_box!();

fn do_initial(&self, client: &mut Client) -> bool {
fn do_initial(&self, client: &Client) -> bool {
let key = client.argv()[1].clone();
client.set_key(&key);
true
}
Comment on lines +49 to 53
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard against argv underflow to avoid panic

argv[1] indexing will panic on malformed or short requests. Return an error and short-circuit when arity is not satisfied.

Apply:

-    fn do_initial(&self, client: &Client) -> bool {
-        let key = client.argv()[1].clone();
-        client.set_key(&key);
-        true
-    }
+    fn do_initial(&self, client: &Client) -> bool {
+        let argv = client.argv();
+        if argv.len() < 2 {
+            client.set_reply(RespData::Error(
+                "ERR wrong number of arguments for 'get' command".to_string().into(),
+            ));
+            return false;
+        }
+        let key = argv[1].clone();
+        client.set_key(&key);
+        true
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn do_initial(&self, client: &Client) -> bool {
let key = client.argv()[1].clone();
client.set_key(&key);
true
}
fn do_initial(&self, client: &Client) -> bool {
let argv = client.argv();
if argv.len() < 2 {
client.set_reply(RespData::Error(
"ERR wrong number of arguments for 'get' command".to_string().into(),
));
return false;
}
let key = argv[1].clone();
client.set_key(&key);
true
}
🤖 Prompt for AI Agents
In src/cmd/src/get.rs around lines 49 to 53, the code indexes argv[1] without
checking length which can panic for short/malformed requests; before accessing
argv[1] validate that client.argv().len() > 1 and if not, short-circuit by
returning false and avoid accessing argv; also emit an error response to the
client (e.g., send a protocol error or call the client's error/response method)
so the caller gets a clear failure instead of crashing.


fn do_cmd(&self, client: &mut Client, storage: Arc<Storage>) {
fn do_cmd(&self, client: &Client, storage: Arc<Storage>) {
let key = client.key();
let result = storage.get(key);
let result = storage.get(&key);

match result {
Ok(value) => {
*client.reply_mut() = RespData::BulkString(Some(value.into()));
client.set_reply(RespData::BulkString(Some(value.into())));
}
Err(e) => match e {
storage::error::Error::KeyNotFound { .. } => {
*client.reply_mut() = RespData::BulkString(None);
client.set_reply(RespData::BulkString(None));
}
_ => {
*client.reply_mut() = RespData::Error(format!("ERR {e}").into());
client.set_reply(RespData::Error(format!("ERR {e}").into()));
}
},
}
Expand Down
Loading
Loading