Skip to content

Commit

Permalink
fix(cubestore): Fix error: Internal: channel closed on the next reque…
Browse files Browse the repository at this point in the history
…st after cubestore cloud process got OOM (#5238)

* in work

* fix(cubestore): Fix error: Internal: channel closed on the next request after cubestore cloud process got OOM
  • Loading branch information
waralexrom committed Sep 22, 2022
1 parent 7626ed5 commit cb81fdb
Showing 1 changed file with 61 additions and 9 deletions.
70 changes: 61 additions & 9 deletions rust/cubestore/cubestore/src/cluster/worker_pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::Debug;
use std::marker::PhantomData;
use std::panic;
use std::process::Child;
use std::process::{Child, ExitStatus};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -111,6 +111,35 @@ impl<
}
}

struct ProcessHandleGuard {
handle: Child,
}

impl ProcessHandleGuard {
pub fn new(handle: Child) -> Self {
Self { handle }
}
pub fn try_wait(&mut self) -> std::io::Result<Option<ExitStatus>> {
self.handle.try_wait()
}
pub fn is_alive(&mut self) -> bool {
self.handle.try_wait().map_or(false, |r| r.is_none())
}
pub fn kill(&mut self) {
if let Err(e) = self.handle.kill() {
error!("Error during kill: {:?}", e);
}
}
}

impl Drop for ProcessHandleGuard {
fn drop(&mut self) {
if self.is_alive() {
self.kill();
}
}
}

pub struct WorkerProcess<
T: Debug + Serialize + DeserializeOwned + Sync + Send + 'static,
R: Serialize + DeserializeOwned + Sync + Send + 'static,
Expand Down Expand Up @@ -151,8 +180,8 @@ impl<
let process = self.spawn_process();

match process {
Ok((mut args_tx, mut res_rx, mut handle)) => {
scopeguard::defer!(<WorkerProcess<T, R, P>>::kill(&mut handle));
Ok((mut args_tx, mut res_rx, handle)) => {
let mut handle_guard = ProcessHandleGuard::new(handle);
loop {
let mut stopped_rx = self.stopped_rx.write().await;
let Message {
Expand All @@ -172,6 +201,35 @@ impl<
message
}
};
//Check if child process is killed
match handle_guard.try_wait() {
Ok(Some(_)) => {
error!(
"Worker process is killed, reshedule message in another process"
);
self.queue.push(Message {
message,
sender,
span,
dispatcher,
});
break;
}
Ok(None) => {}
Err(_) => {
error!(
"Can't read worker process status, reshedule message in another process"
);
self.queue.push(Message {
message,
sender,
span,
dispatcher,
});
break;
}
}

let process_message_res_timeout = tokio::time::timeout(
self.timeout,
self.process_message(message, args_tx, res_rx),
Expand Down Expand Up @@ -214,12 +272,6 @@ impl<
}
}

fn kill(handle: &mut Child) {
if let Err(e) = handle.kill() {
error!("Error during kill: {:?}", e);
}
}

#[instrument(level = "trace", skip(self, message, args_tx, res_rx))]
async fn process_message(
&self,
Expand Down

0 comments on commit cb81fdb

Please sign in to comment.