diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index e2a6844fe..7af3a7ef4 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -853,28 +853,36 @@ where nodes.sort_unstable(); nodes.dedup(); let nodes_len = nodes.len(); - let addresses_and_connections_iter = nodes.into_iter().map(|addr| async move { - if let Some(node) = connections.node_for_address(addr.as_str()) { - return (addr, Some(node)); - } - // If it's a DNS endpoint, it could have been stored in the existing connections vector using the resolved IP address instead of the DNS endpoint's name. - // We shall check if a connection is already exists under the resolved IP name. - let (host, port) = match get_host_and_port_from_addr(addr) { - Some((host, port)) => (host, port), - None => return (addr, None), - }; - let conn = get_socket_addrs(host, port) - .await - .ok() - .map(|mut socket_addresses| { - socket_addresses - .find_map(|addr| connections.node_for_address(&addr.to_string())) - }) - .unwrap_or(None); - (addr, conn) - }); - let addresses_and_connections_iter = - futures::future::join_all(addresses_and_connections_iter).await; + let addresses_and_connections_iter = stream::iter(nodes) + .fold( + Vec::with_capacity(nodes_len), + |mut addrs_and_conns, addr| async move { + if let Some(node) = connections.node_for_address(addr.as_str()) { + addrs_and_conns.push((addr, Some(node))); + return addrs_and_conns; + } + // If it's a DNS endpoint, it could have been stored in the existing connections vector using the resolved IP address instead of the DNS endpoint's name. + // We shall check if a connection is already exists under the resolved IP name. + let (host, port) = match get_host_and_port_from_addr(addr) { + Some((host, port)) => (host, port), + None => { + addrs_and_conns.push((addr, None)); + return addrs_and_conns; + } + }; + let conn = get_socket_addrs(host, port) + .await + .ok() + .map(|mut socket_addresses| { + socket_addresses + .find_map(|addr| connections.node_for_address(&addr.to_string())) + }) + .unwrap_or(None); + addrs_and_conns.push((addr, conn)); + addrs_and_conns + }, + ) + .await; let new_connections: ConnectionMap = stream::iter(addresses_and_connections_iter) .fold( ConnectionsMap(HashMap::with_capacity(nodes_len)),