Skip to content

Commit

Permalink
async cluster: Group responses by response_policy. (redis-rs#888)
Browse files Browse the repository at this point in the history
* Add NOTBUSY error code.

* async cluster: Group responses by response_policy.

This change implements the response_policy tip in the async cluster.
https://redis.io/docs/reference/command-tips/#response_policy

This means that the results from fan-out commands will be aggregated
differently, based on the sent command.
  • Loading branch information
nihohit authored and altanozlu committed Aug 16, 2023
1 parent 7ded0ac commit 4395917
Show file tree
Hide file tree
Showing 5 changed files with 579 additions and 38 deletions.
141 changes: 105 additions & 36 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
aio::{ConnectionLike, MultiplexedConnection},
cluster::{get_connection_info, parse_slots, slot_cmd},
cluster_client::{ClusterParams, RetryParams},
cluster_routing::{Redirect, Route, RoutingInfo, Slot, SlotMap},
cluster_routing::{Redirect, ResponsePolicy, Route, RoutingInfo, Slot, SlotMap},
Cmd, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError, RedisFuture, RedisResult,
Value,
};
Expand Down Expand Up @@ -118,6 +118,7 @@ enum CmdArg<C> {
cmd: Arc<Cmd>,
func: fn(C, Arc<Cmd>) -> RedisFuture<'static, Response>,
routing: Option<RoutingInfo>,
response_policy: Option<ResponsePolicy>,
},
Pipeline {
pipeline: Arc<crate::Pipeline>,
Expand Down Expand Up @@ -533,64 +534,117 @@ where
Ok(())
}

async fn execute_on_all_nodes(
async fn execute_on_multiple_nodes(
func: fn(C, Arc<Cmd>) -> RedisFuture<'static, Response>,
cmd: &Arc<Cmd>,
only_primaries: bool,
core: Core<C>,
response_policy: Option<ResponsePolicy>,
) -> (OperationTarget, RedisResult<Response>) {
let read_guard = core.conn_lock.read().await;
let results = future::join_all(
read_guard
.1
.all_unique_addresses(only_primaries)
.into_iter()
.filter_map(|addr| read_guard.0.get(addr).cloned())
.map(|conn| {
(async {
let conn = conn.await;
func(conn, cmd.clone()).await
})
.boxed()
}),
)
.await;
let connections: Vec<(String, ConnectionFuture<C>)> = read_guard
.1
.all_unique_addresses(only_primaries)
.into_iter()
.filter_map(|addr| {
read_guard
.0
.get(addr)
.cloned()
.map(|conn| (addr.to_string(), conn))
})
.collect();
drop(read_guard);
let mut merged_results = Vec::with_capacity(results.len());

// TODO - we can have better error reporting here if we had an Error variant on Value.
for result in results {
match result {
Ok(response) => match response {
Response::Single(value) => merged_results.push(value),
Response::Multiple(_) => unreachable!(),
},
Err(_) => {
return (OperationTarget::FanOut, result);
}
let extract_result = |response| match response {
Response::Single(value) => value,
Response::Multiple(_) => unreachable!(),
};

let run_func = |(_, conn)| {
Box::pin(async move {
let conn = conn.await;
Ok(extract_result(func(conn, cmd.clone()).await?))
})
};

// TODO - once Value::Error will be merged, these will need to be updated to handle this new value.
let result = match response_policy {
Some(ResponsePolicy::AllSucceeded) => {
future::try_join_all(connections.into_iter().map(run_func))
.await
.map(|mut results| results.pop().unwrap()) // unwrap is safe, since at least one function succeeded
}
Some(ResponsePolicy::OneSucceeded) => {
future::select_ok(connections.into_iter().map(run_func))
.await
.map(|(result, _)| result)
}
Some(ResponsePolicy::OneSucceededNonEmpty) => {
future::select_ok(connections.into_iter().map(|tuple| {
Box::pin(async move {
let result = run_func(tuple).await?;
match result {
Value::Nil => Err((ErrorKind::ResponseError, "no value found").into()),
_ => Ok(result),
}
})
}))
.await
.map(|(result, _)| result)
}
Some(ResponsePolicy::Aggregate(op)) => {
future::try_join_all(connections.into_iter().map(run_func))
.await
.and_then(|results| crate::cluster_routing::aggregate(results, op))
}
Some(ResponsePolicy::AggregateLogical(op)) => {
future::try_join_all(connections.into_iter().map(run_func))
.await
.and_then(|results| crate::cluster_routing::logical_aggregate(results, op))
}
Some(ResponsePolicy::CombineArrays) => {
future::try_join_all(connections.into_iter().map(run_func))
.await
.and_then(crate::cluster_routing::combine_array_results)
}
Some(ResponsePolicy::Special) | None => {
// This is our assumption - if there's no coherent way to aggregate the responses, we just map each response to the sender, and pass it to the user.
// TODO - once RESP3 is merged, return a map value here.
// TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes.
future::try_join_all(connections.into_iter().map(|(addr, conn)| async move {
let conn = conn.await;
Ok(Value::Bulk(vec![
Value::Data(addr.into_bytes()),
extract_result(func(conn, cmd.clone()).await?),
]))
}))
.await
.map(Value::Bulk)
}
}
.map(Response::Single);

(
OperationTarget::FanOut,
Ok(Response::Single(Value::Bulk(merged_results))),
)
(OperationTarget::FanOut, result)
}

async fn try_cmd_request(
cmd: Arc<Cmd>,
func: fn(C, Arc<Cmd>) -> RedisFuture<'static, Response>,
redirect: Option<Redirect>,
routing: Option<RoutingInfo>,
response_policy: Option<ResponsePolicy>,
core: Core<C>,
asking: bool,
) -> (OperationTarget, RedisResult<Response>) {
let route_option = match routing.as_ref().unwrap_or(&RoutingInfo::Random) {
RoutingInfo::AllNodes => {
return Self::execute_on_all_nodes(func, &cmd, false, core).await
return Self::execute_on_multiple_nodes(func, &cmd, false, core, response_policy)
.await
}
RoutingInfo::AllMasters => {
return Self::execute_on_all_nodes(func, &cmd, true, core).await
return Self::execute_on_multiple_nodes(func, &cmd, true, core, response_policy)
.await
}
RoutingInfo::Random => None,
RoutingInfo::SpecificNode(route) => Some(route),
Expand Down Expand Up @@ -619,8 +673,22 @@ where
let asking = matches!(&info.redirect, Some(Redirect::Ask(_)));

match info.cmd {
CmdArg::Cmd { cmd, func, routing } => {
Self::try_cmd_request(cmd, func, info.redirect, routing, core, asking).await
CmdArg::Cmd {
cmd,
func,
routing,
response_policy,
} => {
Self::try_cmd_request(
cmd,
func,
info.redirect,
routing,
response_policy,
core,
asking,
)
.await
}
CmdArg::Pipeline {
pipeline,
Expand Down Expand Up @@ -993,6 +1061,7 @@ where
})
},
routing: RoutingInfo::for_routable(cmd),
response_policy: RoutingInfo::response_policy(cmd),
},
sender,
})
Expand Down
158 changes: 158 additions & 0 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::min;
use std::collections::{BTreeMap, HashSet};
use std::iter::Iterator;

Expand All @@ -7,6 +8,7 @@ use rand::thread_rng;
use crate::cmd::{Arg, Cmd};
use crate::commands::is_readonly_cmd;
use crate::types::Value;
use crate::{ErrorKind, RedisResult};

pub(crate) const SLOT_SIZE: u16 = 16384;

Expand All @@ -20,6 +22,30 @@ pub(crate) enum Redirect {
Ask(String),
}

#[derive(Debug, Clone, Copy)]
pub(crate) enum LogicalAggregateOp {
And,
// Or, omitted due to dead code warnings. ATM this value isn't constructed anywhere
}

#[derive(Debug, Clone, Copy)]
pub(crate) enum AggregateOp {
Min,
Sum,
// Max, omitted due to dead code warnings. ATM this value isn't constructed anywhere
}

#[derive(Debug, Clone, Copy)]
pub(crate) enum ResponsePolicy {
OneSucceeded,
OneSucceededNonEmpty,
AllSucceeded,
AggregateLogical(LogicalAggregateOp),
Aggregate(AggregateOp),
CombineArrays,
Special,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) enum RoutingInfo {
AllNodes,
Expand All @@ -28,7 +54,139 @@ pub(crate) enum RoutingInfo {
SpecificNode(Route),
}

pub(crate) fn aggregate(values: Vec<Value>, op: AggregateOp) -> RedisResult<Value> {
let initial_value = match op {
AggregateOp::Min => i64::MAX,
AggregateOp::Sum => 0,
};
let result = values
.into_iter()
.fold(RedisResult::Ok(initial_value), |acc, curr| {
let mut acc = acc?;
let int = match curr {
Value::Int(int) => int,
_ => {
return Err((
ErrorKind::TypeError,
"expected array of integers as response",
)
.into());
}
};
acc = match op {
AggregateOp::Min => min(acc, int),
AggregateOp::Sum => acc + int,
};
Ok(acc)
})?;
Ok(Value::Int(result))
}

pub(crate) fn logical_aggregate(values: Vec<Value>, op: LogicalAggregateOp) -> RedisResult<Value> {
let initial_value = match op {
LogicalAggregateOp::And => true,
};
let results = values
.into_iter()
.fold(RedisResult::Ok(Vec::new()), |acc, curr| {
let acc = acc?;
let values = match curr {
Value::Bulk(values) => values,
_ => {
return Err((
ErrorKind::TypeError,
"expected array of integers as response",
)
.into());
}
};
let mut acc = if acc.is_empty() {
vec![initial_value; values.len()]
} else {
acc
};
for (index, value) in values.into_iter().enumerate() {
let int = match value {
Value::Int(int) => int,
_ => {
return Err((
ErrorKind::TypeError,
"expected array of integers as response",
)
.into());
}
};
acc[index] = match op {
LogicalAggregateOp::And => acc[index] && (int > 0),
};
}
Ok(acc)
})?;
Ok(Value::Bulk(
results
.into_iter()
.map(|result| Value::Int(result as i64))
.collect(),
))
}

pub(crate) fn combine_array_results(values: Vec<Value>) -> RedisResult<Value> {
let mut results = Vec::new();

for value in values {
match value {
Value::Bulk(values) => results.extend(values),
_ => {
return Err((ErrorKind::TypeError, "expected array of values as response").into());
}
}
}

Ok(Value::Bulk(results))
}

impl RoutingInfo {
pub(crate) fn response_policy<R>(r: &R) -> Option<ResponsePolicy>
where
R: Routable + ?Sized,
{
use ResponsePolicy::*;
let cmd = &r.command()?[..];
match cmd {
b"SCRIPT EXISTS" => Some(AggregateLogical(LogicalAggregateOp::And)),

b"DBSIZE" | b"DEL" | b"EXISTS" | b"SLOWLOG LEN" | b"TOUCH" | b"UNLINK" => {
Some(Aggregate(AggregateOp::Sum))
}

b"MSETNX" | b"WAIT" => Some(Aggregate(AggregateOp::Min)),

b"CONFIG SET" | b"FLUSHALL" | b"FLUSHDB" | b"FUNCTION DELETE" | b"FUNCTION FLUSH"
| b"FUNCTION LOAD" | b"FUNCTION RESTORE" | b"LATENCY RESET" | b"MEMORY PURGE"
| b"MSET" | b"PING" | b"SCRIPT FLUSH" | b"SCRIPT LOAD" | b"SLOWLOG RESET" => {
Some(AllSucceeded)
}

b"KEYS" | b"MGET" | b"SLOWLOG GET" => Some(CombineArrays),

b"FUNCTION KILL" | b"SCRIPT KILL" => Some(OneSucceeded),

// This isn't based on response_tips, but on the discussion here - https://github.com/redis/redis/issues/12410
b"RANDOMKEY" => Some(OneSucceededNonEmpty),

b"LATENCY GRAPH" | b"LATENCY HISTOGRAM" | b"LATENCY HISTORY" | b"LATENCY DOCTOR"
| b"LATENCY LATEST" => Some(Special),

b"FUNCTION STATS" => Some(Special),

b"MEMORY MALLOC-STATS" | b"MEMORY DOCTOR" | b"MEMORY STATS" => Some(Special),

b"INFO" => Some(Special),

_ => None,
}
}

pub(crate) fn for_routable<R>(r: &R) -> Option<RoutingInfo>
where
R: Routable + ?Sized,
Expand Down
1 change: 1 addition & 0 deletions redis/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ fn err_parser(line: &str) -> RedisError {
"CROSSSLOT" => ErrorKind::CrossSlot,
"MASTERDOWN" => ErrorKind::MasterDown,
"READONLY" => ErrorKind::ReadOnly,
"NOTBUSY" => ErrorKind::NotBusy,
code => return make_extension_error(code, pieces.next()),
};
match pieces.next() {
Expand Down

0 comments on commit 4395917

Please sign in to comment.