From c819cf941c39cc5afe917115844935271cfdca89 Mon Sep 17 00:00:00 2001 From: Seth Jennings Date: Tue, 21 Apr 2026 10:34:42 -0500 Subject: [PATCH] fix(k8s-driver): use dedicated kube client without read_timeout for watches The 30s read_timeout on the shared kube client was killing the long-lived watch streams during idle periods, causing a reconnect cycle every 30 seconds. Use a separate client with no read_timeout for watch_sandboxes so the streams stay open indefinitely. --- .../openshell-driver-kubernetes/src/driver.rs | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/crates/openshell-driver-kubernetes/src/driver.rs b/crates/openshell-driver-kubernetes/src/driver.rs index f70054805..444e0f55d 100644 --- a/crates/openshell-driver-kubernetes/src/driver.rs +++ b/crates/openshell-driver-kubernetes/src/driver.rs @@ -102,6 +102,7 @@ const WORKSPACE_SENTINEL: &str = ".workspace-initialized"; #[derive(Clone)] pub struct KubernetesComputeDriver { client: Client, + watch_client: Client, config: KubernetesComputeConfig, } @@ -117,17 +118,30 @@ impl std::fmt::Debug for KubernetesComputeDriver { impl KubernetesComputeDriver { pub async fn new(config: KubernetesComputeConfig) -> Result { - let mut kube_config = match kube::Config::incluster() { + let base_config = match kube::Config::incluster() { Ok(c) => c, Err(_) => kube::Config::infer() .await .map_err(kube::Error::InferConfig)?, }; + + let mut kube_config = base_config.clone(); kube_config.connect_timeout = Some(Duration::from_secs(10)); kube_config.read_timeout = Some(Duration::from_secs(30)); kube_config.write_timeout = Some(Duration::from_secs(30)); let client = Client::try_from(kube_config)?; - Ok(Self { client, config }) + + let mut watch_kube_config = base_config; + watch_kube_config.connect_timeout = Some(Duration::from_secs(10)); + watch_kube_config.read_timeout = None; + watch_kube_config.write_timeout = Some(Duration::from_secs(30)); + let watch_client = Client::try_from(watch_kube_config)?; + + Ok(Self { + client, + watch_client, + config, + }) } pub async fn capabilities(&self) -> Result { @@ -155,6 +169,12 @@ impl KubernetesComputeDriver { self.config.ssh_handshake_skew_secs } + fn watch_api(&self) -> Api { + let gvk = GroupVersionKind::gvk(SANDBOX_GROUP, SANDBOX_VERSION, SANDBOX_KIND); + let resource = ApiResource::from_gvk(&gvk); + Api::namespaced_with(self.watch_client.clone(), &self.config.namespace, &resource) + } + fn api(&self) -> Api { let gvk = GroupVersionKind::gvk(SANDBOX_GROUP, SANDBOX_VERSION, SANDBOX_KIND); let resource = ApiResource::from_gvk(&gvk); @@ -392,8 +412,8 @@ impl KubernetesComputeDriver { pub async fn watch_sandboxes(&self) -> Result { let namespace = self.config.namespace.clone(); - let sandbox_api = self.api(); - let event_api: Api = Api::namespaced(self.client.clone(), &namespace); + let sandbox_api = self.watch_api(); + let event_api: Api = Api::namespaced(self.watch_client.clone(), &namespace); let mut sandbox_stream = watcher::watcher(sandbox_api, watcher::Config::default()).boxed(); let mut event_stream = watcher::watcher(event_api, watcher::Config::default()).boxed(); let (tx, rx) = mpsc::channel(256);