diff --git a/Cargo.toml b/Cargo.toml index 8373a4f7c..f687f8873 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shardio" -version = "0.5.3" +version = "0.6.0" authors = ["Patrick Marks ", "Sreenath Krishnan ", "Lance Hepler "] edition = "2018" license = "MIT" @@ -16,8 +16,7 @@ readme = "README.md" full-test = [] [dependencies] -serde = "1" -serde_derive = "1" +serde = { version = "1.0", features = ["derive"] } bincode = "1.1" byteorder = "1.3.0" libc = "0.2" diff --git a/src/helper.rs b/src/helper.rs index 11e735c94..721314a92 100644 --- a/src/helper.rs +++ b/src/helper.rs @@ -69,7 +69,6 @@ pub struct ThreadProxyWriter { } impl ThreadProxyWriter { - /// Create a new `ThreadProxyWriter` that will write to `writer` on a newly created thread pub fn new(mut writer: T, buffer_size: usize) -> ThreadProxyWriter { let (tx, rx) = bounded::>>(10); diff --git a/src/lib.rs b/src/lib.rs index a73fea5e6..29e5980fd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,12 +4,13 @@ //! Data is written to sorted chunks. When reading shardio will merge the data on the fly into a single sorted view. You can //! also procss disjoint subsets of sorted data. //! -//! ```rust -//! #[macro_use] -//! extern crate serde_derive; +//! Shardio storage is intended for transient use within a single processing task, not for archival purposes. +//! You can't read shardio files created by binaries for a different Rust version, +//! or where type definitions have changed. It recommended +//! to only use interact with a shardio file from a single Rust binary. //! -//! extern crate shardio; -//! extern crate failure; +//! ```rust +//! use serde::{Serialize, Deserialize}; //! use shardio::*; //! use std::fs::File; //! use failure::Error; @@ -41,6 +42,9 @@ //! sender.send(DataStruct { a: (i%25) as u64, b: (i%100) as u32 }); //! } //! +//! // done sending items +//! sender.finished(); +//! //! // Write errors are accessible by calling the finish() method //! writer.finish()?; //! } @@ -72,37 +76,32 @@ //! } //! ``` +#![deny(warnings)] #![deny(missing_docs)] -use crossbeam_channel; -use lz4; - -#[macro_use] -extern crate serde_derive; -use serde::{Serialize, de::DeserializeOwned}; - +use std::any::{Any, TypeId}; use std::borrow::Cow; +use std::collections::hash_map::DefaultHasher; use std::collections::BTreeSet; use std::fs::File; +use std::hash::Hash; +use std::hash::Hasher; use std::io::{self, Seek, SeekFrom}; +use std::marker::PhantomData; use std::os::unix::io::{AsRawFd, RawFd}; - -use crossbeam_channel::{bounded, Receiver, Sender}; use std::path::Path; - -use min_max_heap::MinMaxHeap; -use std::marker::PhantomData; use std::thread; use std::thread::JoinHandle; - use bincode::{deserialize_from, serialize_into}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; - -use libc::{c_void, off_t, pread, pwrite, size_t, ssize_t}; - +use crossbeam_channel; +use crossbeam_channel::{bounded, Receiver, Sender}; use failure::{format_err, Error}; - +use libc::{c_void, off_t, pread, pwrite, size_t, ssize_t}; +use lz4; +use min_max_heap::MinMaxHeap; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; /// Represent a range of key space pub mod range; @@ -274,7 +273,6 @@ impl FileManager { /// } /// ``` pub trait SortKey { - /// The type of the key that will be sorted. type Key: Ord + Clone; @@ -302,7 +300,8 @@ where /// flush remaining items to disk, write an index of the chunk data and close the file. /// /// The `get_sender()` methods returns a `ShardSender` that must be used to send items to the writer. -/// You must close each ShardSender by calling its `finish()` method, or data may be lost. +/// You must close each ShardSender by dropping it or calling its `finish()` method, or data may be lost. +/// The `ShardSender` must be dropped/finished prior to callinng `SharWriter::finish` or dropping the shard writer. /// /// # Sorting /// Items are sorted according to the `Ord` implementation of type `S::Key`. Type `S`, implementing the `SortKey` trait @@ -324,7 +323,7 @@ struct ShardWriterHelper { impl ShardWriter where T: 'static + Send + Serialize, - S: SortKey, + S: 'static + SortKey, >::Key: 'static + Send + Ord + Serialize + Clone, { /// Create a writer for storing data items of type `T`. @@ -525,6 +524,20 @@ where } } +/// Get a hash of the data and sort types -- used +/// to validate that we're reading the correct types. +fn get_type_hash() -> u64 { + let mut hasher = DefaultHasher::new(); + + let typeid = TypeId::of::(); + typeid.hash(&mut hasher); + + let typeid = TypeId::of::(); + typeid.hash(&mut hasher); + + hasher.finish() +} + /// Sort buffered items, break large buffer into chunks. /// Serialize, compress and write the each chunk. The /// file manager maintains the index data. @@ -544,8 +557,8 @@ where impl ShardWriterThread where - T: Send + Serialize, - S: SortKey, + T: Any + Send + Serialize, + S: Any + SortKey, >::Key: Ord + Clone + Serialize, { fn new( @@ -624,6 +637,8 @@ where fn write_index_block(&mut self) -> Result<(), Error> { let mut buf = Vec::new(); + // write the type hash to allow checking a read-time + serialize_into(&mut buf, &get_type_hash::())?; serialize_into(&mut buf, &self.writer.regions)?; let index_block_position = self.writer.cursor; @@ -692,7 +707,8 @@ impl ShardSender { } /// Signal that you've finished sending items to this `ShardSender`. `finished` will called - /// if the `ShardSender` is dropped. + /// if the `ShardSender` is dropped. You must call `finished()` or drop the `ShardSender` + /// prior to calling `ShardWriter::finish` or dropping the ShardWriter, or you will get a panic. pub fn finished(&mut self) { if !self.buffer.is_empty() { let mut send_buf = Vec::new(); @@ -733,35 +749,36 @@ where impl ShardReaderSingle where - T: DeserializeOwned, - S: SortKey, + T: Any + DeserializeOwned, + S: Any + SortKey, >::Key: Clone + Ord + DeserializeOwned, { - /// Open a shard file that stores `T` items. + /// Open a shard file that stores `T` items. Will panic if `T` and `S` don't have the same + /// `TypeId` as those used when the file was written. This means that you can't read + /// shardio files created by binaries for a different Rust version, or with different + /// definitions of `T` and `S`. fn open>(path: P) -> Result, Error> { - let mut f = File::open(path).unwrap(); - - let mut index = Self::read_index_block(&mut f)?; - index.sort(); + let mut file = File::open(path).unwrap(); - Ok(ShardReaderSingle { - file: f, - index, - p1: PhantomData, - }) - } - - /// Read shard index - fn read_index_block( - file: &mut File, - ) -> Result>::Key>>, Error> { let _ = file.seek(SeekFrom::End(-24))?; let _num_shards = file.read_u64::()? as usize; let index_block_position = file.read_u64::()?; let _ = file.read_u64::()?; file.seek(SeekFrom::Start(index_block_position as u64))?; - Ok(deserialize_from(file)?) + let type_hash: u64 = deserialize_from(&mut file)?; + + // validate that we're using the correct types to deserialize. + assert_eq!(type_hash, get_type_hash::()); + + let mut index: Vec>::Key>> = deserialize_from(&mut file)?; + index.sort(); + + Ok(ShardReaderSingle { + file, + index, + p1: PhantomData, + }) } fn get_decoder(buffer: &mut Vec) -> lz4::Decoder<&[u8]> { @@ -960,8 +977,8 @@ where impl<'a, T, S> RangeIter<'a, T, S> where - T: DeserializeOwned, - S: SortKey, + T: Any + DeserializeOwned, + S: Any + SortKey, >::Key: 'a + Clone + Ord + DeserializeOwned, { fn new( @@ -1037,8 +1054,8 @@ fn transpose(v: Option>) -> Result, E> { impl<'a, T, S> Iterator for RangeIter<'a, T, S> where - T: DeserializeOwned, - S: SortKey, + T: Any + DeserializeOwned, + S: Any + SortKey, >::Key: Clone + Ord + DeserializeOwned, { type Item = Result; @@ -1195,9 +1212,9 @@ where impl ShardReader where - T: DeserializeOwned, + T: Any + DeserializeOwned, >::Key: Clone + Ord + DeserializeOwned, - S: SortKey, + S: Any + SortKey, { /// Open a single shard files into reader pub fn open>(shard_file: P) -> Result, Error> { @@ -1316,16 +1333,16 @@ where #[cfg(test)] mod shard_tests { use super::*; + use is_sorted::IsSorted; + use pretty_assertions::assert_eq; + use quickcheck::{Arbitrary, Gen, QuickCheck, StdThreadGen}; + use rand::Rng; use std::collections::HashSet; use std::fmt::Debug; use std::hash::Hash; use std::iter::{repeat, FromIterator}; use std::u8; use tempfile; - use pretty_assertions::assert_eq; - use quickcheck::{QuickCheck, Arbitrary, Gen, StdThreadGen}; - use rand::Rng; - use is_sorted::IsSorted; #[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Debug, PartialOrd, Ord, Hash)] struct T1 { @@ -1337,8 +1354,8 @@ mod shard_tests { impl Arbitrary for T1 { fn arbitrary(g: &mut G) -> T1 { - T1 { - a: g.gen(), + T1 { + a: g.gen(), b: g.gen(), c: g.gen(), d: g.gen(), @@ -1508,7 +1525,7 @@ mod shard_tests { let mut data = Vec::new(); - for _ in 0 .. slices { + for _ in 0..slices { let slice = Vec::::arbitrary(g); data.push(slice); } @@ -1517,26 +1534,29 @@ mod shard_tests { } } - - fn test_multi_slice(items: MultiSlice, disk_chunk_size: usize, producer_chunk_size: usize, buffer_size: usize) -> Result, Error> + fn test_multi_slice( + items: MultiSlice, + disk_chunk_size: usize, + producer_chunk_size: usize, + buffer_size: usize, + ) -> Result, Error> where T: 'static + Serialize + DeserializeOwned + Clone + Send, - S: SortKey, + S: 'static + SortKey, >::Key: 'static + Send + Serialize + DeserializeOwned, { - let mut files = Vec::new(); for item_chunk in &items.0 { let tmp = tempfile::NamedTempFile::new()?; let writer: ShardWriter = ShardWriter::new( - tmp.path(), - producer_chunk_size, - disk_chunk_size, - buffer_size)?; - - + tmp.path(), + producer_chunk_size, + disk_chunk_size, + buffer_size, + )?; + let mut sender = writer.get_sender(); for item in item_chunk { sender.send(item.clone())?; @@ -1545,7 +1565,7 @@ mod shard_tests { files.push(tmp); } - let reader = ShardReader::::open_set(&files)?; + let reader = ShardReader::::open_set(&files)?; let mut out_items = Vec::new(); for r in reader.iter()? { @@ -1555,27 +1575,29 @@ mod shard_tests { Ok(out_items) } - #[test] fn multi_slice_correctness_quickcheck() { - fn check_t1(v: MultiSlice) -> bool { - let sorted = test_multi_slice::(v.clone(), 1024, 1<<17, 16).unwrap(); + let sorted = test_multi_slice::(v.clone(), 1024, 1 << 17, 16).unwrap(); let mut vall = Vec::new(); for chunk in v.0 { vall.extend(chunk); } - if sorted.len() != vall.len() { return false; } - if !set_compare(&sorted, &vall) { return false; } + if sorted.len() != vall.len() { + return false; + } + if !set_compare(&sorted, &vall) { + return false; + } IsSorted::is_sorted_by_key(&mut sorted.iter(), |x| FieldDSort::sort_key(x).into_owned()) } - - QuickCheck::with_gen(StdThreadGen::new(500000)).tests(4).quickcheck(check_t1 as fn(MultiSlice) -> bool); + QuickCheck::with_gen(StdThreadGen::new(500000)) + .tests(4) + .quickcheck(check_t1 as fn(MultiSlice) -> bool); } - fn check_round_trip( disk_chunk_size: usize, diff --git a/src/range.rs b/src/range.rs index 5322735e4..cf85dd981 100644 --- a/src/range.rs +++ b/src/range.rs @@ -1,6 +1,7 @@ // Copyright (c) 2018 10x Genomics, Inc. All rights reserved. use crate::ShardRecord; +use serde::{Deserialize, Serialize}; #[derive(Debug, PartialEq, Eq)] pub(crate) enum Rorder {