Skip to content

Commit

Permalink
controller: Retry when watch errors occurs
Browse files Browse the repository at this point in the history
According to Kube-rs documentation, when a watch error occurs (e.g
because of the kube API being currently unavailable), the `Watcher`
still forwards the error, but will retry if the receiver continues
to wait for a new event.

This commit changes how the `Watcher` is used in order to handle those
errors and let the `Watcher` retry. A backoff mechanism is also setup so
we don't end up retrying forever.

Signed-off-by: Nicolas Belouin <nicolas.belouin@suse.com>
  • Loading branch information
diconico07 committed Mar 9, 2023
1 parent 179aeff commit 846662e
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 24 deletions.
18 changes: 13 additions & 5 deletions controller/src/util/instance_action.rs
Expand Up @@ -13,7 +13,8 @@ use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::batch::v1::JobSpec;
use k8s_openapi::api::core::v1::{Pod, PodSpec};
use kube::api::{Api, ListParams};
use kube_runtime::watcher::{watcher, Event};
use kube_runtime::watcher::{default_backoff, watcher, Event};
use kube_runtime::WatchStreamExt;
use log::{error, info, trace};
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -92,12 +93,19 @@ async fn internal_do_instance_watch(
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
trace!("internal_do_instance_watch - enter");
let resource = Api::<Instance>::all(kube_interface.get_kube_client());
let watcher = watcher(resource, ListParams::default());
let watcher = watcher(resource, ListParams::default()).backoff(default_backoff());
let mut informer = watcher.boxed();
let mut first_event = true;
// Currently, this does not handle None except to break the
// while.
while let Some(event) = informer.try_next().await? {
// Currently, this does not handle None except to break the loop.
loop {
let event = match informer.try_next().await {
Err(e) => {
error!("Error during watch: {}", e);
continue;
}
Ok(None) => break,
Ok(Some(event)) => event,
};
// Aquire lock to ensure cleanup_instance_and_configuration_svcs and the
// inner loop handle_instance call in internal_do_instance_watch
// cannot execute at the same time.
Expand Down
20 changes: 14 additions & 6 deletions controller/src/util/node_watcher.rs
Expand Up @@ -9,8 +9,9 @@ use akri_shared::{
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::{Node, NodeStatus};
use kube::api::{Api, ListParams};
use kube_runtime::watcher::{watcher, Event};
use log::{info, trace};
use kube_runtime::watcher::{default_backoff, watcher, Event};
use kube_runtime::WatchStreamExt;
use log::{error, info, trace};
use std::collections::HashMap;

/// Node states that NodeWatcher is interested in
Expand Down Expand Up @@ -55,13 +56,20 @@ impl NodeWatcher {
trace!("watch - enter");
let kube_interface = k8s::KubeImpl::new().await?;
let resource = Api::<Node>::all(kube_interface.get_kube_client());
let watcher = watcher(resource, ListParams::default());
let watcher = watcher(resource, ListParams::default()).backoff(default_backoff());
let mut informer = watcher.boxed();
let mut first_event = true;

// Currently, this does not handle None except to break the
// while.
while let Some(event) = informer.try_next().await? {
// Currently, this does not handle None except to break the loop.
loop {
let event = match informer.try_next().await {
Err(e) => {
error!("Error during watch: {}", e);
continue;
}
Ok(None) => break,
Ok(Some(event)) => event,
};
self.handle_node(event, &kube_interface, &mut first_event)
.await?;
}
Expand Down
30 changes: 17 additions & 13 deletions controller/src/util/pod_watcher.rs
Expand Up @@ -13,8 +13,9 @@ use async_std::sync::Mutex;
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::{Pod, ServiceSpec};
use kube::api::{Api, ListParams};
use kube_runtime::watcher::{watcher, Event};
use log::{info, trace};
use kube_runtime::watcher::{default_backoff, watcher, Event};
use kube_runtime::WatchStreamExt;
use log::{error, info, trace};
use std::{collections::HashMap, sync::Arc};

type PodSlice = [Pod];
Expand Down Expand Up @@ -122,28 +123,31 @@ impl BrokerPodWatcher {
}

/// This watches for broker Pod events
pub async fn watch(
&mut self,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
pub async fn watch(&mut self) -> anyhow::Result<()> {
trace!("watch - enter");
let kube_interface = k8s::KubeImpl::new().await?;
let resource = Api::<Pod>::all(kube_interface.get_kube_client());
let watcher = watcher(
resource,
ListParams::default().labels(AKRI_CONFIGURATION_LABEL_NAME),
);
)
.backoff(default_backoff());
let mut informer = watcher.boxed();
let synchronization = Arc::new(Mutex::new(()));
let mut first_event = true;

loop {
// Currently, this does not handle None except to break the
// while.
while let Some(event) = informer.try_next().await? {
let _lock = synchronization.lock().await;
self.handle_pod(event, &kube_interface, &mut first_event)
.await?;
}
let event = match informer.try_next().await {
Err(e) => {
error!("Error during watch: {}", e);
continue;
}
Ok(None) => return Err(anyhow::anyhow!("Watch stream ended")),
Ok(Some(event)) => event,
};
let _lock = synchronization.lock().await;
self.handle_pod(event, &kube_interface, &mut first_event)
.await?;
}
}

Expand Down

0 comments on commit 846662e

Please sign in to comment.