diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 611d0a434..5e2cf0f06 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -36,7 +36,7 @@ use crate::{ aio::{ConnectionLike, MultiplexedConnection}, cluster::{get_connection_info, parse_slots, slot_cmd}, cluster_client::{ClusterParams, RetryParams}, - cluster_routing::{Redirect, Route, RoutingInfo, Slot, SlotMap}, + cluster_routing::{Redirect, ResponsePolicy, Route, RoutingInfo, Slot, SlotMap}, Cmd, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError, RedisFuture, RedisResult, Value, }; @@ -118,6 +118,7 @@ enum CmdArg { cmd: Arc, func: fn(C, Arc) -> RedisFuture<'static, Response>, routing: Option, + response_policy: Option, }, Pipeline { pipeline: Arc, @@ -533,48 +534,98 @@ where Ok(()) } - async fn execute_on_all_nodes( + async fn execute_on_multiple_nodes( func: fn(C, Arc) -> RedisFuture<'static, Response>, cmd: &Arc, only_primaries: bool, core: Core, + response_policy: Option, ) -> (OperationTarget, RedisResult) { let read_guard = core.conn_lock.read().await; - let results = future::join_all( - read_guard - .1 - .all_unique_addresses(only_primaries) - .into_iter() - .filter_map(|addr| read_guard.0.get(addr).cloned()) - .map(|conn| { - (async { - let conn = conn.await; - func(conn, cmd.clone()).await - }) - .boxed() - }), - ) - .await; + let connections: Vec<(String, ConnectionFuture)> = read_guard + .1 + .all_unique_addresses(only_primaries) + .into_iter() + .filter_map(|addr| { + read_guard + .0 + .get(addr) + .cloned() + .map(|conn| (addr.to_string(), conn)) + }) + .collect(); drop(read_guard); - let mut merged_results = Vec::with_capacity(results.len()); - // TODO - we can have better error reporting here if we had an Error variant on Value. - for result in results { - match result { - Ok(response) => match response { - Response::Single(value) => merged_results.push(value), - Response::Multiple(_) => unreachable!(), - }, - Err(_) => { - return (OperationTarget::FanOut, result); - } + let extract_result = |response| match response { + Response::Single(value) => value, + Response::Multiple(_) => unreachable!(), + }; + + let run_func = |(_, conn)| { + Box::pin(async move { + let conn = conn.await; + Ok(extract_result(func(conn, cmd.clone()).await?)) + }) + }; + + // TODO - once Value::Error will be merged, these will need to be updated to handle this new value. + let result = match response_policy { + Some(ResponsePolicy::AllSucceeded) => { + future::try_join_all(connections.into_iter().map(run_func)) + .await + .map(|mut results| results.pop().unwrap()) // unwrap is safe, since at least one function succeeded + } + Some(ResponsePolicy::OneSucceeded) => { + future::select_ok(connections.into_iter().map(run_func)) + .await + .map(|(result, _)| result) + } + Some(ResponsePolicy::OneSucceededNonEmpty) => { + future::select_ok(connections.into_iter().map(|tuple| { + Box::pin(async move { + let result = run_func(tuple).await?; + match result { + Value::Nil => Err((ErrorKind::ResponseError, "no value found").into()), + _ => Ok(result), + } + }) + })) + .await + .map(|(result, _)| result) + } + Some(ResponsePolicy::Aggregate(op)) => { + future::try_join_all(connections.into_iter().map(run_func)) + .await + .and_then(|results| crate::cluster_routing::aggregate(results, op)) + } + Some(ResponsePolicy::AggregateLogical(op)) => { + future::try_join_all(connections.into_iter().map(run_func)) + .await + .and_then(|results| crate::cluster_routing::logical_aggregate(results, op)) + } + Some(ResponsePolicy::CombineArrays) => { + future::try_join_all(connections.into_iter().map(run_func)) + .await + .and_then(crate::cluster_routing::combine_array_results) + } + Some(ResponsePolicy::Special) | None => { + // This is our assumption - if there's no coherent way to aggregate the responses, we just map each response to the sender, and pass it to the user. + // TODO - once RESP3 is merged, return a map value here. + // TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes. + future::try_join_all(connections.into_iter().map(|(addr, conn)| async move { + let conn = conn.await; + Ok(Value::Bulk(vec![ + Value::Data(addr.into_bytes()), + extract_result(func(conn, cmd.clone()).await?), + ])) + })) + .await + .map(Value::Bulk) } } + .map(Response::Single); - ( - OperationTarget::FanOut, - Ok(Response::Single(Value::Bulk(merged_results))), - ) + (OperationTarget::FanOut, result) } async fn try_cmd_request( @@ -582,15 +633,18 @@ where func: fn(C, Arc) -> RedisFuture<'static, Response>, redirect: Option, routing: Option, + response_policy: Option, core: Core, asking: bool, ) -> (OperationTarget, RedisResult) { let route_option = match routing.as_ref().unwrap_or(&RoutingInfo::Random) { RoutingInfo::AllNodes => { - return Self::execute_on_all_nodes(func, &cmd, false, core).await + return Self::execute_on_multiple_nodes(func, &cmd, false, core, response_policy) + .await } RoutingInfo::AllMasters => { - return Self::execute_on_all_nodes(func, &cmd, true, core).await + return Self::execute_on_multiple_nodes(func, &cmd, true, core, response_policy) + .await } RoutingInfo::Random => None, RoutingInfo::SpecificNode(route) => Some(route), @@ -619,8 +673,22 @@ where let asking = matches!(&info.redirect, Some(Redirect::Ask(_))); match info.cmd { - CmdArg::Cmd { cmd, func, routing } => { - Self::try_cmd_request(cmd, func, info.redirect, routing, core, asking).await + CmdArg::Cmd { + cmd, + func, + routing, + response_policy, + } => { + Self::try_cmd_request( + cmd, + func, + info.redirect, + routing, + response_policy, + core, + asking, + ) + .await } CmdArg::Pipeline { pipeline, @@ -993,6 +1061,7 @@ where }) }, routing: RoutingInfo::for_routable(cmd), + response_policy: RoutingInfo::response_policy(cmd), }, sender, }) diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 9a5a84c93..9e966d8c2 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -1,3 +1,4 @@ +use std::cmp::min; use std::collections::{BTreeMap, HashSet}; use std::iter::Iterator; @@ -7,6 +8,7 @@ use rand::thread_rng; use crate::cmd::{Arg, Cmd}; use crate::commands::is_readonly_cmd; use crate::types::Value; +use crate::{ErrorKind, RedisResult}; pub(crate) const SLOT_SIZE: u16 = 16384; @@ -20,6 +22,30 @@ pub(crate) enum Redirect { Ask(String), } +#[derive(Debug, Clone, Copy)] +pub(crate) enum LogicalAggregateOp { + And, + // Or, omitted due to dead code warnings. ATM this value isn't constructed anywhere +} + +#[derive(Debug, Clone, Copy)] +pub(crate) enum AggregateOp { + Min, + Sum, + // Max, omitted due to dead code warnings. ATM this value isn't constructed anywhere +} + +#[derive(Debug, Clone, Copy)] +pub(crate) enum ResponsePolicy { + OneSucceeded, + OneSucceededNonEmpty, + AllSucceeded, + AggregateLogical(LogicalAggregateOp), + Aggregate(AggregateOp), + CombineArrays, + Special, +} + #[derive(Debug, Clone, Copy, PartialEq)] pub(crate) enum RoutingInfo { AllNodes, @@ -28,7 +54,139 @@ pub(crate) enum RoutingInfo { SpecificNode(Route), } +pub(crate) fn aggregate(values: Vec, op: AggregateOp) -> RedisResult { + let initial_value = match op { + AggregateOp::Min => i64::MAX, + AggregateOp::Sum => 0, + }; + let result = values + .into_iter() + .fold(RedisResult::Ok(initial_value), |acc, curr| { + let mut acc = acc?; + let int = match curr { + Value::Int(int) => int, + _ => { + return Err(( + ErrorKind::TypeError, + "expected array of integers as response", + ) + .into()); + } + }; + acc = match op { + AggregateOp::Min => min(acc, int), + AggregateOp::Sum => acc + int, + }; + Ok(acc) + })?; + Ok(Value::Int(result)) +} + +pub(crate) fn logical_aggregate(values: Vec, op: LogicalAggregateOp) -> RedisResult { + let initial_value = match op { + LogicalAggregateOp::And => true, + }; + let results = values + .into_iter() + .fold(RedisResult::Ok(Vec::new()), |acc, curr| { + let acc = acc?; + let values = match curr { + Value::Bulk(values) => values, + _ => { + return Err(( + ErrorKind::TypeError, + "expected array of integers as response", + ) + .into()); + } + }; + let mut acc = if acc.is_empty() { + vec![initial_value; values.len()] + } else { + acc + }; + for (index, value) in values.into_iter().enumerate() { + let int = match value { + Value::Int(int) => int, + _ => { + return Err(( + ErrorKind::TypeError, + "expected array of integers as response", + ) + .into()); + } + }; + acc[index] = match op { + LogicalAggregateOp::And => acc[index] && (int > 0), + }; + } + Ok(acc) + })?; + Ok(Value::Bulk( + results + .into_iter() + .map(|result| Value::Int(result as i64)) + .collect(), + )) +} + +pub(crate) fn combine_array_results(values: Vec) -> RedisResult { + let mut results = Vec::new(); + + for value in values { + match value { + Value::Bulk(values) => results.extend(values), + _ => { + return Err((ErrorKind::TypeError, "expected array of values as response").into()); + } + } + } + + Ok(Value::Bulk(results)) +} + impl RoutingInfo { + pub(crate) fn response_policy(r: &R) -> Option + where + R: Routable + ?Sized, + { + use ResponsePolicy::*; + let cmd = &r.command()?[..]; + match cmd { + b"SCRIPT EXISTS" => Some(AggregateLogical(LogicalAggregateOp::And)), + + b"DBSIZE" | b"DEL" | b"EXISTS" | b"SLOWLOG LEN" | b"TOUCH" | b"UNLINK" => { + Some(Aggregate(AggregateOp::Sum)) + } + + b"MSETNX" | b"WAIT" => Some(Aggregate(AggregateOp::Min)), + + b"CONFIG SET" | b"FLUSHALL" | b"FLUSHDB" | b"FUNCTION DELETE" | b"FUNCTION FLUSH" + | b"FUNCTION LOAD" | b"FUNCTION RESTORE" | b"LATENCY RESET" | b"MEMORY PURGE" + | b"MSET" | b"PING" | b"SCRIPT FLUSH" | b"SCRIPT LOAD" | b"SLOWLOG RESET" => { + Some(AllSucceeded) + } + + b"KEYS" | b"MGET" | b"SLOWLOG GET" => Some(CombineArrays), + + b"FUNCTION KILL" | b"SCRIPT KILL" => Some(OneSucceeded), + + // This isn't based on response_tips, but on the discussion here - https://github.com/redis/redis/issues/12410 + b"RANDOMKEY" => Some(OneSucceededNonEmpty), + + b"LATENCY GRAPH" | b"LATENCY HISTOGRAM" | b"LATENCY HISTORY" | b"LATENCY DOCTOR" + | b"LATENCY LATEST" => Some(Special), + + b"FUNCTION STATS" => Some(Special), + + b"MEMORY MALLOC-STATS" | b"MEMORY DOCTOR" | b"MEMORY STATS" => Some(Special), + + b"INFO" => Some(Special), + + _ => None, + } + } + pub(crate) fn for_routable(r: &R) -> Option where R: Routable + ?Sized, diff --git a/redis/src/parser.rs b/redis/src/parser.rs index 847278eac..b802e8418 100644 --- a/redis/src/parser.rs +++ b/redis/src/parser.rs @@ -74,6 +74,7 @@ fn err_parser(line: &str) -> RedisError { "CROSSSLOT" => ErrorKind::CrossSlot, "MASTERDOWN" => ErrorKind::MasterDown, "READONLY" => ErrorKind::ReadOnly, + "NOTBUSY" => ErrorKind::NotBusy, code => return make_extension_error(code, pieces.next()), }; match pieces.next() { diff --git a/redis/src/types.rs b/redis/src/types.rs index 54df338b5..d994698c7 100644 --- a/redis/src/types.rs +++ b/redis/src/types.rs @@ -128,6 +128,8 @@ pub enum ErrorKind { NoValidReplicasFoundBySentinel, /// At least one sentinel connection info is required EmptySentinelList, + /// Attempted to kill a script/function while they werent' executing + NotBusy, #[cfg(feature = "json")] /// Error Serializing a struct to JSON form @@ -597,6 +599,7 @@ impl RedisError { ErrorKind::CrossSlot => Some("CROSSSLOT"), ErrorKind::MasterDown => Some("MASTERDOWN"), ErrorKind::ReadOnly => Some("READONLY"), + ErrorKind::NotBusy => Some("NOTBUSY"), _ => match self.repr { ErrorRepr::ExtensionError(ref code, _) => Some(code), _ => None, @@ -627,6 +630,7 @@ impl RedisError { ErrorKind::MasterNameNotFoundBySentinel => "master name not found by sentinel", ErrorKind::NoValidReplicasFoundBySentinel => "no valid replicas found by sentinel", ErrorKind::EmptySentinelList => "empty sentinel list", + ErrorKind::NotBusy => "not busy", #[cfg(feature = "json")] ErrorKind::Serialize => "serializing", ErrorKind::RESP3NotSupported => "resp3 is not supported by server", @@ -778,6 +782,7 @@ impl RedisError { ErrorKind::CrossSlot => false, ErrorKind::ClientError => false, ErrorKind::EmptySentinelList => false, + ErrorKind::NotBusy => false, #[cfg(feature = "json")] ErrorKind::Serialize => false, ErrorKind::RESP3NotSupported => false, diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index 031073aa5..b4f3dd56e 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -712,7 +712,7 @@ fn test_cluster_fan_out_to_all_nodes() { } #[test] -fn test_cluster_fan_out_out_once_to_each_primary_when_no_replicas_are_available() { +fn test_cluster_fan_out_once_to_each_primary_when_no_replicas_are_available() { test_cluster_fan_out( "CONFIG SET", vec![6379, 6381], @@ -732,7 +732,7 @@ fn test_cluster_fan_out_out_once_to_each_primary_when_no_replicas_are_available( } #[test] -fn test_cluster_fan_out_out_once_even_if_primary_has_multiple_slot_ranges() { +fn test_cluster_fan_out_once_even_if_primary_has_multiple_slot_ranges() { test_cluster_fan_out( "CONFIG SET", vec![6379, 6380, 6381, 6382], @@ -761,6 +761,314 @@ fn test_cluster_fan_out_out_once_even_if_primary_has_multiple_slot_ranges() { ); } +#[test] +fn test_cluster_fan_out_and_aggregate_numeric_response_with_min() { + let name = "test_cluster_fan_out_and_aggregate_numeric_response"; + let mut cmd = Cmd::new(); + cmd.arg("SLOWLOG").arg("LEN"); + + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .retries(0) + .read_from_replicas(), + name, + move |received_cmd: &[u8], port| { + respond_startup_with_replica_using_config(name, received_cmd, None)?; + + let res = 6383 - port as i64; + Err(Ok(Value::Int(res))) // this results in 1,2,3,4 + }, + ); + + let result = runtime + .block_on(cmd.query_async::<_, i64>(&mut connection)) + .unwrap(); + assert_eq!(result, 10, "{result}"); +} + +#[test] +fn test_cluster_fan_out_and_aggregate_logical_array_response() { + let name = "test_cluster_fan_out_and_aggregate_logical_array_response"; + let mut cmd = Cmd::new(); + cmd.arg("SCRIPT") + .arg("EXISTS") + .arg("foo") + .arg("bar") + .arg("baz") + .arg("barvaz"); + + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .retries(0) + .read_from_replicas(), + name, + move |received_cmd: &[u8], port| { + respond_startup_with_replica_using_config(name, received_cmd, None)?; + + if port == 6381 { + return Err(Ok(Value::Bulk(vec![ + Value::Int(0), + Value::Int(0), + Value::Int(1), + Value::Int(1), + ]))); + } else if port == 6379 { + return Err(Ok(Value::Bulk(vec![ + Value::Int(0), + Value::Int(1), + Value::Int(0), + Value::Int(1), + ]))); + } + + panic!("unexpected port {port}"); + }, + ); + + let result = runtime + .block_on(cmd.query_async::<_, Vec>(&mut connection)) + .unwrap(); + assert_eq!(result, vec![0, 0, 0, 1], "{result:?}"); +} + +#[test] +fn test_cluster_fan_out_and_return_one_succeeded_response() { + let name = "test_cluster_fan_out_and_return_one_succeeded_response"; + let mut cmd = Cmd::new(); + cmd.arg("SCRIPT").arg("KILL"); + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .retries(0) + .read_from_replicas(), + name, + move |received_cmd: &[u8], port| { + respond_startup_with_replica_using_config(name, received_cmd, None)?; + if port == 6381 { + return Err(Ok(Value::Okay)); + } else if port == 6379 { + return Err(Err(( + ErrorKind::NotBusy, + "No scripts in execution right now", + ) + .into())); + } + + panic!("unexpected port {port}"); + }, + ); + + let result = runtime + .block_on(cmd.query_async::<_, Value>(&mut connection)) + .unwrap(); + assert_eq!(result, Value::Okay, "{result:?}"); +} + +#[test] +fn test_cluster_fan_out_and_fail_one_succeeded_if_there_are_no_successes() { + let name = "test_cluster_fan_out_and_fail_one_succeeded_if_there_are_no_successes"; + let mut cmd = Cmd::new(); + cmd.arg("SCRIPT").arg("KILL"); + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .retries(0) + .read_from_replicas(), + name, + move |received_cmd: &[u8], _port| { + respond_startup_with_replica_using_config(name, received_cmd, None)?; + + Err(Err(( + ErrorKind::NotBusy, + "No scripts in execution right now", + ) + .into())) + }, + ); + + let result = runtime + .block_on(cmd.query_async::<_, Value>(&mut connection)) + .unwrap_err(); + assert_eq!(result.kind(), ErrorKind::NotBusy, "{:?}", result.kind()); +} + +#[test] +fn test_cluster_fan_out_and_return_all_succeeded_response() { + let name = "test_cluster_fan_out_and_return_all_succeeded_response"; + let cmd = cmd("FLUSHALL"); + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .retries(0) + .read_from_replicas(), + name, + move |received_cmd: &[u8], _port| { + respond_startup_with_replica_using_config(name, received_cmd, None)?; + Err(Ok(Value::Okay)) + }, + ); + + let result = runtime + .block_on(cmd.query_async::<_, Value>(&mut connection)) + .unwrap(); + assert_eq!(result, Value::Okay, "{result:?}"); +} + +#[test] +fn test_cluster_fan_out_and_fail_all_succeeded_if_there_is_a_single_failure() { + let name = "test_cluster_fan_out_and_fail_all_succeeded_if_there_is_a_single_failure"; + let cmd = cmd("FLUSHALL"); + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .retries(0) + .read_from_replicas(), + name, + move |received_cmd: &[u8], port| { + respond_startup_with_replica_using_config(name, received_cmd, None)?; + if port == 6381 { + return Err(Err(( + ErrorKind::NotBusy, + "No scripts in execution right now", + ) + .into())); + } + Err(Ok(Value::Okay)) + }, + ); + + let result = runtime + .block_on(cmd.query_async::<_, Value>(&mut connection)) + .unwrap_err(); + assert_eq!(result.kind(), ErrorKind::NotBusy, "{:?}", result.kind()); +} + +#[test] +fn test_cluster_fan_out_and_return_one_succeeded_ignoring_empty_values() { + let name = "test_cluster_fan_out_and_return_one_succeeded_ignoring_empty_values"; + let cmd = cmd("RANDOMKEY"); + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .retries(0) + .read_from_replicas(), + name, + move |received_cmd: &[u8], port| { + respond_startup_with_replica_using_config(name, received_cmd, None)?; + if port == 6381 { + return Err(Ok(Value::Data("foo".as_bytes().to_vec()))); + } + Err(Ok(Value::Nil)) + }, + ); + + let result = runtime + .block_on(cmd.query_async::<_, String>(&mut connection)) + .unwrap(); + assert_eq!(result, "foo", "{result:?}"); +} + +#[test] +fn test_cluster_fan_out_and_return_map_of_results_for_special_response_policy() { + let name = "foo"; + let mut cmd = Cmd::new(); + cmd.arg("LATENCY").arg("LATEST"); + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .retries(0) + .read_from_replicas(), + name, + move |received_cmd: &[u8], port| { + respond_startup_with_replica_using_config(name, received_cmd, None)?; + Err(Ok(Value::Data(format!("latency: {port}").into_bytes()))) + }, + ); + + // TODO once RESP3 is in, return this as a map + let mut result = runtime + .block_on(cmd.query_async::<_, Vec>>(&mut connection)) + .unwrap(); + result.sort(); + assert_eq!( + result, + vec![ + vec![format!("{name}:6379"), "latency: 6379".to_string()], + vec![format!("{name}:6380"), "latency: 6380".to_string()], + vec![format!("{name}:6381"), "latency: 6381".to_string()], + vec![format!("{name}:6382"), "latency: 6382".to_string()] + ], + "{result:?}" + ); +} + +#[test] +fn test_cluster_fan_out_and_combine_arrays_of_values() { + let name = "foo"; + let cmd = cmd("KEYS"); + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .retries(0) + .read_from_replicas(), + name, + move |received_cmd: &[u8], port| { + respond_startup_with_replica_using_config(name, received_cmd, None)?; + Err(Ok(Value::Bulk(vec![Value::Data( + format!("key:{port}").into_bytes(), + )]))) + }, + ); + + let mut result = runtime + .block_on(cmd.query_async::<_, Vec>(&mut connection)) + .unwrap(); + result.sort(); + assert_eq!( + result, + vec![format!("key:6379"), format!("key:6381"),], + "{result:?}" + ); +} + #[test] fn test_async_cluster_with_username_and_password() { let cluster = TestClusterContext::new_with_cluster_client_builder(3, 0, |builder| {