From 4e00fd41265562bd974f0d1f2dbb85b3a1f943a4 Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Sat, 30 Sep 2023 15:10:04 +1000 Subject: [PATCH] Add read only zip store --- CHANGELOG.md | 3 + Cargo.toml | 7 +- TODO.md | 2 +- src/lib.rs | 2 +- src/storage/store.rs | 11 +- src/storage/store/zip.rs | 372 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 391 insertions(+), 6 deletions(-) create mode 100644 src/storage/store/zip.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index e46c22a6..8a682487 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + - Read only `ZipStore` behind `zip` feature + ### Changed - Relax some dependency minimum versions - Add `size_hint()` to some array subset iterators diff --git a/Cargo.toml b/Cargo.toml index 794477ef..3302472f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "zarrs" description = "A library for the Zarr V3 storage format for multidimensional arrays and metadata" -version = "0.3.1" +version = "0.4.0" authors = ["Lachlan Deakin "] edition = "2021" license = "MIT OR Apache-2.0" @@ -11,7 +11,7 @@ categories = ["encoding"] exclude = [".dockerignore", ".github", ".editorconfig", "Dockerfile", "coverage.sh", "TODO.md"] [features] -default = ["transpose", "blosc", "gzip", "sharding", "crc32c", "zstd", "raw_bits", "float16", "bfloat16", "ndarray"] +default = ["transpose", "blosc", "gzip", "sharding", "crc32c", "zstd", "raw_bits", "float16", "bfloat16", "zip", "ndarray"] # Codecs transpose = ["dep:ndarray"] blosc = ["dep:blosc-sys"] @@ -23,6 +23,8 @@ zstd = ["dep:zstd"] raw_bits = [] float16 = ["dep:half"] bfloat16 = ["dep:half"] +# Stores +zip = ["dep:zip"] # Utilities ndarray = ["dep:ndarray"] # Adds ndarray utility functions to Array @@ -48,6 +50,7 @@ serde = { version = "1.0.100", features = ["derive"] } serde_json = { version = "1.0.71", features = ["preserve_order"] } thiserror = "1.0.7" walkdir = "2.3.2" +zip = { version = "0.6", optional = true } zstd = { version = "0.12", optional = true } [dev-dependencies] diff --git a/TODO.md b/TODO.md index 2d258e46..c689685a 100644 --- a/TODO.md +++ b/TODO.md @@ -1,4 +1,4 @@ ## TODO - Review documentation - Increase test coverage -- Additional stores (e.g. zip, http) and URI support [see ZEP0008](https://github.com/zarr-developers/zeps/pull/48) +- Additional stores (e.g. http) and URI support [see ZEP0008](https://github.com/zarr-developers/zeps/pull/48) diff --git a/src/lib.rs b/src/lib.rs index de344d1a..32f7b941 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ //! - [x] [ZEP0001 - Zarr specification version 3](https://zarr.dev/zeps/draft/ZEP0001.html) //! - [x] [ZEP0002 - Sharding codec](https://zarr.dev/zeps/draft/ZEP0002.html) ([under review](https://github.com/zarr-developers/zarr-specs/issues/254)) //! - [x] [ZEP0003 - Variable chunking](https://zarr.dev/zeps/draft/ZEP0003.html) ([draft](https://github.com/orgs/zarr-developers/discussions/52)) -//! - [x] Stores: [`filesystem`](crate::storage::store::FilesystemStore), [`memory`](crate::storage::store::MemoryStore) +//! - [x] Stores: [`filesystem`](crate::storage::store::FilesystemStore), [`memory`](crate::storage::store::MemoryStore), [`zip`](crate::storage::store::ZipStore) //! - [x] Data types: [core data types](crate::array::data_type::DataType), [`raw bits`](crate::array::data_type::RawBitsDataType), [`float16`](crate::array::data_type::Float16DataType), [`bfloat16`](crate::array::data_type::Bfloat16DataType) [(spec issue)](https://github.com/zarr-developers/zarr-specs/issues/130) //! - [x] Chunk grids: [`regular`](crate::array::chunk_grid::RegularChunkGrid), [`rectangular`](crate::array::chunk_grid::RectangularChunkGrid) ([draft](https://github.com/orgs/zarr-developers/discussions/52)) //! - [x] Chunk key encoding: [`default`](crate::array::chunk_key_encoding::DefaultChunkKeyEncoding), [`v2`](crate::array::chunk_key_encoding::V2ChunkKeyEncoding) diff --git a/src/storage/store.rs b/src/storage/store.rs index 5c7f73c3..30c90b29 100644 --- a/src/storage/store.rs +++ b/src/storage/store.rs @@ -1,4 +1,4 @@ -//! Zarr stores. Includes [filesystem](FilesystemStore) and [memory](MemoryStore) implementations. +//! Zarr stores. Includes [filesystem](FilesystemStore), [memory](MemoryStore), and [zip](ZipStore) (read only) implementations. //! //! All stores must be Send + Sync with internally managed synchronisation. //! @@ -10,10 +10,17 @@ mod memory; mod prefix; // mod store_plugin; -pub use filesystem::{FilesystemStore, FilesystemStoreCreateError}; pub use key::{StoreKey, StoreKeyError, StoreKeys}; pub use memory::MemoryStore; pub use prefix::{StorePrefix, StorePrefixError, StorePrefixes}; + +pub use filesystem::{FilesystemStore, FilesystemStoreCreateError}; + +#[cfg(feature = "zip")] +mod zip; +#[cfg(feature = "zip")] +pub use zip::{ZipStore, ZipStoreCreateError}; + // pub use store_plugin::{StorePlugin, StorePluginCreateError}; // Currently disabled. use std::sync::Arc; diff --git a/src/storage/store/zip.rs b/src/storage/store/zip.rs new file mode 100644 index 00000000..65e934f5 --- /dev/null +++ b/src/storage/store/zip.rs @@ -0,0 +1,372 @@ +//! A zip store. + +use crate::{ + byte_range::ByteRange, + storage::{ + ListableStorageTraits, ReadableStorageTraits, StorageError, StoreKeyRange, + StoreKeysPrefixes, + }, +}; + +use super::{ + ListableStoreExtension, ReadableStoreExtension, StoreExtension, StoreKey, StoreKeys, + StorePrefix, StorePrefixes, +}; + +use itertools::Itertools; +use thiserror::Error; +use zip::{result::ZipError, ZipArchive}; + +use std::{ + fs::File, + io::Read, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, +}; + +// // Register the store. +// inventory::submit! { +// ReadableStorePlugin::new("zip", |uri| Ok(Arc::new(create_store_zip(uri)?))) +// } +// inventory::submit! { +// WritableStorePlugin::new("zip", |uri| Ok(Arc::new(create_store_zip(uri)?))) +// } +// inventory::submit! { +// ListableStorePlugin::new("zip", |uri| Ok(Arc::new(create_store_zip(uri)?))) +// } +// inventory::submit! { +// ReadableWritableStorePlugin::new("zip", |uri| Ok(Arc::new(create_store_zip(uri)?))) +// } + +// #[allow(clippy::similar_names)] +// fn create_store_zip(uri: &str) -> Result { +// let url = url::Url::parse(uri)?; +// let path = std::path::PathBuf::from(url.path()); +// ZipStore::new(path).map_err(|e| StorePluginCreateError::Other(e.to_string())) +// } + +/// A zip store. +/// +/// See . +pub struct ZipStore { + path: PathBuf, + zip_archive: Arc>>, + size: usize, +} + +impl std::fmt::Debug for ZipStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.path.fmt(f) + } +} + +impl ReadableStoreExtension for ZipStore {} + +impl ListableStoreExtension for ZipStore {} + +impl StoreExtension for ZipStore { + fn uri_scheme(&self) -> Option<&'static str> { + Some("zip") + } +} + +impl ZipStore { + /// Create a new zip store for the zip file at `zip_path`. + /// + /// # Errors + /// + /// Returns a [`ZipStoreCreateError`] if `zip_path` is not valid zip file. + pub fn new>(zip_path: P) -> Result { + let path = zip_path.as_ref().to_path_buf(); + if path.is_dir() { + Err(ZipStoreCreateError::ExistingDir(path)) + } else { + let zip_file = File::open(&path)?; + let size = usize::try_from(zip_file.metadata()?.len()) + .map_err(|_| ZipError::UnsupportedArchive("zip file is too large"))?; + let zip_archive = Arc::new(Mutex::new(ZipArchive::new(zip_file)?)); + Ok(ZipStore { + path, + zip_archive, + size, + }) + } + } + + fn get_impl(&self, key: &StoreKey, byte_range: &ByteRange) -> Result, StorageError> { + let mut zip_archive = self.zip_archive.lock().unwrap(); + let file = zip_archive + .by_name(key.as_str()) + .map_err(|err| StorageError::Other(err.to_string()))?; + let size = usize::try_from(file.size()).map_err(|_| { + StorageError::Other("zip archive internal file larger than usize".to_string()) + })?; + let bytes = file.bytes(); + + let buffer = match byte_range { + ByteRange::FromStart(offset, None) => { + bytes.skip(*offset).collect::, _>>()? + } + ByteRange::FromStart(offset, Some(length)) => bytes + .skip(*offset) + .take(*length) + .collect::, _>>()?, + ByteRange::FromEnd(offset, None) => { + bytes.take(size - offset).collect::, _>>()? + } + ByteRange::FromEnd(offset, Some(length)) => bytes + .skip(size - length - offset) + .take(*length) + .collect::, _>>()?, + }; + + Ok(buffer) + } +} + +impl ReadableStorageTraits for ZipStore { + fn get(&self, key: &StoreKey) -> Result, StorageError> { + self.get_impl(key, &ByteRange::FromStart(0, None)) + } + + fn get_partial_values( + &self, + key_ranges: &[StoreKeyRange], + ) -> Vec, StorageError>> { + let mut out = Vec::with_capacity(key_ranges.len()); + for key_range in key_ranges { + out.push(self.get_impl(&key_range.key, &key_range.byte_range)); + } + out + } + + fn size(&self) -> usize { + self.size + } +} + +impl ListableStorageTraits for ZipStore { + fn list(&self) -> Result { + let zip_archive = self.zip_archive.lock().unwrap(); + Ok(zip_archive + .file_names() + .filter_map(|v| StoreKey::try_from(v).ok()) + .sorted() + .collect()) + } + + fn list_prefix(&self, prefix: &StorePrefix) -> Result { + let mut zip_archive = self.zip_archive.lock().unwrap(); + let file_names: Vec = zip_archive + .file_names() + .map(std::string::ToString::to_string) + .collect(); + Ok(file_names + .into_iter() + .filter_map(|name| { + if name.starts_with(prefix.as_str()) { + if let Ok(file) = zip_archive.by_name(&name) { + if file.is_file() { + let name = name.strip_suffix('/').unwrap_or(&name); + if let Ok(store_key) = StoreKey::try_from(name) { + return Some(store_key); + } + } + } + } + None + }) + .sorted() + .collect()) + } + + fn list_dir(&self, prefix: &StorePrefix) -> Result { + let mut zip_archive = self.zip_archive.lock().unwrap(); + let mut keys: StoreKeys = vec![]; + let mut prefixes: StorePrefixes = vec![]; + let file_names: Vec = zip_archive + .file_names() + .map(std::string::ToString::to_string) + .collect(); + for name in file_names { + if name.starts_with(prefix.as_str()) { + if let Ok(file) = zip_archive.by_name(&name) { + if file.is_file() { + let name = name.strip_suffix('/').unwrap_or(&name); + if let Ok(store_key) = StoreKey::try_from(name) { + keys.push(store_key); + } + } else if file.is_dir() { + if let Ok(store_prefix) = StorePrefix::try_from(name.as_str()) { + prefixes.push(store_prefix); + } + } + } + } + } + keys.sort(); + prefixes.sort(); + + Ok(StoreKeysPrefixes { keys, prefixes }) + } +} + +/// A zip store creation error. +#[derive(Debug, Error)] +pub enum ZipStoreCreateError { + /// An IO error. + #[error(transparent)] + IOError(#[from] std::io::Error), + /// An existing directory. + #[error("{0} is an existing directory, not a zip file")] + ExistingDir(PathBuf), + /// A zip error. + #[error(transparent)] + ZipError(#[from] ZipError), +} + +#[cfg(test)] +mod tests { + use walkdir::WalkDir; + + use crate::storage::{store::FilesystemStore, WritableStorageTraits}; + + use super::*; + use std::{error::Error, io::Write}; + + // https://github.com/zip-rs/zip/blob/master/examples/write_dir.rs + fn zip_dir( + it: &mut dyn Iterator, + prefix: &str, + writer: File, + method: zip::CompressionMethod, + ) -> zip::result::ZipResult<()> { + let mut zip = zip::ZipWriter::new(writer); + let options = zip::write::FileOptions::default().compression_method(method); + let mut buffer = Vec::new(); + for entry in it { + let path = entry.path(); + let name = path.strip_prefix(Path::new(prefix)).unwrap(); + if path.is_file() { + #[allow(deprecated)] + zip.start_file_from_path(name, options)?; + let mut f = File::open(path)?; + f.read_to_end(&mut buffer)?; + zip.write_all(&buffer)?; + buffer.clear(); + } else if !name.as_os_str().is_empty() { + #[allow(deprecated)] + zip.add_directory_from_path(name, options)?; + } + } + zip.finish()?; + Result::Ok(()) + } + + fn zip_write(path: &Path) -> Result<(), Box> { + let tmp_path = tempfile::TempDir::new()?; + let tmp_path = tmp_path.path(); + let store = FilesystemStore::new(tmp_path)?.sorted(); + store.set(&"a/b".try_into()?, &[0, 1, 2, 3])?; + store.set(&"a/c".try_into()?, &[])?; + store.set(&"a/d/e".try_into()?, &[])?; + store.set(&"a/f/g".try_into()?, &[])?; + store.set(&"a/f/h".try_into()?, &[])?; + store.set(&"b/c/d".try_into()?, &[])?; + + let walkdir = WalkDir::new(tmp_path); + + let file = File::create(path).unwrap(); + zip_dir( + &mut walkdir.into_iter().filter_map(|e| e.ok()), + tmp_path.to_str().unwrap(), + file, + zip::CompressionMethod::Stored, + )?; + + Ok(()) + } + + #[test] + fn zip_list() -> Result<(), Box> { + let path = tempfile::TempDir::new()?; + let mut path = path.path().to_path_buf(); + path.push("test.zip"); + zip_write(&path).unwrap(); + + println!("{path:?}"); + + let store = ZipStore::new(path)?; + + assert_eq!( + store.list()?, + &[ + "a/b".try_into()?, + "a/c".try_into()?, + "a/d/e".try_into()?, + "a/f/g".try_into()?, + "a/f/h".try_into()?, + "b/c/d".try_into()? + ] + ); + assert_eq!( + store.list_prefix(&"a/".try_into()?)?, + &[ + "a/b".try_into()?, + "a/c".try_into()?, + "a/d/e".try_into()?, + "a/f/g".try_into()?, + "a/f/h".try_into()?, + ] + ); + assert_eq!( + store.list_prefix(&"a/d/".try_into()?)?, + &["a/d/e".try_into()?] + ); + assert_eq!( + store.list_prefix(&"".try_into()?)?, + &[ + "a/b".try_into()?, + "a/c".try_into()?, + "a/d/e".try_into()?, + "a/f/g".try_into()?, + "a/f/h".try_into()?, + "b/c/d".try_into()? + ] + ); + + let list = store.list_dir(&"".try_into()?)?; + assert_eq!( + list.keys(), + &[ + "a/b".try_into()?, + "a/c".try_into()?, + "a/d/e".try_into()?, + "a/f/g".try_into()?, + "a/f/h".try_into()?, + "b/c/d".try_into()? + ] + ); + assert_eq!( + list.prefixes(), + &[ + "a/".try_into()?, + "a/d/".try_into()?, + "a/f/".try_into()?, + "b/".try_into()?, + "b/c/".try_into()? + ] + ); + + assert!(crate::storage::node_exists(&store, &"/a/b".try_into()?)?); + assert!(crate::storage::node_exists_listable( + &store, + &"/a/b".try_into()? + )?); + + assert_eq!(store.get(&"a/b".try_into()?)?, &[0, 1, 2, 3]); + assert_eq!(store.get(&"a/c".try_into()?)?, Vec::::new().as_slice()); + + Ok(()) + } +}