diff --git a/controller/src/util/instance_action.rs b/controller/src/util/instance_action.rs index b0d83d254..862cfc340 100644 --- a/controller/src/util/instance_action.rs +++ b/controller/src/util/instance_action.rs @@ -13,7 +13,8 @@ use futures::{StreamExt, TryStreamExt}; use k8s_openapi::api::batch::v1::JobSpec; use k8s_openapi::api::core::v1::{Pod, PodSpec}; use kube::api::{Api, ListParams}; -use kube_runtime::watcher::{watcher, Event}; +use kube_runtime::watcher::{default_backoff, watcher, Event}; +use kube_runtime::WatchStreamExt; use log::{error, info, trace}; use std::collections::HashMap; use std::sync::Arc; @@ -92,12 +93,19 @@ async fn internal_do_instance_watch( ) -> Result<(), Box> { trace!("internal_do_instance_watch - enter"); let resource = Api::::all(kube_interface.get_kube_client()); - let watcher = watcher(resource, ListParams::default()); + let watcher = watcher(resource, ListParams::default()).backoff(default_backoff()); let mut informer = watcher.boxed(); let mut first_event = true; - // Currently, this does not handle None except to break the - // while. - while let Some(event) = informer.try_next().await? { + // Currently, this does not handle None except to break the loop. + loop { + let event = match informer.try_next().await { + Err(e) => { + error!("Error during watch: {}", e); + continue; + } + Ok(None) => break, + Ok(Some(event)) => event, + }; // Aquire lock to ensure cleanup_instance_and_configuration_svcs and the // inner loop handle_instance call in internal_do_instance_watch // cannot execute at the same time. diff --git a/controller/src/util/node_watcher.rs b/controller/src/util/node_watcher.rs index 9da65a68a..98079dcab 100644 --- a/controller/src/util/node_watcher.rs +++ b/controller/src/util/node_watcher.rs @@ -9,8 +9,9 @@ use akri_shared::{ use futures::{StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::{Node, NodeStatus}; use kube::api::{Api, ListParams}; -use kube_runtime::watcher::{watcher, Event}; -use log::{info, trace}; +use kube_runtime::watcher::{default_backoff, watcher, Event}; +use kube_runtime::WatchStreamExt; +use log::{error, info, trace}; use std::collections::HashMap; /// Node states that NodeWatcher is interested in @@ -55,13 +56,20 @@ impl NodeWatcher { trace!("watch - enter"); let kube_interface = k8s::KubeImpl::new().await?; let resource = Api::::all(kube_interface.get_kube_client()); - let watcher = watcher(resource, ListParams::default()); + let watcher = watcher(resource, ListParams::default()).backoff(default_backoff()); let mut informer = watcher.boxed(); let mut first_event = true; - // Currently, this does not handle None except to break the - // while. - while let Some(event) = informer.try_next().await? { + // Currently, this does not handle None except to break the loop. + loop { + let event = match informer.try_next().await { + Err(e) => { + error!("Error during watch: {}", e); + continue; + } + Ok(None) => break, + Ok(Some(event)) => event, + }; self.handle_node(event, &kube_interface, &mut first_event) .await?; } diff --git a/controller/src/util/pod_watcher.rs b/controller/src/util/pod_watcher.rs index 0ef202ead..829257733 100644 --- a/controller/src/util/pod_watcher.rs +++ b/controller/src/util/pod_watcher.rs @@ -13,8 +13,9 @@ use async_std::sync::Mutex; use futures::{StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::{Pod, ServiceSpec}; use kube::api::{Api, ListParams}; -use kube_runtime::watcher::{watcher, Event}; -use log::{info, trace}; +use kube_runtime::watcher::{default_backoff, watcher, Event}; +use kube_runtime::WatchStreamExt; +use log::{error, info, trace}; use std::{collections::HashMap, sync::Arc}; type PodSlice = [Pod]; @@ -122,28 +123,31 @@ impl BrokerPodWatcher { } /// This watches for broker Pod events - pub async fn watch( - &mut self, - ) -> Result<(), Box> { + pub async fn watch(&mut self) -> anyhow::Result<()> { trace!("watch - enter"); let kube_interface = k8s::KubeImpl::new().await?; let resource = Api::::all(kube_interface.get_kube_client()); let watcher = watcher( resource, ListParams::default().labels(AKRI_CONFIGURATION_LABEL_NAME), - ); + ) + .backoff(default_backoff()); let mut informer = watcher.boxed(); let synchronization = Arc::new(Mutex::new(())); let mut first_event = true; loop { - // Currently, this does not handle None except to break the - // while. - while let Some(event) = informer.try_next().await? { - let _lock = synchronization.lock().await; - self.handle_pod(event, &kube_interface, &mut first_event) - .await?; - } + let event = match informer.try_next().await { + Err(e) => { + error!("Error during watch: {}", e); + continue; + } + Ok(None) => return Err(anyhow::anyhow!("Watch stream ended")), + Ok(Some(event)) => event, + }; + let _lock = synchronization.lock().await; + self.handle_pod(event, &kube_interface, &mut first_event) + .await?; } }