From 5deb96979054d77c1be1cb92b680399cb97be0ce Mon Sep 17 00:00:00 2001 From: Aaron <69273634+aaron-congo@users.noreply.github.com> Date: Mon, 13 May 2024 22:25:32 -0700 Subject: [PATCH] Python: add BZPOPMIN and BZPOPMAX commands (#1399) * Python: add BZPOPMIN and BZPOPMAX commands (#266) * Update PR link * PR suggestions * Fix rust --- CHANGELOG.md | 2 + glide-core/src/client/value_conversion.rs | 88 +++++++++++++++++++ python/python/glide/async_commands/core.py | 72 +++++++++++++++ .../glide/async_commands/transaction.py | 46 ++++++++++ python/python/tests/test_async_client.py | 80 +++++++++++++++++ python/python/tests/test_transaction.py | 12 ++- 6 files changed, 296 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4063e650f..341b625a15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,8 @@ * Python: Added ZRANGESTORE command ([#1377](https://github.com/aws/glide-for-redis/pull/1377)) * Python: Added ZDIFFSTORE command ([#1378](https://github.com/aws/glide-for-redis/pull/1378)) * Python: Added ZDIFF command ([#1401](https://github.com/aws/glide-for-redis/pull/1401)) +* Python: Added BZPOPMIN and BZPOPMAX commands ([#1399](https://github.com/aws/glide-for-redis/pull/1399)) + #### Fixes * Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203)) diff --git a/glide-core/src/client/value_conversion.rs b/glide-core/src/client/value_conversion.rs index 426d9a93f3..0f92daa325 100644 --- a/glide-core/src/client/value_conversion.rs +++ b/glide-core/src/client/value_conversion.rs @@ -22,6 +22,7 @@ pub(crate) enum ExpectedReturnType { ArrayOfArraysOfDoubleOrNull, ArrayOfKeyValuePairs, ZMPopReturnType, + KeyWithMemberAndScore, } pub(crate) fn convert_to_expected_type( @@ -320,6 +321,28 @@ pub(crate) fn convert_to_expected_type( ) .into()), }, + // Used by BZPOPMIN/BZPOPMAX, which return an array consisting of the key of the sorted set that was popped, the popped member, and its score. + // RESP2 returns the score as a string, but RESP3 returns the score as a double. Here we convert string scores into type double. + ExpectedReturnType::KeyWithMemberAndScore => match value { + Value::Nil => Ok(value), + Value::Array(ref array) if array.len() == 3 && matches!(array[2], Value::Double(_)) => { + Ok(value) + } + Value::Array(mut array) + if array.len() == 3 + && matches!(array[2], Value::BulkString(_) | Value::SimpleString(_)) => + { + array[2] = + convert_to_expected_type(array[2].clone(), Some(ExpectedReturnType::Double))?; + Ok(Value::Array(array)) + } + _ => Err(( + ErrorKind::TypeError, + "Response couldn't be converted to an array containing a key, member, and score", + format!("(response was {:?})", value), + ) + .into()), + }, } } @@ -454,6 +477,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option { b"ZRANK" | b"ZREVRANK" => cmd .position(b"WITHSCORE") .map(|_| ExpectedReturnType::ZRankReturnType), + b"BZPOPMIN" | b"BZPOPMAX" => Some(ExpectedReturnType::KeyWithMemberAndScore), b"SPOP" => { if cmd.arg_idx(2).is_some() { Some(ExpectedReturnType::Set) @@ -815,6 +839,70 @@ mod tests { )); } + #[test] + fn convert_bzpopmin_bzpopmax() { + assert!(matches!( + expected_type_for_cmd( + redis::cmd("BZPOPMIN") + .arg("myzset1") + .arg("myzset2") + .arg("1") + ), + Some(ExpectedReturnType::KeyWithMemberAndScore) + )); + + assert!(matches!( + expected_type_for_cmd( + redis::cmd("BZPOPMAX") + .arg("myzset1") + .arg("myzset2") + .arg("1") + ), + Some(ExpectedReturnType::KeyWithMemberAndScore) + )); + + let array_with_double_score = Value::Array(vec![ + Value::BulkString(b"key1".to_vec()), + Value::BulkString(b"member1".to_vec()), + Value::Double(2.0), + ]); + let result = convert_to_expected_type( + array_with_double_score.clone(), + Some(ExpectedReturnType::KeyWithMemberAndScore), + ) + .unwrap(); + assert_eq!(array_with_double_score, result); + + let array_with_string_score = Value::Array(vec![ + Value::BulkString(b"key1".to_vec()), + Value::BulkString(b"member1".to_vec()), + Value::BulkString(b"2.0".to_vec()), + ]); + let result = convert_to_expected_type( + array_with_string_score.clone(), + Some(ExpectedReturnType::KeyWithMemberAndScore), + ) + .unwrap(); + assert_eq!(array_with_double_score, result); + + let converted_nil_value = + convert_to_expected_type(Value::Nil, Some(ExpectedReturnType::KeyWithMemberAndScore)) + .unwrap(); + assert_eq!(Value::Nil, converted_nil_value); + + let array_with_unexpected_length = Value::Array(vec![ + Value::BulkString(b"key1".to_vec()), + Value::BulkString(b"member1".to_vec()), + Value::Double(2.0), + Value::Double(2.0), + ]); + assert!(convert_to_expected_type( + array_with_unexpected_length, + Some(ExpectedReturnType::KeyWithMemberAndScore) + ) + .is_err()); + } + #[test] fn convert_zank_zrevrank_only_if_withsocres_is_included() { assert!(matches!( diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index 4267bd7c63..729d71bc70 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -2285,6 +2285,42 @@ async def zpopmax( ), ) + async def bzpopmax( + self, keys: List[str], timeout: float + ) -> Optional[List[Union[str, float]]]: + """ + Pops the member with the highest score from the first non-empty sorted set, with the given keys being checked in + the order that they are given. Blocks the connection when there are no members to remove from any of the given + sorted sets. + + When in cluster mode, all keys must map to the same hash slot. + + `BZPOPMAX` is the blocking variant of `ZPOPMAX`. + + `BZPOPMAX` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices. + + See https://valkey.io/commands/bzpopmax for more details. + + Args: + keys (List[str]): The keys of the sorted sets. + timeout (float): The number of seconds to wait for a blocking operation to complete. + A value of 0 will block indefinitely. + + Returns: + Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself, + and the member score. If no member could be popped and the `timeout` expired, returns None. + + Examples: + >>> await client.zadd("my_sorted_set1", {"member1": 10.0, "member2": 5.0}) + 2 # Two elements have been added to the sorted set at "my_sorted_set1". + >>> await client.bzpopmax(["my_sorted_set1", "my_sorted_set2"], 0.5) + ['my_sorted_set1', 'member1', 10.0] # "member1" with a score of 10.0 has been removed from "my_sorted_set1". + """ + return cast( + Optional[List[Union[str, float]]], + await self._execute_command(RequestType.BZPopMax, keys + [str(timeout)]), + ) + async def zpopmin( self, key: str, count: Optional[int] = None ) -> Mapping[str, float]: @@ -2317,6 +2353,42 @@ async def zpopmin( ), ) + async def bzpopmin( + self, keys: List[str], timeout: float + ) -> Optional[List[Union[str, float]]]: + """ + Pops the member with the lowest score from the first non-empty sorted set, with the given keys being checked in + the order that they are given. Blocks the connection when there are no members to remove from any of the given + sorted sets. + + When in cluster mode, all keys must map to the same hash slot. + + `BZPOPMIN` is the blocking variant of `ZPOPMIN`. + + `BZPOPMIN` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices. + + See https://valkey.io/commands/bzpopmin for more details. + + Args: + keys (List[str]): The keys of the sorted sets. + timeout (float): The number of seconds to wait for a blocking operation to complete. + A value of 0 will block indefinitely. + + Returns: + Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself, + and the member score. If no member could be popped and the `timeout` expired, returns None. + + Examples: + >>> await client.zadd("my_sorted_set1", {"member1": 10.0, "member2": 5.0}) + 2 # Two elements have been added to the sorted set at "my_sorted_set1". + >>> await client.bzpopmin(["my_sorted_set1", "my_sorted_set2"], 0.5) + ['my_sorted_set1', 'member2', 5.0] # "member2" with a score of 5.0 has been removed from "my_sorted_set1". + """ + return cast( + Optional[List[Union[str, float]]], + await self._execute_command(RequestType.BZPopMin, keys + [str(timeout)]), + ) + async def zrange( self, key: str, diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index 43ad39e59d..6bc11d754d 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -1624,6 +1624,29 @@ def zpopmax( RequestType.ZPopMax, [key, str(count)] if count else [key] ) + def bzpopmax(self: TTransaction, keys: List[str], timeout: float) -> TTransaction: + """ + Pops the member with the highest score from the first non-empty sorted set, with the given keys being checked in + the order that they are given. Blocks the connection when there are no members to remove from any of the given + sorted sets. + + `BZPOPMAX` is the blocking variant of `ZPOPMAX`. + + `BZPOPMAX` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices. + + See https://valkey.io/commands/bzpopmax for more details. + + Args: + keys (List[str]): The keys of the sorted sets. + timeout (float): The number of seconds to wait for a blocking operation to complete. + A value of 0 will block indefinitely. + + Command response: + Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself, + and the member score. If no member could be popped and the `timeout` expired, returns None. + """ + return self.append_command(RequestType.BZPopMax, keys + [str(timeout)]) + def zpopmin( self: TTransaction, key: str, count: Optional[int] = None ) -> TTransaction: @@ -1647,6 +1670,29 @@ def zpopmin( RequestType.ZPopMin, [key, str(count)] if count else [key] ) + def bzpopmin(self: TTransaction, keys: List[str], timeout: float) -> TTransaction: + """ + Pops the member with the lowest score from the first non-empty sorted set, with the given keys being checked in + the order that they are given. Blocks the connection when there are no members to remove from any of the given + sorted sets. + + `BZPOPMIN` is the blocking variant of `ZPOPMIN`. + + `BZPOPMIN` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices. + + See https://valkey.io/commands/bzpopmin for more details. + + Args: + keys (List[str]): The keys of the sorted sets. + timeout (float): The number of seconds to wait for a blocking operation to complete. + A value of 0 will block indefinitely. + + Command response: + Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself, + and the member score. If no member could be popped and the `timeout` expired, returns None. + """ + return self.append_command(RequestType.BZPopMin, keys + [str(timeout)]) + def zrange( self: TTransaction, key: str, diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index 7d87aaa382..e1b163eb04 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -1833,6 +1833,46 @@ async def test_zpopmin(self, redis_client: TRedisClient): assert await redis_client.zpopmin("non_exisitng_key") == {} + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_bzpopmin(self, redis_client: TRedisClient): + key1 = f"{{testKey}}:{get_random_string(10)}" + key2 = f"{{testKey}}:{get_random_string(10)}" + non_existing_key = f"{{testKey}}:non_existing_key" + + assert await redis_client.zadd(key1, {"a": 1.0, "b": 1.5}) == 2 + assert await redis_client.zadd(key2, {"c": 2.0}) == 1 + assert await redis_client.bzpopmin([key1, key2], 0.5) == [key1, "a", 1.0] + assert await redis_client.bzpopmin([non_existing_key, key2], 0.5) == [ + key2, + "c", + 2.0, + ] + assert await redis_client.bzpopmin(["non_existing_key"], 0.5) is None + + # invalid argument - key list must not be empty + with pytest.raises(RequestError): + await redis_client.bzpopmin([], 0.5) + + # key exists, but it is not a sorted set + assert await redis_client.set("foo", "value") == OK + with pytest.raises(RequestError): + await redis_client.bzpopmin(["foo"], 0.5) + + # same-slot requirement + if isinstance(redis_client, RedisClusterClient): + with pytest.raises(RequestError) as e: + await redis_client.bzpopmin(["abc", "zxy", "lkn"], 0.5) + assert "CrossSlot" in str(e) + + async def endless_bzpopmin_call(): + await redis_client.bzpopmin(["non_existent_key"], 0) + + # bzpopmax is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to + # avoid having the test block forever + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(endless_bzpopmin_call(), timeout=0.5) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_zpopmax(self, redis_client: TRedisClient): @@ -1852,6 +1892,46 @@ async def test_zpopmax(self, redis_client: TRedisClient): assert await redis_client.zpopmax("non_exisitng_key") == {} + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_bzpopmax(self, redis_client: TRedisClient): + key1 = f"{{testKey}}:{get_random_string(10)}" + key2 = f"{{testKey}}:{get_random_string(10)}" + non_existing_key = f"{{testKey}}:non_existing_key" + + assert await redis_client.zadd(key1, {"a": 1.0, "b": 1.5}) == 2 + assert await redis_client.zadd(key2, {"c": 2.0}) == 1 + assert await redis_client.bzpopmax([key1, key2], 0.5) == [key1, "b", 1.5] + assert await redis_client.bzpopmax([non_existing_key, key2], 0.5) == [ + key2, + "c", + 2.0, + ] + assert await redis_client.bzpopmax(["non_existing_key"], 0.5) is None + + # invalid argument - key list must not be empty + with pytest.raises(RequestError): + await redis_client.bzpopmax([], 0.5) + + # key exists, but it is not a sorted set + assert await redis_client.set("foo", "value") == OK + with pytest.raises(RequestError): + await redis_client.bzpopmax(["foo"], 0.5) + + # same-slot requirement + if isinstance(redis_client, RedisClusterClient): + with pytest.raises(RequestError) as e: + await redis_client.bzpopmax(["abc", "zxy", "lkn"], 0.5) + assert "CrossSlot" in str(e) + + async def endless_bzpopmax_call(): + await redis_client.bzpopmax(["non_existent_key"], 0) + + # bzpopmax is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to + # avoid having the test block forever + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(endless_bzpopmax_call(), timeout=0.5) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_zrange_by_index(self, redis_client: TRedisClient): diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index 83bac56b35..adb8634ce2 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -228,12 +228,16 @@ async def transaction_test( args.append([2.0, 3.0]) transaction.zrangestore(key8, key8, RangeByIndex(0, -1)) args.append(3) - transaction.zpopmin(key8) - args.append({"two": 2.0}) + transaction.bzpopmin([key8], 0.5) + args.append([key8, "two", 2.0]) + transaction.bzpopmax([key8], 0.5) + args.append([key8, "four", 4.0]) transaction.zpopmax(key8) - args.append({"four": 4}) + args.append({"three": 3.0}) + transaction.zpopmin(key8) + args.append({}) transaction.zremrangebyscore(key8, InfBound.NEG_INF, InfBound.POS_INF) - args.append(1) + args.append(0) transaction.zremrangebylex(key8, InfBound.NEG_INF, InfBound.POS_INF) args.append(0) transaction.zdiffstore(key8, [key8, key8])