Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix early stdio close due to multiple ownership #1655

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 0 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion conmon-rs/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ strum = { version = "0.25.0", features = ["derive"] }
tempfile = "3.8.0"
tokio = { version = "1.32.0", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "time"] }
tokio-eventfd = "0.2.1"
tokio-fd = "0.3.0"
tokio-seqpacket = "0.7.0"
tokio-util = { version = "0.7.8", features = ["compat"] }
tracing = "0.1.37"
Expand Down
10 changes: 3 additions & 7 deletions conmon-rs/server/src/container_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ use nix::errno::Errno;
use std::{
fmt,
marker::Unpin,
os::unix::io::{FromRawFd, RawFd},
path::{Path, PathBuf},
sync::Arc,
};
use strum::AsRefStr;
use tempfile::Builder;
use tokio::{
fs::File,
io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
select,
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
Expand Down Expand Up @@ -310,12 +308,10 @@ impl ContainerIO {
}

pub async fn read_loop_stdin(
fd: RawFd,
mut writer: impl AsyncWrite + Unpin,
mut attach: SharedContainerAttach,
token: CancellationToken,
) -> Result<()> {
let mut writer = unsafe { File::from_raw_fd(fd) };

loop {
// While we're not processing input from a caller, and logically should be able to
// catch a Message::Done here, it doesn't quite work that way.
Expand Down Expand Up @@ -346,7 +342,7 @@ impl ContainerIO {
}
}

async fn handle_stdin_data(data: &[u8], writer: &mut File) -> Result<()> {
async fn handle_stdin_data(data: &[u8], mut writer: impl AsyncWrite + Unpin) -> Result<()> {
debug!("Got {} attach bytes", data.len());

writer
Expand Down
5 changes: 1 addition & 4 deletions conmon-rs/server/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{
};
use anyhow::Result;
use getset::Getters;
use std::os::unix::io::AsRawFd;
use tokio::{
process::{ChildStderr, ChildStdin, ChildStdout},
sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
Expand Down Expand Up @@ -68,9 +67,7 @@ impl Streams {
if let Some(stdin) = stdin {
task::spawn(
async move {
if let Err(e) =
ContainerIO::read_loop_stdin(stdin.as_raw_fd(), attach, token).await
{
if let Err(e) = ContainerIO::read_loop_stdin(stdin, attach, token).await {
error!("Stdin read loop failure: {:#}", e);
}
}
Expand Down
121 changes: 94 additions & 27 deletions conmon-rs/server/src/terminal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,32 @@ use crate::{
container_log::SharedContainerLog,
listener::{DefaultListener, Listener},
};
use anyhow::{bail, format_err, Context, Result};
use anyhow::{format_err, Context as _, Result};
use getset::{Getters, MutGetters, Setters};
use libc::{self, winsize, TIOCSWINSZ};
use nix::sys::termios::{self, OutputFlags, SetArg};
use nix::{
fcntl::{self, FcntlArg, OFlag},
sys::termios::{self, OutputFlags, SetArg},
};
use sendfd::RecvWithFd;
use std::{
convert::TryFrom,
io::{Error as IOError, ErrorKind},
os::unix::{fs::PermissionsExt, io::RawFd},
io::{self, ErrorKind, Read, Write},
os::{
fd::{AsRawFd, FromRawFd, OwnedFd},
unix::{fs::PermissionsExt, io::RawFd},
},
path::PathBuf,
sync::mpsc::Sender as StdSender,
pin::Pin,
sync::{mpsc::Sender as StdSender, Arc, Weak},
task::{ready, Context, Poll},
};
use tokio::{
fs,
io::{AsyncWriteExt, Interest},
io::{unix::AsyncFd, AsyncRead, AsyncWrite, AsyncWriteExt, Interest, ReadBuf},
net::UnixStream,
sync::mpsc::{self, Receiver, Sender, UnboundedReceiver},
task,
};
use tokio_fd::AsyncFd;
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, trace, Instrument};

Expand All @@ -34,13 +40,13 @@ pub struct Terminal {
#[getset(get = "pub")]
path: PathBuf,

connected_rx: Receiver<RawFd>,
connected_rx: Receiver<OwnedFd>,

#[getset(get = "pub", get_mut = "pub")]
message_rx: Option<UnboundedReceiver<Message>>,

#[getset(get, set)]
tty: Option<RawFd>,
tty: Option<Weak<TerminalFd>>,

logger: SharedContainerLog,
attach: SharedContainerAttach,
Expand All @@ -55,7 +61,7 @@ struct Config {
ready_tx: StdSender<()>,

#[get]
connected_tx: Sender<RawFd>,
connected_tx: Sender<OwnedFd>,
}

impl Terminal {
Expand Down Expand Up @@ -102,24 +108,24 @@ impl Terminal {
.recv()
.await
.context("receive connected channel")?;
self.set_tty(fd.into());
let fd = Arc::new(TerminalFd::new(fd)?);
self.set_tty(Arc::downgrade(&fd).into());

debug!("Changing terminal settings");
let mut term = termios::tcgetattr(fd)?;
let mut term = termios::tcgetattr(fd.as_raw_fd())?;
term.output_flags |= OutputFlags::ONLCR;
termios::tcsetattr(fd, SetArg::TCSANOW, &term)?;

let stdio = AsyncFd::try_from(fd)?;
termios::tcsetattr(fd.as_raw_fd(), SetArg::TCSANOW, &term)?;

let attach_clone = self.attach.clone();
let logger_clone = self.logger.clone();
let (message_tx, message_rx) = mpsc::unbounded_channel();
self.message_rx = Some(message_rx);

task::spawn(
task::spawn({
let fd = fd.clone();
async move {
if let Err(e) = ContainerIO::read_loop(
stdio,
&*fd,
Pipe::StdOut,
logger_clone,
message_tx,
Expand All @@ -131,14 +137,14 @@ impl Terminal {
}
Ok::<_, anyhow::Error>(())
}
.instrument(debug_span!("read_loop")),
);
.instrument(debug_span!("read_loop"))
});

if stdin {
let attach_clone = self.attach.clone();
task::spawn(
async move {
if let Err(e) = ContainerIO::read_loop_stdin(fd, attach_clone, token).await {
if let Err(e) = ContainerIO::read_loop_stdin(&*fd, attach_clone, token).await {
error!("Stdin read loop failure: {:#}", e);
}
}
Expand All @@ -158,15 +164,16 @@ impl Terminal {
ws_xpixel: 0,
ws_ypixel: 0,
};
let tty = self.tty().as_ref().and_then(Weak::upgrade);
match unsafe {
libc::ioctl(
self.tty().context("terminal not connected")?,
tty.context("terminal not connected")?.as_raw_fd(),
TIOCSWINSZ,
&ws,
)
} {
0 => Ok(()),
_ => Err(IOError::last_os_error().into()),
_ => Err(io::Error::last_os_error().into()),
}
}

Expand Down Expand Up @@ -202,6 +209,9 @@ impl Terminal {

match stream.recv_with_fd(&mut data_buffer, &mut fd_buffer) {
Ok((_, fd_read)) => {
// take ownership of the received file descriptor (prevents fd leak in case of error)
let fd = (fd_read != 0).then(|| unsafe { OwnedFd::from_raw_fd(fd_buffer[0]) });

// Allow only one single read
let path = config.path();
debug!("Removing socket path {}", path.display());
Expand All @@ -210,13 +220,13 @@ impl Terminal {
debug!("Shutting down receiver stream");
stream.shutdown().await?;

if fd_read == 0 {
if fd.is_none() {
error!("No file descriptor received");
bail!("got no file descriptor");
}

let fd = fd.context("got no file descriptor")?;

debug!("Received terminal file descriptor");
let fd = fd_buffer[0];

config
.connected_tx
Expand Down Expand Up @@ -278,7 +288,7 @@ mod tests {
match stream.send_with_fd(b"test", &[res.master]) {
Ok(_) => break,
Err(ref e) if e.kind() == ErrorKind::WouldBlock => continue,
Err(e) => bail!(e),
Err(e) => anyhow::bail!(e),
}
}
}
Expand All @@ -293,3 +303,60 @@ mod tests {
Ok(())
}
}

#[derive(Debug)]
struct TerminalFd(AsyncFd<std::fs::File>);

impl TerminalFd {
fn new(fd: OwnedFd) -> io::Result<Self> {
let flags = fcntl::fcntl(fd.as_raw_fd(), FcntlArg::F_GETFL)?;
let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
fcntl::fcntl(fd.as_raw_fd(), FcntlArg::F_SETFL(flags))?;
AsyncFd::new(fd.into()).map(Self)
}
}

impl AsRawFd for TerminalFd {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}

impl AsyncRead for &TerminalFd {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
loop {
let mut guard = ready!(self.0.poll_read_ready(cx))?;
match guard.try_io(|inner| inner.get_ref().read(buf.initialize_unfilled())) {
Ok(n) => {
buf.advance(n?);
break Poll::Ready(Ok(()));
}
Err(_would_block) => continue,
}
}
}
}

impl AsyncWrite for &TerminalFd {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
loop {
let mut guard = ready!(self.0.poll_write_ready(cx))?;
match guard.try_io(|inner| inner.get_ref().write(buf)) {
Ok(result) => break Poll::Ready(result),
Err(_would_block) => continue,
}
}
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}