Skip to content

Commit

Permalink
Merge pull request #81 from doyoubi/MultiKeyCmd
Browse files Browse the repository at this point in the history
Support multi-key commands
  • Loading branch information
doyoubi committed Feb 23, 2020
2 parents 01e5637 + bfd4c54 commit 00dd893
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 59 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ scopeguard = "1.0.0"
itertools = "0.8.0"
futures-batch = "0.6.0"
config = "0.9"
btoi = "0.4.0"
btoi = "0.4.2"
crossbeam = "0.7.1"
crossbeam-channel = "0.4"
chashmap = "2.2.2"
Expand Down
2 changes: 1 addition & 1 deletion conf/server-proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ auto_select_db = true
slowlog_len = 1024

# In microseconds like redis.
slowlog_log_slower_than = 0
slowlog_log_slower_than = 20000

thread_number = 2

Expand Down
4 changes: 2 additions & 2 deletions src/proxy/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::net::TcpStream;
use tokio_util::codec::Decoder;

pub type BackendResult<T> = Result<T, BackendError>;
pub type TaskResult = Result<RespVec, CommandError>;
pub type CmdTaskResult = Result<RespVec, CommandError>;

pub trait CmdTaskResultHandler: Send + Sync + 'static {
type Task: CmdTask;
Expand Down Expand Up @@ -71,7 +71,7 @@ pub trait CmdTaskFactory {
) -> (
Self::Task,
// TODO: return indexed resp
Pin<Box<dyn Future<Output = TaskResult> + Send + 'static>>,
Pin<Box<dyn Future<Output = CmdTaskResult> + Send + 'static>>,
);
}

Expand Down
51 changes: 32 additions & 19 deletions src/proxy/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ use crate::common::utils::byte_to_uppercase;
use crate::protocol::{RespPacket, RespSlice, RespVec};
use arrayvec::ArrayVec;
use futures::channel::oneshot;
use futures::task::{Context, Poll};
use futures::Future;
use pin_project::pin_project;
use std::convert::identity;
use std::error::Error;
use std::fmt;
use std::io;
use std::pin::Pin;
use std::result::Result;
use std::str;

Expand Down Expand Up @@ -93,6 +97,8 @@ pub enum DataCmdType {
STRLEN,
EVAL,
EVALSHA,
DEL,
EXISTS,
Others,
}

Expand Down Expand Up @@ -138,6 +144,8 @@ impl DataCmdType {
b"STRLEN" => DataCmdType::STRLEN,
b"EVAL" => DataCmdType::EVAL,
b"EVALSHA" => DataCmdType::EVALSHA,
b"DEL" => DataCmdType::DEL,
b"EXISTS" => DataCmdType::EXISTS,
_ => DataCmdType::Others,
}
}
Expand Down Expand Up @@ -243,6 +251,15 @@ impl TaskReply {
pub type CommandResult<T> = Result<Box<T>, CommandError>;
pub type TaskResult = Result<Box<TaskReply>, CommandError>;

pub fn new_command_pair() -> (CmdReplySender, CmdReplyReceiver) {
let (s, r) = oneshot::channel::<TaskResult>();
let reply_sender = CmdReplySender {
reply_sender: Some(s),
};
let reply_receiver = CmdReplyReceiver { reply_receiver: r };
(reply_sender, reply_receiver)
}

pub struct CmdReplySender {
reply_sender: Option<oneshot::Sender<TaskResult>>,
}
Expand All @@ -253,19 +270,6 @@ impl fmt::Debug for CmdReplySender {
}
}

pub struct CmdReplyReceiver {
reply_receiver: oneshot::Receiver<TaskResult>,
}

pub fn new_command_pair() -> (CmdReplySender, CmdReplyReceiver) {
let (s, r) = oneshot::channel::<TaskResult>();
let reply_sender = CmdReplySender {
reply_sender: Some(s),
};
let reply_receiver = CmdReplyReceiver { reply_receiver: r };
(reply_sender, reply_receiver)
}

impl CmdReplySender {
pub fn send(&mut self, res: TaskResult) -> Result<(), CommandError> {
// Must not send twice.
Expand Down Expand Up @@ -294,12 +298,21 @@ impl Drop for CmdReplySender {
}
}

impl CmdReplyReceiver {
pub async fn wait_response(self) -> Result<Box<TaskReply>, CommandError> {
self.reply_receiver
.await
.map_err(|_| CommandError::Canceled)
.and_then(identity)
#[pin_project]
pub struct CmdReplyReceiver {
#[pin]
reply_receiver: oneshot::Receiver<TaskResult>,
}

impl Future for CmdReplyReceiver {
type Output = TaskResult;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().reply_receiver.poll(cx).map(|result| {
result
.map_err(|_| CommandError::Canceled)
.and_then(identity)
})
}
}

Expand Down

0 comments on commit 00dd893

Please sign in to comment.