From 3de4bc3d150c4fc888daf492853baba232da56f6 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Tue, 19 Sep 2023 13:29:42 -0400 Subject: [PATCH] Port fully to rustix This drops out our `nix` dependency entirely. We use io-lifetimes here and this also leaves the only `unsafe` code to be the bit that sets up `prctl`. Signed-off-by: Colin Walters --- Cargo.toml | 2 +- src/imageproxy.rs | 91 ++++++++++++++++++++--------------------------- 2 files changed, 39 insertions(+), 54 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e719af4..eacf040 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ nix = "0.26" oci-spec = "0.5.5" once_cell = "1.9.0" libc = "0.2" -rustix = { version = "0.37", features = ["process"] } +rustix = { version = "0.38", features = ["process", "net"] } serde = { features = ["derive"], version = "1.0.125" } serde_json = "1.0.64" semver = "1.0.4" diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 8d02717..a34dc99 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -8,13 +8,12 @@ use anyhow::{anyhow, Context, Result}; use cap_std_ext::prelude::CapStdExtCommandExt; use cap_std_ext::{cap_std, cap_tempfile}; use futures_util::Future; -use nix::sys::socket::{self as nixsocket, ControlMessageOwned}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use std::fs::File; use std::ops::Range; -use std::os::unix::io::AsRawFd; -use std::os::unix::prelude::{CommandExt, FromRawFd, RawFd}; +use std::os::fd::OwnedFd; +use std::os::unix::prelude::CommandExt; use std::path::PathBuf; use std::pin::Pin; use std::process::{Command, Stdio}; @@ -86,7 +85,7 @@ type ChildFuture = Pin< /// Manage a child process proxy to fetch container images. pub struct ImageProxy { - sockfd: Arc>, + sockfd: Arc>, childwait: Arc>, protover: semver::Version, } @@ -101,30 +100,6 @@ impl std::fmt::Debug for ImageProxy { #[derive(Debug, PartialEq, Eq)] pub struct OpenedImage(u32); -#[allow(unsafe_code)] -fn new_seqpacket_pair() -> Result<(File, File)> { - let (mysock, theirsock) = nixsocket::socketpair( - nixsocket::AddressFamily::Unix, - nixsocket::SockType::SeqPacket, - None, - nixsocket::SockFlag::SOCK_CLOEXEC, - )?; - // Convert to owned values - let mysock = unsafe { std::fs::File::from_raw_fd(mysock) }; - let theirsock = unsafe { std::fs::File::from_raw_fd(theirsock) }; - Ok((mysock, theirsock)) -} - -#[allow(unsafe_code)] -fn file_from_scm_rights(cmsg: ControlMessageOwned) -> Option { - if let nixsocket::ControlMessageOwned::ScmRights(fds) = cmsg { - fds.first() - .map(|&fd| unsafe { std::fs::File::from_raw_fd(fd) }) - } else { - None - } -} - /// Configuration for the proxy. #[derive(Debug, Default)] pub struct ImageProxyConfig { @@ -265,7 +240,12 @@ impl ImageProxy { #[instrument] pub async fn new_with_config(config: ImageProxyConfig) -> Result { let mut c = Command::try_from(config)?; - let (mysock, theirsock) = new_seqpacket_pair()?; + let (mysock, theirsock) = rustix::net::socketpair( + rustix::net::AddressFamily::UNIX, + rustix::net::SocketType::SEQPACKET, + rustix::net::SocketFlags::CLOEXEC, + None, + )?; c.stdin(Stdio::from(theirsock)); let child = c.spawn().context("Failed to spawn skopeo")?; tracing::debug!("Spawned skopeo pid={:?}", child.id()); @@ -301,36 +281,39 @@ impl ImageProxy { } async fn impl_request_raw( - sockfd: Arc>, + sockfd: Arc>, req: Request, - ) -> Result<(T, Option<(File, u32)>)> { + ) -> Result<(T, Option<(OwnedFd, u32)>)> { tracing::trace!("sending request {}", req.method.as_str()); // TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio let r = tokio::task::spawn_blocking(move || { let sockfd = sockfd.lock().unwrap(); let sendbuf = serde_json::to_vec(&req)?; - nixsocket::send(sockfd.as_raw_fd(), &sendbuf, nixsocket::MsgFlags::empty())?; + let sockfd = &*sockfd; + rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?; drop(sendbuf); let mut buf = [0u8; MAX_MSG_SIZE]; - let mut cmsg_buffer = nix::cmsg_space!([RawFd; 1]); + let mut cmsg_space = vec![0; rustix::cmsg_space!(ScmRights(1))]; + let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(&mut cmsg_space); let iov = std::io::IoSliceMut::new(buf.as_mut()); let mut iov = [iov]; - let r = nixsocket::recvmsg::<()>( - sockfd.as_raw_fd(), + let nread = rustix::net::recvmsg( + sockfd, &mut iov, - Some(&mut cmsg_buffer), - nixsocket::MsgFlags::MSG_CMSG_CLOEXEC, - )?; - // SAFETY: We provided a buffer - let iov = r.iovs().next().unwrap(); - let mut fdret: Option = None; - for cmsg in r.cmsgs() { - if let Some(f) = file_from_scm_rights(cmsg) { - fdret = Some(f); - break; - } - } - let reply: Reply = serde_json::from_slice(iov).context("Deserializing reply")?; + &mut cmsg_buffer, + rustix::net::RecvFlags::CMSG_CLOEXEC, + )? + .bytes; + let fdret = cmsg_buffer + .drain() + .filter_map(|m| match m { + rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f), + _ => None, + }) + .flatten() + .next(); + let buf = &buf[..nread]; + let reply: Reply = serde_json::from_slice(buf).context("Deserializing reply")?; if !reply.success { return Err(anyhow!("remote error: {}", reply.error)); } @@ -361,7 +344,7 @@ impl ImageProxy { &self, method: &str, args: T, - ) -> Result<(R, Option<(File, u32)>)> + ) -> Result<(R, Option<(OwnedFd, u32)>)> where T: IntoIterator, I: Into, @@ -420,9 +403,10 @@ impl ImageProxy { Ok(r) } - async fn read_all_fd(&self, fd: Option<(File, u32)>) -> Result> { + async fn read_all_fd(&self, fd: Option<(OwnedFd, u32)>) -> Result> { let (fd, pipeid) = fd.ok_or_else(|| anyhow!("Missing fd from reply"))?; - let mut fd = tokio::io::BufReader::new(tokio::fs::File::from_std(fd)); + let fd = tokio::fs::File::from_std(std::fs::File::from(fd)); + let mut fd = tokio::io::BufReader::new(fd); let mut r = Vec::new(); let reader = fd.read_to_end(&mut r); let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipeid)); @@ -491,7 +475,8 @@ impl ImageProxy { vec![img.0.into(), digest.to_string().into(), size.into()]; let (_bloblen, fd) = self.impl_request::("GetBlob", args).await?; let (fd, pipeid) = fd.ok_or_else(|| anyhow!("Missing fd from reply"))?; - let fd = tokio::io::BufReader::new(tokio::fs::File::from_std(fd)); + let fd = tokio::fs::File::from_std(std::fs::File::from(fd)); + let fd = tokio::io::BufReader::new(fd); let finish = Box::pin(self.finish_pipe(pipeid)); Ok((fd, finish)) } @@ -519,7 +504,7 @@ impl ImageProxy { let sendbuf = serde_json::to_vec(&req)?; // SAFETY: Only panics if a worker thread already panic'd let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap(); - nixsocket::send(sockfd.as_raw_fd(), &sendbuf, nixsocket::MsgFlags::empty())?; + rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?; drop(sendbuf); tracing::debug!("sent shutdown request"); let mut childwait = self.childwait.lock().await;