Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions crates/runc-shim/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ use std::{

use containerd_shim::{
api::{ExecProcessRequest, Options},
io_error, other, other_error,
util::IntoOption,
Error,
io_error, other, other_error, Error,
};
use log::{debug, warn};
use nix::{
Expand All @@ -44,7 +42,8 @@ use nix::{
};
use oci_spec::runtime::{LinuxNamespaceType, Spec};
use runc::{
io::{Io, NullIo, FIFO},
io::{IOOption, Io, NullIo},
PipedIo,
options::GlobalOpts,
Runc, Spawner,
};
Expand Down Expand Up @@ -77,8 +76,8 @@ pub struct ProcessIO {

pub fn create_io(
id: &str,
_io_uid: u32,
_io_gid: u32,
io_uid: u32,
io_gid: u32,
stdio: &Stdio,
) -> containerd_shim::Result<ProcessIO> {
let mut pio = ProcessIO::default();
Expand All @@ -101,19 +100,25 @@ pub fn create_io(

if scheme == FIFO_SCHEME {
debug!(
"create named pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
"create pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
id,
stdio.stdin.as_str(),
stdio.stdout.as_str(),
stdio.stderr.as_str()
);
let io = FIFO {
stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),

if stdio.stdin.is_empty() {
debug!("stdin is empty");
}
let opts = IOOption {
open_stdin: !stdio.stdin.is_empty(),
open_stdout: !stdio.stdout.is_empty(),
open_stderr: !stdio.stderr.is_empty(),
};
let io = PipedIo::new(io_uid, io_gid, &opts).unwrap();
pio.copy = true;

pio.io = Some(Arc::new(io));
pio.copy = false;
}
Ok(pio)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/runc-shim/src/processes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tokio::{
sync::oneshot::{channel, Receiver, Sender},
};

use crate::io::Stdio;
use crate::{common::ProcessIO, io::Stdio};

#[allow(dead_code)]
#[async_trait]
Expand Down Expand Up @@ -77,6 +77,7 @@ pub struct ProcessTemplate<S> {
pub state: Status,
pub id: String,
pub stdio: Stdio,
pub io: Option<Arc<ProcessIO>>,
pub pid: i32,
pub exit_code: i32,
pub exited_at: Option<OffsetDateTime>,
Expand All @@ -92,6 +93,7 @@ impl<S> ProcessTemplate<S> {
state: Status::CREATED,
id: id.to_string(),
stdio,
io: None,
pid: 0,
exit_code: 0,
exited_at: None,
Expand Down
33 changes: 28 additions & 5 deletions crates/runc-shim/src/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ impl RuncFactory {
(Some(s), None)
} else {
let pio = create_io(&id, opts.io_uid, opts.io_gid, stdio)?;
create_opts.io = pio.io.as_ref().cloned();
(None, Some(pio))
let ref_pio = Arc::new(pio);
create_opts.io = ref_pio.io.clone();
init.io = Some(ref_pio.clone());
(None, Some(ref_pio))
};

let resp = init
Expand All @@ -178,6 +180,22 @@ impl RuncFactory {
}
return Err(runtime_error(bundle, e, "OCI runtime create failed").await);
}
if !init.stdio.stdin.is_empty() {
let stdin_clone = init.stdio.stdin.clone();
let stdin_w = init.stdin.clone();
// Open the write side in advance to make sure read side will not block,
// open it in another thread otherwise it will block too.
tokio::spawn(async move {
if let Ok(stdin_w_file) = OpenOptions::new()
.write(true)
.open(stdin_clone.as_str())
.await
{
let mut lock_guard = stdin_w.lock().unwrap();
*lock_guard = Some(stdin_w_file);
}
});
}
copy_io_or_console(init, socket, pio, init.lifecycle.exit_signal.clone()).await?;
let pid = read_file_to_str(pid_path).await?.parse::<i32>()?;
init.pid = pid;
Expand Down Expand Up @@ -232,6 +250,7 @@ impl ProcessFactory<ExecProcess> for RuncExecFactory {
stderr: req.stderr.to_string(),
terminal: req.terminal,
},
io: None,
pid: 0,
exit_code: 0,
exited_at: None,
Expand Down Expand Up @@ -299,6 +318,7 @@ impl ProcessLifecycle<InitProcess> for RuncInitLifecycle {
);
}
}

self.exit_signal.signal();
Ok(())
}
Expand Down Expand Up @@ -434,8 +454,10 @@ impl ProcessLifecycle<ExecProcess> for RuncExecLifecycle {
(Some(s), None)
} else {
let pio = create_io(&p.id, self.io_uid, self.io_gid, &p.stdio)?;
exec_opts.io = pio.io.as_ref().cloned();
(None, Some(pio))
let ref_pio = Arc::new(pio);
exec_opts.io = ref_pio.io.clone();
p.io = Some(ref_pio.clone());
(None, Some(ref_pio))
};
//TODO checkpoint support
let exec_result = self
Expand Down Expand Up @@ -698,7 +720,7 @@ where
async fn copy_io_or_console<P>(
p: &mut ProcessTemplate<P>,
socket: Option<ConsoleSocket>,
pio: Option<ProcessIO>,
pio: Option<Arc<ProcessIO>>,
exit_signal: Arc<ExitSignal>,
) -> Result<()> {
if p.stdio.terminal {
Expand Down Expand Up @@ -736,6 +758,7 @@ impl Spawner for ShimExecutor {
}
};
let pid = child.id().unwrap();

let (stdout, stderr, exit_code) = tokio::join!(
read_std(child.stdout),
read_std(child.stderr),
Expand Down
Loading