diff --git a/CHANGELOG.md b/CHANGELOG.md index e1745f68..738d2449 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ +## 6.2.2 + +* Fix cluster replica discovery with Elasticache +* Fix cluster replica `READONLY` usage + ## 6.2.1 -* Fix cluster failover with paused nodes. +* Fix cluster failover with paused nodes ## 6.2.0 diff --git a/Cargo.toml b/Cargo.toml index 403bc0ba..ce4d4e9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fred" -version = "6.2.1" +version = "6.2.2" authors = ["Alec Embke "] edition = "2021" description = "An async Redis client built on Tokio." diff --git a/src/clients/node.rs b/src/clients/node.rs index 67d967ed..1274ae06 100644 --- a/src/clients/node.rs +++ b/src/clients/node.rs @@ -41,6 +41,46 @@ use crate::{ /// A struct for interacting with individual nodes in a cluster. /// /// See [with_cluster_node](crate::clients::RedisClient::with_cluster_node) for more information. +/// +/// ``` +/// # use fred::prelude::*; +/// async fn example(client: &RedisClient) -> Result<(), RedisError> { +/// // discover servers via the `RedisConfig` or active connections +/// let connections = client.active_connections().await?; +/// +/// // ping each node in the cluster individually +/// for server in connections.into_iter() { +/// let _: () = client.with_cluster_node(server).ping().await?; +/// } +/// +/// // or use the cached cluster routing table to discover servers +/// let servers = client +/// .cached_cluster_state() +/// .expect("Failed to read cached cluster state") +/// .unique_primary_nodes(); +/// for server in servers { +/// // verify the server address with `CLIENT INFO` +/// let server_addr = client +/// .with_cluster_node(&server) +/// .client_info::() +/// .await? +/// .split(" ") +/// .find_map(|s| { +/// let parts: Vec<&str> = s.split("=").collect(); +/// if parts[0] == "laddr" { +/// Some(parts[1].to_owned()) +/// } else { +/// None +/// } +/// }) +/// .expect("Failed to read or parse client info."); +/// +/// assert_eq!(server_addr, server.to_string()); +/// } +/// +/// Ok(()) +/// } +/// ``` #[derive(Clone)] pub struct Node { inner: Arc, diff --git a/src/clients/redis.rs b/src/clients/redis.rs index 6d1053df..70fb98d7 100644 --- a/src/clients/redis.rs +++ b/src/clients/redis.rs @@ -242,53 +242,12 @@ impl RedisClient { Pipeline::from(self.clone()) } - /// Send subsequent commands to the provided cluster node. + /// Send commands to the provided cluster node. /// /// The caller will receive a `RedisErrorKind::Cluster` error if the provided server does not exist. /// /// The client will still automatically follow `MOVED` errors via this interface. Callers may not notice this, but /// incorrect server arguments here could result in unnecessary calls to refresh the cached cluster routing table. - /// - /// ``` - /// # use fred::prelude::*; - /// - /// async fn example(client: &RedisClient) -> Result<(), RedisError> { - /// // discover servers via the `RedisConfig` or active connections - /// let connections = client.active_connections().await?; - /// - /// // ping each node in the cluster individually - /// for server in connections.into_iter() { - /// let _: () = client.with_cluster_node(server).ping().await?; - /// } - /// - /// // or use the cached cluster routing table to discover servers - /// let servers = client - /// .cached_cluster_state() - /// .expect("Failed to read cached cluster state") - /// .unique_primary_nodes(); - /// for server in servers { - /// // verify the server address with `CLIENT INFO` - /// let server_addr = client - /// .with_cluster_node(&server) - /// .client_info::() - /// .await? - /// .split(" ") - /// .find_map(|s| { - /// let parts: Vec<&str> = s.split("=").collect(); - /// if parts[0] == "laddr" { - /// Some(parts[1].to_owned()) - /// } else { - /// None - /// } - /// }) - /// .expect("Failed to read or parse client info."); - /// - /// assert_eq!(server_addr, server.to_string()); - /// } - /// - /// Ok(()) - /// } - /// ``` pub fn with_cluster_node(&self, server: S) -> Node where S: Into, diff --git a/src/clients/replica.rs b/src/clients/replica.rs index b06fa1ae..adbc1e18 100644 --- a/src/clients/replica.rs +++ b/src/clients/replica.rs @@ -35,46 +35,6 @@ use tokio::sync::oneshot::channel as oneshot_channel; /// or when any connection closes. /// /// [Redis replication is asynchronous](https://redis.io/docs/management/replication/). -// ### Cluster Replication -// -// In a clustered deployment replicas may redirect callers back to primary nodes, even with read-only commands, -// depending on the server configuration. The client will automatically follow these redirections, but callers should -// be aware of this behavior for monitoring or tracing purposes. -// -// #### Example -// -// ```bash -// // connect to a primary node, print cluster and replica info, and `GET bar` -// foo@d85c70fd4fc0:/project$ redis-cli -h 172.21.0.5 -p 30001 -// 172.21.0.5:30001> cluster nodes -// 60ca8d301ef624956e847e6e6ecc865a36513bbe 172.21.0.3:30001@40001 slave f837e4056f564ab7fd69c24264279a1bd81d6420 0 1674165394000 3 connected -// ddc30573f0c7ee1f79d7f263e2f83d7b83ad0ba0 172.21.0.8:30001@40001 slave 101b2a992c6c909d807d4c5fbd149bcc28e63ef8 0 1674165396000 2 connected -// 101b2a992c6c909d807d4c5fbd149bcc28e63ef8 172.21.0.2:30001@40001 master - 0 1674165395807 2 connected 5461-10922 -// 38a7f9d3e440a37adf42f2ceddd9ad52bfb4186e 172.21.0.7:30001@40001 slave bd48cbd28cd927a284bab4424bd41b077a25acb6 0 1674165396810 1 connected -// f837e4056f564ab7fd69c24264279a1bd81d6420 172.21.0.4:30001@40001 master - 0 1674165395000 3 connected 10923-16383 -// bd48cbd28cd927a284bab4424bd41b077a25acb6 172.21.0.5:30001@40001 myself,master - 0 1674165393000 1 connected 0-5460 -// 172.21.0.5:30001> info replication -// # Replication -// role:master -// connected_slaves:1 -// slave0:ip=172.21.0.7,port=30001,state=online,offset=183696,lag=0 -// [truncated] -// 172.21.0.5:30001> get bar -// "2" -// -// // connect to the associated replica and `GET bar` -// foo@d85c70fd4fc0:/project$ redis-cli -h 172.21.0.7 -p 30001 -// 172.21.0.7:30001> role -// 1) "slave" -// 2) "172.21.0.5" -// 3) (integer) 30001 -// 4) "connected" -// 5) (integer) 185390 -// 172.21.0.7:30001> get bar -// (error) MOVED 5061 172.21.0.5:30001 -// ``` -// -// **This can result in unexpected latency or errors depending on the client configuration.** #[derive(Clone)] #[cfg_attr(docsrs, doc(cfg(feature = "replicas")))] pub struct Replicas { diff --git a/src/modules/mocks.rs b/src/modules/mocks.rs index 09d3f5e6..f2d15a71 100644 --- a/src/modules/mocks.rs +++ b/src/modules/mocks.rs @@ -223,7 +223,7 @@ impl Mocks for SimpleMap { /// /// ```rust /// #[tokio::test] -/// async fn should_use_echo_mock() { +/// async fn should_use_buffer_mock() { /// let buffer = Arc::new(Buffer::new()); /// let config = RedisConfig { /// mocks: buffer.clone(), diff --git a/src/protocol/cluster.rs b/src/protocol/cluster.rs index 18388915..377552a4 100644 --- a/src/protocol/cluster.rs +++ b/src/protocol/cluster.rs @@ -43,8 +43,9 @@ fn check_metadata_hostname(data: &HashMap) -> Option<&str> { /// The implementation here does the following: /// 1. If `server[0]` is a hostname then use that. /// 2. If `server[0]` is an IP address, then check `server[3]` for a "hostname" metadata field and use that if found. -/// Otherwise use the IP address in `server[0]`. 3. If `server[0]` is null, but `server[3]` has a "hostname" metadata -/// field, then use the metadata field. Otherwise use `default_host`. +/// Otherwise use the IP address in `server[0]`. +/// 3. If `server[0]` is null, but `server[3]` has a "hostname" metadata field, then use the metadata field. Otherwise +/// use `default_host`. /// /// fn parse_cluster_slot_hostname(server: &[RedisValue], default_host: &str) -> Result { @@ -114,6 +115,37 @@ fn parse_node_block(data: &Vec, default_host: &str) -> Option<(Strin Some((hostname, port, primary, id)) } +/// Parse the optional trailing replica nodes in each `CLUSTER SLOTS` slot range block. +#[cfg(feature = "replicas")] +fn parse_cluster_slot_replica_nodes(slot_range: Vec, default_host: &str) -> Vec { + slot_range + .into_iter() + .filter_map(|value| { + let server_block: Vec = match value.convert() { + Ok(v) => v, + Err(_) => { + warn!("Skip replica CLUSTER SLOTS block from {}", default_host); + return None; + }, + }; + + let (host, port) = match parse_node_block(&server_block, default_host) { + Some((h, p, _, _)) => (ArcStr::from(h), p), + None => { + warn!("Skip replica CLUSTER SLOTS block from {}", default_host); + return None; + }, + }; + + Some(Server { + host, + port, + tls_server_name: None, + }) + }) + .collect() +} + /// Parse the cluster slot range and associated server blocks. fn parse_cluster_slot_nodes(mut slot_range: Vec, default_host: &str) -> Result { if slot_range.len() < 3 { @@ -140,17 +172,18 @@ fn parse_cluster_slot_nodes(mut slot_range: Vec, default_host: &str) )); }, }; - let primary = Server { - host, - port, - tls_server_name: None, - }; Ok(SlotRange { start, end, - primary, id, + primary: Server { + host, + port, + tls_server_name: None, + }, + #[cfg(feature = "replicas")] + replicas: parse_cluster_slot_replica_nodes(slot_range, default_host), }) } @@ -172,6 +205,11 @@ pub fn parse_cluster_slots(frame: RedisValue, default_host: &str) -> Result, default_host: &str) { for slot_range in ranges.iter_mut() { slot_range.primary.set_tls_server_name(policy, default_host); + + #[cfg(feature = "replicas")] + for server in slot_range.replicas.iter_mut() { + server.set_tls_server_name(policy, default_host); + } } } @@ -327,6 +365,12 @@ mod tests { for slot_range in ranges.iter() { assert_ne!(slot_range.primary.host, "default-host"); assert_eq!(slot_range.primary.tls_server_name, Some("default-host".into())); + + #[cfg(feature = "replicas")] + for replica in slot_range.replicas.iter() { + assert_ne!(replica.host, "default-host"); + assert_eq!(replica.tls_server_name, Some("default-host".into())); + } } } @@ -342,6 +386,12 @@ mod tests { assert_ne!(slot_range.primary.host, "default-host"); // since there's a metadata hostname then expect that instead of the default host assert_ne!(slot_range.primary.tls_server_name, Some("default-host".into())); + + #[cfg(feature = "replicas")] + for replica in slot_range.replicas.iter() { + assert_ne!(replica.host, "default-host"); + assert_ne!(replica.tls_server_name, Some("default-host".into())); + } } } @@ -356,6 +406,12 @@ mod tests { for slot_range in ranges.iter() { assert_ne!(slot_range.primary.host, "default-host"); assert_eq!(slot_range.primary.tls_server_name, Some("foobarbaz".into())); + + #[cfg(feature = "replicas")] + for replica in slot_range.replicas.iter() { + assert_ne!(replica.host, "default-host"); + assert_eq!(replica.tls_server_name, Some("foobarbaz".into())); + } } } @@ -366,34 +422,52 @@ mod tests { let actual = parse_cluster_slots(input, "bad-host").expect("Failed to parse input"); let expected = vec![ SlotRange { - start: 0, - end: 5460, - primary: Server { + start: 0, + end: 5460, + primary: Server { host: "host-1.redis.example.com".into(), port: 30001, tls_server_name: None, }, - id: "09dbe9720cda62f7865eabc5fd8857c5d2678366".into(), + id: "09dbe9720cda62f7865eabc5fd8857c5d2678366".into(), + #[cfg(feature = "replicas")] + replicas: vec![Server { + host: "host-2.redis.example.com".into(), + port: 30004, + tls_server_name: None, + }], }, SlotRange { - start: 5461, - end: 10922, - primary: Server { + start: 5461, + end: 10922, + primary: Server { host: "host-3.redis.example.com".into(), port: 30002, tls_server_name: None, }, - id: "c9d93d9f2c0c524ff34cc11838c2003d8c29e013".into(), + id: "c9d93d9f2c0c524ff34cc11838c2003d8c29e013".into(), + #[cfg(feature = "replicas")] + replicas: vec![Server { + host: "host-4.redis.example.com".into(), + port: 30005, + tls_server_name: None, + }], }, SlotRange { - start: 10923, - end: 16383, - primary: Server { + start: 10923, + end: 16383, + primary: Server { host: "host-5.redis.example.com".into(), port: 30003, tls_server_name: None, }, - id: "044ec91f325b7595e76dbcb18cc688b6a5b434a1".into(), + id: "044ec91f325b7595e76dbcb18cc688b6a5b434a1".into(), + #[cfg(feature = "replicas")] + replicas: vec![Server { + host: "host-6.redis.example.com".into(), + port: 30006, + tls_server_name: None, + }], }, ]; assert_eq!(actual, expected); @@ -406,34 +480,52 @@ mod tests { let actual = parse_cluster_slots(input, "bad-host").expect("Failed to parse input"); let expected = vec![ SlotRange { - start: 0, - end: 5460, - primary: Server { + start: 0, + end: 5460, + primary: Server { host: "127.0.0.1".into(), port: 30001, tls_server_name: None, }, - id: "09dbe9720cda62f7865eabc5fd8857c5d2678366".into(), + id: "09dbe9720cda62f7865eabc5fd8857c5d2678366".into(), + #[cfg(feature = "replicas")] + replicas: vec![Server { + host: "127.0.0.1".into(), + port: 30004, + tls_server_name: None, + }], }, SlotRange { - start: 5461, - end: 10922, - primary: Server { + start: 5461, + end: 10922, + primary: Server { host: "127.0.0.1".into(), port: 30002, tls_server_name: None, }, - id: "c9d93d9f2c0c524ff34cc11838c2003d8c29e013".into(), + id: "c9d93d9f2c0c524ff34cc11838c2003d8c29e013".into(), + #[cfg(feature = "replicas")] + replicas: vec![Server { + host: "127.0.0.1".into(), + port: 30005, + tls_server_name: None, + }], }, SlotRange { - start: 10923, - end: 16383, - primary: Server { + start: 10923, + end: 16383, + primary: Server { host: "127.0.0.1".into(), port: 30003, tls_server_name: None, }, - id: "044ec91f325b7595e76dbcb18cc688b6a5b434a1".into(), + id: "044ec91f325b7595e76dbcb18cc688b6a5b434a1".into(), + #[cfg(feature = "replicas")] + replicas: vec![Server { + host: "127.0.0.1".into(), + port: 30006, + tls_server_name: None, + }], }, ]; assert_eq!(actual, expected); @@ -494,34 +586,52 @@ mod tests { let actual = parse_cluster_slots(input, "bad-host").expect("Failed to parse input"); let expected = vec![ SlotRange { - start: 0, - end: 5460, - primary: Server { + start: 0, + end: 5460, + primary: Server { host: "127.0.0.1".into(), port: 30001, tls_server_name: None, }, - id: "09dbe9720cda62f7865eabc5fd8857c5d2678366".into(), + id: "09dbe9720cda62f7865eabc5fd8857c5d2678366".into(), + #[cfg(feature = "replicas")] + replicas: vec![Server { + host: "127.0.0.1".into(), + port: 30004, + tls_server_name: None, + }], }, SlotRange { - start: 5461, - end: 10922, - primary: Server { + start: 5461, + end: 10922, + primary: Server { host: "127.0.0.1".into(), port: 30002, tls_server_name: None, }, - id: "c9d93d9f2c0c524ff34cc11838c2003d8c29e013".into(), + id: "c9d93d9f2c0c524ff34cc11838c2003d8c29e013".into(), + #[cfg(feature = "replicas")] + replicas: vec![Server { + host: "127.0.0.1".into(), + port: 30005, + tls_server_name: None, + }], }, SlotRange { - start: 10923, - end: 16383, - primary: Server { + start: 10923, + end: 16383, + primary: Server { host: "127.0.0.1".into(), port: 30003, tls_server_name: None, }, - id: "044ec91f325b7595e76dbcb18cc688b6a5b434a1".into(), + id: "044ec91f325b7595e76dbcb18cc688b6a5b434a1".into(), + #[cfg(feature = "replicas")] + replicas: vec![Server { + host: "127.0.0.1".into(), + port: 30006, + tls_server_name: None, + }], }, ]; assert_eq!(actual, expected); @@ -582,34 +692,52 @@ mod tests { let actual = parse_cluster_slots(input, "fake-host").expect("Failed to parse input"); let expected = vec![ SlotRange { - start: 0, - end: 5460, - primary: Server { + start: 0, + end: 5460, + primary: Server { host: "fake-host".into(), port: 30001, tls_server_name: None, }, - id: "09dbe9720cda62f7865eabc5fd8857c5d2678366".into(), + id: "09dbe9720cda62f7865eabc5fd8857c5d2678366".into(), + #[cfg(feature = "replicas")] + replicas: vec![Server { + host: "fake-host".into(), + port: 30004, + tls_server_name: None, + }], }, SlotRange { - start: 5461, - end: 10922, - primary: Server { + start: 5461, + end: 10922, + primary: Server { host: "fake-host".into(), port: 30002, tls_server_name: None, }, - id: "c9d93d9f2c0c524ff34cc11838c2003d8c29e013".into(), + id: "c9d93d9f2c0c524ff34cc11838c2003d8c29e013".into(), + #[cfg(feature = "replicas")] + replicas: vec![Server { + host: "fake-host".into(), + port: 30005, + tls_server_name: None, + }], }, SlotRange { - start: 10923, - end: 16383, - primary: Server { + start: 10923, + end: 16383, + primary: Server { host: "fake-host".into(), port: 30003, tls_server_name: None, }, - id: "044ec91f325b7595e76dbcb18cc688b6a5b434a1".into(), + id: "044ec91f325b7595e76dbcb18cc688b6a5b434a1".into(), + #[cfg(feature = "replicas")] + replicas: vec![Server { + host: "fake-host".into(), + port: 30006, + tls_server_name: None, + }], }, ]; assert_eq!(actual, expected); diff --git a/src/protocol/connection.rs b/src/protocol/connection.rs index 146a00d0..791944c0 100644 --- a/src/protocol/connection.rs +++ b/src/protocol/connection.rs @@ -39,7 +39,7 @@ use crate::protocol::tls::TlsConnector; #[cfg(feature = "replicas")] use crate::{ protocol::{connection, responders::ResponseKind}, - router::replicas, + types::RedisValue, }; #[cfg(feature = "enable-rustls")] use std::convert::TryInto; @@ -727,35 +727,63 @@ impl RedisTransport { .await } - /// Run and parse the output from `INFO replication`. + /// Send `READONLY` to the server. #[cfg(feature = "replicas")] - pub async fn info_replication( + pub async fn readonly(&mut self, inner: &Arc, timeout: Option) -> Result<(), RedisError> { + if !inner.config.server.is_clustered() { + return Ok(()); + } + + let timeout = connection_timeout(timeout); + utils::apply_timeout( + async { + _debug!(inner, "Sending READONLY to {}", self.server); + let command = RedisCommand::new(RedisCommandKind::Readonly, vec![]); + let response = self.request_response(command, inner.is_resp3()).await?; + let _ = protocol_utils::frame_to_single_result(response)?; + + Ok::<_, RedisError>(()) + }, + timeout, + ) + .await + } + + /// Send the `ROLE` command to the server. + #[cfg(feature = "replicas")] + pub async fn role( &mut self, inner: &Arc, timeout: Option, - ) -> Result, RedisError> { + ) -> Result { let timeout = connection_timeout(timeout); - let command = RedisCommand::new(RedisCommandKind::Info, vec!["replication".into()]); + let command = RedisCommand::new(RedisCommandKind::Role, vec![]); utils::apply_timeout( async { self .request_response(command, inner.is_resp3()) .await - .map(|f| f.as_str().map(|s| s.to_owned())) + .and_then(protocol_utils::frame_to_results_raw) }, timeout, ) .await } + /// Discover connected replicas via the ROLE command. + #[cfg(feature = "replicas")] + pub async fn discover_replicas(&mut self, inner: &Arc) -> Result, RedisError> { + self + .role(inner, None) + .await + .and_then(protocol_utils::parse_master_role_replicas) + } + + /// Discover connected replicas via the ROLE command. #[cfg(not(feature = "replicas"))] - pub async fn info_replication( - &mut self, - _: &Arc, - _: Option, - ) -> Result, RedisError> { - Ok(None) + pub async fn discover_replicas(&mut self, _: &Arc) -> Result, RedisError> { + Ok(Vec::new()) } /// Split the transport into reader/writer halves. @@ -857,18 +885,13 @@ impl RedisWriter { } #[cfg(feature = "replicas")] - pub async fn info_replication(&mut self, inner: &Arc) -> Result, RedisError> { - let command = RedisCommand::new(RedisCommandKind::Info, vec!["replication".into()]); - let frame = connection::request_response(inner, self, command, None) - .await? - .as_str() - .map(|s| s.to_owned()) - .ok_or(RedisError::new( - RedisErrorKind::Replica, - "Failed to read replication info.", - ))?; - - Ok(replicas::parse_info_replication(frame)) + pub async fn discover_replicas(&mut self, inner: &Arc) -> Result, RedisError> { + let command = RedisCommand::new(RedisCommandKind::Role, vec![]); + let role = connection::request_response(inner, self, command, None) + .await + .and_then(protocol_utils::frame_to_results_raw)?; + + protocol_utils::parse_master_role_replicas(role) } /// Check if the connection is connected and can send frames. @@ -937,10 +960,20 @@ impl RedisWriter { /// /// Returns the in-flight commands that had not received a response. pub async fn graceful_close(mut self) -> CommandBuffer { - let _ = self.sink.close().await; - if let Some(mut reader) = self.reader { - let _ = reader.wait().await; - } + let timeout = globals().default_connection_timeout_ms(); + let _ = utils::apply_timeout( + async { + let _ = self.sink.close().await; + if let Some(mut reader) = self.reader { + let _ = reader.wait().await; + } + + Ok::<_, RedisError>(()) + }, + timeout, + ) + .await; + self.buffer.lock().drain(..).collect() } } diff --git a/src/protocol/tls.rs b/src/protocol/tls.rs index 250bebaa..c24d7773 100644 --- a/src/protocol/tls.rs +++ b/src/protocol/tls.rs @@ -96,7 +96,8 @@ impl Eq for TlsHostMapping {} /// /// Note: the `hostnames` field is only necessary to use with certain clustered deployments. /// -/// ```rust no_compile no_run +/// ```rust no_run +/// # use fred::types::*; /// let config = TlsConfig { /// // or use `TlsConnector::default_rustls()` /// connector: TlsConnector::default_native_tls(), diff --git a/src/protocol/types.rs b/src/protocol/types.rs index b71f75bc..473ff275 100644 --- a/src/protocol/types.rs +++ b/src/protocol/types.rs @@ -373,13 +373,17 @@ impl ValueScanInner { #[derive(Debug, Clone, Eq, PartialEq)] pub struct SlotRange { /// The start of the hash slot range. - pub start: u16, + pub start: u16, /// The end of the hash slot range. - pub end: u16, + pub end: u16, /// The primary server owner. - pub primary: Server, + pub primary: Server, /// The internal ID assigned by the server. - pub id: ArcStr, + pub id: ArcStr, + /// Replica node owners. + #[cfg(feature = "replicas")] + #[cfg_attr(docsrs, doc(cfg(feature = "replicas")))] + pub replicas: Vec, } /// The cached view of the cluster used by the client to route commands to the correct cluster nodes. @@ -444,6 +448,24 @@ impl ClusterRouting { protocol_utils::binary_search(&self.data, slot).map(|idx| &self.data[idx].primary) } + /// Read the replicas associated with the provided primary node based on the cached CLUSTER SLOTS response. + #[cfg(feature = "replicas")] + #[cfg_attr(docsrs, doc(cfg(feature = "replicas")))] + pub fn replicas(&self, primary: &Server) -> Vec { + self + .data + .iter() + .fold(BTreeSet::new(), |mut replicas, slot| { + if slot.primary == *primary { + replicas.extend(slot.replicas.clone()); + } + + replicas + }) + .into_iter() + .collect() + } + /// Read the number of hash slot ranges in the cluster. pub fn len(&self) -> usize { self.data.len() diff --git a/src/protocol/utils.rs b/src/protocol/utils.rs index bb55a1b5..b744da84 100644 --- a/src/protocol/utils.rs +++ b/src/protocol/utils.rs @@ -1191,6 +1191,35 @@ pub fn parse_cluster_info(data: Resp3Frame) -> Result { } } +/// Parse the replicas from the ROLE response returned from a master/primary node. +#[cfg(feature = "replicas")] +pub fn parse_master_role_replicas(data: RedisValue) -> Result, RedisError> { + let mut role: Vec = data.convert()?; + + if role.len() == 3 { + if role[0].as_str().map(|s| s == "master").unwrap_or(false) { + let replicas: Vec = role[2].take().convert()?; + + Ok( + replicas + .into_iter() + .filter_map(|value| { + value + .convert::<(String, u16, String)>() + .ok() + .map(|(host, port, _)| Server::new(host, port)) + }) + .collect(), + ) + } else { + Ok(Vec::new()) + } + } else { + // we're talking to a replica or sentinel node + Ok(Vec::new()) + } +} + fn frame_to_f64(frame: &Resp3Frame) -> Result { match frame { Resp3Frame::Double { ref data, .. } => Ok(*data), diff --git a/src/router/commands.rs b/src/router/commands.rs index 3b90997b..33a9af16 100644 --- a/src/router/commands.rs +++ b/src/router/commands.rs @@ -95,7 +95,7 @@ async fn write_with_backpressure( Some(command) => command, None => return Err(RedisError::new(RedisErrorKind::Unknown, "Missing command.")), }; - // FIXME clean this up + // TODO clean this up let rx = match _backpressure { Some(backpressure) => match backpressure.wait(inner, &mut command).await { Ok(Some(rx)) => Some(rx), diff --git a/src/router/mod.rs b/src/router/mod.rs index 7dee4fb9..83541af2 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -140,13 +140,14 @@ impl Connections { } } + /// Discover and return a mapping of replica nodes to their associated primary node. #[cfg(feature = "replicas")] pub async fn replica_map(&mut self, inner: &Arc) -> Result, RedisError> { Ok(match self { Connections::Centralized { ref mut writer } | Connections::Sentinel { ref mut writer } => { if let Some(writer) = writer { writer - .info_replication(inner) + .discover_replicas(inner) .await? .into_iter() .map(|replica| (replica, writer.server.clone())) @@ -159,8 +160,19 @@ impl Connections { let mut out = HashMap::with_capacity(writers.len()); for (primary, writer) in writers.iter_mut() { - for replica in writer.info_replication(inner).await? { - out.insert(replica, primary.clone()); + let replicas = inner + .with_cluster_state(|state| Ok(state.replicas(primary))) + .ok() + .unwrap_or(Vec::new()); + + if replicas.is_empty() { + for replica in writer.discover_replicas(inner).await? { + out.insert(replica, primary.clone()); + } + } else { + for replica in replicas.into_iter() { + out.insert(replica, primary.clone()); + } } } out @@ -877,7 +889,14 @@ impl Router { self.sync_network_timeout_state(); if result.is_ok() { - self.sync_replicas().await + if let Err(e) = self.sync_replicas().await { + warn!("{}: Error syncing replicas: {:?}", self.inner.id, e); + if !self.inner.ignore_replica_reconnect_errors() { + return Err(e); + } + } + + Ok(()) } else { result } diff --git a/src/router/replicas.rs b/src/router/replicas.rs index 52f6ddb3..ec509d55 100644 --- a/src/router/replicas.rs +++ b/src/router/replicas.rs @@ -6,7 +6,7 @@ use crate::{ interfaces, modules::inner::RedisClientInner, protocol::{ - command::{RedisCommand, RedisCommandKind, RouterCommand}, + command::{RedisCommand, RouterCommand}, connection, connection::{CommandBuffer, RedisWriter}, }, @@ -15,7 +15,7 @@ use crate::{ }; #[cfg(feature = "replicas")] use std::{ - collections::{HashMap, VecDeque}, + collections::{BTreeSet, HashMap, VecDeque}, fmt, fmt::Formatter, sync::Arc, @@ -232,7 +232,7 @@ impl Replicas { } for (replica, primary) in self.routing.to_map() { - let _ = self.add_connection(inner, primary, replica, true).await?; + let _ = self.add_connection(inner, primary, replica, false).await?; } Ok(()) @@ -271,6 +271,7 @@ impl Replicas { let _ = transport.setup(inner, None).await?; let (_, writer) = if inner.config.server.is_clustered() { + let _ = transport.readonly(inner, None).await?; connection::split_and_initialize(inner, transport, true, clustered::spawn_reader_task)? } else { connection::split_and_initialize(inner, transport, true, centralized::spawn_reader_task)? @@ -340,28 +341,67 @@ impl Replicas { self.routing.to_map() } - /// Check if the provided connection has any known replica nodes, and if so add them to the cached routing table. - pub async fn check_replicas( + /// Discover and connect to replicas via the `ROLE` command. + pub async fn sync_by_role( &mut self, inner: &Arc, primary: &mut RedisWriter, ) -> Result<(), RedisError> { - let command = RedisCommand::new(RedisCommandKind::Info, vec!["replication".into()]); - let frame = connection::request_response(inner, primary, command, None) - .await? - .as_str() - .map(|s| s.to_owned()) - .ok_or(RedisError::new( - RedisErrorKind::Replica, - "Failed to read replication info.", - ))?; - - for replica in parse_info_replication(frame) { + for replica in primary.discover_replicas(inner).await? { self.routing.add(primary.server.clone(), replica); } + + Ok(()) + } + + /// Discover and connect to replicas by inspecting the cached `CLUSTER SLOTS` state. + pub fn sync_by_cached_cluster_state( + &mut self, + inner: &Arc, + primary: &Server, + ) -> Result<(), RedisError> { + let replicas: Vec = inner.with_cluster_state(|state| { + Ok( + state + .slots() + .iter() + .fold(BTreeSet::new(), |mut replicas, slot| { + if slot.primary == *primary { + replicas.extend(slot.replicas.clone()); + } + + replicas + }) + .into_iter() + .collect(), + ) + })?; + + for replica in replicas.into_iter() { + self.routing.add(primary.clone(), replica); + } + Ok(()) } + /// Check if the provided connection has any known replica nodes, and if so add them to the cached routing table. + pub async fn check_replicas( + &mut self, + inner: &Arc, + primary: &mut RedisWriter, + ) -> Result<(), RedisError> { + if inner.config.server.is_clustered() { + if let Err(_) = self.sync_by_cached_cluster_state(inner, &primary.server) { + _warn!(inner, "Failed to discover replicas via cached CLUSTER SLOTS."); + self.sync_by_role(inner, primary).await + } else { + Ok(()) + } + } else { + self.sync_by_role(inner, primary).await + } + } + /// Send a command to one of the replicas associated with the provided primary server. pub async fn write_command( &mut self, @@ -457,47 +497,6 @@ impl Replicas { } } -/// Parse the `INFO replication` response for replica node server identifiers. -#[cfg(feature = "replicas")] -pub fn parse_info_replication(frame: String) -> Vec { - let mut replicas = Vec::new(); - for line in frame.lines() { - if line.trim().starts_with("slave") { - let values = match line.split(":").last() { - Some(values) => values, - None => continue, - }; - - let parts: Vec<&str> = values.split(",").collect(); - if parts.len() < 2 { - continue; - } - - let (mut host, mut port) = (None, None); - for kv in parts.into_iter() { - let parts: Vec<&str> = kv.split("=").collect(); - if parts.len() != 2 { - continue; - } - - if parts[0] == "ip" { - host = Some(parts[1].to_owned()); - } else if parts[0] == "port" { - port = parts[1].parse::().ok(); - } - } - - if let Some(host) = host { - if let Some(port) = port { - replicas.push(Server::new(host, port)); - } - } - } - } - - replicas -} - #[cfg(all(feature = "replicas", any(feature = "enable-native-tls", feature = "enable-rustls")))] pub fn map_replica_tls_names(inner: &Arc, primary: &Server, replica: &mut Server) { let policy = match inner.config.tls { diff --git a/src/types/config.rs b/src/types/config.rs index e187c134..50cb8e78 100644 --- a/src/types/config.rs +++ b/src/types/config.rs @@ -309,7 +309,8 @@ pub struct PerformanceConfig { pub auto_pipeline: bool, /// The maximum number of times the client will attempt to send a command. /// - /// This value be incremented on a command whenever the connection closes while the command is in-flight. + /// This value be incremented whenever the connection closes while the command is in-flight or following a + /// MOVED/ASK error. /// /// Default: `3` pub max_command_attempts: u32, diff --git a/tests/docker/runners/bash/sentinel-features.sh b/tests/docker/runners/bash/sentinel-features.sh index d1255585..3416b8e3 100755 --- a/tests/docker/runners/bash/sentinel-features.sh +++ b/tests/docker/runners/bash/sentinel-features.sh @@ -10,7 +10,7 @@ do fi done -FEATURES="network-logs sentinel-tests sentinel-auth" +FEATURES="network-logs debug-ids sentinel-tests sentinel-auth replicas" if [ -z "$FRED_CI_NEXTEST" ]; then cargo test --release --lib --tests --features "$FEATURES" -- --test-threads=1 "$@" diff --git a/tests/integration/clustered.rs b/tests/integration/clustered.rs index 7d2d0fae..a01f8e1a 100644 --- a/tests/integration/clustered.rs +++ b/tests/integration/clustered.rs @@ -67,6 +67,8 @@ mod other { #[cfg(feature = "replicas")] cluster_test!(other, should_replica_set_and_get_not_lazy); #[cfg(feature = "replicas")] + cluster_test!(other, should_use_cluster_replica_without_redirection); + #[cfg(feature = "replicas")] cluster_test!(other, should_pipeline_with_replicas); } diff --git a/tests/integration/other/mod.rs b/tests/integration/other/mod.rs index 096cc6bf..7e536cea 100644 --- a/tests/integration/other/mod.rs +++ b/tests/integration/other/mod.rs @@ -25,6 +25,8 @@ use tokio::time::sleep; #[cfg(feature = "subscriber-client")] use fred::clients::SubscriberClient; +#[cfg(feature = "replicas")] +use fred::types::ReplicaConfig; #[cfg(feature = "dns")] use fred::types::Resolve; #[cfg(feature = "partial-tracing")] @@ -491,6 +493,32 @@ pub async fn should_pipeline_with_replicas(client: RedisClient, _: RedisConfig) Ok(()) } +#[cfg(feature = "replicas")] +pub async fn should_use_cluster_replica_without_redirection( + client: RedisClient, + _: RedisConfig, +) -> Result<(), RedisError> { + let mut config = client.client_config(); + config.replica = ReplicaConfig { + lazy_connections: true, + primary_fallback: false, + ignore_reconnection_errors: true, + ..ReplicaConfig::default() + }; + let mut perf = client.perf_config(); + perf.max_command_attempts = 1; + let policy = client.client_reconnect_policy(); + + let client = RedisClient::new(config, Some(perf), policy); + let _ = client.connect(); + let _ = client.wait_for_connect().await?; + + let _: () = client.replicas().get("foo").await?; + let _: () = client.incr("foo").await?; + + Ok(()) +} + pub async fn should_gracefully_quit(client: RedisClient, _: RedisConfig) -> Result<(), RedisError> { let client = client.clone_new(); let connection = client.connect();