Skip to content

Commit

Permalink
shim: state should be Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzzzzzzy9 committed Jun 25, 2024
1 parent 47f2665 commit eb3c246
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
11 changes: 5 additions & 6 deletions crates/runc-shim/src/processes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ pub trait ProcessLifecycle<P: Process> {
}

pub struct ProcessTemplate<S> {
// TODO: state should be Mutex
pub state: Status,
pub state: tokio::sync::Mutex<Status>,
pub id: String,
pub stdio: Stdio,
pub pid: i32,
Expand All @@ -88,7 +87,7 @@ pub struct ProcessTemplate<S> {
impl<S> ProcessTemplate<S> {
pub fn new(id: &str, stdio: Stdio, lifecycle: S) -> Self {
Self {
state: Status::CREATED,
state: tokio::sync::Mutex::new(Status::CREATED),
id: id.to_string(),
stdio,
pid: 0,
Expand All @@ -113,7 +112,7 @@ where
}

async fn set_exited(&mut self, exit_code: i32) {
self.state = Status::STOPPED;
*self.state.lock().await = Status::STOPPED;
self.exit_code = exit_code;
self.exited_at = Some(OffsetDateTime::now_utc());
// set wait_chan_tx to empty, to trigger the drop of the initialized Receiver.
Expand All @@ -127,7 +126,7 @@ where
async fn state(&self) -> Result<StateResponse> {
let mut resp = StateResponse::new();
resp.id = self.id.to_string();
resp.set_status(self.state);
resp.set_status(*self.state.lock().await);
resp.pid = self.pid as u32;
resp.terminal = self.stdio.terminal;
resp.stdin = self.stdio.stdin.to_string();
Expand All @@ -153,7 +152,7 @@ where

async fn wait_channel(&mut self) -> Result<Receiver<()>> {
let (tx, rx) = channel::<()>();
if self.state != Status::STOPPED {
if *self.state.lock().await != Status::STOPPED {
self.wait_chan_tx.push(tx);
}
Ok(rx)
Expand Down
20 changes: 11 additions & 9 deletions crates/runc-shim/src/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl ProcessFactory<ExecProcess> for RuncExecFactory {
async fn create(&self, req: &ExecProcessRequest) -> Result<ExecProcess> {
let p = get_spec_from_request(req)?;
Ok(ExecProcess {
state: Status::CREATED,
state: tokio::sync::Mutex::new(Status::CREATED),
id: req.exec_id.to_string(),
stdio: Stdio {
stdin: req.stdin.to_string(),
Expand Down Expand Up @@ -264,7 +264,7 @@ impl ProcessLifecycle<InitProcess> for RuncInitLifecycle {
if let Err(e) = self.runtime.start(p.id.as_str()).await {
return Err(runtime_error(&p.lifecycle.bundle, e, "OCI runtime start failed").await);
}
p.state = Status::RUNNING;
*p.state.lock().await = Status::RUNNING;
Ok(())
}

Expand Down Expand Up @@ -352,16 +352,17 @@ impl ProcessLifecycle<InitProcess> for RuncInitLifecycle {

#[cfg(target_os = "linux")]
async fn pause(&self, p: &mut InitProcess) -> Result<()> {
match p.state {
let mut state = p.state.lock().await;
match *state {
Status::UNKNOWN => Err(other!("cannot pause an unknown process")),
Status::CREATED => Err(other!("cannot pause a created process")),
Status::RUNNING => {
p.state = Status::PAUSING;
*state = Status::PAUSING;
if let Err(e) = self.runtime.pause(p.id.as_str()).await {
p.state = Status::RUNNING;
*state = Status::RUNNING;
return Err(runtime_error(&self.bundle, e, "OCI runtime pause failed").await);
}
p.state = Status::PAUSED;
*state = Status::PAUSED;
Ok(())
}
Status::STOPPED => Err(other!("cannot pause a stopped process")),
Expand All @@ -377,7 +378,8 @@ impl ProcessLifecycle<InitProcess> for RuncInitLifecycle {

#[cfg(target_os = "linux")]
async fn resume(&self, p: &mut InitProcess) -> Result<()> {
match p.state {
let mut state = p.state.lock().await;
match *state {
Status::UNKNOWN => Err(other!("cannot resume an unknown process")),
Status::CREATED => Err(other!("cannot resume a created process")),
Status::RUNNING => Err(other!("cannot resume a running process")),
Expand All @@ -386,7 +388,7 @@ impl ProcessLifecycle<InitProcess> for RuncInitLifecycle {
if let Err(e) = self.runtime.resume(p.id.as_str()).await {
return Err(runtime_error(&self.bundle, e, "OCI runtime pause failed").await);
}
p.state = Status::RUNNING;
*state = Status::RUNNING;
Ok(())
}
Status::PAUSING => Err(other!("cannot resume a pausing process")),
Expand Down Expand Up @@ -477,7 +479,7 @@ impl ProcessLifecycle<ExecProcess> for RuncExecLifecycle {
copy_io_or_console(p, socket, pio, p.lifecycle.exit_signal.clone()).await?;
let pid = read_file_to_str(pid_path).await?.parse::<i32>()?;
p.pid = pid;
p.state = Status::RUNNING;
*p.state.lock().await = Status::RUNNING;
Ok(())
}

Expand Down

0 comments on commit eb3c246

Please sign in to comment.