Skip to content

Commit

Permalink
Merge pull request #1655 from mgjm-fork/fix-early-stdio-close
Browse files Browse the repository at this point in the history
Fix early stdio close due to multiple ownership
  • Loading branch information
openshift-merge-robot committed Aug 29, 2023
2 parents 4010dda + 3cc76bf commit b5bdcb8
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 50 deletions.
11 changes: 0 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion conmon-rs/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ strum = { version = "0.25.0", features = ["derive"] }
tempfile = "3.8.0"
tokio = { version = "1.32.0", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "time"] }
tokio-eventfd = "0.2.1"
tokio-fd = "0.3.0"
tokio-seqpacket = "0.7.0"
tokio-util = { version = "0.7.8", features = ["compat"] }
tracing = "0.1.37"
Expand Down
10 changes: 3 additions & 7 deletions conmon-rs/server/src/container_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ use nix::errno::Errno;
use std::{
fmt,
marker::Unpin,
os::unix::io::{FromRawFd, RawFd},
path::{Path, PathBuf},
sync::Arc,
};
use strum::AsRefStr;
use tempfile::Builder;
use tokio::{
fs::File,
io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
select,
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
Expand Down Expand Up @@ -310,12 +308,10 @@ impl ContainerIO {
}

pub async fn read_loop_stdin(
fd: RawFd,
mut writer: impl AsyncWrite + Unpin,
mut attach: SharedContainerAttach,
token: CancellationToken,
) -> Result<()> {
let mut writer = unsafe { File::from_raw_fd(fd) };

loop {
// While we're not processing input from a caller, and logically should be able to
// catch a Message::Done here, it doesn't quite work that way.
Expand Down Expand Up @@ -346,7 +342,7 @@ impl ContainerIO {
}
}

async fn handle_stdin_data(data: &[u8], writer: &mut File) -> Result<()> {
async fn handle_stdin_data(data: &[u8], mut writer: impl AsyncWrite + Unpin) -> Result<()> {
debug!("Got {} attach bytes", data.len());

writer
Expand Down
5 changes: 1 addition & 4 deletions conmon-rs/server/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{
};
use anyhow::Result;
use getset::Getters;
use std::os::unix::io::AsRawFd;
use tokio::{
process::{ChildStderr, ChildStdin, ChildStdout},
sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
Expand Down Expand Up @@ -68,9 +67,7 @@ impl Streams {
if let Some(stdin) = stdin {
task::spawn(
async move {
if let Err(e) =
ContainerIO::read_loop_stdin(stdin.as_raw_fd(), attach, token).await
{
if let Err(e) = ContainerIO::read_loop_stdin(stdin, attach, token).await {
error!("Stdin read loop failure: {:#}", e);
}
}
Expand Down
121 changes: 94 additions & 27 deletions conmon-rs/server/src/terminal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,32 @@ use crate::{
container_log::SharedContainerLog,
listener::{DefaultListener, Listener},
};
use anyhow::{bail, format_err, Context, Result};
use anyhow::{format_err, Context as _, Result};
use getset::{Getters, MutGetters, Setters};
use libc::{self, winsize, TIOCSWINSZ};
use nix::sys::termios::{self, OutputFlags, SetArg};
use nix::{
fcntl::{self, FcntlArg, OFlag},
sys::termios::{self, OutputFlags, SetArg},
};
use sendfd::RecvWithFd;
use std::{
convert::TryFrom,
io::{Error as IOError, ErrorKind},
os::unix::{fs::PermissionsExt, io::RawFd},
io::{self, ErrorKind, Read, Write},
os::{
fd::{AsRawFd, FromRawFd, OwnedFd},
unix::{fs::PermissionsExt, io::RawFd},
},
path::PathBuf,
sync::mpsc::Sender as StdSender,
pin::Pin,
sync::{mpsc::Sender as StdSender, Arc, Weak},
task::{ready, Context, Poll},
};
use tokio::{
fs,
io::{AsyncWriteExt, Interest},
io::{unix::AsyncFd, AsyncRead, AsyncWrite, AsyncWriteExt, Interest, ReadBuf},
net::UnixStream,
sync::mpsc::{self, Receiver, Sender, UnboundedReceiver},
task,
};
use tokio_fd::AsyncFd;
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, trace, Instrument};

Expand All @@ -34,13 +40,13 @@ pub struct Terminal {
#[getset(get = "pub")]
path: PathBuf,

connected_rx: Receiver<RawFd>,
connected_rx: Receiver<OwnedFd>,

#[getset(get = "pub", get_mut = "pub")]
message_rx: Option<UnboundedReceiver<Message>>,

#[getset(get, set)]
tty: Option<RawFd>,
tty: Option<Weak<TerminalFd>>,

logger: SharedContainerLog,
attach: SharedContainerAttach,
Expand All @@ -55,7 +61,7 @@ struct Config {
ready_tx: StdSender<()>,

#[get]
connected_tx: Sender<RawFd>,
connected_tx: Sender<OwnedFd>,
}

impl Terminal {
Expand Down Expand Up @@ -102,24 +108,24 @@ impl Terminal {
.recv()
.await
.context("receive connected channel")?;
self.set_tty(fd.into());
let fd = Arc::new(TerminalFd::new(fd)?);
self.set_tty(Arc::downgrade(&fd).into());

debug!("Changing terminal settings");
let mut term = termios::tcgetattr(fd)?;
let mut term = termios::tcgetattr(fd.as_raw_fd())?;
term.output_flags |= OutputFlags::ONLCR;
termios::tcsetattr(fd, SetArg::TCSANOW, &term)?;

let stdio = AsyncFd::try_from(fd)?;
termios::tcsetattr(fd.as_raw_fd(), SetArg::TCSANOW, &term)?;

let attach_clone = self.attach.clone();
let logger_clone = self.logger.clone();
let (message_tx, message_rx) = mpsc::unbounded_channel();
self.message_rx = Some(message_rx);

task::spawn(
task::spawn({
let fd = fd.clone();
async move {
if let Err(e) = ContainerIO::read_loop(
stdio,
&*fd,
Pipe::StdOut,
logger_clone,
message_tx,
Expand All @@ -131,14 +137,14 @@ impl Terminal {
}
Ok::<_, anyhow::Error>(())
}
.instrument(debug_span!("read_loop")),
);
.instrument(debug_span!("read_loop"))
});

if stdin {
let attach_clone = self.attach.clone();
task::spawn(
async move {
if let Err(e) = ContainerIO::read_loop_stdin(fd, attach_clone, token).await {
if let Err(e) = ContainerIO::read_loop_stdin(&*fd, attach_clone, token).await {
error!("Stdin read loop failure: {:#}", e);
}
}
Expand All @@ -158,15 +164,16 @@ impl Terminal {
ws_xpixel: 0,
ws_ypixel: 0,
};
let tty = self.tty().as_ref().and_then(Weak::upgrade);
match unsafe {
libc::ioctl(
self.tty().context("terminal not connected")?,
tty.context("terminal not connected")?.as_raw_fd(),
TIOCSWINSZ,
&ws,
)
} {
0 => Ok(()),
_ => Err(IOError::last_os_error().into()),
_ => Err(io::Error::last_os_error().into()),
}
}

Expand Down Expand Up @@ -202,6 +209,9 @@ impl Terminal {

match stream.recv_with_fd(&mut data_buffer, &mut fd_buffer) {
Ok((_, fd_read)) => {
// take ownership of the received file descriptor (prevents fd leak in case of error)
let fd = (fd_read != 0).then(|| unsafe { OwnedFd::from_raw_fd(fd_buffer[0]) });

// Allow only one single read
let path = config.path();
debug!("Removing socket path {}", path.display());
Expand All @@ -210,13 +220,13 @@ impl Terminal {
debug!("Shutting down receiver stream");
stream.shutdown().await?;

if fd_read == 0 {
if fd.is_none() {
error!("No file descriptor received");
bail!("got no file descriptor");
}

let fd = fd.context("got no file descriptor")?;

debug!("Received terminal file descriptor");
let fd = fd_buffer[0];

config
.connected_tx
Expand Down Expand Up @@ -278,7 +288,7 @@ mod tests {
match stream.send_with_fd(b"test", &[res.master]) {
Ok(_) => break,
Err(ref e) if e.kind() == ErrorKind::WouldBlock => continue,
Err(e) => bail!(e),
Err(e) => anyhow::bail!(e),
}
}
}
Expand All @@ -293,3 +303,60 @@ mod tests {
Ok(())
}
}

#[derive(Debug)]
struct TerminalFd(AsyncFd<std::fs::File>);

impl TerminalFd {
fn new(fd: OwnedFd) -> io::Result<Self> {
let flags = fcntl::fcntl(fd.as_raw_fd(), FcntlArg::F_GETFL)?;
let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
fcntl::fcntl(fd.as_raw_fd(), FcntlArg::F_SETFL(flags))?;
AsyncFd::new(fd.into()).map(Self)
}
}

impl AsRawFd for TerminalFd {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}

impl AsyncRead for &TerminalFd {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
loop {
let mut guard = ready!(self.0.poll_read_ready(cx))?;
match guard.try_io(|inner| inner.get_ref().read(buf.initialize_unfilled())) {
Ok(n) => {
buf.advance(n?);
break Poll::Ready(Ok(()));
}
Err(_would_block) => continue,
}
}
}
}

impl AsyncWrite for &TerminalFd {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
loop {
let mut guard = ready!(self.0.poll_write_ready(cx))?;
match guard.try_io(|inner| inner.get_ref().write(buf)) {
Ok(result) => break Poll::Ready(result),
Err(_would_block) => continue,
}
}
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

0 comments on commit b5bdcb8

Please sign in to comment.