Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions crates/openshell-driver-kubernetes/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ const WORKSPACE_SENTINEL: &str = ".workspace-initialized";
#[derive(Clone)]
pub struct KubernetesComputeDriver {
client: Client,
watch_client: Client,
config: KubernetesComputeConfig,
}

Expand All @@ -117,17 +118,30 @@ impl std::fmt::Debug for KubernetesComputeDriver {

impl KubernetesComputeDriver {
pub async fn new(config: KubernetesComputeConfig) -> Result<Self, KubeError> {
let mut kube_config = match kube::Config::incluster() {
let base_config = match kube::Config::incluster() {
Ok(c) => c,
Err(_) => kube::Config::infer()
.await
.map_err(kube::Error::InferConfig)?,
};

let mut kube_config = base_config.clone();
kube_config.connect_timeout = Some(Duration::from_secs(10));
kube_config.read_timeout = Some(Duration::from_secs(30));
kube_config.write_timeout = Some(Duration::from_secs(30));
let client = Client::try_from(kube_config)?;
Ok(Self { client, config })

let mut watch_kube_config = base_config;
watch_kube_config.connect_timeout = Some(Duration::from_secs(10));
watch_kube_config.read_timeout = None;
watch_kube_config.write_timeout = Some(Duration::from_secs(30));
let watch_client = Client::try_from(watch_kube_config)?;

Ok(Self {
client,
watch_client,
config,
})
}

pub async fn capabilities(&self) -> Result<GetCapabilitiesResponse, String> {
Expand Down Expand Up @@ -155,6 +169,12 @@ impl KubernetesComputeDriver {
self.config.ssh_handshake_skew_secs
}

fn watch_api(&self) -> Api<DynamicObject> {
let gvk = GroupVersionKind::gvk(SANDBOX_GROUP, SANDBOX_VERSION, SANDBOX_KIND);
let resource = ApiResource::from_gvk(&gvk);
Api::namespaced_with(self.watch_client.clone(), &self.config.namespace, &resource)
}

fn api(&self) -> Api<DynamicObject> {
let gvk = GroupVersionKind::gvk(SANDBOX_GROUP, SANDBOX_VERSION, SANDBOX_KIND);
let resource = ApiResource::from_gvk(&gvk);
Expand Down Expand Up @@ -392,8 +412,8 @@ impl KubernetesComputeDriver {

pub async fn watch_sandboxes(&self) -> Result<WatchStream, String> {
let namespace = self.config.namespace.clone();
let sandbox_api = self.api();
let event_api: Api<KubeEventObj> = Api::namespaced(self.client.clone(), &namespace);
let sandbox_api = self.watch_api();
let event_api: Api<KubeEventObj> = Api::namespaced(self.watch_client.clone(), &namespace);
let mut sandbox_stream = watcher::watcher(sandbox_api, watcher::Config::default()).boxed();
let mut event_stream = watcher::watcher(event_api, watcher::Config::default()).boxed();
let (tx, rx) = mpsc::channel(256);
Expand Down
Loading