Skip to content

Commit

Permalink
bug/4 introducing a segment manager
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Sep 29, 2016
1 parent 61ae98e commit 533061d
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 163 deletions.
49 changes: 10 additions & 39 deletions src/core/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use core::SegmentReader;
use super::pool::Pool;
use super::pool::LeasedItem;
use core::SegmentMeta;
use indexer::SegmentRegister;
use indexer::SegmentUpdate;

use indexer::SegmentManager;

const NUM_SEARCHERS: usize = 12;

Expand Down Expand Up @@ -67,9 +65,7 @@ fn load_metas(directory: &Directory) -> Result<IndexMeta> {

/// Tantivy's Search Index
pub struct Index {

pub committed_segments: Arc<SegmentRegister>,
pub uncommitted_segments: Arc<SegmentRegister>,
pub segment_manager: Arc<SegmentManager>,

directory: Box<Directory>,
schema: Schema,
Expand Down Expand Up @@ -112,10 +108,10 @@ impl Index {
fn create_from_metas(directory: Box<Directory>, metas: IndexMeta) -> Result<Index> {
let schema = metas.schema.clone();
let docstamp = metas.docstamp;
let committed_segments = SegmentRegister::from(metas.committed_segments);
let committed_segments = metas.committed_segments;
// TODO log somethings is uncommitted is not empty.
let index = Index {
committed_segments: Arc::new(committed_segments),
uncommitted_segments: Arc::new(SegmentRegister::default()),
segment_manager: Arc::new(SegmentManager::from_segments(committed_segments)),
directory: directory,
schema: schema,
searcher_pool: Arc::new(Pool::new()),
Expand Down Expand Up @@ -173,34 +169,12 @@ impl Index {
pub fn commit(&mut self,
docstamp: u64) -> Result<()> {
self.docstamp = docstamp;
let segment_metas = try!(self.uncommitted_segments.segment_metas());
for segment_meta in segment_metas {
let segment_update = SegmentUpdate::NewSegment(segment_meta.clone());
try!(self.committed_segments.segment_update(segment_update));
}
try!(self.uncommitted_segments.clear());
try!(self.segment_manager.commit());
try!(self.save_metas());
try!(self.load_searchers());
Ok(())
}


/// Exchange a set of `SegmentId`s for the `SegmentId` of a merged segment.
pub fn update_uncommited_segments(&mut self, segment_update: SegmentUpdate) -> Result<()> {
try!(self.uncommitted_segments.segment_update(segment_update));
try!(self.save_metas());
try!(self.load_searchers());
Ok(())
}

/// Exchange a set of `SegmentId`s for the `SegmentId` of a merged segment.
pub fn update_commited_segments(&mut self, segment_update: SegmentUpdate) -> Result<()> {
try!(self.committed_segments.segment_update(segment_update));
try!(self.save_metas());
try!(self.load_searchers());
Ok(())
}


/// Returns the list of segments that are searchable
pub fn searchable_segments(&self,) -> Result<Vec<Segment>> {
let segment_ids = try!(self.searchable_segment_ids());
Expand Down Expand Up @@ -232,8 +206,7 @@ impl Index {

/// Returns the list of segment ids that are searchable.
fn searchable_segment_ids(&self,) -> Result<Vec<SegmentId>> {
self.committed_segments
.segment_ids()
self.segment_manager.committed_segments()
}

/// Creates a new segment.
Expand All @@ -242,8 +215,7 @@ impl Index {
}

fn create_metas(&self,) -> Result<IndexMeta> {
let committed_segments = try!(self.committed_segments.segment_metas());
let uncommitted_segments = try!(self.uncommitted_segments.segment_metas());
let (committed_segments, uncommitted_segments) = try!(self.segment_manager.segment_metas());
Ok(IndexMeta {
committed_segments: committed_segments,
uncommitted_segments: uncommitted_segments,
Expand Down Expand Up @@ -316,8 +288,7 @@ impl fmt::Debug for Index {
impl Clone for Index {
fn clone(&self,) -> Index {
Index {
committed_segments: self.committed_segments.clone(),
uncommitted_segments: self.uncommitted_segments.clone(),
segment_manager: self.segment_manager.clone(),

directory: self.directory.box_clone(),
schema: self.schema.clone(),
Expand Down
91 changes: 39 additions & 52 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@ use std::clone::Clone;
use std::io;
use std::thread;
use indexer::merger::IndexMerger;
use indexer::SegmentUpdate;
use core::SegmentId;
use datastruct::stacker::Heap;
use std::mem::swap;
use chan;
use std::sync::Arc;
use core::SegmentMeta;
use indexer::SegmentRegister;

use indexer::SegmentAppender;
use Result;
use Error;

Expand Down Expand Up @@ -48,26 +45,14 @@ type NewSegmentReceiver = chan::Receiver<Result<SegmentMeta>>;
pub struct IndexWriter {
index: Index,
heap_size_in_bytes_per_thread: usize,
workers_join_handle: Vec<JoinHandle<()>>,
segment_ready_sender: NewSegmentSender,
segment_ready_consumer: JoinHandle<Result<()>>,
workers_join_handle: Vec<JoinHandle<Result<()>>>,
segment_appender: SegmentAppender,
document_receiver: DocumentReceiver,
document_sender: DocumentSender,
num_threads: usize,
docstamp: u64,
}

fn create_segment_consumer(
segment_queue: chan::Receiver<Result<SegmentMeta>>,
segment_register: Arc<SegmentRegister>) -> JoinHandle<Result<()>> {
thread::spawn(move || {
for segment_meta_res in segment_queue {
let segment_meta = try!(segment_meta_res);
try!(segment_register.segment_update(SegmentUpdate::NewSegment(segment_meta)));
}
Ok(())
})
}

fn index_documents(heap: &mut Heap,
segment: Segment,
Expand Down Expand Up @@ -96,11 +81,11 @@ impl IndexWriter {
fn add_indexing_worker(&mut self,) -> Result<()> {
let index = self.index.clone();
let schema = self.index.schema();
let segment_ready_sender_clone = self.segment_ready_sender.clone();
let segment_appender_clone = self.segment_appender.clone();
let document_receiver_clone = self.document_receiver.clone();

let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread);
let join_handle: JoinHandle<()> = thread::spawn(move || {
let join_handle: JoinHandle<Result<()>> = thread::spawn(move || {
loop {
let segment = index.new_segment();
let segment_id = segment.id();
Expand All @@ -112,15 +97,21 @@ impl IndexWriter {
// creating a new segment's files
// if no document are available.
if document_iterator.peek().is_some() {
let index_result = index_documents(&mut heap, segment, &schema, &mut document_iterator)
let segment_meta_result = index_documents(&mut heap, segment, &schema, &mut document_iterator)
.map(|num_docs| SegmentMeta {
segment_id: segment_id,
num_docs: num_docs,
});
segment_ready_sender_clone.send(index_result);
let segment_meta = try!(segment_meta_result);
if !try!(segment_appender_clone.add_segment(segment_meta)) {
return Ok(());
}
}
else {
return;
// No more documents.
// Happens when there is a commit, or if the `IndexWriter`
// was dropped.
return Ok(());
}
}
});
Expand All @@ -139,13 +130,11 @@ impl IndexWriter {
panic!(format!("The heap size per thread needs to be at least {}.", HEAP_SIZE_LIMIT));
}
let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
let (segment_ready_sender, segment_ready_receiver): (NewSegmentSender, NewSegmentReceiver) = chan::async();
let segment_ready_consumer = create_segment_consumer(segment_ready_receiver, index.uncommitted_segments.clone());
let segment_appender = SegmentAppender::for_manager(index.segment_manager.clone());
let mut index_writer = IndexWriter {
heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread,
index: index.clone(),
segment_ready_consumer: segment_ready_consumer,
segment_ready_sender: segment_ready_sender,
segment_appender: segment_appender,
document_receiver: document_receiver,
document_sender: document_sender,
workers_join_handle: Vec::new(),
Expand All @@ -165,20 +154,22 @@ impl IndexWriter {

/// Merges a given list of segments
pub fn merge(&mut self, segments: &[Segment]) -> Result<()> {

// TODO fix commit or uncommited?
let schema = self.index.schema();
// An IndexMerger is like a "view" of our merged segments.
let merger = try!(IndexMerger::open(schema, segments));
let mut merged_segment = self.index.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = try!(SegmentSerializer::for_segment(&mut merged_segment));
let num_docs = try!(merger.write(segment_serializer));
let merged_segment_ids: Vec<SegmentId> = segments.iter().map(|segment| segment.id()).collect();
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
};
let segment_update = SegmentUpdate::EndMerge(merged_segment_ids, segment_meta);
try!(self.index.update_commited_segments(segment_update));
try!(self.index.segment_manager.end_merge(&merged_segment_ids, &segment_meta));
try!(self.index.load_searchers());
Ok(())
}

Expand All @@ -190,15 +181,12 @@ impl IndexWriter {
/// when no documents are remaining.
///
/// Returns the former segment_ready channel.
fn recreate_channels(&mut self,) -> (DocumentReceiver, JoinHandle<Result<()>>) {
fn recreate_channel(&mut self,) -> DocumentReceiver {
let (mut document_sender, mut document_receiver): (DocumentSender, DocumentReceiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
let (mut segment_ready_sender, segment_ready_receiver): (NewSegmentSender, NewSegmentReceiver) = chan::async();
swap(&mut self.document_sender, &mut document_sender);
swap(&mut self.document_receiver, &mut document_receiver);
swap(&mut self.segment_ready_sender, &mut segment_ready_sender);
let mut segment_ready_consumer = create_segment_consumer(segment_ready_receiver, self.index.uncommitted_segments.clone());
swap(&mut self.segment_ready_consumer, &mut segment_ready_consumer);
(document_receiver, segment_ready_consumer)
self.segment_appender = SegmentAppender::for_manager(self.index.segment_manager.clone());
document_receiver
}

// TODO what if we merge uncommitted segments
Expand All @@ -216,26 +204,31 @@ impl IndexWriter {
/// The docstamp at the last commit is returned.
pub fn rollback(&mut self,) -> Result<u64> {

// the current generation is killed...
// if some pending segment are still in the pipe,
// they won't be added to our index.
self.index.segment_manager.kill_generation();

// we cannot drop segment ready receiver yet
// as it would block the workers.
let (document_receiver, mut _segment_ready_consumer) = self.recreate_channels();
let document_receiver = self.recreate_channel();

// TODO cancel the segment_ready_consumer as well.

// consumes the document receiver pipeline
// worker don't need to index the pending documents.
for _ in document_receiver {};


let mut former_workers_join_handle = Vec::new();
swap(&mut former_workers_join_handle, &mut self.workers_join_handle);

// wait for all the worker to finish their work
// (it should be fast since we consumed all pending documents)
for worker_handle in former_workers_join_handle {
try!(worker_handle
.join()
.map_err(|e| Error::ErrorInThread(format!("{:?}", e)))
);
try!(try!(
worker_handle
.join()
.map_err(|e| Error::ErrorInThread(format!("{:?}", e)))
));
// add a new worker for the next generation.
try!(self.add_indexing_worker());
}
Expand All @@ -262,9 +255,9 @@ impl IndexWriter {
///
pub fn commit(&mut self,) -> Result<u64> {

let (document_receiver, segment_ready_receiver) = self.recreate_channels();
drop(document_receiver);

// this will drop the current channel
self.recreate_channel();
// Docstamp of the last document in this commit.
let commit_docstamp = self.docstamp;

Expand All @@ -280,12 +273,6 @@ impl IndexWriter {
try!(self.add_indexing_worker());
}

let segment_metas_result = try!(
segment_ready_receiver
.join()
.map_err(|_| super::super::error::Error::ErrorInThread(String::from("Joining receiver thread failed while committing.")))
);
try!(segment_metas_result);
try!(self.index.commit(commit_docstamp));
Ok(commit_docstamp)
}
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/merge_policy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::SegmentId;
use indexer::SegmentRegister;
use indexer::segment_register::SegmentRegister;

pub struct MergeCandidate {
segments: Vec<SegmentId>,
Expand Down
4 changes: 2 additions & 2 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ pub mod merger;
mod merge_policy;
mod segment_register;
mod segment_writer;
mod segment_manager;

pub use self::segment_serializer::SegmentSerializer;
pub use self::segment_writer::SegmentWriter;
pub use self::index_writer::IndexWriter;
pub use self::merge_policy::MergePolicy;
pub use self::segment_register::SegmentRegister;
pub use self::segment_register::SegmentUpdate;
pub use self::segment_manager::{SegmentManager, SegmentAppender};

0 comments on commit 533061d

Please sign in to comment.