diff --git a/crates/cfsctl/src/main.rs b/crates/cfsctl/src/main.rs index 1b575afb..e0ce960d 100644 --- a/crates/cfsctl/src/main.rs +++ b/crates/cfsctl/src/main.rs @@ -48,7 +48,7 @@ pub struct App { enum OciCommand { /// Stores a tar file as a splitstream in the repository. ImportLayer { - sha256: String, + digest: String, name: Option, }, /// Lists the contents of a tar stream @@ -105,7 +105,7 @@ enum Command { Transaction, /// Reconstitutes a split stream and writes it to stdout Cat { - /// the name of the stream to cat, either a sha256 digest or prefixed with 'ref/' + /// the name of the stream to cat, either a content identifier or prefixed with 'ref/' name: String, }, /// Perform garbage collection @@ -122,7 +122,7 @@ enum Command { }, /// Mounts a composefs, possibly enforcing fsverity of the image Mount { - /// the name of the image to mount, either a sha256 digest or prefixed with 'ref/' + /// the name of the image to mount, either an fs-verity hash or prefixed with 'ref/' name: String, /// the mountpoint mountpoint: String, @@ -194,7 +194,7 @@ async fn main() -> Result<()> { } } Command::Cat { name } => { - repo.merge_splitstream(&name, None, &mut std::io::stdout())?; + repo.merge_splitstream(&name, None, None, &mut std::io::stdout())?; } Command::ImportImage { reference } => { let image_id = repo.import_image(&reference, &mut std::io::stdin())?; @@ -202,10 +202,10 @@ async fn main() -> Result<()> { } #[cfg(feature = "oci")] Command::Oci { cmd: oci_cmd } => match oci_cmd { - OciCommand::ImportLayer { name, sha256 } => { + OciCommand::ImportLayer { name, digest } => { let object_id = composefs_oci::import_layer( &Arc::new(repo), - &composefs::util::parse_sha256(sha256)?, + &digest, name.as_deref(), &mut std::io::stdin(), )?; @@ -253,10 +253,10 @@ async fn main() -> Result<()> { println!("{}", image_id.to_id()); } OciCommand::Pull { ref image, name } => { - let (sha256, verity) = + let (digest, verity) = composefs_oci::pull(&Arc::new(repo), image, name.as_deref(), None).await?; - println!("sha256 {}", hex::encode(sha256)); + println!("config {digest}"); println!("verity {}", verity.to_hex()); } OciCommand::Seal { @@ -264,9 +264,9 @@ async fn main() -> Result<()> { ref config_verity, } => { let verity = verity_opt(config_verity)?; - let (sha256, verity) = + let (digest, verity) = composefs_oci::seal(&Arc::new(repo), config_name, verity.as_ref())?; - println!("sha256 {}", hex::encode(sha256)); + println!("config {digest}"); println!("verity {}", verity.to_id()); } OciCommand::Mount { @@ -367,8 +367,8 @@ async fn main() -> Result<()> { } #[cfg(feature = "http")] Command::Fetch { url, name } => { - let (sha256, verity) = composefs_http::download(&url, &name, Arc::new(repo)).await?; - println!("sha256 {}", hex::encode(sha256)); + let (digest, verity) = composefs_http::download(&url, &name, Arc::new(repo)).await?; + println!("content {digest}"); println!("verity {}", verity.to_hex()); } } diff --git a/crates/composefs-http/src/lib.rs b/crates/composefs-http/src/lib.rs index f849efad..25491bd2 100644 --- a/crates/composefs-http/src/lib.rs +++ b/crates/composefs-http/src/lib.rs @@ -7,7 +7,6 @@ use std::{ collections::{HashMap, HashSet}, fs::File, - io::Read, sync::Arc, }; @@ -19,10 +18,7 @@ use sha2::{Digest, Sha256}; use tokio::task::JoinSet; use composefs::{ - fsverity::FsVerityHashValue, - repository::Repository, - splitstream::{DigestMapEntry, SplitStreamReader}, - util::Sha256Digest, + fsverity::FsVerityHashValue, repository::Repository, splitstream::SplitStreamReader, }; struct Downloader { @@ -66,17 +62,11 @@ impl Downloader { } } - fn open_splitstream(&self, id: &ObjectID) -> Result> { - SplitStreamReader::new(File::from(self.repo.open_object(id)?)) + fn open_splitstream(&self, id: &ObjectID) -> Result> { + SplitStreamReader::new(File::from(self.repo.open_object(id)?), None) } - fn read_object(&self, id: &ObjectID) -> Result> { - let mut data = vec![]; - File::from(self.repo.open_object(id)?).read_to_end(&mut data)?; - Ok(data) - } - - async fn ensure_stream(self: &Arc, name: &str) -> Result<(Sha256Digest, ObjectID)> { + async fn ensure_stream(self: &Arc, name: &str) -> Result<(String, ObjectID)> { let progress = ProgressBar::new(2); // the first object gets "ensured" twice progress.set_style( ProgressStyle::with_template( @@ -113,8 +103,8 @@ impl Downloader { // this part is fast: it only touches the header let mut reader = self.open_splitstream(&id)?; - for DigestMapEntry { verity, body } in &reader.refs.map { - match splitstreams.insert(verity.clone(), Some(*body)) { + for (body, verity) in reader.iter_named_refs() { + match splitstreams.insert(verity.clone(), Some(body.to_string())) { // This is the (normal) case if we encounter a splitstream we didn't see yet... None => { splitstreams_todo.push(verity.clone()); @@ -125,7 +115,7 @@ impl Downloader { // verify the SHA-256 content hashes later (after we get all the objects) so we // need to make sure that all referents of this stream agree on what that is. Some(Some(previous)) => { - if previous != *body { + if previous != body { bail!( "Splitstream with verity {verity:?} has different body hashes {} and {}", hex::encode(previous), @@ -208,8 +198,8 @@ impl Downloader { for (id, expected_checksum) in splitstreams { let mut reader = self.open_splitstream(&id)?; let mut context = Sha256::new(); - reader.cat(&mut context, |id| self.read_object(id))?; - let measured_checksum: Sha256Digest = context.finalize().into(); + reader.cat(&self.repo, &mut context)?; + let measured_checksum = format!("sha256:{}", hex::encode(context.finalize())); if let Some(expected) = expected_checksum { if measured_checksum != expected { @@ -265,7 +255,7 @@ pub async fn download( url: &str, name: &str, repo: Arc>, -) -> Result<(Sha256Digest, ObjectID)> { +) -> Result<(String, ObjectID)> { let downloader = Arc::new(Downloader { client: Client::new(), repo, diff --git a/crates/composefs-oci/src/image.rs b/crates/composefs-oci/src/image.rs index fc420c78..e0965b62 100644 --- a/crates/composefs-oci/src/image.rs +++ b/crates/composefs-oci/src/image.rs @@ -11,7 +11,7 @@ use std::{ffi::OsStr, os::unix::ffi::OsStrExt, rc::Rc}; use anyhow::{ensure, Context, Result}; -use oci_spec::image::ImageConfiguration; +use sha2::{Digest, Sha256}; use composefs::{ fsverity::FsVerityHashValue, @@ -19,6 +19,7 @@ use composefs::{ tree::{Directory, FileSystem, Inode, Leaf}, }; +use crate::skopeo::TAR_LAYER_CONTENT_TYPE; use crate::tar::{TarEntry, TarItem}; /// Processes a single tar entry and adds it to the filesystem. @@ -84,6 +85,10 @@ pub fn process_entry( /// Creates a filesystem from the given OCI container. No special transformations are performed to /// make the filesystem bootable. +/// +/// If `config_verity` is given it is used to get the OCI config splitstream by its fs-verity ID +/// and the entire process is substantially faster. If it is not given, the config and layers will +/// be hashed to ensure that they match their claimed blob IDs. pub fn create_filesystem( repo: &Repository, config_name: &str, @@ -91,14 +96,27 @@ pub fn create_filesystem( ) -> Result> { let mut filesystem = FileSystem::default(); - let mut config_stream = repo.open_stream(config_name, config_verity)?; - let config = ImageConfiguration::from_reader(&mut config_stream)?; + let (config, map) = crate::open_config(repo, config_name, config_verity)?; for diff_id in config.rootfs().diff_ids() { - let layer_sha256 = super::sha256_from_digest(diff_id)?; - let layer_verity = config_stream.lookup(&layer_sha256)?; + let layer_verity = map + .get(diff_id.as_str()) + .context("OCI config splitstream missing named ref to layer {diff_id}")?; + + if config_verity.is_none() { + // We don't have any proof that the named references in the config splitstream are + // trustworthy. We have no choice but to perform expensive validation of the layer + // stream. + let mut layer_stream = + repo.open_stream("", Some(layer_verity), Some(TAR_LAYER_CONTENT_TYPE))?; + let mut context = Sha256::new(); + layer_stream.cat(repo, &mut context)?; + let content_hash = format!("sha256:{}", hex::encode(context.finalize())); + ensure!(content_hash == *diff_id, "Layer has incorrect checksum"); + } - let mut layer_stream = repo.open_stream(&hex::encode(layer_sha256), Some(layer_verity))?; + let mut layer_stream = + repo.open_stream("", Some(layer_verity), Some(TAR_LAYER_CONTENT_TYPE))?; while let Some(entry) = crate::tar::get_entry(&mut layer_stream)? { process_entry(&mut filesystem, entry)?; } diff --git a/crates/composefs-oci/src/lib.rs b/crates/composefs-oci/src/lib.rs index 079defea..a0a0638f 100644 --- a/crates/composefs-oci/src/lib.rs +++ b/crates/composefs-oci/src/lib.rs @@ -18,32 +18,22 @@ use std::{collections::HashMap, io::Read, sync::Arc}; use anyhow::{bail, ensure, Context, Result}; use containers_image_proxy::ImageProxyConfig; -use oci_spec::image::{Descriptor, ImageConfiguration}; +use oci_spec::image::ImageConfiguration; use sha2::{Digest, Sha256}; -use composefs::{ - fsverity::FsVerityHashValue, - repository::Repository, - splitstream::DigestMap, - util::{parse_sha256, Sha256Digest}, -}; +use composefs::{fsverity::FsVerityHashValue, repository::Repository}; +use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE}; use crate::tar::get_entry; -type ContentAndVerity = (Sha256Digest, ObjectID); +type ContentAndVerity = (String, ObjectID); -pub(crate) fn sha256_from_descriptor(descriptor: &Descriptor) -> Result { - let Some(digest) = descriptor.as_digest_sha256() else { - bail!("Descriptor in oci config is not sha256"); - }; - Ok(parse_sha256(digest)?) +fn layer_identifier(diff_id: &str) -> String { + format!("oci-layer-{diff_id}") } -pub(crate) fn sha256_from_digest(digest: &str) -> Result { - match digest.strip_prefix("sha256:") { - Some(rest) => Ok(parse_sha256(rest)?), - None => bail!("Manifest has non-sha256 digest"), - } +fn config_identifier(config: &str) -> String { + format!("oci-config-{config}") } /// Imports a container layer from a tar stream into the repository. @@ -54,11 +44,16 @@ pub(crate) fn sha256_from_digest(digest: &str) -> Result { /// Returns the fs-verity hash value of the stored split stream. pub fn import_layer( repo: &Arc>, - sha256: &Sha256Digest, + diff_id: &str, name: Option<&str>, tar_stream: &mut impl Read, ) -> Result { - repo.ensure_stream(sha256, |writer| tar::split(tar_stream, writer), name) + repo.ensure_stream( + &layer_identifier(diff_id), + TAR_LAYER_CONTENT_TYPE, + |writer| tar::split(tar_stream, writer), + name, + ) } /// Lists the contents of a container layer stored in the repository. @@ -67,9 +62,13 @@ pub fn import_layer( /// in composefs dumpfile format. pub fn ls_layer( repo: &Repository, - name: &str, + diff_id: &str, ) -> Result<()> { - let mut split_stream = repo.open_stream(name, None)?; + let mut split_stream = repo.open_stream( + &layer_identifier(diff_id), + None, + Some(TAR_LAYER_CONTENT_TYPE), + )?; while let Some(entry) = get_entry(&mut split_stream)? { println!("{entry}"); @@ -85,75 +84,52 @@ pub async fn pull( imgref: &str, reference: Option<&str>, img_proxy_config: Option, -) -> Result<(Sha256Digest, ObjectID)> { +) -> Result<(String, ObjectID)> { skopeo::pull(repo, imgref, reference, img_proxy_config).await } -/// Opens and parses a container configuration, following all layer references. +fn hash(bytes: &[u8]) -> String { + let mut context = Sha256::new(); + context.update(bytes); + format!("sha256:{}", hex::encode(context.finalize())) +} + +/// Opens and parses a container configuration. /// /// Reads the OCI image configuration from the repository and returns both the parsed /// configuration and a digest map containing fs-verity hashes for all referenced layers. -/// This performs a "deep" open that validates all layer references exist. /// /// If verity is provided, it's used directly. Otherwise, the name must be a sha256 digest -/// and the corresponding verity hash will be looked up (which is more expensive). +/// and the corresponding verity hash will be looked up (which is more expensive) and the content +/// will be hashed and compared to the provided digest. +/// +/// Returns the parsed image configuration and the map of layer references. /// -/// Returns the parsed image configuration and the digest map of layer references. +/// Note: if the verity value is known and trusted then the layer fs-verity values can also be +/// trusted. If not, then you can use the layer map to find objects that are ostensibly the layers +/// in question, but you'll have to verity their content hashes yourself. pub fn open_config( repo: &Repository, - name: &str, + config_digest: &str, verity: Option<&ObjectID>, -) -> Result<(ImageConfiguration, DigestMap)> { - let id = match verity { - Some(id) => id, - None => { - // take the expensive route - let sha256 = parse_sha256(name) - .context("Containers must be referred to by sha256 if verity is missing")?; - &repo - .check_stream(&sha256)? - .with_context(|| format!("Object {name} is unknown to us"))? - } +) -> Result<(ImageConfiguration, HashMap, ObjectID>)> { + let mut stream = repo.open_stream( + &config_identifier(config_digest), + verity, + Some(OCI_CONFIG_CONTENT_TYPE), + )?; + + let config = if verity.is_none() { + // No verity means we need to verify the content hash + let mut data = vec![]; + stream.read_to_end(&mut data)?; + ensure!(config_digest == hash(&data), "Data integrity issue"); + ImageConfiguration::from_reader(&data[..])? + } else { + ImageConfiguration::from_reader(&mut stream)? }; - let mut stream = repo.open_stream(name, Some(id))?; - let config = ImageConfiguration::from_reader(&mut stream)?; - Ok((config, stream.refs)) -} - -fn hash(bytes: &[u8]) -> Sha256Digest { - let mut context = Sha256::new(); - context.update(bytes); - context.finalize().into() -} -/// Opens and parses a container configuration without following layer references. -/// -/// Reads only the OCI image configuration itself from the repository without validating -/// that all referenced layers exist. This is faster than `open_config` when you only need -/// the configuration metadata. -/// -/// If verity is not provided, manually verifies the content digest matches the expected hash. -/// -/// Returns the parsed image configuration. -pub fn open_config_shallow( - repo: &Repository, - name: &str, - verity: Option<&ObjectID>, -) -> Result { - match verity { - // with verity deep opens are just as fast as shallow ones - Some(id) => Ok(open_config(repo, name, Some(id))?.0), - None => { - // we need to manually check the content digest - let expected_hash = parse_sha256(name) - .context("Containers must be referred to by sha256 if verity is missing")?; - let mut stream = repo.open_stream(name, None)?; - let mut raw_config = vec![]; - stream.read_to_end(&mut raw_config)?; - ensure!(hash(&raw_config) == expected_hash, "Data integrity issue"); - Ok(ImageConfiguration::from_reader(&mut raw_config.as_slice())?) - } - } + Ok((config, stream.into_named_refs())) } /// Writes a container configuration to the repository. @@ -165,15 +141,18 @@ pub fn open_config_shallow( pub fn write_config( repo: &Arc>, config: &ImageConfiguration, - refs: DigestMap, + refs: HashMap, ObjectID>, ) -> Result> { let json = config.to_string()?; let json_bytes = json.as_bytes(); - let sha256 = hash(json_bytes); - let mut stream = repo.create_stream(Some(sha256), Some(refs)); + let config_digest = hash(json_bytes); + let mut stream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE); + for (name, value) in &refs { + stream.add_named_stream_ref(name, value) + } stream.write_inline(json_bytes); - let id = repo.write_stream(stream, None)?; - Ok((sha256, id)) + let id = repo.write_stream(stream, &config_identifier(&config_digest), None)?; + Ok((config_digest, id)) } /// Seals a container by computing its filesystem fs-verity hash and adding it to the config. @@ -211,7 +190,7 @@ pub fn mount( mountpoint: &str, verity: Option<&ObjectID>, ) -> Result<()> { - let config = open_config_shallow(repo, name, verity)?; + let (config, _map) = open_config(repo, name, verity)?; let Some(id) = config.get_config_annotation("containers.composefs.fsverity") else { bail!("Can only mount sealed containers"); }; @@ -255,14 +234,14 @@ mod test { let layer = example_layer(); let mut context = Sha256::new(); context.update(&layer); - let layer_id: [u8; 32] = context.finalize().into(); + let layer_id = format!("sha256:{}", hex::encode(context.finalize())); let repo_dir = tempdir(); let repo = Arc::new(Repository::::open_path(CWD, &repo_dir).unwrap()); let id = import_layer(&repo, &layer_id, Some("name"), &mut layer.as_slice()).unwrap(); let mut dump = String::new(); - let mut split_stream = repo.open_stream("refs/name", Some(&id)).unwrap(); + let mut split_stream = repo.open_stream("refs/name", Some(&id), None).unwrap(); while let Some(entry) = tar::get_entry(&mut split_stream).unwrap() { writeln!(dump, "{entry}").unwrap(); } diff --git a/crates/composefs-oci/src/skopeo.rs b/crates/composefs-oci/src/skopeo.rs index de40e994..b7ee7d22 100644 --- a/crates/composefs-oci/src/skopeo.rs +++ b/crates/composefs-oci/src/skopeo.rs @@ -20,11 +20,13 @@ use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType}; use rustix::process::geteuid; use tokio::{io::AsyncReadExt, sync::Semaphore}; -use composefs::{ - fsverity::FsVerityHashValue, repository::Repository, splitstream::DigestMap, util::Sha256Digest, -}; +use composefs::{fsverity::FsVerityHashValue, repository::Repository}; -use crate::{sha256_from_descriptor, sha256_from_digest, tar::split_async, ContentAndVerity}; +use crate::{config_identifier, layer_identifier, tar::split_async, ContentAndVerity}; + +// These are randomly generated UUID-like content types +pub(crate) const TAR_LAYER_CONTENT_TYPE: u64 = 0x2a037edfcae1ffea; +pub(crate) const OCI_CONFIG_CONTENT_TYPE: u64 = 0x44218c839727a80b; struct ImageOp { repo: Arc>, @@ -48,6 +50,13 @@ impl ImageOp { None }; + // See https://github.com/containers/skopeo/issues/2750 + let imgref = if let Some(hash) = imgref.strip_prefix("containers-storage:sha256:") { + &format!("containers-storage:{hash}") // yay temporary lifetime extension! + } else { + imgref + }; + let config = match img_proxy_config { Some(mut conf) => { if conf.skopeo_cmd.is_none() { @@ -77,18 +86,15 @@ impl ImageOp { }) } - pub async fn ensure_layer( - &self, - layer_sha256: Sha256Digest, - descriptor: &Descriptor, - ) -> Result { + pub async fn ensure_layer(&self, diff_id: &str, descriptor: &Descriptor) -> Result { // We need to use the per_manifest descriptor to download the compressed layer but it gets // stored in the repository via the per_config descriptor. Our return value is the // fsverity digest for the corresponding splitstream. + let content_id = layer_identifier(diff_id); - if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? { + if let Some(layer_id) = self.repo.has_stream(&content_id)? { self.progress - .println(format!("Already have layer {}", hex::encode(layer_sha256)))?; + .println(format!("Already have layer {diff_id}"))?; Ok(layer_id) } else { // Otherwise, we need to fetch it... @@ -102,10 +108,9 @@ impl ImageOp { .unwrap() .progress_chars("##-")); let progress = bar.wrap_async_read(blob_reader); - self.progress - .println(format!("Fetching layer {}", hex::encode(layer_sha256)))?; + self.progress.println(format!("Fetching layer {diff_id}"))?; - let mut splitstream = self.repo.create_stream(Some(layer_sha256), None); + let mut splitstream = self.repo.create_stream(TAR_LAYER_CONTENT_TYPE); match descriptor.media_type() { MediaType::ImageLayer => { split_async(progress, &mut splitstream).await?; @@ -116,17 +121,15 @@ impl ImageOp { MediaType::ImageLayerZstd => { split_async(ZstdDecoder::new(progress), &mut splitstream).await?; } - other => bail!("Unsupported layer media type {:?}", other), + other => bail!("Unsupported layer media type {other:?}"), }; - let layer_id = self.repo.write_stream(splitstream, None)?; - // We intentionally explicitly ignore this, even though we're supposed to check it. - // See https://github.com/containers/containers-image-proxy-rs/issues/80 for discussion - // about why. Note: we only care about the uncompressed layer tar, and we checksum it - // ourselves. - drop(driver); + // skopeo is doing data checksums for us to make sure the content we received is equal + // to the claimed diff_id. We trust it, but we need to check it by awaiting the driver. + driver.await?; - Ok(layer_id) + // Now we know that the content is what we expected. Write it. + self.repo.write_stream(splitstream, &content_id, None) } } @@ -135,20 +138,20 @@ impl ImageOp { manifest_layers: &[Descriptor], descriptor: &Descriptor, ) -> Result> { - let config_sha256 = sha256_from_descriptor(descriptor)?; - if let Some(config_id) = self.repo.check_stream(&config_sha256)? { + let config_digest: &str = descriptor.digest().as_ref(); + let content_id = config_identifier(config_digest); + + if let Some(config_id) = self.repo.has_stream(&content_id)? { // We already got this config? Nice. - self.progress.println(format!( - "Already have container config {}", - hex::encode(config_sha256) - ))?; - Ok((config_sha256, config_id)) + self.progress + .println(format!("Already have container config {config_digest}"))?; + Ok((config_digest.to_string(), config_id)) } else { // We need to add the config to the repo. We need to parse the config and make sure we // have all of the layers first. // self.progress - .println(format!("Fetching config {}", hex::encode(config_sha256)))?; + .println(format!("Fetching config {config_digest}"))?; let (mut config, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?; let config = async move { @@ -171,30 +174,29 @@ impl ImageOp { let sem = Arc::new(Semaphore::new(threads.into())); let mut entries = vec![]; for (mld, diff_id) in layers { + let diff_id_ = diff_id.clone(); let self_ = Arc::clone(self); let permit = Arc::clone(&sem).acquire_owned().await?; - let layer_sha256 = sha256_from_digest(diff_id)?; let descriptor = mld.clone(); let future = tokio::spawn(async move { let _permit = permit; - self_.ensure_layer(layer_sha256, &descriptor).await + self_.ensure_layer(&diff_id_, &descriptor).await }); - entries.push((layer_sha256, future)); + entries.push((diff_id, future)); } + let mut splitstream = self.repo.create_stream(OCI_CONFIG_CONTENT_TYPE); + // Collect the results. - let mut config_maps = DigestMap::new(); - for (layer_sha256, future) in entries { - config_maps.insert(&layer_sha256, &future.await??); + for (diff_id, future) in entries { + splitstream.add_named_stream_ref(diff_id, &future.await??); } - let mut splitstream = self - .repo - .create_stream(Some(config_sha256), Some(config_maps)); + // NB: We trust that skopeo has verified that raw_config has the correct digest splitstream.write_inline(&raw_config); - let config_id = self.repo.write_stream(splitstream, None)?; - Ok((config_sha256, config_id)) + let config_id = self.repo.write_stream(splitstream, &content_id, None)?; + Ok((content_id, config_id)) } } @@ -223,7 +225,7 @@ pub async fn pull( imgref: &str, reference: Option<&str>, img_proxy_config: Option, -) -> Result<(Sha256Digest, ObjectID)> { +) -> Result<(String, ObjectID)> { let op = Arc::new(ImageOp::new(repo, imgref, img_proxy_config).await?); let (sha256, id) = op .pull() @@ -231,7 +233,7 @@ pub async fn pull( .with_context(|| format!("Unable to pull container image {imgref}"))?; if let Some(name) = reference { - repo.name_stream(sha256, name)?; + repo.name_stream(&sha256, name)?; } Ok((sha256, id)) } diff --git a/crates/composefs-oci/src/tar.rs b/crates/composefs-oci/src/tar.rs index 09eb6dce..9df92232 100644 --- a/crates/composefs-oci/src/tar.rs +++ b/crates/composefs-oci/src/tar.rs @@ -73,9 +73,9 @@ pub fn split( tar_stream.read_exact(&mut buffer)?; if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX { - // non-empty regular file: store the data in the object store - let padding = buffer.split_off(actual_size); - writer.write_external(&buffer, padding)?; + // non-empty regular file: store the data external and the trailing padding inline + writer.write_external(&buffer[..actual_size])?; + writer.write_inline(&buffer[actual_size..]); } else { // else: store the data inline in the split stream writer.write_inline(&buffer); @@ -112,7 +112,8 @@ pub async fn split_async( if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX { // non-empty regular file: store the data in the object store let padding = buffer.split_off(actual_size); - writer.write_external_async(buffer, padding).await?; + writer.write_external_async(buffer).await?; + writer.write_inline(&padding); } else { // else: store the data inline in the split stream writer.write_inline(&buffer); @@ -173,11 +174,7 @@ fn path_from_tar(pax: Option>, gnu: Vec, short: &[u8]) -> PathBuf } // Drop trailing '/' characters in case of directories. - // https://github.com/rust-lang/rust/issues/122741 - // path.pop_if(|x| x == &b'/'); - if path.last() == Some(&b'/') { - path.pop(); // this is Vec, so that's a single char. - } + path.pop_if(|x| x == &b'/'); PathBuf::from(OsString::from_vec(path)) } @@ -199,17 +196,23 @@ fn symlink_target_from_tar(pax: Option>, gnu: Vec, short: &[u8]) - /// extended attributes. Returns `None` when the end of the archive is reached. /// /// Returns the parsed tar entry, or `None` if the end of the stream is reached. -pub fn get_entry( - reader: &mut SplitStreamReader, +pub fn get_entry( + reader: &mut SplitStreamReader, ) -> Result>> { + // We don't have a way to drive the standard tar crate that lets us feed it random bits of + // header data while continuing to handle the external references as references. That means we + // have to do the header interpretation ourselves, including handling of PAX/GNU extensions for + // xattrs and long filenames. + // + // We try to use as much of the tar crate as possible to help us with this. let mut gnu_longlink: Vec = vec![]; let mut gnu_longname: Vec = vec![]; let mut pax_longlink: Option> = None; let mut pax_longname: Option> = None; let mut xattrs = BTreeMap::new(); + let mut buf = [0u8; 512]; loop { - let mut buf = [0u8; 512]; if !reader.read_inline_exact(&mut buf)? || buf == [0u8; 512] { return Ok(None); } @@ -232,11 +235,6 @@ pub fn get_entry( SplitStreamData::Inline(content) => match header.entry_type() { EntryType::GNULongLink => { gnu_longlink.extend(content); - - // NOTE: We use a custom tar parser since splitstreams are not actual tar archives - // The `tar` crate does have a higher level `path` function that would do this for us. - // See: https://github.com/alexcrichton/tar-rs/blob/a1c3036af48fa02437909112239f0632e4cfcfae/src/header.rs#L1532 - // Similar operation is performed for GNULongName gnu_longlink.pop_if(|x| *x == b'\0'); continue; @@ -321,6 +319,8 @@ pub fn get_entry( #[cfg(test)] mod tests { + use crate::TAR_LAYER_CONTENT_TYPE; + use super::*; use composefs::{ fsverity::Sha256HashValue, generic_tree::LeafContent, repository::Repository, @@ -377,13 +377,15 @@ mod tests { fn read_all_via_splitstream(tar_data: Vec) -> Result>> { let mut tar_cursor = Cursor::new(tar_data); let repo = create_test_repository()?; - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE); split(&mut tar_cursor, &mut writer)?; let object_id = writer.done()?; - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id)?.into())?; + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id)?.into(), + Some(TAR_LAYER_CONTENT_TYPE), + )?; let mut entries = Vec::new(); while let Some(entry) = get_entry(&mut reader)? { @@ -402,13 +404,16 @@ mod tests { let mut tar_cursor = Cursor::new(tar_data); let repo = create_test_repository().unwrap(); - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE); split(&mut tar_cursor, &mut writer).unwrap(); let object_id = writer.done().unwrap(); - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id).unwrap().into()).unwrap(); + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id).unwrap().into(), + Some(TAR_LAYER_CONTENT_TYPE), + ) + .unwrap(); assert!(get_entry(&mut reader).unwrap().is_none()); } @@ -428,13 +433,16 @@ mod tests { let mut tar_cursor = Cursor::new(tar_data); let repo = create_test_repository().unwrap(); - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE); split(&mut tar_cursor, &mut writer).unwrap(); let object_id = writer.done().unwrap(); - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id).unwrap().into()).unwrap(); + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id).unwrap().into(), + Some(TAR_LAYER_CONTENT_TYPE), + ) + .unwrap(); // Should have exactly one entry let entry = get_entry(&mut reader) @@ -483,13 +491,16 @@ mod tests { let mut tar_cursor = Cursor::new(tar_data); let repo = create_test_repository().unwrap(); - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE); split(&mut tar_cursor, &mut writer).unwrap(); let object_id = writer.done().unwrap(); - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id).unwrap().into()).unwrap(); + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id).unwrap().into(), + Some(TAR_LAYER_CONTENT_TYPE), + ) + .unwrap(); let mut entries = Vec::new(); while let Some(entry) = get_entry(&mut reader).unwrap() { @@ -547,13 +558,16 @@ mod tests { // Split the tar let mut tar_cursor = Cursor::new(original_tar.clone()); let repo = create_test_repository().unwrap(); - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE); split(&mut tar_cursor, &mut writer).unwrap(); let object_id = writer.done().unwrap(); // Read back entries and compare with original headers - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id).unwrap().into()).unwrap(); + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id).unwrap().into(), + Some(TAR_LAYER_CONTENT_TYPE), + ) + .unwrap(); let mut entries = Vec::new(); while let Some(entry) = get_entry(&mut reader).unwrap() { @@ -696,26 +710,22 @@ mod tests { assert_eq!( header.mode().unwrap(), stat.st_mode, - "{}: mode mismatch", - msg_prefix + "{msg_prefix}: mode mismatch" ); assert_eq!( header.uid().unwrap() as u32, stat.st_uid, - "{}: uid mismatch", - msg_prefix + "{msg_prefix}: uid mismatch" ); assert_eq!( header.gid().unwrap() as u32, stat.st_gid, - "{}: gid mismatch", - msg_prefix + "{msg_prefix}: gid mismatch" ); assert_eq!( header.mtime().unwrap() as i64, stat.st_mtim_sec, - "{}: mtime mismatch", - msg_prefix + "{msg_prefix}: mtime mismatch" ); } } diff --git a/crates/composefs/src/dumpfile.rs b/crates/composefs/src/dumpfile.rs index f8e4b3a5..e963643e 100644 --- a/crates/composefs/src/dumpfile.rs +++ b/crates/composefs/src/dumpfile.rs @@ -339,7 +339,7 @@ pub fn add_entry_to_filesystem( let parent = path.parent().unwrap_or_else(|| Path::new("/")); let filename = path .file_name() - .ok_or_else(|| anyhow::anyhow!("Path has no filename: {:?}", path))?; + .ok_or_else(|| anyhow::anyhow!("Path has no filename: {path:?}"))?; // Get or create parent directory let parent_dir = if parent == Path::new("/") { @@ -347,7 +347,7 @@ pub fn add_entry_to_filesystem( } else { fs.root .get_directory_mut(parent.as_os_str()) - .with_context(|| format!("Parent directory not found: {:?}", parent))? + .with_context(|| format!("Parent directory not found: {parent:?}"))? }; // Convert the entry to an inode @@ -360,7 +360,7 @@ pub fn add_entry_to_filesystem( // Look up the target in our hardlinks map and clone the Rc let target_leaf = hardlinks .get(target.as_ref()) - .ok_or_else(|| anyhow::anyhow!("Hardlink target not found: {:?}", target))? + .ok_or_else(|| anyhow::anyhow!("Hardlink target not found: {target:?}"))? .clone(); Inode::Leaf(target_leaf) } @@ -456,8 +456,8 @@ pub fn dumpfile_to_filesystem( if line.trim().is_empty() { continue; } - let entry = Entry::parse(line) - .with_context(|| format!("Failed to parse dumpfile line: {}", line))?; + let entry = + Entry::parse(line).with_context(|| format!("Failed to parse dumpfile line: {line}"))?; add_entry_to_filesystem(&mut fs, entry, &mut hardlinks)?; } diff --git a/crates/composefs/src/dumpfile_parse.rs b/crates/composefs/src/dumpfile_parse.rs index 0f58641e..89f321aa 100644 --- a/crates/composefs/src/dumpfile_parse.rs +++ b/crates/composefs/src/dumpfile_parse.rs @@ -328,20 +328,12 @@ impl<'k> Xattr<'k> { let key = unescape_to_osstr(key)?; let keylen = key.as_bytes().len(); if keylen > XATTR_NAME_MAX { - anyhow::bail!( - "xattr name too long; max={} found={}", - XATTR_NAME_MAX, - keylen - ); + anyhow::bail!("xattr name too long; max={XATTR_NAME_MAX} found={keylen}"); } let value = unescape(value)?; let valuelen = value.len(); if valuelen > XATTR_SIZE_MAX { - anyhow::bail!( - "xattr value too long; max={} found={}", - XATTR_SIZE_MAX, - keylen - ); + anyhow::bail!("xattr value too long; max={XATTR_SIZE_MAX} found={keylen}"); } Ok(Self { key, value }) } @@ -440,7 +432,7 @@ impl<'p> Entry<'p> { unescape_to_path(payload.ok_or_else(|| anyhow!("Missing payload"))?)?; let targetlen = target.as_os_str().as_bytes().len(); if targetlen > PATH_MAX as usize { - anyhow::bail!("Target length too large {}", targetlen); + anyhow::bail!("Target length too large {targetlen}"); } Item::Symlink { nlink, target } } diff --git a/crates/composefs/src/erofs/reader.rs b/crates/composefs/src/erofs/reader.rs index e4c2e8bd..b33ca2e5 100644 --- a/crates/composefs/src/erofs/reader.rs +++ b/crates/composefs/src/erofs/reader.rs @@ -694,8 +694,7 @@ mod tests { assert_eq!( found_names, expected_sorted, - "Directory entries mismatch for nid {}", - nid + "Directory entries mismatch for nid {nid}" ); } @@ -788,8 +787,7 @@ mod tests { // Add many files to force directory blocks for i in 0..100 { dumpfile.push_str(&format!( - "/bigdir/file{:03} 5 100644 1 0 0 0 1000.0 - hello -\n", - i + "/bigdir/file{i:03} 5 100644 1 0 0 0 1000.0 - hello -\n" )); } @@ -824,7 +822,7 @@ mod tests { // Build expected names let mut expected: Vec = vec![".".to_string(), "..".to_string()]; for i in 0..100 { - expected.push(format!("file{:03}", i)); + expected.push(format!("file{i:03}")); } let expected_refs: Vec<&str> = expected.iter().map(|s| s.as_str()).collect(); diff --git a/crates/composefs/src/fsverity/mod.rs b/crates/composefs/src/fsverity/mod.rs index 3b86a22b..0a533917 100644 --- a/crates/composefs/src/fsverity/mod.rs +++ b/crates/composefs/src/fsverity/mod.rs @@ -311,7 +311,7 @@ mod tests { use super::*; - static TEMPDIR: Lazy = Lazy::new(|| tempdir()); + static TEMPDIR: Lazy = Lazy::new(tempdir); static TD_FD: Lazy = Lazy::new(|| File::open(TEMPDIR.path()).unwrap()); fn tempfile() -> File { diff --git a/crates/composefs/src/repository.rs b/crates/composefs/src/repository.rs index 35a06a98..03e5fb26 100644 --- a/crates/composefs/src/repository.rs +++ b/crates/composefs/src/repository.rs @@ -18,12 +18,11 @@ use anyhow::{bail, ensure, Context, Result}; use once_cell::sync::OnceCell; use rustix::{ fs::{ - fdatasync, flock, linkat, mkdirat, open, openat, readlinkat, AtFlags, Dir, FileType, + flock, linkat, mkdirat, open, openat, readlinkat, statat, syncfs, AtFlags, Dir, FileType, FlockOperation, Mode, OFlags, CWD, }, io::{Errno, Result as ErrnoResult}, }; -use sha2::{Digest, Sha256}; use crate::{ fsverity::{ @@ -31,8 +30,8 @@ use crate::{ CompareVerityError, EnableVerityError, FsVerityHashValue, MeasureVerityError, }, mount::{composefs_fsmount, mount_at}, - splitstream::{DigestMap, SplitStreamReader, SplitStreamWriter}, - util::{proc_self_fd, replace_symlinkat, ErrnoFilter, Sha256Digest}, + splitstream::{SplitStreamReader, SplitStreamWriter}, + util::{proc_self_fd, replace_symlinkat, ErrnoFilter}, }; /// Call openat() on the named subdirectory of "dirfd", possibly creating it first. @@ -129,12 +128,18 @@ impl Repository { /// /// Same as `ensure_object` but runs the operation on a blocking thread pool /// to avoid blocking async tasks. Returns the fsverity digest of the object. + /// + /// For performance reasons, this function does *not* call fsync() or similar. After you're + /// done with everything, call `Repository::sync_async()`. pub async fn ensure_object_async(self: &Arc, data: Vec) -> Result { let self_ = Arc::clone(self); tokio::task::spawn_blocking(move || self_.ensure_object(&data)).await? } /// Given a blob of data, store it in the repository. + /// + /// For performance reasons, this function does *not* call fsync() or similar. After you're + /// done with everything, call `Repository::sync()`. pub fn ensure_object(&self, data: &[u8]) -> Result { let dirfd = self.objects_dir()?; let id: ObjectID = compute_verity(data); @@ -180,14 +185,15 @@ impl Repository { let fd = ensure_dir_and_openat(dirfd, &id.to_object_dir(), OFlags::RDWR | OFlags::TMPFILE)?; let mut file = File::from(fd); file.write_all(data)?; - fdatasync(&file)?; - // We can't enable verity with an open writable fd, so re-open and close the old one. let ro_fd = open( proc_self_fd(&file), OFlags::RDONLY | OFlags::CLOEXEC, Mode::empty(), )?; + // NB: We should do fdatasync() or fsync() here, but doing this for each file forces the + // creation of a massive number of journal commits and is a performance disaster. We need + // to coordinate this at a higher level. See .write_stream(). drop(file); let ro_fd = match enable_verity_maybe_copy::(dirfd, ro_fd.as_fd()) { @@ -249,32 +255,25 @@ impl Repository { /// Creates a SplitStreamWriter for writing a split stream. /// You should write the data to the returned object and then pass it to .store_stream() to /// store the result. - pub fn create_stream( - self: &Arc, - sha256: Option, - maps: Option>, - ) -> SplitStreamWriter { - SplitStreamWriter::new(self, maps, sha256) + pub fn create_stream(self: &Arc, content_type: u64) -> SplitStreamWriter { + SplitStreamWriter::new(self, content_type) } fn format_object_path(id: &ObjectID) -> String { format!("objects/{}", id.to_object_pathname()) } + fn format_stream_path(content_identifier: &str) -> String { + format!("streams/{content_identifier}") + } + /// Check if the provided splitstream is present in the repository; /// if so, return its fsverity digest. - pub fn has_stream(&self, sha256: &Sha256Digest) -> Result> { - let stream_path = format!("streams/{}", hex::encode(sha256)); + pub fn has_stream(&self, content_identifier: &str) -> Result> { + let stream_path = Self::format_stream_path(content_identifier); match readlinkat(&self.repository, &stream_path, []) { Ok(target) => { - // NB: This is kinda unsafe: we depend that the symlink didn't get corrupted - // we could also measure the verity of the destination object, but it doesn't - // improve anything, since we don't know if it was the original one. - // - // One thing we *could* do here is to iterate the entire file and verify the sha256 - // content hash. That would allow us to reestablish a solid link between - // content-sha256 and verity digest. let bytes = target.as_bytes(); ensure!( bytes.starts_with(b"../"), @@ -287,59 +286,36 @@ impl Repository { } } - /// Similar to [`Self::has_stream`] but performs more expensive verification. - pub fn check_stream(&self, sha256: &Sha256Digest) -> Result> { - let stream_path = format!("streams/{}", hex::encode(sha256)); - match self.openat(&stream_path, OFlags::RDONLY) { - Ok(stream) => { - let path = readlinkat(&self.repository, stream_path, [])?; - let measured_verity = match measure_verity(&stream) { - Ok(found) => found, - Err( - MeasureVerityError::VerityMissing - | MeasureVerityError::FilesystemNotSupported, - ) if self.insecure => FsVerityHashValue::from_object_pathname(path.to_bytes())?, - Err(other) => Err(other)?, - }; - let mut context = Sha256::new(); - let mut split_stream = SplitStreamReader::new(File::from(stream))?; - - // check the verity of all linked streams - for entry in &split_stream.refs.map { - if self.check_stream(&entry.body)?.as_ref() != Some(&entry.verity) { - bail!("reference mismatch"); - } - } - - // check this stream - split_stream.cat(&mut context, |id| -> Result> { - let mut data = vec![]; - File::from(self.open_object(id)?).read_to_end(&mut data)?; - Ok(data) - })?; - if *sha256 != Into::<[u8; 32]>::into(context.finalize()) { - bail!("Content didn't match!"); - } - - Ok(Some(measured_verity)) - } - Err(Errno::NOENT) => Ok(None), - Err(err) => Err(err)?, - } - } - - /// Write the given splitstream to the repository with the - /// provided name. + /// Write the given splitstream to the repository with the provided content identifier and + /// optional reference name. + /// + /// This call contains an internal barrier that guarantees that, in event of a crash, either: + /// - the named stream (by `content_identifier`) will not be available; or + /// - the stream and all of its linked data will be available + /// + /// In other words: it will not be possible to boot a system which contained a stream named + /// `content_identifier` but is missing linked streams or objects from that stream. pub fn write_stream( &self, writer: SplitStreamWriter, + content_identifier: &str, reference: Option<&str>, ) -> Result { - let Some((.., ref sha256)) = writer.sha256 else { - bail!("Writer doesn't have sha256 enabled"); - }; - let stream_path = format!("streams/{}", hex::encode(sha256)); let object_id = writer.done()?; + + // Right now we have: + // - all of the linked external objects and streams; and + // - the binary data of this splitstream itself + // + // in the filesystem but but not yet guaranteed to be synced to disk. This is OK because + // nobody knows that the binary data of the splitstream is a splitstream yet: it could just + // as well be a random data file contained in an OS image or something. + // + // We need to make sure that all of that makes it to the disk before the splitstream is + // visible as a splitstream. + self.sync()?; + + let stream_path = Self::format_stream_path(content_identifier); let object_path = Self::format_object_path(&object_id); self.symlink(&stream_path, &object_path)?; @@ -351,22 +327,33 @@ impl Repository { Ok(object_id) } + /// Check if a splitstream with a given name exists in the "refs" in the repository. + pub fn has_named_stream(&self, name: &str) -> Result { + let stream_path = format!("streams/refs/{name}"); + + Ok(statat(&self.repository, &stream_path, AtFlags::empty()) + .filter_errno(Errno::NOENT) + .context("Looking for stream {name} in repository")? + .map(|s| FileType::from_raw_mode(s.st_mode).is_symlink()) + .unwrap_or(false)) + } + /// Assign the given name to a stream. The stream must already exist. After this operation it /// will be possible to refer to the stream by its new name 'refs/{name}'. - pub fn name_stream(&self, sha256: Sha256Digest, name: &str) -> Result<()> { - let stream_path = format!("streams/{}", hex::encode(sha256)); + pub fn name_stream(&self, content_identifier: &str, name: &str) -> Result<()> { + let stream_path = Self::format_stream_path(content_identifier); let reference_path = format!("streams/refs/{name}"); self.symlink(&reference_path, &stream_path)?; Ok(()) } - /// Ensures that the stream with a given SHA256 digest exists in the repository. + /// Ensures that the stream with a given content identifier digest exists in the repository. /// - /// This tries to find the stream by the `sha256` digest of its contents. If the stream is - /// already in the repository, the object ID (fs-verity digest) is read from the symlink. If - /// the stream is not already in the repository, a `SplitStreamWriter` is created and passed to - /// `callback`. On return, the object ID of the stream will be calculated and it will be - /// written to disk (if it wasn't already created by someone else in the meantime). + /// This tries to find the stream by the content identifier. If the stream is already in the + /// repository, the object ID (fs-verity digest) is read from the symlink. If the stream is + /// not already in the repository, a `SplitStreamWriter` is created and passed to `callback`. + /// On return, the object ID of the stream will be calculated and it will be written to disk + /// (if it wasn't already created by someone else in the meantime). /// /// In both cases, if `reference` is provided, it is used to provide a fixed name for the /// object. Any object that doesn't have a fixed reference to it is subject to garbage @@ -376,22 +363,19 @@ impl Repository { /// ID will be used when referring to the stream from other linked streams. pub fn ensure_stream( self: &Arc, - sha256: &Sha256Digest, + content_identifier: &str, + content_type: u64, callback: impl FnOnce(&mut SplitStreamWriter) -> Result<()>, reference: Option<&str>, ) -> Result { - let stream_path = format!("streams/{}", hex::encode(sha256)); + let stream_path = Self::format_stream_path(content_identifier); - let object_id = match self.has_stream(sha256)? { + let object_id = match self.has_stream(content_identifier)? { Some(id) => id, None => { - let mut writer = self.create_stream(Some(*sha256), None); + let mut writer = self.create_stream(content_type); callback(&mut writer)?; - let object_id = writer.done()?; - - let object_path = Self::format_object_path(&object_id); - self.symlink(&stream_path, &object_path)?; - object_id + self.write_stream(writer, content_identifier, reference)? } }; @@ -406,20 +390,20 @@ impl Repository { /// Open a splitstream with the given name. pub fn open_stream( &self, - name: &str, + content_identifier: &str, verity: Option<&ObjectID>, - ) -> Result> { - let filename = format!("streams/{name}"); - + expected_content_type: Option, + ) -> Result> { let file = File::from(if let Some(verity_hash) = verity { - self.open_with_verity(&filename, verity_hash) - .with_context(|| format!("Opening ref 'streams/{name}'"))? + self.open_object(verity_hash) + .with_context(|| format!("Opening object '{verity_hash:?}'"))? } else { + let filename = Self::format_stream_path(content_identifier); self.openat(&filename, OFlags::RDONLY) - .with_context(|| format!("Opening ref 'streams/{name}'"))? + .with_context(|| format!("Opening ref '{filename}'"))? }); - SplitStreamReader::new(file) + SplitStreamReader::new(file, expected_content_type) } /// Given an object identifier (a digest), return a read-only file descriptor @@ -428,6 +412,13 @@ impl Repository { self.open_with_verity(&Self::format_object_path(id), id) } + /// Read the contents of an object into a Vec + pub fn read_object(&self, id: &ObjectID) -> Result> { + let mut data = vec![]; + File::from(self.open_object(id)?).read_to_end(&mut data)?; + Ok(data) + } + /// Merges a splitstream into a single continuous stream. /// /// Opens the named splitstream, resolves all object references, and writes @@ -435,18 +426,14 @@ impl Repository { /// the splitstream's fsverity digest matches the expected value. pub fn merge_splitstream( &self, - name: &str, + content_identifier: &str, verity: Option<&ObjectID>, - stream: &mut impl Write, + expected_content_type: Option, + output: &mut impl Write, ) -> Result<()> { - let mut split_stream = self.open_stream(name, verity)?; - split_stream.cat(stream, |id| -> Result> { - let mut data = vec![]; - File::from(self.open_object(id)?).read_to_end(&mut data)?; - Ok(data) - })?; - - Ok(()) + let mut split_stream = + self.open_stream(content_identifier, verity, expected_content_type)?; + split_stream.cat(self, output) } /// Write `data into the repository as an image with the given `name`. @@ -665,6 +652,24 @@ impl Repository { Ok(crate::erofs::reader::collect_objects(&data)?) } + /// Makes sure all content is written to the repository. + /// + /// This is currently just syncfs() on the repository's root directory because we don't have + /// any better options at present. This blocks until the data is written out. + pub fn sync(&self) -> Result<()> { + syncfs(&self.repository)?; + Ok(()) + } + + /// Makes sure all content is written to the repository. + /// + /// This is currently just syncfs() on the repository's root directory because we don't have + /// any better options at present. This won't return until the data is written out. + pub async fn sync_async(self: &Arc) -> Result<()> { + let self_ = Arc::clone(self); + tokio::task::spawn_blocking(move || self_.sync()).await? + } + /// Perform a garbage collection operation. /// /// # Locking @@ -681,16 +686,18 @@ impl Repository { objects.extend(self.objects_for_image(&object.to_hex())?); } + /* TODO for object in self.gc_category("streams")? { println!("{object:?} lives as a stream"); objects.insert(object.clone()); - let mut split_stream = self.open_stream(&object.to_hex(), None)?; + let mut split_stream = self.open_stream(&object.to_hex(), None, None)?; split_stream.get_object_refs(|id| { println!(" with {id:?}"); objects.insert(id.clone()); })?; } + */ for first_byte in 0x0..=0xff { let dirfd = match self.openat( diff --git a/crates/composefs/src/splitstream.rs b/crates/composefs/src/splitstream.rs index 313638cb..43fe7e93 100644 --- a/crates/composefs/src/splitstream.rs +++ b/crates/composefs/src/splitstream.rs @@ -6,88 +6,143 @@ /* Implementation of the Split Stream file format * - * See doc/splitstream.md + * NB: This format is documented in `docs/splitstream.md`. Please keep the docs up to date!! */ use std::{ - io::{BufReader, Read, Write}, + collections::{BTreeMap, HashMap}, + fs::File, + hash::Hash, + io::{BufRead, BufReader, Read, Seek, SeekFrom, Take, Write}, + mem::size_of, + mem::MaybeUninit, + ops::Range, sync::Arc, }; -use anyhow::{bail, Result}; -use sha2::{Digest, Sha256}; -use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; +use anyhow::{bail, ensure, Context, Error, Result}; +use rustix::{ + buffer::spare_capacity, + io::{pread, read}, +}; +use zerocopy::{ + little_endian::{I64, U16, U64}, + FromBytes, Immutable, IntoBytes, KnownLayout, +}; use zstd::stream::{read::Decoder, write::Encoder}; -use crate::{ - fsverity::FsVerityHashValue, - repository::Repository, - util::{read_exactish, Sha256Digest}, -}; +use crate::{fsverity::FsVerityHashValue, repository::Repository, util::read_exactish}; + +const SPLITSTREAM_MAGIC: [u8; 11] = *b"SplitStream"; +const LG_BLOCKSIZE: u8 = 12; // TODO: hard-coded 4k. make this generic later... -/// A single entry in the digest map, mapping content SHA256 hash to fs-verity object ID. +// Nearly everything in the file is located at an offset indicated by a FileRange. +#[derive(Debug, Clone, Copy, FromBytes, Immutable, IntoBytes, KnownLayout)] +struct FileRange { + start: U64, + end: U64, +} + +// The only exception is the header: it is a fixed sized and comes at the start (offset 0). #[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] -#[repr(C)] -pub struct DigestMapEntry { - /// SHA256 hash of the content body - pub body: Sha256Digest, - /// fs-verity object identifier - pub verity: ObjectID, +struct SplitstreamHeader { + pub magic: [u8; 11], // Contains SPLITSTREAM_MAGIC + pub version: u8, // must always be 0 + pub _flags: U16, // is currently always 0 (but ignored) + pub algorithm: u8, // kernel fs-verity algorithm identifier (1 = sha256, 2 = sha512) + pub lg_blocksize: u8, // log2 of the fs-verity block size (12 = 4k, 16 = 64k) + pub info: FileRange, // can be used to expand/move the info section in the future } -/// A map of content digests to object IDs, maintained in sorted order for binary search. -#[derive(Debug)] -pub struct DigestMap { - /// Vector of digest map entries, kept sorted by body hash - pub map: Vec>, +// The info block can be located anywhere, indicated by the "info" FileRange in the header. +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +struct SplitstreamInfo { + pub stream_refs: FileRange, // location of the stream references array + pub object_refs: FileRange, // location of the object references array + pub stream: FileRange, // location of the zstd-compressed stream within the file + pub named_refs: FileRange, // location of the compressed named references + pub content_type: U64, // user can put whatever magic identifier they want there + pub stream_size: U64, // total uncompressed size of inline chunks and external chunks } -impl Default for DigestMap { - fn default() -> Self { - Self::new() +impl FileRange { + fn len(&self) -> u64 { + self.end.get().saturating_sub(self.start.get()) } } -impl DigestMap { - /// Creates a new empty digest map. - pub fn new() -> Self { - DigestMap { map: vec![] } +impl From> for FileRange { + fn from(value: Range) -> Self { + Self { + start: U64::from(value.start), + end: U64::from(value.end), + } } +} - /// Looks up an object ID by its content SHA256 hash. - /// - /// Returns the object ID if found, or None if not present in the map. - pub fn lookup(&self, body: &Sha256Digest) -> Option<&ObjectID> { - match self.map.binary_search_by_key(body, |e| e.body) { - Ok(idx) => Some(&self.map[idx].verity), - Err(..) => None, - } +fn read_range(file: &mut File, range: FileRange) -> Result> { + let size: usize = (range.len().try_into()) + .context("Unable to allocate buffer for implausibly large splitstream section")?; + let mut buffer = Vec::with_capacity(size); + if size > 0 { + pread(file, spare_capacity(&mut buffer), range.start.get()) + .context("Unable to read section from splitstream file")?; } + ensure!( + buffer.len() == size, + "Incomplete read from splitstream file" + ); + Ok(buffer) +} - /// Inserts a new digest mapping, maintaining sorted order. - /// - /// If the body hash already exists, asserts that the verity ID matches. - pub fn insert(&mut self, body: &Sha256Digest, verity: &ObjectID) { - match self.map.binary_search_by_key(body, |e| e.body) { - Ok(idx) => assert_eq!(self.map[idx].verity, *verity), // or else, bad things... - Err(idx) => self.map.insert( - idx, - DigestMapEntry { - body: *body, - verity: verity.clone(), - }, - ), +/// An array of objects with the following properties: +/// - each item appears only once +/// - efficient insertion and lookup of indexes of existing items +/// - insertion order is maintained, indexes are stable across modification +/// - can do .as_bytes() for items that are IntoBytes + Immutable +struct UniqueVec { + items: Vec, + index: HashMap, +} + +impl UniqueVec { + fn as_bytes(&self) -> &[u8] { + self.items.as_bytes() + } +} + +impl UniqueVec { + fn new() -> Self { + Self { + items: Vec::new(), + index: HashMap::new(), } } + + fn get(&self, item: &T) -> Option { + self.index.get(item).copied() + } + + fn ensure(&mut self, item: &T) -> usize { + self.get(item).unwrap_or_else(|| { + let idx = self.items.len(); + self.index.insert(item.clone(), idx); + self.items.push(item.clone()); + idx + }) + } } /// Writer for creating split stream format files with inline content and external object references. -pub struct SplitStreamWriter { - repo: Arc>, - inline_content: Vec, +pub struct SplitStreamWriter { + repo: Arc>, + stream_refs: UniqueVec, + object_refs: UniqueVec, + named_refs: BTreeMap, usize>, // index into stream_refs + inline_buffer: Vec, + total_size: u64, writer: Encoder<'static, Vec>, - /// Optional SHA256 hasher and expected digest for validation - pub sha256: Option<(Sha256, Sha256Digest)>, + content_type: u64, } impl std::fmt::Debug for SplitStreamWriter { @@ -95,105 +150,135 @@ impl std::fmt::Debug for SplitStreamWriter SplitStreamWriter { - /// Creates a new split stream writer. - /// - /// The writer is initialized with optional digest map references and an optional - /// expected SHA256 hash for validation when the stream is finalized. - pub fn new( - repo: &Arc>, - refs: Option>, - sha256: Option, - ) -> Self { + /// Create a new split stream writer. + pub fn new(repo: &Arc>, content_type: u64) -> Self { // SAFETY: we surely can't get an error writing the header to a Vec - let mut writer = Encoder::new(vec![], 0).unwrap(); - - match refs { - Some(DigestMap { map }) => { - writer.write_all(&(map.len() as u64).to_le_bytes()).unwrap(); - writer.write_all(map.as_bytes()).unwrap(); - } - None => { - writer.write_all(&0u64.to_le_bytes()).unwrap(); - } - } + let writer = Encoder::new(vec![], 0).unwrap(); Self { repo: Arc::clone(repo), - inline_content: vec![], + content_type, + inline_buffer: vec![], + stream_refs: UniqueVec::new(), + object_refs: UniqueVec::new(), + named_refs: Default::default(), + total_size: 0, writer, - sha256: sha256.map(|x| (Sha256::new(), x)), } } - fn write_fragment(writer: &mut impl Write, size: usize, data: &[u8]) -> Result<()> { - writer.write_all(&(size as u64).to_le_bytes())?; - Ok(writer.write_all(data)?) + /// Add an externally-referenced object. + /// + /// This establishes a link to an object (ie: raw data file) from this stream. The link is + /// given a unique index number, which is returned. Once assigned, this index won't change. + /// The same index can be used to find the linked object when reading the file back. + /// + /// This is the primary mechanism by which splitstreams reference split external content. + /// + /// You usually won't need to call this yourself: if you want to add split external content to + /// the stream, call `.write_external()` or `._write_external_async()`. + pub fn add_object_ref(&mut self, verity: &ObjectID) -> usize { + self.object_refs.ensure(verity) + } + + /// Find the index of a previously referenced object. + /// + /// Finds the previously-assigned index for a linked object, or None if the object wasn't + /// previously linked. + pub fn lookup_object_ref(&self, verity: &ObjectID) -> Option { + self.object_refs.get(verity) + } + + /// Add an externally-referenced stream. + /// + /// This establishes a link to another stream from this stream. The link is given a unique + /// index number, which is returned. Once assigned, this index won't change. The same index + /// can be used to find the linked stream when reading the file back. + /// + /// This link is considered when performing garbage collection: the linked stream will be kept + /// alive by this stream. + pub fn add_stream_ref(&mut self, verity: &ObjectID) -> usize { + self.stream_refs.ensure(verity) + } + + /// Add an externally-referenced stream with the given name. + /// + /// The name has no meaning beyond the scope of this file: it is meant to be used to link to + /// associated data when reading the file back again. For example, for OCI config files, this + /// might refer to a layer splitstream via its DiffId. + /// + /// This establishes a link between the two splitstreams and is considered when performing + /// garbage collection: the named stream will be kept alive by this stream. + pub fn add_named_stream_ref(&mut self, name: &str, verity: &ObjectID) { + let idx = self.add_stream_ref(verity); + self.named_refs.insert(Box::from(name), idx); } - /// flush any buffered inline data, taking new_value as the new value of the buffer - fn flush_inline(&mut self, new_value: Vec) -> Result<()> { - if !self.inline_content.is_empty() { - Self::write_fragment( - &mut self.writer, - self.inline_content.len(), - &self.inline_content, - )?; - self.inline_content = new_value; + // flush any buffered inline data + fn flush_inline(&mut self) -> Result<()> { + let size = self.inline_buffer.len(); + if size > 0 { + // Inline chunk: stored as negative LE i64 number of bytes (non-zero!) + // SAFETY: naive - fails on -i64::MIN but we know size was unsigned + let instruction = -i64::try_from(size).expect("implausibly large inline chunk"); + self.writer.write_all(I64::new(instruction).as_bytes())?; + self.writer.write_all(&self.inline_buffer)?; + self.inline_buffer.clear(); } Ok(()) } - /// really, "add inline content to the buffer" - /// you need to call .flush_inline() later + /// Write inline data to the stream. pub fn write_inline(&mut self, data: &[u8]) { - if let Some((ref mut sha256, ..)) = self.sha256 { - sha256.update(data); - } - self.inline_content.extend(data); + // SAFETY: We'd have to write a lot of data to get here... + self.total_size += data.len() as u64; + self.inline_buffer.extend(data); } - /// write a reference to external data to the stream. If the external data had padding in the - /// stream which is not stored in the object then pass it here as well and it will be stored - /// inline after the reference. - fn write_reference(&mut self, reference: &ObjectID, padding: Vec) -> Result<()> { - // Flush the inline data before we store the external reference. Any padding from the - // external data becomes the start of a new inline block. - self.flush_inline(padding)?; + // common part of .write_external() and .write_external_async() + fn write_reference(&mut self, id: ObjectID) -> Result<()> { + // Flush any buffered inline data before we store the external reference. + self.flush_inline()?; - Self::write_fragment(&mut self.writer, 0, reference.as_bytes()) + // External chunk: non-negative LE i64 index into object_refs array + let index = self.add_object_ref(&id); + let instruction = i64::try_from(index).expect("implausibly large external index"); + self.writer.write_all(I64::from(instruction).as_bytes())?; + Ok(()) } - /// Writes data as an external object reference with optional padding. + /// Write externally-split data to the stream. /// /// The data is stored in the repository and a reference is written to the stream. - /// Any padding bytes are stored inline after the reference. - pub fn write_external(&mut self, data: &[u8], padding: Vec) -> Result<()> { - if let Some((ref mut sha256, ..)) = self.sha256 { - sha256.update(data); - sha256.update(&padding); - } + pub fn write_external(&mut self, data: &[u8]) -> Result<()> { + self.total_size += data.len() as u64; let id = self.repo.ensure_object(data)?; - self.write_reference(&id, padding) + self.write_reference(id) } - /// Asynchronously writes data as an external object reference with optional padding. + /// Asynchronously write externally-split data to the stream. /// /// The data is stored in the repository asynchronously and a reference is written to the stream. - /// Any padding bytes are stored inline after the reference. - pub async fn write_external_async(&mut self, data: Vec, padding: Vec) -> Result<()> { - if let Some((ref mut sha256, ..)) = self.sha256 { - sha256.update(&data); - sha256.update(&padding); - } + pub async fn write_external_async(&mut self, data: Vec) -> Result<()> { + self.total_size += data.len() as u64; let id = self.repo.ensure_object_async(data).await?; - self.write_reference(&id, padding) + self.write_reference(id) + } + + fn write_named_refs(named_refs: BTreeMap, usize>) -> Result> { + let mut encoder = Encoder::new(vec![], 0)?; + + for (name, idx) in &named_refs { + write!(&mut encoder, "{idx}:{name}\0")?; + } + + Ok(encoder.finish()?) } /// Finalizes the split stream and returns its object ID. @@ -201,15 +286,85 @@ impl SplitStreamWriter { /// Flushes any remaining inline content, validates the SHA256 hash if provided, /// and stores the compressed stream in the repository. pub fn done(mut self) -> Result { - self.flush_inline(vec![])?; - - if let Some((context, expected)) = self.sha256 { - if Into::::into(context.finalize()) != expected { - bail!("Content doesn't have expected SHA256 hash value!"); + self.flush_inline()?; + let stream = self.writer.finish()?; + + // Pre-compute the file layout + let header_start = 0u64; + let header_end = header_start + size_of::() as u64; + + let info_start = header_end; + let info_end = info_start + size_of::() as u64; + assert_eq!(info_start % 8, 0); + + let stream_refs_size = self.stream_refs.as_bytes().len(); + let stream_refs_start = info_end; + let stream_refs_end = stream_refs_start + stream_refs_size as u64; + assert_eq!(stream_refs_start % 8, 0); + + let object_refs_size = self.object_refs.as_bytes().len(); + let object_refs_start = stream_refs_end; + let object_refs_end = object_refs_start + object_refs_size as u64; + assert_eq!(object_refs_start % 8, 0); + + let named_refs = + Self::write_named_refs(self.named_refs).context("Formatting named references")?; + let named_refs_start = object_refs_end; + let named_refs_end = named_refs_start + named_refs.len() as u64; + assert_eq!(named_refs_start % 8, 0); + + let stream_start = named_refs_end; + let stream_end = stream_start + stream.len() as u64; + + // Write the file out into a Vec, checking the layout on the way + let mut buf = vec![]; + + assert_eq!(buf.len() as u64, header_start); + buf.extend_from_slice( + SplitstreamHeader { + magic: SPLITSTREAM_MAGIC, + version: 0, + _flags: U16::ZERO, + algorithm: ObjectID::ALGORITHM, + lg_blocksize: LG_BLOCKSIZE, + info: (info_start..info_end).into(), } - } + .as_bytes(), + ); + assert_eq!(buf.len() as u64, header_end); + + assert_eq!(buf.len() as u64, info_start); + buf.extend_from_slice( + SplitstreamInfo { + stream_refs: (stream_refs_start..stream_refs_end).into(), + object_refs: (object_refs_start..object_refs_end).into(), + stream: (stream_start..stream_end).into(), + named_refs: (named_refs_start..named_refs_end).into(), + content_type: self.content_type.into(), + stream_size: self.total_size.into(), + } + .as_bytes(), + ); + assert_eq!(buf.len() as u64, info_end); + + assert_eq!(buf.len() as u64, stream_refs_start); + buf.extend_from_slice(self.stream_refs.as_bytes()); + assert_eq!(buf.len() as u64, stream_refs_end); + + assert_eq!(buf.len() as u64, object_refs_start); + buf.extend_from_slice(self.object_refs.as_bytes()); + assert_eq!(buf.len() as u64, object_refs_end); - self.repo.ensure_object(&self.writer.finish()?) + assert_eq!(buf.len() as u64, named_refs_start); + buf.extend_from_slice(&named_refs); + assert_eq!(buf.len() as u64, named_refs_end); + + assert_eq!(buf.len() as u64, stream_start); + buf.extend_from_slice(&stream); + assert_eq!(buf.len() as u64, stream_end); + + // Store the Vec into the repository + self.repo.ensure_object(&buf) } } @@ -223,32 +378,27 @@ pub enum SplitStreamData { } /// Reader for parsing split stream format files with inline content and external object references. -pub struct SplitStreamReader { - decoder: Decoder<'static, BufReader>, - /// Digest map containing content hash to object ID mappings - pub refs: DigestMap, +pub struct SplitStreamReader { + decoder: Decoder<'static, BufReader>>, inline_bytes: usize, + /// The content_type ID given when the splitstream was constructed + pub content_type: u64, + /// The total size of the original/merged stream, in bytes + pub total_size: u64, + object_refs: Vec, + named_refs: HashMap, ObjectID>, } -impl std::fmt::Debug for SplitStreamReader { +impl std::fmt::Debug for SplitStreamReader { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // decoder doesn't impl Debug f.debug_struct("SplitStreamReader") - .field("refs", &self.refs) + .field("refs", &self.object_refs) .field("inline_bytes", &self.inline_bytes) .finish() } } -fn read_u64_le(reader: &mut R) -> Result> { - let mut buf = [0u8; 8]; - if read_exactish(reader, &mut buf)? { - Ok(Some(u64::from_le_bytes(buf) as usize)) - } else { - Ok(None) - } -} - /// Using the provided [`vec`] as a buffer, read exactly [`size`] /// bytes of content from [`reader`] into it. Any existing content /// in [`vec`] will be discarded; however its capacity will be reused, @@ -265,33 +415,123 @@ enum ChunkType { External(ObjectID), } -impl SplitStreamReader { +impl SplitStreamReader { /// Creates a new split stream reader from the provided reader. /// /// Reads the digest map header from the stream during initialization. - pub fn new(reader: R) -> Result { - let mut decoder = Decoder::new(reader)?; - - let n_map_entries = { - let mut buf = [0u8; 8]; - decoder.read_exact(&mut buf)?; - u64::from_le_bytes(buf) - } as usize; - - let mut refs = DigestMap:: { - map: Vec::with_capacity(n_map_entries), - }; - for _ in 0..n_map_entries { - refs.map.push(DigestMapEntry::read_from_io(&mut decoder)?); + pub fn new(mut file: File, expected_content_type: Option) -> Result { + let header = SplitstreamHeader::read_from_io(&mut file) + .map_err(|e| Error::msg(format!("Error reading splitstream header: {e:?}")))?; + + if header.magic != SPLITSTREAM_MAGIC { + bail!("Invalid splitstream header magic value"); + } + + if header.version != 0 { + bail!("Invalid splitstream version {}", header.version); } + if header.algorithm != ObjectID::ALGORITHM { + bail!("Invalid splitstream fs-verity algorithm type"); + } + + if header.lg_blocksize != LG_BLOCKSIZE { + bail!("Invalid splitstream fs-verity block size"); + } + + let info_bytes = read_range(&mut file, header.info)?; + // NB: We imagine that `info` might grow in the future, so for forward-compatibility we + // allow that it is larger than we expect it to be. If we ever expand the info section + // then we will also need to come up with a mechanism for a smaller info section for + // backwards-compatibility. + let (info, _) = SplitstreamInfo::ref_from_prefix(&info_bytes) + .map_err(|e| Error::msg(format!("Error reading splitstream metadata: {e:?}")))?; + + let content_type: u64 = info.content_type.into(); + if let Some(expected) = expected_content_type { + ensure!(content_type == expected, "Invalid splitstream content type"); + } + + let total_size: u64 = info.stream_size.into(); + + let stream_refs_bytes = read_range(&mut file, info.stream_refs)?; + let stream_refs = <[ObjectID]>::ref_from_bytes(&stream_refs_bytes) + .map_err(|e| Error::msg(format!("Error reading splitstream references: {e:?}")))?; + + let object_refs_bytes = read_range(&mut file, info.object_refs)?; + let object_refs = <[ObjectID]>::ref_from_bytes(&object_refs_bytes) + .map_err(|e| Error::msg(format!("Error reading object references: {e:?}")))?; + + let named_refs_bytes = read_range(&mut file, info.named_refs)?; + let named_refs = Self::read_named_references(&named_refs_bytes, stream_refs) + .map_err(|e| Error::msg(format!("Error reading splitstream mappings: {e:?}")))?; + + file.seek(SeekFrom::Start(info.stream.start.get())) + .context("Unable to seek to start of splitstream content")?; + let decoder = Decoder::new(file.take(info.stream.len())) + .context("Unable to decode zstd-compressed content in splitstream")?; + Ok(Self { decoder, - refs, inline_bytes: 0, + content_type, + total_size, + object_refs: object_refs.to_vec(), + named_refs, }) } + fn read_named_references( + section: &[u8], + references: &[ObjectId], + ) -> Result, ObjectId>> { + let mut map = HashMap::new(); + let mut buffer = vec![]; + + let mut reader = BufReader::new( + Decoder::new(section).context("Creating zstd decoder for named references section")?, + ); + + loop { + reader + .read_until(b'\0', &mut buffer) + .context("Reading named references section")?; + + let Some(item) = buffer.strip_suffix(b"\0") else { + ensure!( + buffer.is_empty(), + "Trailing junk in named references section" + ); + return Ok(map); + }; + + let (idx_str, name) = std::str::from_utf8(item) + .context("Reading named references section")? + .split_once(":") + .context("Named reference doesn't contain a colon")?; + + let idx: usize = idx_str + .parse() + .context("Named reference contains a non-integer index")?; + let object_id = references + .get(idx) + .context("Named reference out of bounds")?; + + map.insert(Box::from(name), object_id.clone()); + buffer.clear(); + } + } + + /// Iterate the list of named references defined on this split stream. + pub fn iter_named_refs(&self) -> impl Iterator { + self.named_refs.iter().map(|(name, id)| (name.as_ref(), id)) + } + + /// Steal the "named refs" table from this splitstream, destructing it in the process. + pub fn into_named_refs(self) -> HashMap, ObjectID> { + self.named_refs + } + fn ensure_chunk( &mut self, eof_ok: bool, @@ -299,22 +539,27 @@ impl SplitStreamReader { expected_bytes: usize, ) -> Result> { if self.inline_bytes == 0 { - match read_u64_le(&mut self.decoder)? { - None => { - if !eof_ok { - bail!("Unexpected EOF when parsing splitstream"); - } - return Ok(ChunkType::Eof); - } - Some(0) => { - if !ext_ok { - bail!("Unexpected external reference when parsing splitstream"); - } - let id = ObjectID::read_from_io(&mut self.decoder)?; - return Ok(ChunkType::External(id)); + let mut value = I64::ZERO; + + if !read_exactish(&mut self.decoder, value.as_mut_bytes())? { + ensure!(eof_ok, "Unexpected EOF in splitstream"); + return Ok(ChunkType::Eof); + } + + // Negative values: (non-empty) inline data + // Non-negative values: index into object_refs array + match value.get() { + n if n < 0i64 => { + self.inline_bytes = (n.unsigned_abs().try_into()) + .context("Splitstream inline section is too large")?; } - Some(size) => { - self.inline_bytes = size; + n => { + ensure!(ext_ok, "Unexpected external reference in splitstream"); + let idx = usize::try_from(n) + .context("Splitstream external reference is too large")?; + let id: &ObjectID = (self.object_refs.get(idx)) + .context("Splitstream external reference is out of range")?; + return Ok(ChunkType::External(id.clone())); } } } @@ -330,8 +575,9 @@ impl SplitStreamReader { /// Assumes that the data cannot be split across chunks pub fn read_inline_exact(&mut self, buffer: &mut [u8]) -> Result { if let ChunkType::Inline = self.ensure_chunk(true, false, buffer.len())? { - self.decoder.read_exact(buffer)?; + // SAFETY: ensure_chunk() already verified the number of bytes for us self.inline_bytes -= buffer.len(); + self.decoder.read_exact(buffer)?; Ok(true) } else { Ok(false) @@ -376,11 +622,7 @@ impl SplitStreamReader { /// /// Inline content is written directly, while external references are resolved /// using the provided load_data callback function. - pub fn cat( - &mut self, - output: &mut impl Write, - mut load_data: impl FnMut(&ObjectID) -> Result>, - ) -> Result<()> { + pub fn cat(&mut self, repo: &Repository, output: &mut impl Write) -> Result<()> { let mut buffer = vec![]; loop { @@ -392,7 +634,16 @@ impl SplitStreamReader { output.write_all(&buffer)?; } ChunkType::External(ref id) => { - output.write_all(&load_data(id)?)?; + let mut buffer = [MaybeUninit::::uninit(); 1024 * 1024]; + let fd = repo.open_object(id)?; + + loop { + let (result, _) = read(&fd, &mut buffer)?; + if result.is_empty() { + break; + } + output.write_all(result)?; + } } } } @@ -402,45 +653,21 @@ impl SplitStreamReader { /// /// This includes both references from the digest map and external references in the stream. pub fn get_object_refs(&mut self, mut callback: impl FnMut(&ObjectID)) -> Result<()> { - let mut buffer = vec![]; - - for entry in &self.refs.map { - callback(&entry.verity); - } - - loop { - match self.ensure_chunk(true, true, 0)? { - ChunkType::Eof => break Ok(()), - ChunkType::Inline => { - read_into_vec(&mut self.decoder, &mut buffer, self.inline_bytes)?; - self.inline_bytes = 0; - } - ChunkType::External(ref id) => { - callback(id); - } - } - } - } - - /// Calls the callback for each content hash in the digest map. - pub fn get_stream_refs(&mut self, mut callback: impl FnMut(&Sha256Digest)) { - for entry in &self.refs.map { - callback(&entry.body); + for entry in &self.object_refs { + callback(entry); } + Ok(()) } - /// Looks up an object ID by content hash in the digest map. + /// Looks up a named reference /// - /// Returns an error if the reference is not found. - pub fn lookup(&self, body: &Sha256Digest) -> Result<&ObjectID> { - match self.refs.lookup(body) { - Some(id) => Ok(id), - None => bail!("Reference is not found in splitstream"), - } + /// Returns None if no such reference exists + pub fn lookup_named_ref(&self, name: &str) -> Option<&ObjectID> { + self.named_refs.get(name) } } -impl Read for SplitStreamReader { +impl Read for SplitStreamReader { fn read(&mut self, data: &mut [u8]) -> std::io::Result { match self.ensure_chunk(true, false, 1) { Ok(ChunkType::Eof) => Ok(0), diff --git a/crates/composefs/src/test.rs b/crates/composefs/src/test.rs index 20a83766..af04a924 100644 --- a/crates/composefs/src/test.rs +++ b/crates/composefs/src/test.rs @@ -1,12 +1,8 @@ //! Tests -use std::{ - ffi::OsString, - fs::{create_dir_all, File}, - path::PathBuf, -}; +use std::{ffi::OsString, fs::create_dir_all, path::PathBuf}; -use tempfile::{tempfile_in, TempDir}; +use tempfile::TempDir; use once_cell::sync::Lazy; @@ -31,7 +27,7 @@ pub fn tempdir() -> TempDir { TempDir::with_prefix_in("composefs-test-", TMPDIR.as_os_str()).unwrap() } -/// Allocate a temporary file -pub(crate) fn tempfile() -> File { - tempfile_in(TMPDIR.as_os_str()).unwrap() +#[cfg(test)] +pub(crate) fn tempfile() -> std::fs::File { + tempfile::tempfile_in(TMPDIR.as_os_str()).unwrap() } diff --git a/doc/splitstream.md b/doc/splitstream.md index 62df66e5..f81623c7 100644 --- a/doc/splitstream.md +++ b/doc/splitstream.md @@ -1,86 +1,158 @@ # Split Stream Split Stream is a trivial way of storing file formats (like tar) with the "data -blocks" stored in the composefs object tree with the goal that it's possible to +blocks" stored in the composefs object store with the goal that it's possible to bit-for-bit recreate the entire file. It's something like the idea behind [tar-split](https://github.com/vbatts/tar-split), with some important differences: + - it's a binary format + + - it's based on storing external objects content-addressed in the composefs + object store via their fs-verity digest + - although it's designed with `tar` files in mind, it's not specific to `tar`, or even to the idea of an archive file: any file format can be stored as a splitstream, and it might make sense to do so for any file format that contains large chunks of embedded data - - it's based on storing external objects in the composefs object store + - in addition to the ability to split out chunks of file content (like files + in a `.tar`) to separate files, it is also possible to refer to external + file content, or even other splitstreams, without directly embedding that + content in the referent, which can be useful for cross-document references + (such as between OCI manifests, configs, and layers) - - it's based on a trivial binary format + - the splitstream file itself is stored in the same content-addressed object + store by its own fs-verity hash -It is expected that the splitstream will be compressed before being stored on -disk. In composefs, this is done using zstd. The main reason for this is -that, after removing the actual file data, the remaining `tar` metadata -contains a very large amount of padding and empty space and compresses -extremely well. +Splitstream compresses inline file content before it is stored to disk using +zstd. The main reason for this is that, after removing the actual file data, +the remaining `tar` metadata contains a very large amount of padding and empty +space and compresses extremely well. ## File format -The file format consists of a header, plus a number of data blocks. +What follows is a non-normative documentation of the file format. The actual +definition of the format is "what composefs-rs reads and writes", but this +document may be useful to try to understand that format. If you'd like to +implement the format, please get in touch. -### Mappings +The format is implemented in +[crates/composefs/src/splitstream.rs](crates/composefs/src/splitstream.rs) and +the structs from that file are copy-pasted here. Please try to keep things +roughly in sync when making changes to either side. -The file starts with a single u64 le integer which is the number of mapping -structures present in the file. A mapping is a relationship between a file -identified by its sha256 content hash and the fsverity hash of that same file. -These entries are encoded simply as the sha256 hash value (32 bytes) plus the -fsverity hash value (32 bytes) combined together into a single 64 byte record. +### File ranges ("sections") -For example, if we had a file that mapped `1234..` to `abcd..` and `5678..` to -`efab..`, the header would look like: +The file format consists of a fixed-sized header at the start of the file plus +a number of sections located at arbitrary locations inside of the file. All of +these sections are referred to by a 64-bit `[start..end)` range expressed in +terms of overall byte offsets within the complete file. +```rust +struct FileRange { + start: U64, + end: U64, +} ``` - 64bit 32 bytes 32 bytes + 32 bytes + 32 bytes - +--------+----------+----------+----------+---------+ - | 2 | 1234 | abcd | 5678 | efab | - +--------+----------+----------+----------+---------+ + +All integers are little-endian. + +### Header + +The file starts with a simple fixed-size header. + +```rust +const SPLITSTREAM_MAGIC: [u8; 11] = *b"SplitStream"; + +struct SplitstreamHeader { + pub magic: [u8; 11], // Contains SPLITSTREAM_MAGIC + pub version: u8, // must always be 0 + pub _flags: U16, // is currently always 0 (but ignored) + pub algorithm: u8, // kernel fs-verity algorithm identifier (1 = sha256, 2 = sha512) + pub lg_blocksize: u8, // log2 of the fs-verity block size (12 = 4k, 16 = 64k) + pub info: FileRange, // can be used to expand/move the info section in the future +} ``` -The mappings in the header are always sorted by their sha256 content hash -values. +In addition to magic values and identifiers for the fs-verity algorithm in use, +the header is used to find the location and size of the info section. Future +expansions to the file format are imagined to occur by expanding the size of +the info section: if the section is larger than expected, the additional bytes +will be ignored by the implementation. + +### Info section + +```rust +struct SplitstreamInfo { + pub stream_refs: FileRange, // location of the stream references array + pub object_refs: FileRange, // location of the object references array + pub stream: FileRange, // location of the zstd-compressed stream within the file + pub named_refs: FileRange, // location of the compressed named references + pub content_type: U64, // user can put whatever magic identifier they want there + pub stream_size: U64, // total uncompressed size of inline chunks and external chunks +} +``` -The mappings serve two purposes: +The `content_type` is just an arbitrary identifier that can be used by users of +the file format to prevent casual user error when opening a file by its hash +value (to prevent showing `.tar` data as if it were json, for example). - - in the case of splitstreams which refer to other splitstreams without - directly embedding the content of the other stream, this provides a - mechanism to find out which other streams are referenced. This is used for - garbage collection. +The `stream_size` is the total size of the original file. - - for the same usecase, it provides a mechanism to be able to verify the - content of the referred splitstream (by checking its fsverity digest) before - starting to iterate it +### Stream and object refs sections -### Data blocks +All referred streams and objects in the file are stored as two separate flat +uncompressed arrays of binary fs-verity hash values. Each of these arrays is +referred to from the info section (via `stream_refs` and `object_refs`). -After the header comes a number of data blocks. Each block starts with a u64 -le "size" field followed by some amount of data. +The number of items in the array is determined by the size of the section +divided by the size of the fs-verity hash value (determined by the algorithm +identifier in the header). -``` - 64bit variable-sized - +--------+---------------.... - | size | data... - +--------+---------------.... -``` +The values are not in any particular order, but implementations should produce +a deterministic output. For example, the objects reference array produced by +the current implementation has the external objects sorted by first-appearance +within the stream. + +The main motivation for storing the references uncompressed, in binary, and in +a flat array is to make determining the references contained within a +splitstream as simple as possible to improve the efficiency of garbage +collection on large repositories. + +### The stream + +The main content of the splitstream is stored in the `stream` section +referenced from the info section. The entire section is zstd compressed. + +Within the compressed stream, the splitstream is formed from a number of +"chunks". Each chunk starts with a single 64-bit little endian value. If that +number is negative, it refers to an "inline" chunk, and that (absolute) number +of bytes of data immediately follow it. If the number is non-negative then it +is an index into the object refs array for an "external" chunk. + +Zero is a non-negative value, so it's an object reference. It's not possible +to have a zero-byte inline chunk. This also means that the high/sign bit +determines which case (inline vs. external) we have and there are an equal +number of both cases. + +The stream is reassembled by iterating over the chunks and concatenating the +result. For inline chunks, the inline data is taken directly from the +splitstream. For external chunks, the content of the external file is used. + +The stream is over when there are no more chunks. -There are two kinds of blocks: +### Named references - - "Inline" blocks (`size != 0`): in this case the length of the data is equal - to the size. This is "inline data" and is usually used for the metadata - and padding present in the source file. The Split Stream format itself - doesn't have any padding, which implies that the size fields after the - first may be unaligned. This decision was taken to keep the format simple, - and because the data is compressed before being stored, which removes the - main advantages of aligned data. +It's possible to have named references to other streams. These are stored in +the `named_refs` section referred to from the info section. - - "External" blocks (`size == 0`): in this case the length of the data is 32 - bytes. This is the binary form of a sha256 hash value and is a reference - to an object in the composefs repository (by its fs-verity digest). +This section is also zstd-compressed, and is a number of nul-terminated text +records (including a terminator after the last record). Each record has the +form `n:name` where `n` is a non-negative integer index into the stream refs +array and `name` is an arbitrary name. The entries are currently sorted by +name (by the writer implementation) but the order is not important to the +reader. Whether or not this list is "officially" sorted or not may be pinned +down at some future point if a need should arise. -That's it, really. There's no header. The stream is over when there are no -more blocks. +An example of the decompressed content of the section might be something like +`"0:first,\01:second\0"`. diff --git a/examples/bls/build b/examples/bls/build index 0e1eb2da..1582daf1 100755 --- a/examples/bls/build +++ b/examples/bls/build @@ -52,9 +52,9 @@ podman build \ -f "${containerfile}" \ . -BASE_ID="$(sed s/sha256:// tmp/base.iid)" +BASE_ID="$(cat tmp/base.iid)" -${CFSCTL} oci pull containers-storage:${BASE_ID} +${CFSCTL} oci pull "containers-storage:${BASE_ID}" if [ "${FS_VERITY_MODE:-repart}" = "none" ]; then CFSCTL="$CFSCTL --insecure" diff --git a/examples/common/install-patched-tools b/examples/common/install-patched-tools index 7ee40bc5..0e1516d6 100755 --- a/examples/common/install-patched-tools +++ b/examples/common/install-patched-tools @@ -4,9 +4,10 @@ set -eux install_path="$1" -git clone -b repart-verity https://github.com/allisonkarlitskaya/systemd +git clone https://github.com/systemd/systemd ( cd systemd + git checkout v258 meson setup _build ninja -C _build systemd-repart mkdir -p "${install_path}/src/shared" @@ -17,7 +18,7 @@ git clone -b repart-verity https://github.com/allisonkarlitskaya/systemd git clone https://github.com/tytso/e2fsprogs ( cd e2fsprogs - git checkout dd0c4efa173203484f0cd612f97eb19181240a33 + git checkout v1.47.3 ./configure --disable-fuse2fs make -j$(nproc) cp misc/mke2fs "${install_path}/mkfs.ext4" diff --git a/examples/uki/build b/examples/uki/build index 97fdfec6..5482b8d1 100755 --- a/examples/uki/build +++ b/examples/uki/build @@ -39,7 +39,7 @@ ${PODMAN_BUILD} \ -f "${containerfile}" \ . -BASE_ID="$(sed s/sha256:// tmp/base.iid)" +BASE_ID="$(cat tmp/base.iid)" ${CFSCTL} oci pull containers-storage:"${BASE_ID}" BASE_IMAGE_FSVERITY="$(${CFSCTL} oci compute-id --bootable "${BASE_ID}")" @@ -51,7 +51,7 @@ ${PODMAN_BUILD} \ -f "${containerfile}" \ . -FINAL_ID="$(sed s/sha256:// tmp/final.iid)" +FINAL_ID="$(cat tmp/final.iid)" ${CFSCTL} oci pull containers-storage:"${FINAL_ID}" ${CFSCTL} oci prepare-boot "${FINAL_ID}" --bootdir tmp/efi diff --git a/examples/unified-secureboot/build b/examples/unified-secureboot/build index 6e345276..70f86dcb 100755 --- a/examples/unified-secureboot/build +++ b/examples/unified-secureboot/build @@ -47,7 +47,7 @@ podman build \ --secret=id=cert,src=secureboot/db.crt \ . -IMAGE_ID="$(sed s/sha256:// tmp/iid)" +IMAGE_ID="$(cat tmp/iid)" ${CFSCTL} oci pull containers-storage:"${IMAGE_ID}" ${CFSCTL} oci prepare-boot "${IMAGE_ID}" --bootdir tmp/efi diff --git a/examples/unified/build b/examples/unified/build index c66b2248..884f3945 100755 --- a/examples/unified/build +++ b/examples/unified/build @@ -29,7 +29,7 @@ podman build \ -v "${PWD}/tmp/internal-sysroot:/tmp/sysroot:z,U" \ . -IMAGE_ID="$(sed s/sha256:// tmp/iid)" +IMAGE_ID="$(cat tmp/iid)" ${CFSCTL} oci pull containers-storage:"${IMAGE_ID}" ${CFSCTL} oci prepare-boot "${IMAGE_ID}" --bootdir tmp/efi