Skip to content

Commit

Permalink
bug/4 Fixed race condition in pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 15, 2016
1 parent 0f246ba commit 21192db
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 62 deletions.
32 changes: 19 additions & 13 deletions src/core/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,46 @@ pub struct GenerationItem<T> {

pub struct Pool<T> {
queue: Arc<MsQueue<GenerationItem<T>>>,
generation: AtomicUsize,
last_published_generation: AtomicUsize,
next_generation: AtomicUsize,
}

impl<T> Pool<T> {

pub fn new() -> Pool<T> {
Pool {
queue: Arc::new(MsQueue::new()),
generation: AtomicUsize::default(),
last_published_generation: AtomicUsize::default(),
next_generation: AtomicUsize::default(),
}
}

pub fn publish_new_generation(&self, items: Vec<T>) {
let next_generation = self.generation() + 1;
let next_generation = self.next_generation.fetch_add(1, Ordering::SeqCst);
for item in items {
let gen_item = GenerationItem {
item: item,
generation: next_generation,
generation: next_generation + 1,
};
self.queue.push(gen_item);
self.queue.push(gen_item);
}

let mut expected_current_generation = next_generation;
loop {
let current_generation = self.last_published_generation.compare_and_swap(expected_current_generation, next_generation + 1, Ordering::SeqCst);
if current_generation >= expected_current_generation {
break;
}
expected_current_generation = current_generation;
}
self.inc_generation();
}

fn generation(&self,) -> usize {
self.generation.load(Ordering::Acquire)
}

pub fn inc_generation(&self,) {
self.generation.fetch_add(1, Ordering::SeqCst);
self.last_published_generation.load(Ordering::Acquire)
}

pub fn acquire(&self,) -> LeasedItem<T> {
let generation = self.generation.load(Ordering::Acquire);
let generation = self.generation();
loop {
let gen_item = self.queue.pop();
if gen_item.generation >= generation {
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl IndexWriter {

let segment_updater = SegmentUpdater::new(index.clone());

let segment_update_sender = segment_updater.update_channel().expect("This should never happen"); // TODO remove expect
let segment_update_sender = segment_updater.update_channel();

let segment_update_thread = segment_updater.start();

Expand Down
139 changes: 91 additions & 48 deletions src/indexer/segment_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,35 @@ pub type SegmentUpdateReceiver = chan::Receiver<SegmentUpdate>;

#[derive(Debug)]
pub enum SegmentUpdate {

/// New segment added.
/// Created by the indexing worker thread
AddSegment(SegmentMeta),

/// A merge is ended.
/// Remove the merged segment and record the new
/// large merged segment.
EndMerge(Vec<SegmentId>, SegmentMeta),

/// Happens when rollback is called.
/// The current generation of segments is cancelled.
CancelGeneration,

/// Starts a new generation... This
/// happens at the end of Rollback.
NewGeneration,

/// Just dropping the Segment updater object
/// is safe, but some merge might be happening in
/// the background and the user may want to wait for these
/// threads to terminate.
///
/// When receiving the Terminate signal, the segment updater stops
/// receiving segment updates and just waits for the merging threads
/// to terminate.
Terminate,

/// Commit marks uncommmitted segments as committed.
Commit(u64),
}

Expand All @@ -42,10 +66,12 @@ pub enum SegmentUpdate {
pub struct SegmentUpdater {
index: Index,
is_cancelled_generation: bool,
has_received_terminate_command: bool,
segment_update_receiver: SegmentUpdateReceiver,
option_segment_update_sender: Option<SegmentUpdateSender>,
segment_update_sender: SegmentUpdateSender,
segment_manager_arc: Arc<SegmentManager>,
merge_policy: Box<MergePolicy>,
living_threads: LivingCounterLatch,
}


Expand All @@ -57,17 +83,53 @@ impl SegmentUpdater {
SegmentUpdater {
index: index,
is_cancelled_generation: false,
option_segment_update_sender: Some(segment_update_sender),
has_received_terminate_command: false,
segment_update_sender: segment_update_sender,
segment_update_receiver: segment_update_receiver,
segment_manager_arc: segment_manager_arc,
merge_policy: Box::new(SimpleMergePolicy::default()), // TODO make that configurable
living_threads: LivingCounterLatch::default(),
}
}

pub fn update_channel(&self,) -> Option<SegmentUpdateSender> {
self.option_segment_update_sender.clone()
pub fn update_channel(&self,) -> SegmentUpdateSender {
self.segment_update_sender.clone()
}

fn start_merges(&self,) {

let merge_candidates = self.consider_merge_options();

for MergeCandidate(segment_ids) in merge_candidates {
self.segment_manager().start_merge(&segment_ids);
let living_threads_clone = self.living_threads.clone();
let index_clone = self.index.clone();
let segment_update_sender_clone = self.segment_update_sender.clone();
thread::Builder::new().name(format!("merge_thread_{:?}", segment_ids[0])).spawn(move || {
info!("Start merge: {:?}", segment_ids);
let schema = index_clone.schema();
let segments: Vec<Segment> = segment_ids
.iter()
.map(|&segment_id| index_clone.segment(segment_id))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap();
let mut merged_segment = index_clone.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap();
let num_docs = merger.write(segment_serializer).unwrap();
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
};
let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta);
segment_update_sender_clone.send(segment_update);
drop(living_threads_clone);
}).expect("Failed to spawn merge thread");
}
}

fn consider_merge_options(&self,) -> Vec<MergeCandidate> {
let segment_manager = self.segment_manager();
Expand All @@ -94,75 +156,56 @@ impl SegmentUpdater {
.expect("Failed to start segment updater thread.")
}

fn still_has_work(&self,) -> bool {
if self.has_received_terminate_command {
!self.living_threads.is_zero()
}
else {
true
}
}

fn process(mut self,) {

let segment_manager = self.segment_manager_arc.clone();

let mut segment_updates_it = self.segment_update_receiver.clone().into_iter();

let living_threads = LivingCounterLatch::default();

let segment_updates = self.segment_update_receiver.clone();
for segment_update in segment_updates {
while self.still_has_work() {
let segment_update = segment_updates_it.next().expect("");
// we check the generation number as if it was
// dirty-bit. If the value is different
// to our generation, then the segment_manager has
// been update updated and we need to
// - save meta.json
// - update the searchers
// - consider possible segment merge
// been update updated.
let generation_before_update = segment_manager.generation();

self.process_one(segment_update);

if generation_before_update != segment_manager.generation() {

// saving the meta file.
// The segment manager has changed, we need to
// - save meta.json
save_metas(
&*segment_manager,
self.index.schema(),
self.index.docstamp(),
self.index.directory_mut()).expect("Could not save metas.");


// - update the searchers

// update the searchers so that they eventually will
// use the new segments.
// TODO eventually have this work through watching meta.json
// so that an external process stays up to date as well.
self.index.load_searchers().expect("Could not load new searchers.");

if let Some(ref segment_update_sender) = self.option_segment_update_sender {
for MergeCandidate(segment_ids) in self.consider_merge_options() {
segment_manager.start_merge(&segment_ids);
let living_threads_clone = living_threads.clone();
let index_clone = self.index.clone();
let segment_update_sender_clone = segment_update_sender.clone();
thread::Builder::new().name(format!("merge_thread_{:?}", segment_ids[0])).spawn(move || {
info!("Start merge: {:?}", segment_ids);
let schema = index_clone.schema();
let segments: Vec<Segment> = segment_ids
.iter()
.map(|&segment_id| index_clone.segment(segment_id))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap();
let mut merged_segment = index_clone.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap();
let num_docs = merger.write(segment_serializer).unwrap();
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
};
let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta);
segment_update_sender_clone.send(segment_update);
drop(living_threads_clone);
}).expect("Failed to spawn merge thread");
}
}

// - start merges if required
self.start_merges();
}
}
}
}


// Process a single segment update.
Expand Down Expand Up @@ -207,7 +250,7 @@ impl SegmentUpdater {
self.segment_manager().commit(docstamp);
}
SegmentUpdate::Terminate => {
self.option_segment_update_sender = None;
self.has_received_terminate_command = true;
}
}
}
Expand Down

0 comments on commit 21192db

Please sign in to comment.