Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions grpc/src/client/load_balancing/child_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
// production.

use std::collections::HashSet;
use std::error::Error;
use std::fmt::Debug;
use std::sync::Mutex;
use std::{collections::HashMap, hash::Hash, mem, sync::Arc};
Expand Down Expand Up @@ -287,7 +288,7 @@ where
&mut self,
child_updates: impl IntoIterator<Item = ChildUpdate<T>>,
channel_controller: &mut dyn ChannelController,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
) {
// Split the child updates into the IDs and builders, and the
// ResolverUpdates/LbConfigs.
let (ids_builders, updates): (Vec<_>, Vec<_>) = child_updates
Expand All @@ -313,7 +314,29 @@ where
);
self.resolve_child_controller(channel_controller, child_idx);
}
Ok(())
}

/// Forwards the `resolver_update` and `config` to all current children.
///
/// Returns the Result from calling into each child.
pub fn resolver_update(
&mut self,
resolver_update: ResolverUpdate,
config: Option<&LbConfig>,
channel_controller: &mut dyn ChannelController,
) -> Vec<Result<(), Box<dyn Error + Send + Sync>>> {
let mut res = Vec::with_capacity(self.children.len());
for child_idx in 0..self.children.len() {
let child = &mut self.children[child_idx];
let mut channel_controller = WrappedController::new(channel_controller);
res.push(child.policy.resolver_update(
resolver_update.clone(),
config,
&mut channel_controller,
));
self.resolve_child_controller(channel_controller, child_idx);
}
res
}

/// Forwards the incoming subchannel_update to the child that created the
Expand Down Expand Up @@ -513,7 +536,7 @@ mod test {
)),
});

assert!(child_manager.update(updates, tcc).is_ok());
child_manager.update(updates, tcc);
}

fn move_subchannel_to_state(
Expand Down Expand Up @@ -849,7 +872,7 @@ mod test {
child_update: Some((ResolverUpdate::default(), Some(cfg.clone()))),
}
});
child_manager.update(updates.clone(), &mut tcc).unwrap();
child_manager.update(updates.clone(), &mut tcc);

// Confirm that child one has requested work.
match rx_events.recv().await.unwrap() {
Expand All @@ -873,7 +896,7 @@ mod test {
// Now have both children request work.
children.lock().unwrap().insert(name2, ());

child_manager.update(updates.clone(), &mut tcc).unwrap();
child_manager.update(updates.clone(), &mut tcc);

// Confirm that both children requested work.
match rx_events.recv().await.unwrap() {
Expand All @@ -888,6 +911,6 @@ mod test {

// Perform one final call to resolver_update which asserts that both
// child policies had their work methods called.
child_manager.update(updates, &mut tcc).unwrap();
child_manager.update(updates, &mut tcc);
}
}
6 changes: 2 additions & 4 deletions grpc/src/client/load_balancing/graceful_switch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct GracefulSwitchLbConfig {
/// to to active and tear down the previously active policy.
#[derive(Debug)]
pub(crate) struct GracefulSwitchPolicy {
child_manager: ChildManager<()>, // Child ID is the name of the child policy.
child_manager: ChildManager<()>, // Child ID empty - only the name of the child LB policy matters.
last_update: Option<LbState>, // Saves the last output LbState to determine if an update is needed.
active_child_builder: Option<Arc<dyn LbPolicyBuilder>>,
}
Expand Down Expand Up @@ -69,9 +69,7 @@ impl LbPolicy for GracefulSwitchPolicy {
});
}

let res = self
.child_manager
.update(children.into_iter(), channel_controller)?;
self.child_manager.update(children, channel_controller);
self.update_picker(channel_controller);
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions grpc/src/client/load_balancing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::client::{
pub(crate) mod child_manager;
pub(crate) mod graceful_switch;
pub(crate) mod pick_first;
pub(crate) mod round_robin;

#[cfg(test)]
pub(crate) mod test_utils;
Expand Down Expand Up @@ -606,11 +607,11 @@ impl Picker for QueuingPicker {
}

#[derive(Debug)]
pub(crate) struct Failing {
pub(crate) struct FailingPicker {
pub error: String,
}

impl Picker for Failing {
impl Picker for FailingPicker {
fn pick(&self, _: &Request) -> PickResult {
PickResult::Fail(Status::unavailable(self.error.clone()))
}
Expand Down
Loading
Loading