Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions src/imageproxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::fs::File;
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::{FromRawFd, RawFd};
use std::pin::Pin;
use std::process::Stdio;
use std::process::{ExitStatus, Stdio};
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufRead, AsyncReadExt};

Expand Down Expand Up @@ -69,9 +69,9 @@ type JoinFuture<T> = Pin<Box<dyn Future<Output = Result<Result<T>>>>>;

/// Manage a child process proxy to fetch container images.
pub struct ImageProxy {
proc: tokio::process::Child,
sockfd: Arc<Mutex<File>>,
stderr: JoinFuture<String>,
procwait: Pin<Box<dyn Future<Output = Result<ExitStatus>>>>,
}

impl std::fmt::Debug for ImageProxy {
Expand Down Expand Up @@ -130,12 +130,22 @@ impl ImageProxy {
.map_err(anyhow::Error::msg)
.boxed();

let mut procwait = Box::pin(async move { proc.wait().map_err(anyhow::Error::msg).await });

let sockfd = Arc::new(Mutex::new(mysock));

// Verify semantic version
let (protover, _) =
Self::impl_request_raw::<String>(Arc::clone(&sockfd), Request::new_bare("Initialize"))
.await?;
let protoreq =
Self::impl_request_raw::<String>(Arc::clone(&sockfd), Request::new_bare("Initialize"));
let protover = tokio::select! {
r = protoreq => {
r?.0
}
r = &mut procwait => {
let errmsg = stderr.await??;
return Err(anyhow!("skopeo exited unexpectedly (no support for `experimental-image-proxy`?): {}\n{}", r?, errmsg));
}
};
let protover = semver::Version::parse(protover.as_str())?;
let supported = &*SUPPORTED_PROTO_VERSION;
if !supported.matches(&protover) {
Expand All @@ -147,9 +157,9 @@ impl ImageProxy {
}

let r = Self {
proc,
stderr,
sockfd,
procwait,
};
Ok(r)
}
Expand Down Expand Up @@ -276,14 +286,14 @@ impl ImageProxy {
}

/// Close the connection and wait for the child process to exit successfully.
pub async fn finalize(mut self) -> Result<()> {
pub async fn finalize(self) -> Result<()> {
let req = Request::new_bare("Shutdown");
let sendbuf = serde_json::to_vec(&req)?;
// SAFETY: Only panics if a worker thread already panic'd
let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
nixsocket::send(sockfd.as_raw_fd(), &sendbuf, nixsocket::MsgFlags::empty())?;
drop(sendbuf);
let status = self.proc.wait().await?;
let status = self.procwait.await?;
if !status.success() {
if let Some(stderr) = self.stderr.await.map(|v| v.ok()).ok().flatten() {
anyhow::bail!("proxy failed: {}\n{}", status, stderr)
Expand Down