diff --git a/Cargo.toml b/Cargo.toml index abbc977b0..e8f18a818 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shardio" -version = "0.5.5" +version = "0.5.4" authors = ["Patrick Marks ", "Sreenath Krishnan ", "Lance Hepler "] edition = "2018" license = "MIT" @@ -20,10 +20,10 @@ bincode = "1.1" byteorder = "1.3.0" lz4 = "1" failure = "0.1.5" -crossbeam-channel = "0.3.8" failure_derive = "0.1" min-max-heap = "1.2.2" serde = { version = "1.0", features = ["derive"] } +crossbeam-channel = "0.4.0" [dev-dependencies] tempfile = "3.0" @@ -32,7 +32,7 @@ fxhash = "0.2.1" lazy_static = "1.2" pretty_assertions = "0.5.1" quickcheck = "0.8.5" -rand = "0.6.5" +rand = "0.7" is_sorted = "0.1" [[bench]] diff --git a/src/lib.rs b/src/lib.rs index 30ad0cf35..1135b2564 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,8 +73,6 @@ #![deny(warnings)] #![deny(missing_docs)] - -use crossbeam_channel; use lz4; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -84,19 +82,22 @@ use std::collections::BTreeSet; use std::fs::File; use std::io::{self, Seek, SeekFrom}; use std::os::unix::fs::FileExt; +use std::ops::DerefMut; -use crossbeam_channel::{bounded, Receiver, Sender}; use std::path::Path; +use std::sync::Arc; use min_max_heap::MinMaxHeap; use std::marker::PhantomData; use std::thread; -use std::thread::JoinHandle; use bincode::{deserialize_from, serialize_into}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use std::sync::atomic::AtomicBool; + use failure::{format_err, Error}; +use std::sync::Mutex; /// Represent a range of key space pub mod range; @@ -118,54 +119,6 @@ struct ShardRecord { len_items: usize, } -/// Log of shard chunks written into an output file. -struct FileManager { - // Current start position of next chunk - cursor: usize, - // Record of chunks written - regions: Vec>, - // File handle - file: File, -} - -impl FileManager { - pub fn new>(path: P) -> Result, Error> { - let file = File::create(path)?; - - Ok(FileManager { - cursor: 4096, - regions: Vec::new(), - file, - }) - - // FIXME: write a magic string to the start of the file, - // and maybe some metadata about the types being stored and the - // compression scheme. - // FIXME: write to a TempFile that will be destroyed unless - // writing completes successfully. - } - - // Write a chunk to the file. Chunk contains `n_items` items covering [range.0, range.1]. Compressed data is in `data` - fn write_block(&mut self, range: (K, K), n_items: usize, data: &[u8]) -> Result { - assert!(n_items > 0); - - let cur_offset = self.cursor; - let reg = ShardRecord { - offset: self.cursor, - start_key: range.0, - end_key: range.1, - len_bytes: data.len(), - len_items: n_items, - }; - - self.regions.push(reg); - self.cursor += data.len(); - let l = self.file.write_at(data, cur_offset as u64)?; - - Ok(l) - } -} - /// Specify a key function from data items of type `T` to a sort key of type `Key`. /// Impelment this trait to create a custom sort order. /// The function `sort_key` returns a `Cow` so that we abstract over Owned or @@ -230,17 +183,14 @@ where /// Items are sorted according to the `Ord` implementation of type `S::Key`. Type `S`, implementing the `SortKey` trait /// maps items of type `T` to their sort key of type `S::Key`. By default the sort key is the data item itself, and the /// the `DefaultSort` implementation of `SortKey` is the identity function. -pub struct ShardWriter { - helper: ShardWriterHelper, - sender_buffer_size: usize, +pub struct ShardWriter +where + T: 'static + Send + Serialize, + S: SortKey, + >::Key: 'static + Send + Ord + Serialize + Clone, +{ + inner: Option>>, sort: PhantomData, - closed: bool, -} - -struct ShardWriterHelper { - tx: Sender>>, - buffer_thread: Option>>, - writer_thread: Option>>, } impl ShardWriter @@ -263,264 +213,325 @@ where disk_chunk_size: usize, item_buffer_size: usize, ) -> Result, Error> { - let (send, recv) = bounded(4); - - // Ping-pong of the 2 item buffer between the thread the accumulates the items, - // and the thread that does the IO. Need 2 slots to avoid a deadlock. - let (to_writer_send, to_writer_recv) = bounded(2); - let (to_buffer_send, to_buffer_recv) = bounded(2); - - // Divide buffer size by 2 -- the get swaped between buffer thread and writer thread - let mut sbt = ShardBufferThread::::new( - sender_buffer_size, - item_buffer_size >> 1, - recv, - to_writer_send, - to_buffer_recv, - ); - let p2 = path.as_ref().to_owned(); - - let buffer_thread = thread::spawn(move || sbt.process()); + assert!(disk_chunk_size >= 1); + assert!(item_buffer_size >= 1); + assert!(sender_buffer_size >= 1); + assert!(item_buffer_size >= sender_buffer_size); - let writer = FileManager::<>::Key>::new(p2)?; - let writer_thread = thread::spawn(move || { - let mut swt = ShardWriterThread::<_, S>::new( - disk_chunk_size, - writer, - to_writer_recv, - to_buffer_send, - ); - swt.process() - }); + let inner = + ShardWriterInner::new(disk_chunk_size, item_buffer_size, sender_buffer_size, path)?; Ok(ShardWriter { - helper: ShardWriterHelper { - tx: send, - // These need to be options to work correctly with drop(). - buffer_thread: Some(buffer_thread), - writer_thread: Some(writer_thread), - }, - sender_buffer_size, - closed: false, + inner: Some(Arc::new(inner)), sort: PhantomData, }) } /// Get a `ShardSender`. It can be sent to another thread that is generating data. - pub fn get_sender(&self) -> ShardSender { - ShardSender::new(self) + pub fn get_sender(&self) -> ShardSender { + ShardSender::new(self.inner.as_ref().unwrap().clone()) } /// Call finish if you want to detect errors in the writer IO. pub fn finish(&mut self) -> Result { - if self.closed { - return Err(format_err!("ShardWriter closed twice")); - } - - self.closed = true; - // Signal the buffer thread to shut down - let _ = self.helper.tx.send(None); - - // Wait for the buffer thread to finish - self.helper.buffer_thread.take().map(|x| x.join()); - - // Wait for the writer thread to finish & propagate any errors - let join_handle = self - .helper - .writer_thread - .take() - .expect("called finish twice"); - convert_thread_panic(join_handle.join()) - } -} - -fn convert_thread_panic(thread_result: thread::Result>) -> Result { - match thread_result { - Ok(v) => v, - Err(e) => { - if let Some(str_slice) = e.downcast_ref::<&'static str>() { - Err(format_err!("thread panic: {}", str_slice)) - } else if let Some(string) = e.downcast_ref::() { - Err(format_err!("thread panic: {}", string)) - } else { - Err(format_err!("thread panic: Box")) + if let Some(inner_arc) = self.inner.take() { + match Arc::try_unwrap(inner_arc) { + Ok(inner) => inner.close(), + Err(_) => { + if !std::thread::panicking() { + panic!("ShardSenders are still active. They must all be out of scope before ShardWriter is closed"); + } else { + return Ok(0); + } + } } + } else { + Ok(0) } } } -impl Drop for ShardWriter { +impl Drop for ShardWriter +where + S: SortKey, + >::Key: 'static + Send + Ord + Serialize + Clone, + T: Send + Serialize, +{ fn drop(&mut self) { - if !self.closed { - let _ = self.helper.tx.send(None); - self.helper.buffer_thread.take().map(|x| x.join()); - self.helper.writer_thread.take().map(|x| x.join()); - } + let _e = self.finish(); } } -/// Manage receive and buffer items from ShardSenders -/// Two buffers will be created & passed back and forth -/// with the ShardWriter thread. -struct ShardBufferThread -where - T: Send, -{ - chunk_size: usize, - buffer_size: usize, - rx: Receiver>>, - buf_tx: Sender<(Vec, bool)>, - buf_rx: Receiver>, - second_buf: bool, +/// Coordinate a pair of buffers that need to added to from multiple threads and processed/flushed when full. +/// Coordinates acces to `buffer_state`, passing full buffers to `handler` when they are full. +struct BufferStateMachine { + sender_buffer_size: usize, + buffer_state: Mutex>, + handler: Mutex, + closed: AtomicBool, } -impl ShardBufferThread -where - T: Send, -{ - fn new( - chunk_size: usize, - buffer_size: usize, - rx: Receiver>>, - buf_tx: Sender<(Vec, bool)>, - buf_rx: Receiver>, - ) -> ShardBufferThread { - ShardBufferThread { - chunk_size, - buffer_size, - rx, - buf_rx, - buf_tx, - second_buf: false, +/// There are always two item buffers held by a ShardWriter. A buffer can be in the process of being filled, +/// waiting for use, or being processed & written to disk. In the first two states, the buffer will be held +/// by this enum. In the processing state it will be held on the stack of the thread processing the buffer. +#[derive(PartialEq, Eq)] +enum BufStates { + /// Fill the first buffer, using the second as backup + FillAndWait(Vec, Vec), + /// Fill the buffer, the other buffer is being written out + FillAndBusy(Vec), + /// Both buffers are busy, try again later + BothBusy, + /// Placeholder variable, should never be observed. + Dummy, +} + +/// Outcome of trying to add items to the buffer. +enum BufAddOutcome { + /// Items added successfully + Done, + /// No free buffers, operation must be re-tried + Retry, + /// Items added, but a buffer was filled and must be processed. + /// Buffer to be processed is attached. + Process(Vec), +} + +use BufAddOutcome::*; +use BufStates::*; + +use std::fmt; + +impl fmt::Debug for BufStates { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + BufStates::FillAndWait(a, b) => write!(f, "FillAndWait({}, {})", a.len(), b.len()), + BufStates::FillAndBusy(a) => write!(f, "FillAndBusy({})", a.len()), + BufStates::BothBusy => write!(f, "BothBusy"), + BufStates::Dummy => write!(f, "Dummy"), } } +} - // Add new items to the buffer, send full buffers to the - // writer thread. - fn process(&mut self) -> Result<(), Error> { - let mut buf = Vec::with_capacity(self.buffer_size); +impl> BufferStateMachine { + + /// Move `items` into the buffer & it. If a buffer is filled by this operation + /// then the calling thread will be used to process the full buffer. + fn add_items(&self, items: &mut Vec) -> Result<(), Error> { + if self.closed.load(std::sync::atomic::Ordering::Relaxed) { + panic!("tried to add items to BufHandler after it was closed"); + } loop { - match self.rx.recv() { - Ok(Some(v)) => { - buf.extend(v); - if buf.len() + self.chunk_size > self.buffer_size { - buf = self.send_buf(buf, false)?; + + // get mutex on buffers + let mut buffer_state = self.buffer_state.lock().unwrap(); + let mut current_state = Dummy; + std::mem::swap(buffer_state.deref_mut(), &mut current_state); + + // determine new state and any follow-up work + let (mut new_state, outcome) = match current_state { + FillAndWait(mut f, w) => { + f.extend(items.drain(..)); + if f.len() + self.sender_buffer_size >= f.capacity() { + (FillAndBusy(w), Process(f)) + } else { + (FillAndWait(f, w), Done) } } - Ok(None) => { - self.send_buf(buf, true)?; - break; + FillAndBusy(mut f) => { + f.extend(items.drain(..)); + if f.len() + self.sender_buffer_size >= f.capacity() { + (BothBusy, Process(f)) + } else { + (FillAndBusy(f), Done) + } } - Err(_) => break, + BothBusy => (BothBusy, Retry), + Dummy => unreachable!(), + }; + + // Fill in the new state. + std::mem::swap(buffer_state.deref_mut(), &mut new_state); + + // drop the mutex + drop(buffer_state); + + // outcome could be to process a full vec, retry, or be done. + match outcome { + Process(mut buf_to_process) => { + // process the buffer & return it to the pool. + self.process_buffer(&mut buf_to_process)?; + self.return_buffer(buf_to_process); + break; + }, + Retry => { + // take a break before trying again. + thread::yield_now(); + continue; + }, + Done => break, } } Ok(()) } - fn send_buf(&mut self, buf: Vec, done: bool) -> Result, Error> { - let r = self.buf_rx.try_recv(); - let sent = self.buf_tx.send((buf, done)); - if sent.is_err() { - return Err(format_err!( - "writer thread shut down, see ShardWriterThread.finish() for error" - )); + /// shut down the buffer machinery, processing any remaining items. + /// calls to `add_items` after `close` will panic. + pub fn close(self) -> Result { + self.closed.store(true, std::sync::atomic::Ordering::SeqCst); + let mut bufs_processed = 0; + + loop { + if bufs_processed == 2 { + break; + } + + let mut buffer_state = self.buffer_state.lock().unwrap(); + let mut current_state = BufStates::Dummy; + std::mem::swap(buffer_state.deref_mut(), &mut current_state); + + let (mut new_state, to_process) = match current_state { + BufStates::FillAndWait(f, w) => (BufStates::FillAndBusy(w), Some(f)), + BufStates::FillAndBusy(f) => (BufStates::BothBusy, Some(f)), + // if both buffers are busy, we yield and try again. + BufStates::BothBusy => { + thread::yield_now(); + continue; + } + BufStates::Dummy => unreachable!(), + }; + + // fill in the new state. + std::mem::swap(buffer_state.deref_mut(), &mut new_state); + + if let Some(mut buf) = to_process { + bufs_processed += 1; + self.process_buffer(&mut buf)?; + } else { + unreachable!(); + } } - match r { - Ok(r) => return Ok(r), - Err(crossbeam_channel::TryRecvError::Empty) => (), - Err(v) => return Err(v.into()), + Ok(self.handler.into_inner().unwrap()) + } + + /// put a buffer back into service after processing + fn return_buffer(&self, buf: Vec) { + + let mut buffer_state = self.buffer_state.lock().unwrap(); + let mut current_state = BufStates::Dummy; + std::mem::swap(buffer_state.deref_mut(), &mut current_state); + + let mut new_state = match current_state { + BufStates::FillAndWait(_, _) => panic!("there are 3 buffers in use!!"), + BufStates::FillAndBusy(f) => BufStates::FillAndWait(f, buf), + BufStates::BothBusy => BufStates::FillAndBusy(buf), + BufStates::Dummy => unreachable!(), }; - if !self.second_buf { - self.second_buf = true; - Ok(Vec::with_capacity(self.buffer_size)) - } else if !done { - let return_buf = self.buf_rx.recv()?; - Ok(return_buf) - } else { - assert!(done, "had to make 3rd buffer when not done"); - Ok(Vec::new()) - } + std::mem::swap(buffer_state.deref_mut(), &mut new_state); + } + + /// process `buf` + pub fn process_buffer(&self, buf: &mut Vec) -> Result<(), Error> { + // prepare the buffer for process - this doesn't require + // a lock on handler + H::prepare_buf(buf); + + // process the buf -- this requires the handler mutex + let mut handler = self.handler.lock().unwrap(); + handler.process_buf(buf)?; + buf.clear(); + Ok(()) } } -/// Sort buffered items, break large buffer into chunks. -/// Serialize, compress and write the each chunk. The -/// file manager maintains the index data. -struct ShardWriterThread +/// Two-part handler for a buffer needing processing. +/// `prepare_buf` is generic and needs no state. Used for sorting. +/// `process_buf` needs exclusive access to the handler, so can do IO. +trait BufHandler { + fn prepare_buf(v: &mut Vec); + fn process_buf(&mut self, v: &mut Vec) -> Result<(), Error>; +} + +struct SortAndWriteHandler where T: Send + Serialize, S: SortKey, >::Key: Ord + Clone + Serialize, { + // Current start position of next chunk + cursor: usize, + // Record of chunks written + regions: Vec>::Key>>, + // File handle + file: File, + // Size of disk chunks chunk_size: usize, - writer: FileManager<>::Key>, - buf_rx: Receiver<(Vec, bool)>, - buf_tx: Sender>, + // Buffers for use when writing serialize_buffer: Vec, compress_buffer: Vec, } -impl ShardWriterThread +impl BufHandler for SortAndWriteHandler where T: Send + Serialize, S: SortKey, >::Key: Ord + Clone + Serialize, { - fn new( - chunk_size: usize, - writer: FileManager<>::Key>, - buf_rx: Receiver<(Vec, bool)>, - buf_tx: Sender>, - ) -> ShardWriterThread { - ShardWriterThread { - chunk_size, - writer, - buf_rx, - buf_tx, - serialize_buffer: Vec::new(), - compress_buffer: Vec::new(), - } - } - fn process(&mut self) -> Result { - let mut n_items = 0; - - loop { - // Get the next buffer to process - let (mut buf, done) = self.buf_rx.recv()?; - n_items += buf.len(); + /// Sort items according to `S`. + fn prepare_buf(buf: &mut Vec) { + buf.sort_by(|x, y| S::sort_key(x).cmp(&S::sort_key(y))); + } - // Sort by sort key - buf.sort_by(|x, y| S::sort_key(x).cmp(&S::sort_key(y))); + /// Write items to disk chunks + fn process_buf(&mut self, buf: &mut Vec) -> Result<(), Error> { + // Write out the buffer chunks + for c in buf.chunks(self.chunk_size) { + self.write_chunk(c)?; + } - // Write out the buffer chunks - for c in buf.chunks(self.chunk_size) { - self.write_chunk(c)?; - } + Ok(()) + } +} - // Done with all the items - buf.clear(); +impl SortAndWriteHandler +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ + pub fn new>( + chunk_size: usize, + path: P, + ) -> Result, Error> { + let file = File::create(path)?; - // Send the buffer back to be reused - if done { - break; - } else { - // The receiver may have hung up, don't worry. - let _ = self.buf_tx.send(buf); - } - } + Ok(SortAndWriteHandler { + cursor: 4096, + regions: Vec::new(), + file, + serialize_buffer: Vec::new(), + compress_buffer: Vec::new(), + chunk_size, + }) - self.write_index_block()?; - Ok(n_items) + // FIXME: write a magic string to the start of the file, + // and maybe some metadata about the types being stored and the + // compression scheme. + // FIXME: write to a TempFile that will be destroyed unless + // writing completes successfully. } fn write_chunk(&mut self, items: &[T]) -> Result { self.serialize_buffer.clear(); self.compress_buffer.clear(); + + assert!(items.len() > 0); + let bounds = ( S::sort_key(&items[0]).into_owned(), S::sort_key(&items[items.len() - 1]).into_owned(), @@ -541,55 +552,139 @@ where result?; } - self.writer - .write_block(bounds.clone(), items.len(), &self.compress_buffer) + let cur_offset = self.cursor; + let reg = ShardRecord { + offset: self.cursor, + start_key: bounds.0, + end_key: bounds.1, + len_bytes: self.compress_buffer.len(), + len_items: items.len(), + }; + + self.regions.push(reg); + self.cursor += self.compress_buffer.len(); + let l = self + .file + .write_at(&self.compress_buffer, cur_offset as u64)?; + + Ok(l) } /// Write out the shard positioning data - fn write_index_block(&mut self) -> Result<(), Error> { + fn write_index(&mut self) -> Result<(), Error> { let mut buf = Vec::new(); - serialize_into(&mut buf, &self.writer.regions)?; + serialize_into(&mut buf, &self.regions)?; - let index_block_position = self.writer.cursor; + let index_block_position = self.cursor; let index_block_size = buf.len(); - self.writer - .file + self.file .write_at(buf.as_slice(), index_block_position as u64)?; - self.writer.file.seek(SeekFrom::Start( + self.file.seek(SeekFrom::Start( (index_block_position + index_block_size) as u64, ))?; - self.writer.file.write_u64::(0 as u64)?; - self.writer - .file + self.file.write_u64::(0 as u64)?; + self.file .write_u64::(index_block_position as u64)?; - self.writer - .file - .write_u64::(index_block_size as u64)?; + self.file.write_u64::(index_block_size as u64)?; Ok(()) } } +/// Sort buffered items, break large buffer into chunks. +/// Serialize, compress and write the each chunk. The +/// file manager maintains the index data. +struct ShardWriterInner +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ + sender_buffer_size: usize, + state_machine: BufferStateMachine>, + closed: AtomicBool, +} + +impl ShardWriterInner +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ + fn new( + chunk_size: usize, + buf_size: usize, + sender_buffer_size: usize, + path: impl AsRef, + ) -> Result, Error> { + let handler = SortAndWriteHandler::new(chunk_size, path)?; + + let bufs = BufStates::FillAndWait( + Vec::with_capacity(buf_size / 2), + Vec::with_capacity(buf_size / 2), + ); + + let state_machine = BufferStateMachine { + buffer_state: Mutex::new(bufs), + handler: Mutex::new(handler), + closed: AtomicBool::new(false), + sender_buffer_size: sender_buffer_size, + }; + + Ok(ShardWriterInner { + sender_buffer_size, + state_machine, + closed: std::sync::atomic::AtomicBool::from(false), + }) + } + + fn send_items(&self, items: &mut Vec) -> Result<(), Error> { + if !self.closed.load(std::sync::atomic::Ordering::Relaxed) { + self.state_machine.add_items(items) + } else { + let msg = format_err!("tried to send items after sender was closed"); + Err(msg) + } + } + + fn close(self) -> Result { + let mut handler = self.state_machine.close()?; + let nitems = handler.regions.iter().map(|r| r.len_items).sum(); + handler.write_index()?; + Ok(nitems) + } +} + /// A handle that is used to send data to a `ShardWriter`. Each thread that is producing data /// needs it's own ShardSender. A `ShardSender` can be obtained with the `get_sender` method of /// `ShardWriter`. ShardSender implement clone. -pub struct ShardSender { - tx: Sender>>, +pub struct ShardSender +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ + writer: Option>>, buffer: Vec, buf_size: usize, } -impl ShardSender { - fn new(writer: &ShardWriter) -> ShardSender { - let new_tx = writer.helper.tx.clone(); +impl ShardSender +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ + fn new(writer: Arc>) -> ShardSender { let buffer = Vec::with_capacity(writer.sender_buffer_size); + let buf_size = writer.sender_buffer_size; ShardSender { - tx: new_tx, + writer: Some(writer), buffer, - buf_size: writer.sender_buffer_size, + buf_size, } } @@ -601,14 +696,10 @@ impl ShardSender { }; if send { - let mut send_buf = Vec::with_capacity(self.buf_size); - std::mem::swap(&mut send_buf, &mut self.buffer); - let e = self.tx.send(Some(send_buf)); - if e.is_err() { - return Err(format_err!( - "ShardWriter failed. See ShardWriter.finish() for underlying error" - )); - } + self.writer + .as_ref() + .expect("ShardSender is already closed") + .send_items(&mut self.buffer)?; } Ok(()) @@ -617,31 +708,43 @@ impl ShardSender { /// Signal that you've finished sending items to this `ShardSender`. `finished` will called /// if the `ShardSender` is dropped. You must call `finished()` or drop the `ShardSender` /// prior to calling `ShardWriter::finish` or dropping the ShardWriter, or you will get a panic. - pub fn finished(&mut self) { - if !self.buffer.is_empty() { - let mut send_buf = Vec::new(); - std::mem::swap(&mut send_buf, &mut self.buffer); - self.tx.send(Some(send_buf)).unwrap(); + pub fn finished(&mut self) -> Result<(), Error> { + // send any remaining items and drop the Arc + if let Some(inner) = self.writer.take() { + if self.buffer.len() > 0 { + inner.send_items(&mut self.buffer)?; + } } + + Ok(()) } } -impl Clone for ShardSender { +impl Clone for ShardSender +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ fn clone(&self) -> Self { - let new_tx = self.tx.clone(); let buffer = Vec::with_capacity(self.buf_size); ShardSender { - tx: new_tx, + writer: self.writer.clone(), buffer, buf_size: self.buf_size, } } } -impl Drop for ShardSender { +impl> Drop for ShardSender +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ fn drop(&mut self) { - self.finished(); + let _e = self.finished(); } } @@ -1209,7 +1312,6 @@ mod shard_tests { use is_sorted::IsSorted; use pretty_assertions::assert_eq; use quickcheck::{Arbitrary, Gen, QuickCheck, StdThreadGen}; - use rand::Rng; use std::collections::HashSet; use std::fmt::Debug; use std::hash::Hash; @@ -1228,10 +1330,10 @@ mod shard_tests { impl Arbitrary for T1 { fn arbitrary(g: &mut G) -> T1 { T1 { - a: g.gen(), - b: g.gen(), - c: g.gen(), - d: g.gen(), + a: u64::arbitrary(g), + b: u32::arbitrary(g), + c: u16::arbitrary(g), + d: u8::arbitrary(g), } } } @@ -1244,22 +1346,23 @@ mod shard_tests { } } - fn rand_items(n: usize) -> Vec { + fn rand_items(n: usize, g: &mut impl Gen) -> Vec { let mut items = Vec::new(); - - for i in 0..n { - let tt = T1 { - a: (((i / 2) + (i * 10)) % 3 + (i * 5) % 2) as u64, - b: i as u32, - c: (i * 2) as u16, - d: ((i % 5) * 10 + (if i % 10 > 7 { i / 10 } else { 0 })) as u8, - }; + for _ in 0..n { + let tt = T1::arbitrary(g); items.push(tt); } - items } + fn rand_item_chunks(n_chunks: usize, items_per_chunk: usize, g: &mut impl Gen) -> Vec> { + let mut chunks = Vec::new(); + for _ in 0..n_chunks { + chunks.push(rand_items(items_per_chunk, g)); + } + chunks + } + // Some tests are configured to only run with the "full-test" feature enabled. // They are too slow to run in debug mode, so you should use release mode. #[cfg(feature = "full-test")] @@ -1291,7 +1394,9 @@ mod shard_tests { #[test] fn test_shard_round_trip() { // Test different buffering configurations - check_round_trip(10, 20, 0, 1 << 8); + check_round_trip(5, 3, 12, 1 << 9); + check_round_trip(5, 4, 12, 1 << 9); + check_round_trip(5, 5, 13, 1 << 9); check_round_trip(10, 20, 40, 1 << 8); check_round_trip(1024, 16, 2 << 14, 1 << 16); check_round_trip(4096, 8, 2048, 1 << 16); @@ -1310,7 +1415,8 @@ mod shard_tests { #[test] fn test_shard_round_trip_sort_key() -> Result<(), Error> { // Test different buffering configurations - check_round_trip_sort_key(10, 20, 0, 256, true)?; + check_round_trip_sort_key(10, 10, 10, 256, true)?; + check_round_trip_sort_key(10, 2, 3, 512, true)?; check_round_trip_sort_key(10, 20, 40, 256, true)?; check_round_trip_sort_key(1024, 16, 2 << 14, 1 << 16, true)?; check_round_trip_sort_key(4096, 8, 2048, 1 << 16, true)?; @@ -1341,7 +1447,11 @@ mod shard_tests { // Write and close file let true_items = { let manager: ShardWriter = ShardWriter::new(tmp.path(), 16, 64, 1 << 10).unwrap(); - let true_items = repeat(rand_items(1)[0]).take(n_items).collect::>(); + + let mut g = StdThreadGen::new(10); + let true_items = repeat(rand_items(1, &mut g)[0]) + .take(n_items) + .collect::>(); // Sender must be closed { @@ -1451,7 +1561,7 @@ mod shard_tests { #[test] fn multi_slice_correctness_quickcheck() { fn check_t1(v: MultiSlice) -> bool { - let sorted = test_multi_slice::(v.clone(), 1024, 1 << 17, 16).unwrap(); + let sorted = test_multi_slice::(v.clone(), 1024, 16, 1 << 17).unwrap(); let mut vall = Vec::new(); for chunk in v.0 { @@ -1488,6 +1598,50 @@ mod shard_tests { .unwrap(); } + struct ThreadSender { + t: PhantomData, + s: PhantomData, + } + + impl ThreadSender + where + T: 'static + Send + Serialize + Clone, + S: 'static + SortKey, + >::Key: Ord + Clone + Serialize + Send, + { + fn send_from_threads( + chunks: Vec>, + sender: ShardSender, + ) -> Result, Error> { + let barrier = Arc::new(std::sync::Barrier::new(chunks.len())); + let mut handles = Vec::new(); + + let mut all_items = Vec::new(); + + for chunk in chunks { + let mut s = sender.clone(); + all_items.extend(chunk.iter().cloned()); + let b = barrier.clone(); + + let h = std::thread::spawn(move || -> Result<(), Error> { + b.wait(); + for item in chunk { + s.send(item)?; + } + Ok(()) + }); + + handles.push(h); + } + + for h in handles { + h.join().unwrap()?; + } + + Ok(all_items) + } + } + fn check_round_trip_opt( disk_chunk_size: usize, producer_chunk_size: usize, @@ -1504,23 +1658,18 @@ mod shard_tests { // Write and close file let true_items = { - let manager: ShardWriter = ShardWriter::new( + let mut writer: ShardWriter = ShardWriter::new( tmp.path(), producer_chunk_size, disk_chunk_size, buffer_size, )?; - let mut true_items = rand_items(n_items); - // Sender must be closed - { - for chunk in true_items.chunks(n_items / 8) { - let mut sender = manager.get_sender(); - for item in chunk { - sender.send(*item)?; - } - } - } + let mut g = StdThreadGen::new(10); + let send_chunks = rand_item_chunks(4, n_items / 4, &mut g); + let mut true_items = ThreadSender::send_from_threads(send_chunks, writer.get_sender())?; + + writer.finish()?; true_items.sort(); true_items }; @@ -1585,24 +1734,18 @@ mod shard_tests { // Write and close file let true_items = { - let mut manager: ShardWriter = ShardWriter::new( + let mut writer: ShardWriter = ShardWriter::new( tmp.path(), producer_chunk_size, disk_chunk_size, buffer_size, )?; - let mut true_items = rand_items(n_items); - // Sender must be closed - { - for chunk in true_items.chunks(n_items / 8) { - let mut sender = manager.get_sender(); - for item in chunk { - sender.send(*item)?; - } - } - } - manager.finish()?; + let mut g = StdThreadGen::new(10); + let send_chunks = rand_item_chunks(4, n_items / 4, &mut g); + let mut true_items = ThreadSender::send_from_threads(send_chunks, writer.get_sender())?; + + writer.finish()?; true_items.sort_by_key(|x| x.d); true_items @@ -1687,7 +1830,9 @@ mod shard_tests { disk_chunk_size, buffer_size, )?; - let mut true_items = rand_items(n_items); + + let mut g = StdThreadGen::new(10); + let mut true_items = rand_items(n_items, &mut g); // Sender must be closed { @@ -1711,16 +1856,20 @@ mod shard_tests { file.set_len(3 * sz / 4).unwrap(); // make sure we get a read err due to the truncated file. - let mut got_err = false; - let iter = reader.iter_range(&Range::all())?; - for i in iter { + let iter = reader.iter_range(&Range::all()); + + // Could get the error here + if iter.is_err() { + return Ok(()); + } + + for i in iter? { + // or could get the error here if i.is_err() { - got_err = true; - break; + return Ok(()); } } - assert!(got_err); - Ok(()) + Err(format_err!("didn't shardio IO error correctly")) } }