Skip to content

Commit

Permalink
Merge de74fd2 into 2f7bf36
Browse files Browse the repository at this point in the history
  • Loading branch information
sreenathkrishnan committed Jul 15, 2019
2 parents 2f7bf36 + de74fd2 commit 9199ad7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
target
Cargo.lock
test.shardio
35 changes: 22 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,18 @@
//! let mut writer: ShardWriter<DataStruct> =
//! ShardWriter::new(filename, 64, 256, 1<<16)?;
//!
//! // Get a handle to send data to the file
//! let mut sender = writer.get_sender();
//! // get_sender() immutably borrows the writer and all senders need to be dropped
//! // before we can call finish() on the writer (which is recommended to catch errors)
//! // Hence declaring senders in an inner scope to make sure they are dropped and the
//! // borrow checker does not complain.
//! {
//! // Get a handle to send data to the file
//! let mut sender = writer.get_sender();
//!
//! // Generate some test data
//! for i in 0..(2 << 16) {
//! sender.send(DataStruct { a: (i%25) as u64, b: (i%100) as u32 });
//! // Generate some test data
//! for i in 0..(2 << 16) {
//! sender.send(DataStruct { a: (i%25) as u64, b: (i%100) as u32 });
//! }
//! }
//!
//! // Write errors are accessible by calling the finish() method
Expand Down Expand Up @@ -324,7 +330,7 @@ struct ShardWriterHelper<T> {
impl<T, S> ShardWriter<T, S>
where
T: 'static + Send + Serialize,
S: SortKey<T>,
S: SortKey<T> + Send,
<S as SortKey<T>>::Key: 'static + Send + Ord + Serialize + Clone,
{
/// Create a writer for storing data items of type `T`.
Expand Down Expand Up @@ -385,7 +391,7 @@ where
}

/// Get a `ShardSender`. It can be sent to another thread that is generating data.
pub fn get_sender(&self) -> ShardSender<T> {
pub fn get_sender(&self) -> ShardSender<T, S> {
ShardSender::new(self)
}

Expand Down Expand Up @@ -652,21 +658,23 @@ where
/// A handle that is used to send data to a `ShardWriter`. Each thread that is producing data
/// needs it's own ShardSender. A `ShardSender` can be obtained with the `get_sender` method of
/// `ShardWriter`. ShardSender implement clone.
pub struct ShardSender<T: Send> {
pub struct ShardSender<'a, T: Send, S: Send> {
tx: Sender<Option<Vec<T>>>,
buffer: Vec<T>,
buf_size: usize,
writer: &'a ShardWriter<T, S>,
}

impl<T: Send> ShardSender<T> {
fn new<S>(writer: &ShardWriter<T, S>) -> ShardSender<T> {
impl<'a, T: Send, S: Send> ShardSender<'a, T, S> {
fn new(writer: &ShardWriter<T, S>) -> ShardSender<T, S> {
let new_tx = writer.helper.tx.clone();
let buffer = Vec::with_capacity(writer.sender_buffer_size);

ShardSender {
tx: new_tx,
buffer,
buf_size: writer.sender_buffer_size,
writer
}
}

Expand Down Expand Up @@ -702,7 +710,7 @@ impl<T: Send> ShardSender<T> {
}
}

impl<T: Send + Serialize> Clone for ShardSender<T> {
impl<T: Send + Serialize, S: Send> Clone for ShardSender<'_, T, S> {
fn clone(&self) -> Self {
let new_tx = self.tx.clone();
let buffer = Vec::with_capacity(self.buf_size);
Expand All @@ -711,11 +719,12 @@ impl<T: Send + Serialize> Clone for ShardSender<T> {
tx: new_tx,
buffer,
buf_size: self.buf_size,
writer: self.writer,
}
}
}

impl<T: Send> Drop for ShardSender<T> {
impl<T: Send, S: Send> Drop for ShardSender<'_, T, S> {
fn drop(&mut self) {
self.finished();
}
Expand Down Expand Up @@ -1521,7 +1530,7 @@ mod shard_tests {
fn test_multi_slice<T, S>(items: MultiSlice<T>, disk_chunk_size: usize, producer_chunk_size: usize, buffer_size: usize) -> Result<Vec<T>, Error>
where
T: 'static + Serialize + DeserializeOwned + Clone + Send,
S: SortKey<T>,
S: SortKey<T> + Send,
<S as SortKey<T>>::Key: 'static + Send + Serialize + DeserializeOwned,
{

Expand Down

0 comments on commit 9199ad7

Please sign in to comment.