Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ license = "MIT OR Apache-2.0"
name = "containers-image-proxy"
readme = "README.md"
repository = "https://github.com/containers/containers-image-proxy-rs"
version = "0.6.0"
version = "0.7.0"
rust-version = "1.70.0"

[dependencies]
anyhow = "1.0"
fn-error-context = "0.2.0"
futures-util = "0.3.13"
# NOTE when bumping this in a semver-incompatible way, because we re-export it you
Expand All @@ -19,12 +18,14 @@ rustix = { version = "0.38", features = ["process", "net"] }
serde = { features = ["derive"], version = "1.0.125" }
serde_json = "1.0.64"
semver = "1.0.4"
thiserror = "1"
tokio = { features = ["fs", "io-util", "macros", "process", "rt", "sync"], version = "1" }
tracing = "0.1"
# We support versions 2, 3 and 4
cap-std-ext = ">= 2.0, <= 4.0"

[dev-dependencies]
anyhow = "1.0"
bytes = "1.5"
clap = { version = "4.4", features = ["derive"] }

Expand Down
129 changes: 95 additions & 34 deletions src/imageproxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
//!
//! More information: <https://github.com/containers/skopeo/pull/1476>

use anyhow::{anyhow, Context, Result};
use cap_std_ext::prelude::CapStdExtCommandExt;
use cap_std_ext::{cap_std, cap_tempfile};
use futures_util::Future;
Expand All @@ -18,11 +17,59 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex, OnceLock};
use thiserror::Error;
use tokio::io::{AsyncBufRead, AsyncReadExt};
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::JoinError;
use tracing::instrument;

/// Errors returned by this crate.
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum Error {
#[error("i/o error")]
/// An input/output error
Io(#[from] std::io::Error),
#[error("serialization error")]
/// Returned when serialization or deserialization fails
SerDe(#[from] serde_json::Error),
/// The proxy failed to initiate a request
#[error("failed to invoke method {method}: {error}")]
RequestInitiationFailure { method: Box<str>, error: Box<str> },
/// An error returned from the remote proxy
#[error("proxy request returned error")]
RequestReturned(Box<str>),
#[error("semantic version error")]
SemanticVersion(#[from] semver::Error),
#[error("proxy too old (requested={requested_version} found={found_version}) error")]
/// The proxy doesn't support the requested semantic version
ProxyTooOld {
requested_version: Box<str>,
found_version: Box<str>,
},
#[error("configuration error")]
/// Conflicting or missing configuration
Configuration(Box<str>),
#[error("error")]
/// An unknown other error
Other(Box<str>),
}

impl Error {
pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
Self::Other(e.into())
}
}

impl From<rustix::io::Errno> for Error {
fn from(value: rustix::io::Errno) -> Self {
Self::Io(value.into())
}
}

/// The error type returned from this crate.
pub type Result<T> = std::result::Result<T, Error>;

/// Re-export because we use this in our public APIs
pub use oci_spec;

Expand Down Expand Up @@ -152,14 +199,14 @@ pub struct ImageProxyConfig {
}

impl TryFrom<ImageProxyConfig> for Command {
type Error = anyhow::Error;
type Error = Error;

fn try_from(config: ImageProxyConfig) -> Result<Self, Self::Error> {
fn try_from(config: ImageProxyConfig) -> Result<Self> {
let mut allocated_fds = RESERVED_FD_RANGE.clone();
let mut alloc_fd = || {
allocated_fds
.next()
.ok_or_else(|| anyhow::anyhow!("Ran out of reserved file descriptors for child"))
allocated_fds.next().ok_or_else(|| {
Error::Other("Ran out of reserved file descriptors for child".into())
})
};

// By default, we set up pdeathsig to "lifecycle bind" the child process to us.
Expand All @@ -186,7 +233,9 @@ impl TryFrom<ImageProxyConfig> for Command {
.count();
if auth_option_count > 1 {
// This is a programmer error really
anyhow::bail!("Conflicting authentication options");
return Err(Error::Configuration(
"Conflicting authentication options".into(),
));
}
if let Some(authfile) = config.authfile {
c.arg("--authfile");
Expand All @@ -197,11 +246,13 @@ impl TryFrom<ImageProxyConfig> for Command {
// the file is only readable to privileged code.
let target_fd = alloc_fd()?;
let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
let mut tempfile = cap_tempfile::TempFile::new_anonymous(tmpd)
.context("Creating temporary file for auth data")
.map(std::io::BufWriter::new)?;
let mut tempfile =
cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
std::io::copy(&mut auth_data, &mut tempfile)?;
let tempfile = tempfile.into_inner()?.into_std();
let tempfile = tempfile
.into_inner()
.map_err(|e| e.into_error())?
.into_std();
let fd = std::sync::Arc::new(tempfile.into());
c.take_fd_n(fd, target_fd);
c.arg("--authfile");
Expand Down Expand Up @@ -261,7 +312,7 @@ impl ImageProxy {
None,
)?;
c.stdin(Stdio::from(theirsock));
let child = c.spawn().context("Failed to spawn skopeo")?;
let child = c.spawn()?;
tracing::debug!("Spawned skopeo pid={:?}", child.id());
// Here we use std sync API via thread because tokio installs
// a SIGCHLD handler which can conflict with e.g. the glib one
Expand All @@ -283,11 +334,10 @@ impl ImageProxy {
// Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`.
let supported = base_proto_version();
if !supported.matches(&protover) {
return Err(anyhow!(
"Unsupported protocol version {} (compatible: {})",
protover,
supported
));
return Err(Error::ProxyTooOld {
requested_version: protover.to_string().into(),
found_version: supported.to_string().into(),
});
}
r.protover = protover;

Expand Down Expand Up @@ -327,28 +377,32 @@ impl ImageProxy {
.flatten()
.next();
let buf = &buf[..nread];
let reply: Reply = serde_json::from_slice(buf).context("Deserializing reply")?;
let reply: Reply = serde_json::from_slice(buf)?;
if !reply.success {
return Err(anyhow!("remote error: {}", reply.error));
return Err(Error::RequestInitiationFailure {
method: req.method.clone().into(),
error: reply.error.into(),
});
}
let fdret = match (fdret, reply.pipeid) {
(Some(fd), n) => {
if n == 0 {
return Err(anyhow!("got fd but no pipeid"));
return Err(Error::Other("got fd but no pipeid".into()));
}
Some((fd, n))
}
(None, n) => {
if n != 0 {
return Err(anyhow!("got no fd with pipeid {}", n));
return Err(Error::Other(format!("got no fd with pipeid {}", n).into()));
}
None
}
};
let reply = serde_json::from_value(reply.value).context("Deserializing value")?;
let reply = serde_json::from_value(reply.value)?;
Ok((reply, fdret))
})
.await??;
.await
.map_err(|e| Error::Other(e.to_string().into()))??;
tracing::trace!("completed request");
Ok(r)
}
Expand All @@ -367,12 +421,15 @@ impl ImageProxy {
let mut childwait = self.childwait.lock().await;
tokio::select! {
r = req => {
Ok(r.with_context(|| format!("Failed to invoke skopeo proxy method {method}"))?)
r.map_err(|e| Error::RequestInitiationFailure {
method: method.to_string().into(),
error: e.to_string().into()
})
}
r = childwait.as_mut() => {
let r = r??;
let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
let stderr = String::from_utf8_lossy(&r.stderr);
Err(anyhow::anyhow!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr))
Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
}
}
}
Expand All @@ -382,7 +439,7 @@ impl ImageProxy {
tracing::debug!("closing pipe");
let (r, fd) = self.impl_request("FinishPipe", [pipeid]).await?;
if fd.is_some() {
return Err(anyhow!("Unexpected fd in finish_pipe reply"));
return Err(Error::Other("Unexpected fd in finish_pipe reply".into()));
}
Ok(r)
}
Expand Down Expand Up @@ -417,7 +474,7 @@ impl ImageProxy {
}

async fn read_all_fd(&self, fd: Option<(OwnedFd, u32)>) -> Result<Vec<u8>> {
let (fd, pipeid) = fd.ok_or_else(|| anyhow!("Missing fd from reply"))?;
let (fd, pipeid) = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?;
let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
let mut fd = tokio::io::BufReader::new(fd);
let mut r = Vec::new();
Expand All @@ -443,8 +500,7 @@ impl ImageProxy {
img: &OpenedImage,
) -> Result<(String, oci_spec::image::ImageManifest)> {
let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
let manifest =
serde_json::from_slice(&raw).context("Deserializing manifest from skopeo")?;
let manifest = serde_json::from_slice(&raw)?;
Ok((digest, manifest))
}

Expand All @@ -464,7 +520,7 @@ impl ImageProxy {
img: &OpenedImage,
) -> Result<oci_spec::image::ImageConfiguration> {
let raw = self.fetch_config_raw(img).await?;
serde_json::from_slice(&raw).context("Deserializing config from skopeo")
serde_json::from_slice(&raw).map_err(Into::into)
}

/// Fetch a blob identified by e.g. `sha256:<digest>`.
Expand All @@ -487,7 +543,7 @@ impl ImageProxy {
let args: Vec<serde_json::Value> =
vec![img.0.into(), digest.to_string().into(), size.into()];
let (_bloblen, fd) = self.impl_request::<i64, _, _>("GetBlob", args).await?;
let (fd, pipeid) = fd.ok_or_else(|| anyhow!("Missing fd from reply"))?;
let (fd, pipeid) = fd.ok_or_else(|| Error::new_other("Missing fd from reply"))?;
let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
let fd = tokio::io::BufReader::new(fd);
let finish = Box::pin(self.finish_pipe(pipeid));
Expand Down Expand Up @@ -535,10 +591,15 @@ impl ImageProxy {
drop(sendbuf);
tracing::debug!("sent shutdown request");
let mut childwait = self.childwait.lock().await;
let output = childwait.as_mut().await??;
let output = childwait
.as_mut()
.await
.map_err(|e| Error::new_other(e.to_string()))??;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("proxy failed: {}\n{}", output.status, stderr)
return Err(Error::RequestReturned(
format!("proxy failed: {}\n{}", output.status, stderr).into(),
));
}
tracing::debug!("proxy exited successfully");
Ok(())
Expand Down