Skip to content

Commit

Permalink
refresh_slots - change DNS resolver calls to run sequentially (redis-…
Browse files Browse the repository at this point in the history
…rs#61)

Fix dns resolve delay
  • Loading branch information
alexander-shabanov committed Oct 29, 2023
1 parent 168b9c5 commit db09996
Showing 1 changed file with 30 additions and 22 deletions.
52 changes: 30 additions & 22 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,28 +853,36 @@ where
nodes.sort_unstable();
nodes.dedup();
let nodes_len = nodes.len();
let addresses_and_connections_iter = nodes.into_iter().map(|addr| async move {
if let Some(node) = connections.node_for_address(addr.as_str()) {
return (addr, Some(node));
}
// If it's a DNS endpoint, it could have been stored in the existing connections vector using the resolved IP address instead of the DNS endpoint's name.
// We shall check if a connection is already exists under the resolved IP name.
let (host, port) = match get_host_and_port_from_addr(addr) {
Some((host, port)) => (host, port),
None => return (addr, None),
};
let conn = get_socket_addrs(host, port)
.await
.ok()
.map(|mut socket_addresses| {
socket_addresses
.find_map(|addr| connections.node_for_address(&addr.to_string()))
})
.unwrap_or(None);
(addr, conn)
});
let addresses_and_connections_iter =
futures::future::join_all(addresses_and_connections_iter).await;
let addresses_and_connections_iter = stream::iter(nodes)
.fold(
Vec::with_capacity(nodes_len),
|mut addrs_and_conns, addr| async move {
if let Some(node) = connections.node_for_address(addr.as_str()) {
addrs_and_conns.push((addr, Some(node)));
return addrs_and_conns;
}
// If it's a DNS endpoint, it could have been stored in the existing connections vector using the resolved IP address instead of the DNS endpoint's name.
// We shall check if a connection is already exists under the resolved IP name.
let (host, port) = match get_host_and_port_from_addr(addr) {
Some((host, port)) => (host, port),
None => {
addrs_and_conns.push((addr, None));
return addrs_and_conns;
}
};
let conn = get_socket_addrs(host, port)
.await
.ok()
.map(|mut socket_addresses| {
socket_addresses
.find_map(|addr| connections.node_for_address(&addr.to_string()))
})
.unwrap_or(None);
addrs_and_conns.push((addr, conn));
addrs_and_conns
},
)
.await;
let new_connections: ConnectionMap<C> = stream::iter(addresses_and_connections_iter)
.fold(
ConnectionsMap(HashMap::with_capacity(nodes_len)),
Expand Down

0 comments on commit db09996

Please sign in to comment.