Skip to content

Commit

Permalink
bug/4
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 4, 2016
1 parent cd04643 commit 9cc1e47
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 41 deletions.
2 changes: 1 addition & 1 deletion src/core/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl Index {
/// Return a segment object given a segment_id
///
/// The segment may or may not exist.
fn segment(&self, segment_id: SegmentId) -> Segment {
pub fn segment(&self, segment_id: SegmentId) -> Segment {
create_segment(self.clone(), segment_id)
}

Expand Down
88 changes: 53 additions & 35 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use core::SerializableSegment;
use core::Segment;
use std::thread::JoinHandle;
use indexer::SegmentWriter;
use indexer::MergeCandidate;
use std::clone::Clone;
use std::io;
use indexer::MergePolicy;
Expand Down Expand Up @@ -57,8 +58,6 @@ pub struct IndexWriter {

num_threads: usize,
docstamp: u64,

merge_policy: Box<MergePolicy>,
}


Expand Down Expand Up @@ -92,7 +91,6 @@ fn index_documents(heap: &mut Heap,
#[derive(Debug)]
pub enum SegmentUpdate {
AddSegment(SegmentMeta),
StartMerge(Vec<SegmentId>),
EndMerge(Vec<SegmentId>, SegmentMeta),
CancelGeneration,
NewGeneration,
Expand All @@ -106,7 +104,8 @@ fn process_segment_update(
index: &Index,
segment_manager: &SegmentManager,
segment_update: SegmentUpdate,
is_cancelled_generation: &mut bool) -> Result<bool> {
is_cancelled_generation: &mut bool) -> bool {
info!("Segment update: {:?}", segment_update);
match segment_update {
SegmentUpdate::AddSegment(segment_meta) => {
if !*is_cancelled_generation {
Expand All @@ -115,54 +114,41 @@ fn process_segment_update(
else {
index.delete_segment(segment_meta.segment_id);
}
Ok(true)
},
SegmentUpdate::StartMerge(segment_ids) => {
if !*is_cancelled_generation {
segment_manager.start_merge(&segment_ids);
// TODO spawn a segment merge thread
}
Ok(false)
true
},
SegmentUpdate::EndMerge(segment_ids, segment_meta) => {
segment_manager.end_merge(&segment_ids, &segment_meta);
for segment_id in segment_ids {
index.delete_segment(segment_id);
}
Ok(true)
true
},
SegmentUpdate::CancelGeneration => {
*is_cancelled_generation = true;
Ok(false)
false
},
SegmentUpdate::NewGeneration => {
*is_cancelled_generation = false;
Ok(false)
false
}
}
}

fn consider_merge_options(index: &mut Index, merge_policy: &MergePolicy) {
fn consider_merge_options(index: &Index, merge_policy: &MergePolicy) -> Vec<MergeCandidate> {
let segment_manager = get_segment_manager(index);
let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(&*segment_manager);
// committed segments cannot be merged with uncommitted_segments.
let merge_candidates_committed = merge_policy.compute_merge_candidates(&committed_segments);
let merge_candidates_uncommitted = merge_policy.compute_merge_candidates(&uncommitted_segments);
merge_candidates_committed.into_iter().chain(merge_candidates_uncommitted)
.map(|merge_candidate| {
println!("{:?}", merge_candidate);
});
let mut merge_candidates = merge_policy.compute_merge_candidates(&committed_segments);
merge_candidates.extend_from_slice(&merge_policy.compute_merge_candidates(&uncommitted_segments)[..]);
merge_candidates
}

fn on_segment_change(index: &mut Index,
merge_policy: &MergePolicy) -> Result<()> {
fn on_segment_change(index: &mut Index) -> Result<()> {
// saving the meta file.
try!(index.save_metas());
// update the searcher so that they eventually will
// use the new segments.
try!(index.load_searchers());
// consider merge options.
consider_merge_options(index, merge_policy);
Ok(())
}

Expand All @@ -175,26 +161,55 @@ fn on_segment_change(index: &mut Index,
// trivial.
fn process_segment_updates(mut index: Index,
segment_manager: &SegmentManager,
segment_update_receiver: SegmentUpdateReceiver) -> Result<()> {
segment_update_receiver: SegmentUpdateReceiver,
segment_update_sender: SegmentUpdateSender) {
let mut segment_update_it = segment_update_receiver.into_iter();
let mut is_cancelled_generation = false;
let merge_policy = index.get_merge_policy();
loop {
if let Some(segment_update) = segment_update_it.next() {
let has_changed = try!(
process_segment_update(
let has_changed = process_segment_update(
&index,
segment_manager,
segment_update,
&mut is_cancelled_generation)
);
&mut is_cancelled_generation);
if has_changed {
on_segment_change(&mut index, &*merge_policy);
on_segment_change(&mut index);

let segment_manager = get_segment_manager(&index);

for MergeCandidate(segment_ids) in consider_merge_options(&index, &*merge_policy) {
segment_manager.start_merge(&segment_ids);
let index_clone = index.clone();
let segment_update_sender_clone = segment_update_sender.clone();
thread::spawn(move || {
info!("Start merge: {:?}", segment_ids);
let schema = index_clone.schema();
let segments: Vec<Segment> = segment_ids
.iter()
.map(|&segment_id| index_clone.segment(segment_id))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap();
let mut merged_segment = index_clone.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap();
let num_docs = merger.write(segment_serializer).unwrap();
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
};
let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta);
segment_update_sender_clone.send(segment_update);
});
}
}
}
else {
// somehow, the channel was dropped.
return Ok(());
return;
}
}
}
Expand Down Expand Up @@ -267,8 +282,9 @@ impl IndexWriter {
let segment_manager = get_segment_manager(index);

let index_clone = index.clone();
let segment_update_sender_clone = segment_update_sender.clone();
thread::spawn(move || {
process_segment_updates(index_clone, &*segment_manager, segment_update_receiver)
process_segment_updates(index_clone, &*segment_manager, segment_update_receiver, segment_update_sender_clone)
});

let mut index_writer = IndexWriter {
Expand All @@ -283,7 +299,6 @@ impl IndexWriter {
workers_join_handle: Vec::new(),
num_threads: num_threads,

merge_policy: index.get_merge_policy(),
docstamp: index.docstamp(),
};
try!(index_writer.start_workers());
Expand Down Expand Up @@ -383,6 +398,9 @@ impl IndexWriter {

let rollbacked_segments = get_segment_manager(&self.index).rollback();
for segment_id in rollbacked_segments {

// TODO all delete must happen after saving
// meta.json
self.index.delete_segment(segment_id);
}
try!(self.on_change());
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/merge_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use core::SegmentId;
use core::SegmentMeta;


#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MergeCandidate(pub Vec<SegmentId>);

pub trait MergePolicy {
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ mod tests {
}
}
{
let segments = index.searchable_segments().unwrap();
let segments = index.searchable_segments();
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
index_writer.merge(&segments).unwrap();
}
Expand Down
2 changes: 0 additions & 2 deletions src/indexer/segment_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ impl Default for SegmentRegisters {
}




/// The segment manager stores the list of segments
/// as well as their state.
///
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/simple_merge_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl MergePolicy for SimpleMergePolicy {
let num_packs = segments.len() / PACK_LEN;
(0..num_packs)
.map(|i| {
let segment_ids = segments[i..i*PACK_LEN]
let segment_ids = segments[i*PACK_LEN..(i+1)*PACK_LEN]
.iter()
.map(|segment_meta| segment_meta.segment_id)
.collect();
Expand Down

0 comments on commit 9cc1e47

Please sign in to comment.