Skip to content

Commit

Permalink
bug/4 Removed race condition in SegmentUpdater
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 16, 2016
1 parent e5b51e5 commit 1ada311
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 172 deletions.
71 changes: 0 additions & 71 deletions src/common/living_counter_latch.rs

This file was deleted.

2 changes: 0 additions & 2 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
mod serialize;
mod timer;
mod vint;
mod living_counter_latch;

pub use self::serialize::BinarySerializable;
pub use self::timer::Timing;
pub use self::timer::TimerTree;
pub use self::timer::OpenTimer;
pub use self::vint::VInt;
pub use self::living_counter_latch::LivingCounterLatch;
use std::io;


Expand Down
4 changes: 2 additions & 2 deletions src/core/index_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ impl IndexMeta {
#[derive(Clone, Debug, RustcDecodable,RustcEncodable)]
pub struct SegmentMeta {
pub segment_id: SegmentId,
pub num_docs: usize,
pub num_docs: u32,
}

#[cfg(test)]
impl SegmentMeta {
pub fn new(segment_id: SegmentId, num_docs: usize) -> SegmentMeta {
pub fn new(segment_id: SegmentId, num_docs: u32) -> SegmentMeta {
SegmentMeta {
segment_id: segment_id,
num_docs: num_docs,
Expand Down
29 changes: 18 additions & 11 deletions src/core/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct GenerationItem<T> {

pub struct Pool<T> {
queue: Arc<MsQueue<GenerationItem<T>>>,
last_published_generation: AtomicUsize,
freshest_generation: AtomicUsize,
next_generation: AtomicUsize,
}

Expand All @@ -21,33 +21,40 @@ impl<T> Pool<T> {
pub fn new() -> Pool<T> {
Pool {
queue: Arc::new(MsQueue::new()),
last_published_generation: AtomicUsize::default(),
freshest_generation: AtomicUsize::default(),
next_generation: AtomicUsize::default(),
}
}

pub fn publish_new_generation(&self, items: Vec<T>) {
let next_generation = self.next_generation.fetch_add(1, Ordering::SeqCst);
let next_generation = self.next_generation.fetch_add(1, Ordering::SeqCst) + 1;
for item in items {
let gen_item = GenerationItem {
item: item,
generation: next_generation + 1,
generation: next_generation,
};
self.queue.push(gen_item);
}

let mut expected_current_generation = next_generation;
self.advertise_generation(next_generation);
}

/// At the exit of this method,
/// - freshest_generation has a value greater or equal than generation
/// - freshest_generation has a value that has been advertised
/// - freshest_generation has
fn advertise_generation(&self, generation: usize) {
// not optimal at all but the easiest to read proof.
loop {
let current_generation = self.last_published_generation.compare_and_swap(expected_current_generation, next_generation + 1, Ordering::SeqCst);
if current_generation >= expected_current_generation {
let former_generation = self.freshest_generation.load(Ordering::Acquire);
if former_generation >= generation {
break;
}
expected_current_generation = current_generation;
}
self.freshest_generation.compare_and_swap(former_generation, generation, Ordering::SeqCst);
}
}

fn generation(&self,) -> usize {
self.last_published_generation.load(Ordering::Acquire)
self.freshest_generation.load(Ordering::Acquire)
}

pub fn acquire(&self,) -> LeasedItem<T> {
Expand Down
7 changes: 6 additions & 1 deletion src/core/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ impl Segment {
}

pub trait SerializableSegment {
fn write(&self, serializer: SegmentSerializer) -> Result<usize>;
/// Writes a view of a segment by pushing information
/// to the `SegmentSerializer`.
///
/// # Returns
/// The number of documents in the segment.
fn write(&self, serializer: SegmentSerializer) -> Result<u32>;
}

#[derive(Clone,Debug,RustcDecodable,RustcEncodable)]
Expand Down
4 changes: 2 additions & 2 deletions src/core/segment_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl SegmentId {
}

pub fn short_uuid_string(&self,) -> String {
String::from(&self.0.to_simple_string()[..8])
(&self.0.to_simple_string()[..8]).to_string()
}

pub fn uuid_string(&self,) -> String {
Expand Down Expand Up @@ -84,4 +84,4 @@ impl Ord for SegmentId {
fn cmp(&self, other: &Self) -> Ordering {
self.0.as_bytes().cmp(other.0.as_bytes())
}
}
}
8 changes: 5 additions & 3 deletions src/datastruct/stacker/heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ struct InnerHeap {

/// initializing a long Vec<u8> is crazy slow in
/// debug mode.
/// We use this unsafe trick to make unit test
/// way faster.
fn allocate_fast(num_bytes: usize) -> Vec<u8> {
let mut v = Vec::with_capacity(num_bytes);
let mut buffer = Vec::with_capacity(num_bytes);
unsafe {
v.set_len(num_bytes);
buffer.set_len(num_bytes);
}
v
buffer
}

impl InnerHeap {
Expand Down
4 changes: 2 additions & 2 deletions src/indexer/directory_lock.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use Directory;
use std::path::Path;
use error::Result;
use directory::error::OpenWriteError;

pub const LOCKFILE_NAME: &'static str = ".tantivy-indexer.lock";

Expand All @@ -15,7 +15,7 @@ pub struct DirectoryLock {
}

impl DirectoryLock {
pub fn lock(mut directory: Box<Directory>) -> Result<DirectoryLock> {
pub fn lock(mut directory: Box<Directory>) -> Result<DirectoryLock, OpenWriteError> {
let lockfile_path = Path::new(LOCKFILE_NAME);
try!(directory.open_write(lockfile_path));
Ok(DirectoryLock { directory: directory })
Expand Down
32 changes: 16 additions & 16 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ pub fn save_metas(segment_manager: &SegmentManager,
/// Each indexing thread builds its own independant `Segment`, via
/// a `SegmentWriter` object.
pub struct IndexWriter {
// the lock is just used to bind the

// the lock is just used to bind the
// lifetime of the lock with that of the IndexWriter.
_directory_lock: DirectoryLock,

_directory_lock: DirectoryLock,
index: Index,
heap_size_in_bytes_per_thread: usize,

Expand Down Expand Up @@ -131,7 +132,7 @@ fn index_documents(heap: &mut Heap,
break;
}
}
let num_docs = segment_writer.max_doc() as usize;
let num_docs = segment_writer.max_doc();
let segment_meta = SegmentMeta {
segment_id: segment_id,
num_docs: num_docs,
Expand All @@ -149,11 +150,11 @@ impl IndexWriter {
pub fn wait_merging_threads(mut self) -> Result<()> {

self.segment_update_sender.send(SegmentUpdate::Terminate);

// this will stop the indexing thread,
// dropping the last reference to the segment_update_sender.
drop(self.document_sender);

let mut v = Vec::new();
mem::swap(&mut v, &mut self.workers_join_handle);
for join_handle in v {
Expand Down Expand Up @@ -245,19 +246,18 @@ impl IndexWriter {
panic!(format!("The heap size per thread needs to be at least {}.",
HEAP_SIZE_LIMIT));
}

let directory_lock = try!(DirectoryLock::lock(index.directory().box_clone()));

let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) =
chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);

let segment_updater = SegmentUpdater::new(index.clone());
let segment_update_sender = segment_updater.update_channel();
let segment_update_thread = segment_updater.start();


let (segment_update_sender, segment_update_thread) = SegmentUpdater::start_updater(index.clone());

let mut index_writer = IndexWriter {

_directory_lock: directory_lock,

heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread,
index: index.clone(),

Expand Down Expand Up @@ -514,12 +514,12 @@ mod tests {
let index = Index::create_in_ram(schema_builder.build());
{
let _index_writer = index.writer(40_000_000).unwrap();
// the lock should be released when the
// the lock should be released when the
// index_writer leaves the scope.
}
let _index_writer_two = index.writer(40_000_000).unwrap();
}

#[test]
fn test_commit_and_rollback() {
let mut schema_builder = schema::SchemaBuilder::default();
Expand Down
4 changes: 2 additions & 2 deletions src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,14 @@ impl IndexMerger {
}

impl SerializableSegment for IndexMerger {
fn write(&self, mut serializer: SegmentSerializer) -> Result<usize> {
fn write(&self, mut serializer: SegmentSerializer) -> Result<u32> {
try!(self.write_postings(serializer.get_postings_serializer()));
try!(self.write_fieldnorms(serializer.get_fieldnorms_serializer()));
try!(self.write_fast_fields(serializer.get_fast_field_serializer()));
try!(self.write_storable_fields(serializer.get_store_writer()));
try!(serializer.write_segment_info(&self.segment_info));
try!(serializer.close());
Ok(self.segment_info.max_doc as usize)
Ok(self.segment_info.max_doc)
}
}

Expand Down

0 comments on commit 1ada311

Please sign in to comment.