Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for BZPOPMIN and BZPOPMAX #237

Merged
merged 2 commits into from Nov 24, 2020
Merged

Conversation

cfeitong
Copy link
Contributor

@cfeitong cfeitong commented Nov 24, 2020

No description provided.

diff --git a/docs/command_table.json b/docs/command_table.json
index 38aa4c4..21d3f1e 100644
--- a/docs/command_table.json
+++ b/docs/command_table.json
@@ -49,11 +49,11 @@
     },
     "bzpopmax": {
         "desc": "",
-        "supported": false
+        "supported": true
     },
     "bzpopmin": {
         "desc": "",
-        "supported": false
+        "supported": true
     },
     "client": {
         "desc": "",
diff --git a/docs/command_table.md b/docs/command_table.md
index 17bf80c..49a2d26 100644
--- a/docs/command_table.md
+++ b/docs/command_table.md
@@ -12,8 +12,8 @@
 | blpop | True | User MUST specify timeout. |
 | brpop | True | User MUST specify timeout. |
 | brpoplpush | True | User MUST specify timeout. |
-| bzpopmax | False |  |
-| bzpopmin | False |  |
+| bzpopmax | True |  |
+| bzpopmin | True |  |
 | client | False |  |
 | cluster | True | Only support the following sub commands: NODES, SLOTS, KEYSLOT. |
 | command | False |  |
diff --git a/src/proxy/command.rs b/src/proxy/command.rs
index 9a35b47..eecee9c 100644
--- a/src/proxy/command.rs
+++ b/src/proxy/command.rs
@@ -129,6 +129,8 @@ pub enum DataCmdType {
     ZREMRANGEBYLEX,
     ZREMRANGEBYRANK,
     ZREMRANGEBYSCORE,
+    BZPOPMIN,
+    BZPOPMAX,
     // Key commands
     EXPIRE,
     EXPIREAT,
@@ -207,6 +209,8 @@ impl DataCmdType {
             b"UNLINK" => DataCmdType::UNLINK,
             b"ZPOPMAX" => DataCmdType::ZPOPMAX,
             b"ZPOPMIN" => DataCmdType::ZPOPMIN,
+            b"BZPOPMAX" => DataCmdType::BZPOPMAX,
+            b"BZPOPMIN" => DataCmdType::BZPOPMIN,
             b"ZREM" => DataCmdType::ZREM,
             b"ZREMRANGEBYLEX" => DataCmdType::ZREMRANGEBYLEX,
             b"ZREMRANGEBYRANK" => DataCmdType::ZREMRANGEBYRANK,
diff --git a/src/proxy/executor.rs b/src/proxy/executor.rs
index 5906aec..93c9205 100644
--- a/src/proxy/executor.rs
+++ b/src/proxy/executor.rs
@@ -30,6 +30,8 @@ use std::str;
 use std::sync::{self, Arc};
 use std::time::Duration;

+type NonBlockingCommandsWithKey = Vec<(Vec<u8>, RespVec)>;
+
 pub struct SharedForwardHandler<F: RedisClientFactory, C: ConnFactory<Pkt = RespPacket>> {
     handler: sync::Arc<ForwardHandler<F, C>>,
 }
@@ -533,11 +535,13 @@ where
                     "EXISTS",
                 )))
             }
-            DataCmdType::BLPOP | DataCmdType::BRPOP | DataCmdType::BRPOPLPUSH => {
-                CmdReplyFuture::Right(Box::pin(
-                    self.handle_list_blocking_commands(cmd_ctx, reply_receiver),
-                ))
-            }
+            DataCmdType::BLPOP
+            | DataCmdType::BRPOP
+            | DataCmdType::BRPOPLPUSH
+            | DataCmdType::BZPOPMIN
+            | DataCmdType::BZPOPMAX => CmdReplyFuture::Right(Box::pin(
+                self.handle_blocking_commands(cmd_ctx, reply_receiver),
+            )),
             _ => {
                 self.handle_single_key_data_cmd(cmd_ctx);
                 CmdReplyFuture::Left(reply_receiver)
@@ -756,60 +760,38 @@ where
         reply_receiver.await
     }

-    async fn handle_list_blocking_commands(
+    async fn handle_blocking_commands(
         &self,
         cmd_ctx: CmdCtx,
         reply_receiver: CmdReplyReceiver,
     ) -> TaskResult {
         let data_cmd_type = cmd_ctx.get_data_cmd_type();

-        let (non_blocking_cmd_name, lrpop) = match data_cmd_type {
-            DataCmdType::BLPOP => ("LPOP", true),
-            DataCmdType::BRPOP => ("RPOP", true),
-            DataCmdType::BRPOPLPUSH => ("RPOPLPUSH", false),
-            _ => {
-                let cmd_name = cmd_ctx
-                    .get_cmd()
-                    .get_command_name()
-                    .map(|s| s.to_string())
-                    .unwrap_or_else(String::new);
-                cmd_ctx.set_resp_result(Ok(Resp::Error(
-                    format!("ERR unexpected command name '{}'", cmd_name).into_bytes(),
-                )));
+        let non_blocking_cmd_name = match Self::get_non_blocking_name(&cmd_ctx, data_cmd_type) {
+            Ok(non_blocking_cmd_name) => non_blocking_cmd_name,
+            // unexpected command name, return early
+            Err(resp) => {
+                cmd_ctx.set_resp_result(Ok(resp));
                 return reply_receiver.await;
             }
         };

-        let arg_len = match (data_cmd_type, cmd_ctx.get_cmd().get_command_len()) {
-            (DataCmdType::BLPOP, Some(len)) if len > 2 => len,
-            (DataCmdType::BRPOP, Some(len)) if len > 2 => len,
-            (DataCmdType::BRPOPLPUSH, Some(len)) if len == 4 => len,
-            _ => {
-                let cmd_name = cmd_ctx
-                    .get_cmd()
-                    .get_command_name()
-                    .map(|s| s.to_string())
-                    .unwrap_or_else(String::new);
-                cmd_ctx.set_resp_result(Ok(Resp::Error(
-                    format!("ERR invalid argument number for {:?}", cmd_name).into_bytes(),
-                )));
+        let arg_len = match Self::get_command_arg_len(&cmd_ctx, data_cmd_type) {
+            Ok(len) => len,
+            // invalid number of arguments, return early
+            Err(resp) => {
+                cmd_ctx.set_resp_result(Ok(resp));
                 return reply_receiver.await;
             }
         };

-        let timeout = match cmd_ctx.get_cmd().get_command_last_element() {
-            None => {
-                cmd_ctx.set_resp_result(Ok(Resp::Error(b"ERR wrong number of arguments".to_vec())));
+        let timeout = match Self::get_blocking_command_timeout(&cmd_ctx) {
+            Ok(timeout) => timeout,
+            // fail to parse required timeout, return early
+            Err(resp) => {
+                cmd_ctx.set_resp_result(Ok(resp));
                 return reply_receiver.await;
             }
-            Some(last) => match btoi::btou::<u64>(last) {
-                Err(_) => {
-                    cmd_ctx
-                        .set_resp_result(Ok(Resp::Error(b"ERR invalid timeout argument".to_vec())));
-                    return reply_receiver.await;
-                }
-                Ok(timeout) => timeout,
-            },
         };

         if !self.config.active_redirection {
@@ -827,38 +809,18 @@ where
         let factory = CmdCtxFactory::default();
         let mut retry_num = 0;
         loop {
-            let mut cmds = vec![];
-            if lrpop {
-                // BLPOP, BRPOP
-                // exclude the timeout argument
-                for i in 1..(arg_len - 1) {
-                    let key = match cmd_ctx.get_cmd().get_command_element(i) {
-                        None => break, // invalid state
-                        Some(key) => key.to_vec(),
-                    };
-                    let non_blocking_cmd =
-                        vec![non_blocking_cmd_name.to_string().into_bytes(), key.clone()];
-                    let arr: Vec<RespVec> = non_blocking_cmd
-                        .into_iter()
-                        .map(|s| Resp::Bulk(BulkStr::Str(s)))
-                        .collect();
-                    let resp = Resp::Arr(Array::Arr(arr));
-                    cmds.push((key, resp));
-                }
-            } else {
-                // BRPOPLPUSH
-                let mut resp = cmd_ctx.get_cmd().get_resp_slice().map(|b| b.to_vec());
-                change_bulk_array_element(
-                    &mut resp,
-                    0,
-                    non_blocking_cmd_name.to_string().into_bytes(),
-                );
-                if let Resp::Arr(Array::Arr(ref mut resps)) = resp {
-                    resps.pop(); // pop out the timeout argument
+            let cmds = match Self::transfer_cmd_from_blocking_to_nonblocking(
+                &cmd_ctx,
+                data_cmd_type,
+                arg_len,
+                non_blocking_cmd_name,
+            ) {
+                Ok(cmds) => cmds,
+                Err(resp) => {
+                    cmd_ctx.set_resp_result(Ok(resp));
+                    return reply_receiver.await;
                 }
-                // BRPOPLPUSH does not need to care about key.
-                cmds.push((vec![], resp));
-            }
+            };

             for (key, non_blocking_cmd) in cmds.into_iter() {
                 let (sub_cmd_ctx, fut) =
@@ -873,27 +835,21 @@ where
                     Ok(resp) => resp,
                 };

-                match resp {
-                    Resp::Bulk(BulkStr::Nil) if timeout == 0 || retry_num < timeout => {}
-                    Resp::Bulk(BulkStr::Nil) if lrpop => {
-                        // BLPOP, BRPOP need to change resposne to Array::Nil.
-                        cmd_ctx.set_resp_result(Ok(Resp::Arr(Array::Nil)));
-                        return reply_receiver.await;
-                    }
-                    Resp::Bulk(BulkStr::Str(s)) if lrpop => {
-                        // BLPOP, BRPOP need to include the key.
-                        let resp = Resp::Arr(Array::Arr(vec![
-                            Resp::Bulk(BulkStr::Str(key)),
-                            Resp::Bulk(BulkStr::Str(s)),
-                        ]));
-                        cmd_ctx.set_resp_result(Ok(resp));
-                        return reply_receiver.await;
+                if Self::is_empty_resp(&resp) && (timeout == 0 || retry_num < timeout) {
+                    continue;
+                }
+
+                let resp = match data_cmd_type {
+                    DataCmdType::BLPOP | DataCmdType::BRPOP => {
+                        Self::adjust_lrpop_response(resp, key)
                     }
-                    resp => {
-                        cmd_ctx.set_resp_result(Ok(resp));
-                        return reply_receiver.await;
+                    DataCmdType::BZPOPMIN | DataCmdType::BZPOPMAX => {
+                        Self::adjust_zpop_response(resp, key)
                     }
-                }
+                    _ => resp,
+                };
+                cmd_ctx.set_resp_result(Ok(resp));
+                return reply_receiver.await;
             }

             retry_num += 1;
@@ -958,6 +914,138 @@ where
     fn handle_umsync(&self, cmd_ctx: CmdCtx) {
         self.manager.send_sync_task(cmd_ctx);
     }
+
+    fn get_non_blocking_name(
+        cmd_ctx: &CmdCtx,
+        data_cmd_type: DataCmdType,
+    ) -> Result<&'static str, RespVec> {
+        match data_cmd_type {
+            DataCmdType::BLPOP => Ok("LPOP"),
+            DataCmdType::BRPOP => Ok("RPOP"),
+            DataCmdType::BRPOPLPUSH => Ok("RPOPLPUSH"),
+            DataCmdType::BZPOPMIN => Ok("ZPOPMIN"),
+            DataCmdType::BZPOPMAX => Ok("ZPOPMAX"),
+            _ => {
+                let cmd_name = cmd_ctx
+                    .get_cmd()
+                    .get_command_name()
+                    .map(|s| s.to_string())
+                    .unwrap_or_else(String::new);
+                Err(Resp::Error(
+                    format!("ERR unexpected command name '{}'", cmd_name).into_bytes(),
+                ))
+            }
+        }
+    }
+
+    fn get_command_arg_len(cmd_ctx: &CmdCtx, data_cmd_type: DataCmdType) -> Result<usize, RespVec> {
+        match (data_cmd_type, cmd_ctx.get_cmd().get_command_len()) {
+            (DataCmdType::BLPOP, Some(len)) if len > 2 => Ok(len),
+            (DataCmdType::BRPOP, Some(len)) if len > 2 => Ok(len),
+            (DataCmdType::BRPOPLPUSH, Some(len)) if len == 4 => Ok(len),
+            (DataCmdType::BZPOPMIN, Some(len)) if len > 2 => Ok(len),
+            (DataCmdType::BZPOPMAX, Some(len)) if len > 2 => Ok(len),
+            _ => {
+                let cmd_name = cmd_ctx
+                    .get_cmd()
+                    .get_command_name()
+                    .map(|s| s.to_string())
+                    .unwrap_or_else(String::new);
+                Err(Resp::Error(
+                    format!("ERR invalid argument number for {:?}", cmd_name).into_bytes(),
+                ))
+            }
+        }
+    }
+
+    fn is_empty_resp(resp: &RespVec) -> bool {
+        match resp {
+            Resp::Bulk(BulkStr::Nil) => true,
+            Resp::Arr(Array::Arr(arr)) if arr.is_empty() => true,
+            _ => false,
+        }
+    }
+
+    fn get_blocking_command_timeout(cmd_ctx: &CmdCtx) -> Result<u64, RespVec> {
+        cmd_ctx
+            .get_cmd()
+            .get_command_last_element()
+            .ok_or_else(|| Resp::Error(b"ERR wrong number of arguments".to_vec()))
+            .and_then(|last| {
+                btoi::btou::<u64>(last)
+                    .map_err(|_| Resp::Error(b"ERR invalid timeout argument".to_vec()))
+            })
+    }
+
+    fn transfer_cmd_from_blocking_to_nonblocking(
+        cmd_ctx: &CmdCtx,
+        data_cmd_type: DataCmdType,
+        arg_len: usize,
+        non_blocking_cmd_name: &str,
+    ) -> Result<NonBlockingCommandsWithKey, RespVec> {
+        let mut cmds = vec![];
+        use DataCmdType::*;
+        match data_cmd_type {
+            BLPOP | BRPOP | BZPOPMIN | BZPOPMAX => {
+                // exclude the timeout argument
+                for i in 1..(arg_len - 1) {
+                    let key = match cmd_ctx.get_cmd().get_command_element(i) {
+                        None => break, // invalid state
+                        Some(key) => key.to_vec(),
+                    };
+                    let non_blocking_cmd =
+                        vec![non_blocking_cmd_name.to_string().into_bytes(), key.clone()];
+                    let arr: Vec<RespVec> = non_blocking_cmd
+                        .into_iter()
+                        .map(|s| Resp::Bulk(BulkStr::Str(s)))
+                        .collect();
+                    let resp = Resp::Arr(Array::Arr(arr));
+                    cmds.push((key, resp));
+                }
+            }
+            BRPOPLPUSH => {
+                let mut resp = cmd_ctx.get_cmd().get_resp_slice().map(|b| b.to_vec());
+                change_bulk_array_element(
+                    &mut resp,
+                    0,
+                    non_blocking_cmd_name.to_string().into_bytes(),
+                );
+                if let Resp::Arr(Array::Arr(ref mut resps)) = resp {
+                    resps.pop(); // pop out the timeout argument
+                }
+                // BRPOPLPUSH does not need to care about key.
+                cmds.push((vec![], resp));
+            }
+            _ => return Err(Resp::Error(b"ERR unsupported blocking command".to_vec())),
+        }
+        Ok(cmds)
+    }
+
+    fn adjust_lrpop_response(resp: RespVec, key: Vec<u8>) -> RespVec {
+        match resp {
+            Resp::Bulk(BulkStr::Nil) => {
+                // BLPOP, BRPOP need to change resposne to Array::Nil.
+                return Resp::Arr(Array::Nil);
+            }
+            Resp::Bulk(BulkStr::Str(s)) => Resp::Arr(Array::Arr(vec![
+                Resp::Bulk(BulkStr::Str(key)),
+                Resp::Bulk(BulkStr::Str(s)),
+            ])),
+            _ => resp,
+        }
+    }
+
+    fn adjust_zpop_response(resp: RespVec, key: Vec<u8>) -> RespVec {
+        match resp {
+            Resp::Arr(Array::Arr(arr)) if arr.len() == 0 => Resp::Bulk(BulkStr::Nil),
+            Resp::Arr(Array::Arr(arr)) if arr.len() == 2 => {
+                let mut ret = vec![Resp::Bulk(BulkStr::Str(key))];
+                ret.extend(arr);
+                Resp::Arr(Array::Arr(ret))
+            }
+            _ => resp,
+        }
+    }
 }

 impl<F, C> CmdCtxHandler for ForwardHandler<F, C>
src/proxy/executor.rs Outdated Show resolved Hide resolved
src/proxy/executor.rs Outdated Show resolved Hide resolved
@doyoubi
Copy link
Owner

doyoubi commented Nov 24, 2020

Thanks for the PR! Very nice work.

@doyoubi doyoubi merged commit 78184f3 into doyoubi:master Nov 24, 2020
@doyoubi doyoubi mentioned this pull request Dec 8, 2020
25 tasks
This was referenced May 9, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants