From 272935ca70beb1f084b3f064096bdce06eba753f Mon Sep 17 00:00:00 2001 From: Patrick Marks Date: Sun, 26 Jan 2020 16:35:23 -0800 Subject: [PATCH 1/7] 'sender pays' threading model - sending thread is used to compress & write data, no threads are created. --- src/lib.rs | 744 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 426 insertions(+), 318 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 30ad0cf35..febcf1d60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,10 +71,9 @@ //! } //! ``` -#![deny(warnings)] -#![deny(missing_docs)] +///#![deny(warnings)] +///#![deny(missing_docs)] -use crossbeam_channel; use lz4; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -85,17 +84,19 @@ use std::fs::File; use std::io::{self, Seek, SeekFrom}; use std::os::unix::fs::FileExt; -use crossbeam_channel::{bounded, Receiver, Sender}; +use std::sync::Arc; use std::path::Path; use min_max_heap::MinMaxHeap; use std::marker::PhantomData; use std::thread; -use std::thread::JoinHandle; use bincode::{deserialize_from, serialize_into}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use std::sync::atomic::AtomicBool; + +use std::sync::Mutex; use failure::{format_err, Error}; /// Represent a range of key space @@ -118,54 +119,6 @@ struct ShardRecord { len_items: usize, } -/// Log of shard chunks written into an output file. -struct FileManager { - // Current start position of next chunk - cursor: usize, - // Record of chunks written - regions: Vec>, - // File handle - file: File, -} - -impl FileManager { - pub fn new>(path: P) -> Result, Error> { - let file = File::create(path)?; - - Ok(FileManager { - cursor: 4096, - regions: Vec::new(), - file, - }) - - // FIXME: write a magic string to the start of the file, - // and maybe some metadata about the types being stored and the - // compression scheme. - // FIXME: write to a TempFile that will be destroyed unless - // writing completes successfully. - } - - // Write a chunk to the file. Chunk contains `n_items` items covering [range.0, range.1]. Compressed data is in `data` - fn write_block(&mut self, range: (K, K), n_items: usize, data: &[u8]) -> Result { - assert!(n_items > 0); - - let cur_offset = self.cursor; - let reg = ShardRecord { - offset: self.cursor, - start_key: range.0, - end_key: range.1, - len_bytes: data.len(), - len_items: n_items, - }; - - self.regions.push(reg); - self.cursor += data.len(); - let l = self.file.write_at(data, cur_offset as u64)?; - - Ok(l) - } -} - /// Specify a key function from data items of type `T` to a sort key of type `Key`. /// Impelment this trait to create a custom sort order. /// The function `sort_key` returns a `Cow` so that we abstract over Owned or @@ -230,18 +183,16 @@ where /// Items are sorted according to the `Ord` implementation of type `S::Key`. Type `S`, implementing the `SortKey` trait /// maps items of type `T` to their sort key of type `S::Key`. By default the sort key is the data item itself, and the /// the `DefaultSort` implementation of `SortKey` is the identity function. -pub struct ShardWriter { - helper: ShardWriterHelper, - sender_buffer_size: usize, +pub struct ShardWriter +where + T: 'static + Send + Serialize, + S: SortKey, + >::Key: 'static + Send + Ord + Serialize + Clone, +{ + inner: Option>>, sort: PhantomData, - closed: bool, } -struct ShardWriterHelper { - tx: Sender>>, - buffer_thread: Option>>, - writer_thread: Option>>, -} impl ShardWriter where @@ -263,264 +214,293 @@ where disk_chunk_size: usize, item_buffer_size: usize, ) -> Result, Error> { - let (send, recv) = bounded(4); - - // Ping-pong of the 2 item buffer between the thread the accumulates the items, - // and the thread that does the IO. Need 2 slots to avoid a deadlock. - let (to_writer_send, to_writer_recv) = bounded(2); - let (to_buffer_send, to_buffer_recv) = bounded(2); - - // Divide buffer size by 2 -- the get swaped between buffer thread and writer thread - let mut sbt = ShardBufferThread::::new( - sender_buffer_size, - item_buffer_size >> 1, - recv, - to_writer_send, - to_buffer_recv, - ); - let p2 = path.as_ref().to_owned(); - - let buffer_thread = thread::spawn(move || sbt.process()); - - let writer = FileManager::<>::Key>::new(p2)?; - let writer_thread = thread::spawn(move || { - let mut swt = ShardWriterThread::<_, S>::new( - disk_chunk_size, - writer, - to_writer_recv, - to_buffer_send, - ); - swt.process() - }); + assert!(disk_chunk_size >= 1); + assert!(item_buffer_size >= 1); + assert!(sender_buffer_size >= 1); + let inner = ShardWriterInner::new(disk_chunk_size, item_buffer_size, sender_buffer_size, path)?; + Ok(ShardWriter { - helper: ShardWriterHelper { - tx: send, - // These need to be options to work correctly with drop(). - buffer_thread: Some(buffer_thread), - writer_thread: Some(writer_thread), - }, - sender_buffer_size, - closed: false, + inner: Some(Arc::new(inner)), sort: PhantomData, }) } /// Get a `ShardSender`. It can be sent to another thread that is generating data. - pub fn get_sender(&self) -> ShardSender { - ShardSender::new(self) + pub fn get_sender(&self) -> ShardSender { + ShardSender::new(self.inner.as_ref().unwrap().clone()) } /// Call finish if you want to detect errors in the writer IO. pub fn finish(&mut self) -> Result { - if self.closed { - return Err(format_err!("ShardWriter closed twice")); + if let Some(inner_arc) = self.inner.take() { + match Arc::try_unwrap(inner_arc) { + Ok(inner) => inner.close(), + Err(_) => panic!("ShardSenders are still active. They must all be out of scope before ShardWriter is closed"), + } + } else { + Ok(0) } + } +} - self.closed = true; - // Signal the buffer thread to shut down - let _ = self.helper.tx.send(None); - - // Wait for the buffer thread to finish - self.helper.buffer_thread.take().map(|x| x.join()); - // Wait for the writer thread to finish & propagate any errors - let join_handle = self - .helper - .writer_thread - .take() - .expect("called finish twice"); - convert_thread_panic(join_handle.join()) +impl Drop for ShardWriter +where + S: SortKey, + >::Key: 'static + Send + Ord + Serialize + Clone, + T: Send + Serialize, +{ + fn drop(&mut self) { + let _e = self.finish(); } } -fn convert_thread_panic(thread_result: thread::Result>) -> Result { - match thread_result { - Ok(v) => v, - Err(e) => { - if let Some(str_slice) = e.downcast_ref::<&'static str>() { - Err(format_err!("thread panic: {}", str_slice)) - } else if let Some(string) = e.downcast_ref::() { - Err(format_err!("thread panic: {}", string)) - } else { - Err(format_err!("thread panic: Box")) - } - } - } + +struct BufferStateMachine { + sender_buffer_size: usize, + buffer_state: Mutex>, + handler: Mutex, + closed: AtomicBool, } -impl Drop for ShardWriter { - fn drop(&mut self) { - if !self.closed { - let _ = self.helper.tx.send(None); - self.helper.buffer_thread.take().map(|x| x.join()); - self.helper.writer_thread.take().map(|x| x.join()); - } - } +#[derive(PartialEq, Eq)] +enum BufStates { + FillAndWait(Vec, Vec), + FillAndBusy(Vec), + BothBusy, + Dummy, } -/// Manage receive and buffer items from ShardSenders -/// Two buffers will be created & passed back and forth -/// with the ShardWriter thread. -struct ShardBufferThread -where - T: Send, -{ - chunk_size: usize, - buffer_size: usize, - rx: Receiver>>, - buf_tx: Sender<(Vec, bool)>, - buf_rx: Receiver>, - second_buf: bool, +use BufStates::*; + + +enum BufAddOutcome { + Done, + Retry, + Process(Vec), } -impl ShardBufferThread -where - T: Send, -{ - fn new( - chunk_size: usize, - buffer_size: usize, - rx: Receiver>>, - buf_tx: Sender<(Vec, bool)>, - buf_rx: Receiver>, - ) -> ShardBufferThread { - ShardBufferThread { - chunk_size, - buffer_size, - rx, - buf_rx, - buf_tx, - second_buf: false, +use BufAddOutcome::*; + + +use std::fmt; + +impl fmt::Debug for BufStates { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + BufStates::FillAndWait(a,b) => write!(f, "FillAndWait({}, {})", a.len(), b.len()), + BufStates::FillAndBusy(a) => write!(f, "FillAndBusy({})", a.len()), + BufStates::BothBusy => write!(f, "BothBusy"), + BufStates::Dummy => write!(f, "Dummy"), } } +} + +impl> BufferStateMachine { - // Add new items to the buffer, send full buffers to the - // writer thread. - fn process(&mut self) -> Result<(), Error> { - let mut buf = Vec::with_capacity(self.buffer_size); + fn add_items(&self, items: &mut Vec) -> Result<(), Error> { + + if self.closed.load(std::sync::atomic::Ordering::Relaxed) { + panic!("tried to add items to BufHandler after it was closed"); + } loop { - match self.rx.recv() { - Ok(Some(v)) => { - buf.extend(v); - if buf.len() + self.chunk_size > self.buffer_size { - buf = self.send_buf(buf, false)?; + use std::ops::DerefMut; + let mut buffer_state = self.buffer_state.lock().unwrap(); + let mut current_state = Dummy; + std::mem::swap(buffer_state.deref_mut(), &mut current_state); + + let (mut new_state, outcome) = match current_state { + FillAndWait(mut f, w) => { + f.extend(items.drain(..)); + if f.len() + self.sender_buffer_size >= f.capacity() { + (FillAndBusy(w), Process(f)) + } else { + (FillAndWait(f, w), Done) } - } - Ok(None) => { - self.send_buf(buf, true)?; - break; - } - Err(_) => break, + }, + FillAndBusy(mut f) => { + f.extend(items.drain(..)); + if f.len() + self.sender_buffer_size >= f.capacity() { + (BothBusy, Process(f)) + } else { + (FillAndBusy(f), Done) + } + }, + BothBusy => (BothBusy, Retry), + Dummy => unreachable!(), + }; + + // fill in the new state. + std::mem::swap(buffer_state.deref_mut(), &mut new_state); + + // drop the mutex + drop(buffer_state); + + match outcome { + Process(mut buf_to_process) => { + // process the buffer & return it to the pool. + self.process_buffer(&mut buf_to_process)?; + self.return_buffer(buf_to_process); + break + }, + Done => break, + Retry => { thread::yield_now(); continue; }, } } Ok(()) } - fn send_buf(&mut self, buf: Vec, done: bool) -> Result, Error> { - let r = self.buf_rx.try_recv(); - let sent = self.buf_tx.send((buf, done)); - if sent.is_err() { - return Err(format_err!( - "writer thread shut down, see ShardWriterThread.finish() for error" - )); - } + pub fn close(self) -> Result { + + self.closed.store(true, std::sync::atomic::Ordering::SeqCst); + let mut bufs_processed = 0; + + loop { + if bufs_processed == 2 { + break; + } + + use std::ops::DerefMut; + let mut buffer_state = self.buffer_state.lock().unwrap(); + let mut current_state = BufStates::Dummy; + std::mem::swap(buffer_state.deref_mut(), &mut current_state); + + let (mut new_state, to_process) = match current_state { + BufStates::FillAndWait(f, w) => { + (BufStates::FillAndBusy(w), Some(f)) + }, + BufStates::FillAndBusy(f) => { + (BufStates::BothBusy, Some(f)) + }, + // if both buffers are busy, we yield and try again. + BufStates::BothBusy => { thread::yield_now(); continue; }, + BufStates::Dummy => unreachable!(), + }; - match r { - Ok(r) => return Ok(r), - Err(crossbeam_channel::TryRecvError::Empty) => (), - Err(v) => return Err(v.into()), + // fill in the new state. + std::mem::swap(buffer_state.deref_mut(), &mut new_state); + + if let Some(mut buf) = to_process { + bufs_processed += 1; + self.process_buffer(&mut buf)?; + } else { + unreachable!(); + } }; - if !self.second_buf { - self.second_buf = true; - Ok(Vec::with_capacity(self.buffer_size)) - } else if !done { - let return_buf = self.buf_rx.recv()?; - Ok(return_buf) - } else { - assert!(done, "had to make 3rd buffer when not done"); - Ok(Vec::new()) - } + Ok(self.handler.into_inner().unwrap()) + } + + fn return_buffer(&self, buf: Vec) { + use std::ops::DerefMut; + let mut buffer_state = self.buffer_state.lock().unwrap(); + let mut current_state = BufStates::Dummy; + std::mem::swap(buffer_state.deref_mut(), &mut current_state); + + let mut new_state = match current_state { + BufStates::FillAndWait(_, _) => panic!("there are 3 buffers in use!!"), + BufStates::FillAndBusy(f) => BufStates::FillAndWait(f, buf), + BufStates::BothBusy => BufStates::FillAndBusy(buf), + BufStates::Dummy => unreachable!(), + }; + + std::mem::swap(buffer_state.deref_mut(), &mut new_state); + } + + pub fn process_buffer(&self, buf: &mut Vec) -> Result<(), Error> { + // prepare the buffer for process - this doesn't require + // a lock on handler + H::prepare_buf(buf); + + // process the buf -- this requires the handler mutex + let mut handler = self.handler.lock().unwrap(); + handler.process_buf(buf)?; + buf.clear(); + Ok(()) } } -/// Sort buffered items, break large buffer into chunks. -/// Serialize, compress and write the each chunk. The -/// file manager maintains the index data. -struct ShardWriterThread + +trait BufHandler { + fn prepare_buf(v: &mut Vec); + fn process_buf(&mut self, v: &mut Vec) -> Result<(), Error>; +} + +struct SortAndWriteHandler where T: Send + Serialize, S: SortKey, - >::Key: Ord + Clone + Serialize, + >::Key: Ord + Clone + Serialize, { + // Current start position of next chunk + cursor: usize, + // Record of chunks written + regions: Vec>::Key>>, + // File handle + file: File, + // Size of disk chunks chunk_size: usize, - writer: FileManager<>::Key>, - buf_rx: Receiver<(Vec, bool)>, - buf_tx: Sender>, + // Buffers for use when writing serialize_buffer: Vec, compress_buffer: Vec, } -impl ShardWriterThread +impl BufHandler for SortAndWriteHandler where T: Send + Serialize, S: SortKey, - >::Key: Ord + Clone + Serialize, + >::Key: Ord + Clone + Serialize, { - fn new( - chunk_size: usize, - writer: FileManager<>::Key>, - buf_rx: Receiver<(Vec, bool)>, - buf_tx: Sender>, - ) -> ShardWriterThread { - ShardWriterThread { - chunk_size, - writer, - buf_rx, - buf_tx, - serialize_buffer: Vec::new(), - compress_buffer: Vec::new(), - } + fn prepare_buf(buf: &mut Vec) { + buf.sort_by(|x, y| S::sort_key(x).cmp(&S::sort_key(y))); } - fn process(&mut self) -> Result { - let mut n_items = 0; + fn process_buf(&mut self, buf: &mut Vec) -> Result<(), Error> { + // Write out the buffer chunks + for c in buf.chunks(self.chunk_size) { + self.write_chunk(c)?; + } - loop { - // Get the next buffer to process - let (mut buf, done) = self.buf_rx.recv()?; - n_items += buf.len(); + Ok(()) + } +} - // Sort by sort key - buf.sort_by(|x, y| S::sort_key(x).cmp(&S::sort_key(y))); +impl SortAndWriteHandler +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ - // Write out the buffer chunks - for c in buf.chunks(self.chunk_size) { - self.write_chunk(c)?; - } + pub fn new>(chunk_size: usize, path: P) -> Result, Error> { - // Done with all the items - buf.clear(); + let file = File::create(path)?; - // Send the buffer back to be reused - if done { - break; - } else { - // The receiver may have hung up, don't worry. - let _ = self.buf_tx.send(buf); - } - } + Ok(SortAndWriteHandler { + cursor: 4096, + regions: Vec::new(), + file, + serialize_buffer: Vec::new(), + compress_buffer: Vec::new(), + chunk_size, + }) - self.write_index_block()?; - Ok(n_items) + // FIXME: write a magic string to the start of the file, + // and maybe some metadata about the types being stored and the + // compression scheme. + // FIXME: write to a TempFile that will be destroyed unless + // writing completes successfully. } fn write_chunk(&mut self, items: &[T]) -> Result { self.serialize_buffer.clear(); self.compress_buffer.clear(); + + assert!(items.len() > 0); + let bounds = ( S::sort_key(&items[0]).into_owned(), S::sort_key(&items[items.len() - 1]).into_owned(), @@ -541,55 +521,138 @@ where result?; } - self.writer - .write_block(bounds.clone(), items.len(), &self.compress_buffer) + let cur_offset = self.cursor; + let reg = ShardRecord { + offset: self.cursor, + start_key: bounds.0, + end_key: bounds.1, + len_bytes: self.compress_buffer.len(), + len_items: items.len(), + }; + + self.regions.push(reg); + self.cursor += self.compress_buffer.len(); + let l = self.file.write_at(&self.compress_buffer, cur_offset as u64)?; + + Ok(l) } /// Write out the shard positioning data - fn write_index_block(&mut self) -> Result<(), Error> { + fn write_index(&mut self) -> Result<(), Error> { let mut buf = Vec::new(); - serialize_into(&mut buf, &self.writer.regions)?; + serialize_into(&mut buf, &self.regions)?; - let index_block_position = self.writer.cursor; + let index_block_position = self.cursor; let index_block_size = buf.len(); - self.writer - .file - .write_at(buf.as_slice(), index_block_position as u64)?; + self.file.write_at(buf.as_slice(), index_block_position as u64)?; - self.writer.file.seek(SeekFrom::Start( + self.file.seek(SeekFrom::Start( (index_block_position + index_block_size) as u64, ))?; - self.writer.file.write_u64::(0 as u64)?; - self.writer - .file - .write_u64::(index_block_position as u64)?; - self.writer - .file - .write_u64::(index_block_size as u64)?; + self.file.write_u64::(0 as u64)?; + self.file.write_u64::(index_block_position as u64)?; + self.file.write_u64::(index_block_size as u64)?; Ok(()) } + +} + +/// Sort buffered items, break large buffer into chunks. +/// Serialize, compress and write the each chunk. The +/// file manager maintains the index data. +struct ShardWriterInner +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ + sender_buffer_size: usize, + state_machine: BufferStateMachine>, + closed: AtomicBool, +} + +impl ShardWriterInner +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ + fn new( + chunk_size: usize, + buf_size: usize, + sender_buffer_size: usize, + path: impl AsRef, + ) -> Result, Error> { + + let handler = SortAndWriteHandler::new(chunk_size, path)?; + + let bufs = BufStates::FillAndWait( + Vec::with_capacity(buf_size / 2), + Vec::with_capacity(buf_size / 2), + ); + + let state_machine = BufferStateMachine { + buffer_state: Mutex::new(bufs), + handler: Mutex::new(handler), + closed: AtomicBool::new(false), + sender_buffer_size: sender_buffer_size, + }; + + Ok(ShardWriterInner { + sender_buffer_size, + state_machine, + closed: std::sync::atomic::AtomicBool::from(false), + }) + } + + fn send_items(&self, items: &mut Vec) -> Result<(), Error> { + if !self.closed.load(std::sync::atomic::Ordering::Relaxed) { + self.state_machine.add_items(items) + } else { + let msg = format_err!("tried to send items after sender was closed"); + Err(msg) + } + } + + fn close(self) -> Result { + let mut handler = self.state_machine.close()?; + let nitems = handler.regions.iter().map(|r| r.len_items).sum(); + handler.write_index()?; + Ok(nitems) + } } /// A handle that is used to send data to a `ShardWriter`. Each thread that is producing data /// needs it's own ShardSender. A `ShardSender` can be obtained with the `get_sender` method of /// `ShardWriter`. ShardSender implement clone. -pub struct ShardSender { - tx: Sender>>, +pub struct ShardSender +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ + writer: Option>>, buffer: Vec, buf_size: usize, } -impl ShardSender { - fn new(writer: &ShardWriter) -> ShardSender { - let new_tx = writer.helper.tx.clone(); +impl ShardSender +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ + fn new(writer: Arc>) -> ShardSender { + let buffer = Vec::with_capacity(writer.sender_buffer_size); + let buf_size = writer.sender_buffer_size; ShardSender { - tx: new_tx, + writer: Some(writer), buffer, - buf_size: writer.sender_buffer_size, + buf_size, } } @@ -601,14 +664,11 @@ impl ShardSender { }; if send { - let mut send_buf = Vec::with_capacity(self.buf_size); - std::mem::swap(&mut send_buf, &mut self.buffer); - let e = self.tx.send(Some(send_buf)); - if e.is_err() { - return Err(format_err!( - "ShardWriter failed. See ShardWriter.finish() for underlying error" - )); - } + self + .writer + .as_ref() + .expect("ShardSender is already closed") + .send_items(&mut self.buffer)?; } Ok(()) @@ -617,34 +677,49 @@ impl ShardSender { /// Signal that you've finished sending items to this `ShardSender`. `finished` will called /// if the `ShardSender` is dropped. You must call `finished()` or drop the `ShardSender` /// prior to calling `ShardWriter::finish` or dropping the ShardWriter, or you will get a panic. - pub fn finished(&mut self) { - if !self.buffer.is_empty() { - let mut send_buf = Vec::new(); - std::mem::swap(&mut send_buf, &mut self.buffer); - self.tx.send(Some(send_buf)).unwrap(); + pub fn finished(&mut self) -> Result<(), Error> { + + // send any remaining items and drop the Arc + if let Some(inner) = self.writer.take() { + if self.buffer.len() > 0 { + inner.send_items(&mut self.buffer)?; + } } + + Ok(()) } } -impl Clone for ShardSender { +impl Clone for ShardSender +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ fn clone(&self) -> Self { - let new_tx = self.tx.clone(); let buffer = Vec::with_capacity(self.buf_size); ShardSender { - tx: new_tx, + writer: self.writer.clone(), buffer, buf_size: self.buf_size, } } } -impl Drop for ShardSender { + +impl> Drop for ShardSender +where + T: Send + Serialize, + S: SortKey, + >::Key: Ord + Clone + Serialize, +{ fn drop(&mut self) { - self.finished(); + let _e = self.finished(); } } + /// Read a shardio file struct ShardReaderSingle where @@ -1291,7 +1366,9 @@ mod shard_tests { #[test] fn test_shard_round_trip() { // Test different buffering configurations - check_round_trip(10, 20, 0, 1 << 8); + check_round_trip(5, 3, 12, 1 << 9); + check_round_trip(5, 4, 12, 1 << 9); + check_round_trip(5, 5, 13, 1 << 9); check_round_trip(10, 20, 40, 1 << 8); check_round_trip(1024, 16, 2 << 14, 1 << 16); check_round_trip(4096, 8, 2048, 1 << 16); @@ -1310,7 +1387,7 @@ mod shard_tests { #[test] fn test_shard_round_trip_sort_key() -> Result<(), Error> { // Test different buffering configurations - check_round_trip_sort_key(10, 20, 0, 256, true)?; + check_round_trip_sort_key(10, 20, 1, 256, true)?; check_round_trip_sort_key(10, 20, 40, 256, true)?; check_round_trip_sort_key(1024, 16, 2 << 14, 1 << 16, true)?; check_round_trip_sort_key(4096, 8, 2048, 1 << 16, true)?; @@ -1488,6 +1565,50 @@ mod shard_tests { .unwrap(); } + struct ThreadSender { + t: PhantomData, + s: PhantomData, + } + + impl ThreadSender + where + T: 'static + Send + Serialize + Clone, + S: 'static + SortKey, + >::Key: Ord + Clone + Serialize + Send, + { + fn send_from_threads(items: &[T], sender: ShardSender, n: usize) -> Result<(), Error> { + + let barrier = Arc::new(std::sync::Barrier::new(n+1)); + let mut handles = Vec::new(); + + for _chunk in items.chunks(items.len() / n) { + let mut s = sender.clone(); + let chunk: Vec = _chunk.iter().cloned().collect(); + let barrier = barrier.clone(); + + let h = std::thread::spawn( + move || -> Result<(), Error> + { + barrier.wait(); + for c in chunk { + s.send(c)?; + } + Ok(()) + } + ); + + handles.push(h); + } + + for h in handles { + h.join().unwrap()?; + } + + Ok(()) + } + } + + fn check_round_trip_opt( disk_chunk_size: usize, producer_chunk_size: usize, @@ -1504,7 +1625,7 @@ mod shard_tests { // Write and close file let true_items = { - let manager: ShardWriter = ShardWriter::new( + let mut writer: ShardWriter = ShardWriter::new( tmp.path(), producer_chunk_size, disk_chunk_size, @@ -1512,15 +1633,9 @@ mod shard_tests { )?; let mut true_items = rand_items(n_items); - // Sender must be closed - { - for chunk in true_items.chunks(n_items / 8) { - let mut sender = manager.get_sender(); - for item in chunk { - sender.send(*item)?; - } - } - } + ThreadSender::send_from_threads(&true_items, writer.get_sender(), 4)?; + + writer.finish()?; true_items.sort(); true_items }; @@ -1540,7 +1655,7 @@ mod shard_tests { assert_eq!(&true_items, &all_items); } - for rc in [1, 3, 8, 15, 27].iter() { + for rc in [1].iter() { //, 3, 8, 15, 27].iter() { // Open finished file & test chunked reads let set_reader = ShardReader::::open(&tmp.path())?; let mut all_items_chunks = Vec::new(); @@ -1585,7 +1700,7 @@ mod shard_tests { // Write and close file let true_items = { - let mut manager: ShardWriter = ShardWriter::new( + let mut writer: ShardWriter = ShardWriter::new( tmp.path(), producer_chunk_size, disk_chunk_size, @@ -1593,16 +1708,9 @@ mod shard_tests { )?; let mut true_items = rand_items(n_items); - // Sender must be closed - { - for chunk in true_items.chunks(n_items / 8) { - let mut sender = manager.get_sender(); - for item in chunk { - sender.send(*item)?; - } - } - } - manager.finish()?; + ThreadSender::send_from_threads(&true_items, writer.get_sender(), 4)?; + + writer.finish()?; true_items.sort_by_key(|x| x.d); true_items From 0ce3a4709d63e710813486fa832c0140376df100 Mon Sep 17 00:00:00 2001 From: Patrick Marks Date: Sun, 26 Jan 2020 16:36:18 -0800 Subject: [PATCH 2/7] fmt --- src/lib.rs | 139 ++++++++++++++++++++++++----------------------------- 1 file changed, 64 insertions(+), 75 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index febcf1d60..e2a2f2ce0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,7 +73,6 @@ ///#![deny(warnings)] ///#![deny(missing_docs)] - use lz4; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -84,8 +83,8 @@ use std::fs::File; use std::io::{self, Seek, SeekFrom}; use std::os::unix::fs::FileExt; -use std::sync::Arc; use std::path::Path; +use std::sync::Arc; use min_max_heap::MinMaxHeap; use std::marker::PhantomData; @@ -96,8 +95,8 @@ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::sync::atomic::AtomicBool; -use std::sync::Mutex; use failure::{format_err, Error}; +use std::sync::Mutex; /// Represent a range of key space pub mod range; @@ -183,7 +182,7 @@ where /// Items are sorted according to the `Ord` implementation of type `S::Key`. Type `S`, implementing the `SortKey` trait /// maps items of type `T` to their sort key of type `S::Key`. By default the sort key is the data item itself, and the /// the `DefaultSort` implementation of `SortKey` is the identity function. -pub struct ShardWriter +pub struct ShardWriter where T: 'static + Send + Serialize, S: SortKey, @@ -193,7 +192,6 @@ where sort: PhantomData, } - impl ShardWriter where T: 'static + Send + Serialize, @@ -214,12 +212,12 @@ where disk_chunk_size: usize, item_buffer_size: usize, ) -> Result, Error> { - assert!(disk_chunk_size >= 1); assert!(item_buffer_size >= 1); assert!(sender_buffer_size >= 1); - let inner = ShardWriterInner::new(disk_chunk_size, item_buffer_size, sender_buffer_size, path)?; - + let inner = + ShardWriterInner::new(disk_chunk_size, item_buffer_size, sender_buffer_size, path)?; + Ok(ShardWriter { inner: Some(Arc::new(inner)), sort: PhantomData, @@ -244,9 +242,8 @@ where } } - -impl Drop for ShardWriter -where +impl Drop for ShardWriter +where S: SortKey, >::Key: 'static + Send + Ord + Serialize + Clone, T: Send + Serialize, @@ -256,7 +253,6 @@ where } } - struct BufferStateMachine { sender_buffer_size: usize, buffer_state: Mutex>, @@ -274,7 +270,6 @@ enum BufStates { use BufStates::*; - enum BufAddOutcome { Done, Retry, @@ -283,13 +278,12 @@ enum BufAddOutcome { use BufAddOutcome::*; - use std::fmt; impl fmt::Debug for BufStates { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - BufStates::FillAndWait(a,b) => write!(f, "FillAndWait({}, {})", a.len(), b.len()), + BufStates::FillAndWait(a, b) => write!(f, "FillAndWait({}, {})", a.len(), b.len()), BufStates::FillAndBusy(a) => write!(f, "FillAndBusy({})", a.len()), BufStates::BothBusy => write!(f, "BothBusy"), BufStates::Dummy => write!(f, "Dummy"), @@ -298,9 +292,7 @@ impl fmt::Debug for BufStates { } impl> BufferStateMachine { - fn add_items(&self, items: &mut Vec) -> Result<(), Error> { - if self.closed.load(std::sync::atomic::Ordering::Relaxed) { panic!("tried to add items to BufHandler after it was closed"); } @@ -319,7 +311,7 @@ impl> BufferStateMachine { } else { (FillAndWait(f, w), Done) } - }, + } FillAndBusy(mut f) => { f.extend(items.drain(..)); if f.len() + self.sender_buffer_size >= f.capacity() { @@ -327,7 +319,7 @@ impl> BufferStateMachine { } else { (FillAndBusy(f), Done) } - }, + } BothBusy => (BothBusy, Retry), Dummy => unreachable!(), }; @@ -343,10 +335,13 @@ impl> BufferStateMachine { // process the buffer & return it to the pool. self.process_buffer(&mut buf_to_process)?; self.return_buffer(buf_to_process); - break - }, + break; + } Done => break, - Retry => { thread::yield_now(); continue; }, + Retry => { + thread::yield_now(); + continue; + } } } @@ -354,7 +349,6 @@ impl> BufferStateMachine { } pub fn close(self) -> Result { - self.closed.store(true, std::sync::atomic::Ordering::SeqCst); let mut bufs_processed = 0; @@ -369,14 +363,13 @@ impl> BufferStateMachine { std::mem::swap(buffer_state.deref_mut(), &mut current_state); let (mut new_state, to_process) = match current_state { - BufStates::FillAndWait(f, w) => { - (BufStates::FillAndBusy(w), Some(f)) - }, - BufStates::FillAndBusy(f) => { - (BufStates::BothBusy, Some(f)) - }, + BufStates::FillAndWait(f, w) => (BufStates::FillAndBusy(w), Some(f)), + BufStates::FillAndBusy(f) => (BufStates::BothBusy, Some(f)), // if both buffers are busy, we yield and try again. - BufStates::BothBusy => { thread::yield_now(); continue; }, + BufStates::BothBusy => { + thread::yield_now(); + continue; + } BufStates::Dummy => unreachable!(), }; @@ -389,7 +382,7 @@ impl> BufferStateMachine { } else { unreachable!(); } - }; + } Ok(self.handler.into_inner().unwrap()) } @@ -423,17 +416,16 @@ impl> BufferStateMachine { } } - trait BufHandler { fn prepare_buf(v: &mut Vec); fn process_buf(&mut self, v: &mut Vec) -> Result<(), Error>; } -struct SortAndWriteHandler +struct SortAndWriteHandler where T: Send + Serialize, S: SortKey, - >::Key: Ord + Clone + Serialize, + >::Key: Ord + Clone + Serialize, { // Current start position of next chunk cursor: usize, @@ -448,11 +440,11 @@ where compress_buffer: Vec, } -impl BufHandler for SortAndWriteHandler +impl BufHandler for SortAndWriteHandler where T: Send + Serialize, S: SortKey, - >::Key: Ord + Clone + Serialize, + >::Key: Ord + Clone + Serialize, { fn prepare_buf(buf: &mut Vec) { buf.sort_by(|x, y| S::sort_key(x).cmp(&S::sort_key(y))); @@ -468,15 +460,16 @@ where } } -impl SortAndWriteHandler +impl SortAndWriteHandler where T: Send + Serialize, S: SortKey, - >::Key: Ord + Clone + Serialize, + >::Key: Ord + Clone + Serialize, { - - pub fn new>(chunk_size: usize, path: P) -> Result, Error> { - + pub fn new>( + chunk_size: usize, + path: P, + ) -> Result, Error> { let file = File::create(path)?; Ok(SortAndWriteHandler { @@ -532,7 +525,9 @@ where self.regions.push(reg); self.cursor += self.compress_buffer.len(); - let l = self.file.write_at(&self.compress_buffer, cur_offset as u64)?; + let l = self + .file + .write_at(&self.compress_buffer, cur_offset as u64)?; Ok(l) } @@ -546,17 +541,18 @@ where let index_block_position = self.cursor; let index_block_size = buf.len(); - self.file.write_at(buf.as_slice(), index_block_position as u64)?; + self.file + .write_at(buf.as_slice(), index_block_position as u64)?; self.file.seek(SeekFrom::Start( (index_block_position + index_block_size) as u64, ))?; self.file.write_u64::(0 as u64)?; - self.file.write_u64::(index_block_position as u64)?; + self.file + .write_u64::(index_block_position as u64)?; self.file.write_u64::(index_block_size as u64)?; Ok(()) } - } /// Sort buffered items, break large buffer into chunks. @@ -569,7 +565,7 @@ where >::Key: Ord + Clone + Serialize, { sender_buffer_size: usize, - state_machine: BufferStateMachine>, + state_machine: BufferStateMachine>, closed: AtomicBool, } @@ -585,7 +581,6 @@ where sender_buffer_size: usize, path: impl AsRef, ) -> Result, Error> { - let handler = SortAndWriteHandler::new(chunk_size, path)?; let bufs = BufStates::FillAndWait( @@ -627,13 +622,13 @@ where /// A handle that is used to send data to a `ShardWriter`. Each thread that is producing data /// needs it's own ShardSender. A `ShardSender` can be obtained with the `get_sender` method of /// `ShardWriter`. ShardSender implement clone. -pub struct ShardSender +pub struct ShardSender where T: Send + Serialize, S: SortKey, >::Key: Ord + Clone + Serialize, { - writer: Option>>, + writer: Option>>, buffer: Vec, buf_size: usize, } @@ -645,7 +640,6 @@ where >::Key: Ord + Clone + Serialize, { fn new(writer: Arc>) -> ShardSender { - let buffer = Vec::with_capacity(writer.sender_buffer_size); let buf_size = writer.sender_buffer_size; @@ -664,11 +658,10 @@ where }; if send { - self - .writer - .as_ref() - .expect("ShardSender is already closed") - .send_items(&mut self.buffer)?; + self.writer + .as_ref() + .expect("ShardSender is already closed") + .send_items(&mut self.buffer)?; } Ok(()) @@ -678,7 +671,6 @@ where /// if the `ShardSender` is dropped. You must call `finished()` or drop the `ShardSender` /// prior to calling `ShardWriter::finish` or dropping the ShardWriter, or you will get a panic. pub fn finished(&mut self) -> Result<(), Error> { - // send any remaining items and drop the Arc if let Some(inner) = self.writer.take() { if self.buffer.len() > 0 { @@ -707,7 +699,6 @@ where } } - impl> Drop for ShardSender where T: Send + Serialize, @@ -719,7 +710,6 @@ where } } - /// Read a shardio file struct ShardReaderSingle where @@ -1570,15 +1560,18 @@ mod shard_tests { s: PhantomData, } - impl ThreadSender - where + impl ThreadSender + where T: 'static + Send + Serialize + Clone, S: 'static + SortKey, >::Key: Ord + Clone + Serialize + Send, { - fn send_from_threads(items: &[T], sender: ShardSender, n: usize) -> Result<(), Error> { - - let barrier = Arc::new(std::sync::Barrier::new(n+1)); + fn send_from_threads( + items: &[T], + sender: ShardSender, + n: usize, + ) -> Result<(), Error> { + let barrier = Arc::new(std::sync::Barrier::new(n + 1)); let mut handles = Vec::new(); for _chunk in items.chunks(items.len() / n) { @@ -1586,21 +1579,18 @@ mod shard_tests { let chunk: Vec = _chunk.iter().cloned().collect(); let barrier = barrier.clone(); - let h = std::thread::spawn( - move || -> Result<(), Error> - { - barrier.wait(); - for c in chunk { - s.send(c)?; - } - Ok(()) + let h = std::thread::spawn(move || -> Result<(), Error> { + barrier.wait(); + for c in chunk { + s.send(c)?; } - ); + Ok(()) + }); handles.push(h); } - for h in handles { + for h in handles { h.join().unwrap()?; } @@ -1608,7 +1598,6 @@ mod shard_tests { } } - fn check_round_trip_opt( disk_chunk_size: usize, producer_chunk_size: usize, @@ -1655,7 +1644,7 @@ mod shard_tests { assert_eq!(&true_items, &all_items); } - for rc in [1].iter() { //, 3, 8, 15, 27].iter() { + for rc in [1, 3, 8, 15, 27].iter() { // Open finished file & test chunked reads let set_reader = ShardReader::::open(&tmp.path())?; let mut all_items_chunks = Vec::new(); From f862c1d719e036341cbf02b183d5e817ab63b114 Mon Sep 17 00:00:00 2001 From: Patrick Marks Date: Sun, 26 Jan 2020 20:19:58 -0800 Subject: [PATCH 3/7] fix up tests --- Cargo.toml | 4 +-- src/lib.rs | 103 +++++++++++++++++++++++++++++++---------------------- 2 files changed, 63 insertions(+), 44 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index abbc977b0..56e6f6c12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,10 +20,10 @@ bincode = "1.1" byteorder = "1.3.0" lz4 = "1" failure = "0.1.5" -crossbeam-channel = "0.3.8" failure_derive = "0.1" min-max-heap = "1.2.2" serde = { version = "1.0", features = ["derive"] } +crossbeam-channel = "0.4.0" [dev-dependencies] tempfile = "3.0" @@ -32,7 +32,7 @@ fxhash = "0.2.1" lazy_static = "1.2" pretty_assertions = "0.5.1" quickcheck = "0.8.5" -rand = "0.6.5" +rand = "0.7" is_sorted = "0.1" [[bench]] diff --git a/src/lib.rs b/src/lib.rs index e2a2f2ce0..d7e55da93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,8 +71,8 @@ //! } //! ``` -///#![deny(warnings)] -///#![deny(missing_docs)] +#![deny(warnings)] +#![deny(missing_docs)] use lz4; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -234,7 +234,13 @@ where if let Some(inner_arc) = self.inner.take() { match Arc::try_unwrap(inner_arc) { Ok(inner) => inner.close(), - Err(_) => panic!("ShardSenders are still active. They must all be out of scope before ShardWriter is closed"), + Err(_) => { + if !std::thread::panicking() { + panic!("ShardSenders are still active. They must all be out of scope before ShardWriter is closed"); + } else { + return Ok(0); + } + } } } else { Ok(0) @@ -1274,7 +1280,6 @@ mod shard_tests { use is_sorted::IsSorted; use pretty_assertions::assert_eq; use quickcheck::{Arbitrary, Gen, QuickCheck, StdThreadGen}; - use rand::Rng; use std::collections::HashSet; use std::fmt::Debug; use std::hash::Hash; @@ -1293,10 +1298,10 @@ mod shard_tests { impl Arbitrary for T1 { fn arbitrary(g: &mut G) -> T1 { T1 { - a: g.gen(), - b: g.gen(), - c: g.gen(), - d: g.gen(), + a: u64::arbitrary(g), + b: u32::arbitrary(g), + c: u16::arbitrary(g), + d: u8::arbitrary(g), } } } @@ -1309,22 +1314,23 @@ mod shard_tests { } } - fn rand_items(n: usize) -> Vec { + fn rand_items(n: usize, g: &mut impl Gen) -> Vec { let mut items = Vec::new(); - - for i in 0..n { - let tt = T1 { - a: (((i / 2) + (i * 10)) % 3 + (i * 5) % 2) as u64, - b: i as u32, - c: (i * 2) as u16, - d: ((i % 5) * 10 + (if i % 10 > 7 { i / 10 } else { 0 })) as u8, - }; + for _ in 0..n { + let tt = T1::arbitrary(g); items.push(tt); } - items } + fn rand_item_chunks(n_chunks: usize, items_per_chunk: usize, g: &mut impl Gen) -> Vec> { + let mut chunks = Vec::new(); + for _ in 0..n_chunks { + chunks.push(rand_items(items_per_chunk, g)); + } + chunks + } + // Some tests are configured to only run with the "full-test" feature enabled. // They are too slow to run in debug mode, so you should use release mode. #[cfg(feature = "full-test")] @@ -1408,7 +1414,11 @@ mod shard_tests { // Write and close file let true_items = { let manager: ShardWriter = ShardWriter::new(tmp.path(), 16, 64, 1 << 10).unwrap(); - let true_items = repeat(rand_items(1)[0]).take(n_items).collect::>(); + + let mut g = StdThreadGen::new(10); + let true_items = repeat(rand_items(1, &mut g)[0]) + .take(n_items) + .collect::>(); // Sender must be closed { @@ -1567,22 +1577,23 @@ mod shard_tests { >::Key: Ord + Clone + Serialize + Send, { fn send_from_threads( - items: &[T], + chunks: Vec>, sender: ShardSender, - n: usize, - ) -> Result<(), Error> { - let barrier = Arc::new(std::sync::Barrier::new(n + 1)); + ) -> Result, Error> { + let barrier = Arc::new(std::sync::Barrier::new(chunks.len())); let mut handles = Vec::new(); - for _chunk in items.chunks(items.len() / n) { + let mut all_items = Vec::new(); + + for chunk in chunks { let mut s = sender.clone(); - let chunk: Vec = _chunk.iter().cloned().collect(); - let barrier = barrier.clone(); + all_items.extend(chunk.iter().cloned()); + let b = barrier.clone(); let h = std::thread::spawn(move || -> Result<(), Error> { - barrier.wait(); - for c in chunk { - s.send(c)?; + b.wait(); + for item in chunk { + s.send(item)?; } Ok(()) }); @@ -1594,7 +1605,7 @@ mod shard_tests { h.join().unwrap()?; } - Ok(()) + Ok(all_items) } } @@ -1620,9 +1631,10 @@ mod shard_tests { disk_chunk_size, buffer_size, )?; - let mut true_items = rand_items(n_items); - ThreadSender::send_from_threads(&true_items, writer.get_sender(), 4)?; + let mut g = StdThreadGen::new(10); + let send_chunks = rand_item_chunks(4, n_items / 4, &mut g); + let mut true_items = ThreadSender::send_from_threads(send_chunks, writer.get_sender())?; writer.finish()?; true_items.sort(); @@ -1695,9 +1707,10 @@ mod shard_tests { disk_chunk_size, buffer_size, )?; - let mut true_items = rand_items(n_items); - ThreadSender::send_from_threads(&true_items, writer.get_sender(), 4)?; + let mut g = StdThreadGen::new(10); + let send_chunks = rand_item_chunks(4, n_items / 4, &mut g); + let mut true_items = ThreadSender::send_from_threads(send_chunks, writer.get_sender())?; writer.finish()?; @@ -1784,7 +1797,9 @@ mod shard_tests { disk_chunk_size, buffer_size, )?; - let mut true_items = rand_items(n_items); + + let mut g = StdThreadGen::new(10); + let mut true_items = rand_items(n_items, &mut g); // Sender must be closed { @@ -1808,16 +1823,20 @@ mod shard_tests { file.set_len(3 * sz / 4).unwrap(); // make sure we get a read err due to the truncated file. - let mut got_err = false; - let iter = reader.iter_range(&Range::all())?; - for i in iter { + let iter = reader.iter_range(&Range::all()); + + // Could get the error here + if iter.is_err() { + return Ok(()); + } + + for i in iter? { + // or could get the error here if i.is_err() { - got_err = true; - break; + return Ok(()); } } - assert!(got_err); - Ok(()) + Err(format_err!("didn't shardio IO error correctly")) } } From 3ef14194d0b0e4a9a9dc5963e4c196e67d9b9f5e Mon Sep 17 00:00:00 2001 From: Patrick Marks Date: Sun, 26 Jan 2020 20:34:57 -0800 Subject: [PATCH 4/7] fix up tests and document --- src/lib.rs | 44 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d7e55da93..1600a43d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,6 +82,7 @@ use std::collections::BTreeSet; use std::fs::File; use std::io::{self, Seek, SeekFrom}; use std::os::unix::fs::FileExt; +use std::ops::DerefMut; use std::path::Path; use std::sync::Arc; @@ -259,6 +260,8 @@ where } } +/// Coordinate a pair of buffers that need to added to from multiple threads and processed/flushed when full. +/// Coordinates acces to `buffer_state`, passing full buffers to `handler` when they are full. struct BufferStateMachine { sender_buffer_size: usize, buffer_state: Mutex>, @@ -266,23 +269,34 @@ struct BufferStateMachine { closed: AtomicBool, } +/// There are always two item buffers held by a ShardWriter. A buffer can be in the process of being filled, +/// waiting for use, or being processed & written to disk. In the first two states, the buffer will be held +/// by this enum. In the processing state it will be held on the stack of the thread processing the buffer. #[derive(PartialEq, Eq)] enum BufStates { + /// Fill the first buffer, using the second as backup FillAndWait(Vec, Vec), + /// Fill the buffer, the other buffer is being written out FillAndBusy(Vec), + /// Both buffers are busy, try again later BothBusy, + /// Placeholder variable, should never be observed. Dummy, } -use BufStates::*; - +/// Outcome of trying to add items to the buffer. enum BufAddOutcome { + /// Items added successfully Done, + /// No free buffers, operation must be re-tried Retry, + /// Items added, but a buffer was filled and must be processed. + /// Buffer to be processed is attached. Process(Vec), } use BufAddOutcome::*; +use BufStates::*; use std::fmt; @@ -298,17 +312,22 @@ impl fmt::Debug for BufStates { } impl> BufferStateMachine { + + /// Move `items` into the buffer & it. If a buffer is filled by this operation + /// then the calling thread will be used to process the full buffer. fn add_items(&self, items: &mut Vec) -> Result<(), Error> { if self.closed.load(std::sync::atomic::Ordering::Relaxed) { panic!("tried to add items to BufHandler after it was closed"); } loop { - use std::ops::DerefMut; + + /// get mutex on buffers let mut buffer_state = self.buffer_state.lock().unwrap(); let mut current_state = Dummy; std::mem::swap(buffer_state.deref_mut(), &mut current_state); + /// determine new state and any follow-up work let (mut new_state, outcome) = match current_state { FillAndWait(mut f, w) => { f.extend(items.drain(..)); @@ -330,30 +349,33 @@ impl> BufferStateMachine { Dummy => unreachable!(), }; - // fill in the new state. + // Fill in the new state. std::mem::swap(buffer_state.deref_mut(), &mut new_state); // drop the mutex drop(buffer_state); + // outcome could be to process a full vec, retry, or be done. match outcome { Process(mut buf_to_process) => { // process the buffer & return it to the pool. self.process_buffer(&mut buf_to_process)?; self.return_buffer(buf_to_process); break; - } - Done => break, + }, Retry => { thread::yield_now(); continue; - } + }, + Done => break, } } Ok(()) } + /// shut down the buffer machinery, processing any remaining items. + /// calls to `add_items` after `close` will panic. pub fn close(self) -> Result { self.closed.store(true, std::sync::atomic::Ordering::SeqCst); let mut bufs_processed = 0; @@ -393,6 +415,7 @@ impl> BufferStateMachine { Ok(self.handler.into_inner().unwrap()) } + /// put a buffer back into service after processing fn return_buffer(&self, buf: Vec) { use std::ops::DerefMut; let mut buffer_state = self.buffer_state.lock().unwrap(); @@ -409,6 +432,7 @@ impl> BufferStateMachine { std::mem::swap(buffer_state.deref_mut(), &mut new_state); } + /// process `buf` pub fn process_buffer(&self, buf: &mut Vec) -> Result<(), Error> { // prepare the buffer for process - this doesn't require // a lock on handler @@ -422,6 +446,9 @@ impl> BufferStateMachine { } } +/// Two-part handler for a buffer needing processing. +/// `prepare_buf` is generic and needs no state. Used for sorting. +/// `process_buf` needs exclusive access to the handler, so can do IO. trait BufHandler { fn prepare_buf(v: &mut Vec); fn process_buf(&mut self, v: &mut Vec) -> Result<(), Error>; @@ -452,10 +479,13 @@ where S: SortKey, >::Key: Ord + Clone + Serialize, { + + /// Sort items according to `S`. fn prepare_buf(buf: &mut Vec) { buf.sort_by(|x, y| S::sort_key(x).cmp(&S::sort_key(y))); } + /// Write items to disk chunks fn process_buf(&mut self, buf: &mut Vec) -> Result<(), Error> { // Write out the buffer chunks for c in buf.chunks(self.chunk_size) { From b000e30d67415f19a61dd6da787ceb0d34404861 Mon Sep 17 00:00:00 2001 From: Patrick Marks Date: Sun, 26 Jan 2020 20:36:49 -0800 Subject: [PATCH 5/7] fix test & document --- src/lib.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 1600a43d6..732afba2c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -322,12 +322,12 @@ impl> BufferStateMachine { loop { - /// get mutex on buffers + // get mutex on buffers let mut buffer_state = self.buffer_state.lock().unwrap(); let mut current_state = Dummy; std::mem::swap(buffer_state.deref_mut(), &mut current_state); - /// determine new state and any follow-up work + // determine new state and any follow-up work let (mut new_state, outcome) = match current_state { FillAndWait(mut f, w) => { f.extend(items.drain(..)); @@ -385,7 +385,6 @@ impl> BufferStateMachine { break; } - use std::ops::DerefMut; let mut buffer_state = self.buffer_state.lock().unwrap(); let mut current_state = BufStates::Dummy; std::mem::swap(buffer_state.deref_mut(), &mut current_state); @@ -417,7 +416,7 @@ impl> BufferStateMachine { /// put a buffer back into service after processing fn return_buffer(&self, buf: Vec) { - use std::ops::DerefMut; + let mut buffer_state = self.buffer_state.lock().unwrap(); let mut current_state = BufStates::Dummy; std::mem::swap(buffer_state.deref_mut(), &mut current_state); From 8902665c45c56098afa3ee7c19b301e0d9f655e8 Mon Sep 17 00:00:00 2001 From: Patrick Marks Date: Sun, 26 Jan 2020 21:05:05 -0800 Subject: [PATCH 6/7] old ver for now --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 56e6f6c12..e8f18a818 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shardio" -version = "0.5.5" +version = "0.5.4" authors = ["Patrick Marks ", "Sreenath Krishnan ", "Lance Hepler "] edition = "2018" license = "MIT" From cd964419095287c2edbf1a4755cfbf691680a5db Mon Sep 17 00:00:00 2001 From: Patrick Marks Date: Mon, 27 Jan 2020 15:00:14 -0800 Subject: [PATCH 7/7] assert that mem buffer >= sender buffer --- src/lib.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 732afba2c..1135b2564 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -216,6 +216,8 @@ where assert!(disk_chunk_size >= 1); assert!(item_buffer_size >= 1); assert!(sender_buffer_size >= 1); + assert!(item_buffer_size >= sender_buffer_size); + let inner = ShardWriterInner::new(disk_chunk_size, item_buffer_size, sender_buffer_size, path)?; @@ -364,6 +366,7 @@ impl> BufferStateMachine { break; }, Retry => { + // take a break before trying again. thread::yield_now(); continue; }, @@ -1412,7 +1415,8 @@ mod shard_tests { #[test] fn test_shard_round_trip_sort_key() -> Result<(), Error> { // Test different buffering configurations - check_round_trip_sort_key(10, 20, 1, 256, true)?; + check_round_trip_sort_key(10, 10, 10, 256, true)?; + check_round_trip_sort_key(10, 2, 3, 512, true)?; check_round_trip_sort_key(10, 20, 40, 256, true)?; check_round_trip_sort_key(1024, 16, 2 << 14, 1 << 16, true)?; check_round_trip_sort_key(4096, 8, 2048, 1 << 16, true)?; @@ -1557,7 +1561,7 @@ mod shard_tests { #[test] fn multi_slice_correctness_quickcheck() { fn check_t1(v: MultiSlice) -> bool { - let sorted = test_multi_slice::(v.clone(), 1024, 1 << 17, 16).unwrap(); + let sorted = test_multi_slice::(v.clone(), 1024, 16, 1 << 17).unwrap(); let mut vall = Vec::new(); for chunk in v.0 {