Skip to content

Commit

Permalink
Working but dirty...
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Aug 29, 2016
1 parent 7f52da2 commit 93c984c
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 35 deletions.
15 changes: 6 additions & 9 deletions src/indexer/index_writer.rs
Expand Up @@ -26,7 +26,6 @@ pub struct IndexWriter {
segment_ready_receiver: chan::Receiver<Result<(SegmentId, usize)>>,
document_receiver: chan::Receiver<Document>,
document_sender: chan::Sender<Document>,
target_num_docs: usize,
num_threads: usize,
docstamp: u64,

Expand All @@ -42,6 +41,11 @@ fn index_documents(block_store: &mut BlockStore,
let mut segment_writer = try!(SegmentWriter::for_segment(block_store, segment, &schema));
for doc in document_iterator {
try!(segment_writer.add_document(&doc, &schema));
if segment_writer.is_buffer_full() {
println!("no more space committing.");
println!("seg max doc {}", segment_writer.max_doc());
break;
}
}
let num_docs = segment_writer.max_doc() as usize;
try!(segment_writer.finalize());
Expand All @@ -55,25 +59,19 @@ impl IndexWriter {
/// Spawns a new worker thread for indexing.
/// The thread consumes documents from the pipeline.
///
/// When target_num_docs is reached, or when the channel
/// is closed, the worker flushes its current segment to disc,
/// and sends its segment_id through the channel.
///
fn add_indexing_worker(&mut self,) -> Result<()> {
let index = self.index.clone();
let schema = self.index.schema();
let segment_ready_sender_clone = self.segment_ready_sender.clone();
let document_receiver_clone = self.document_receiver.clone();
let target_num_docs = self.target_num_docs;
let join_handle: JoinHandle<()> = thread::spawn(move || {
let mut block_store = BlockStore::allocate(500_000);
let mut block_store = BlockStore::allocate(1_000_000);
loop {
let segment = index.new_segment();
let segment_id = segment.id();
let mut document_iterator = document_receiver_clone
.clone()
.into_iter()
.take(target_num_docs)
.peekable();
// the peeking here is to avoid
// creating a new segment's files
Expand Down Expand Up @@ -105,7 +103,6 @@ impl IndexWriter {
segment_ready_sender: segment_ready_sender,
document_receiver: document_receiver,
document_sender: document_sender,
target_num_docs: 100_000,
workers_join_handle: Vec::new(),
num_threads: num_threads,
docstamp: try!(index.docstamp()),
Expand Down
7 changes: 6 additions & 1 deletion src/indexer/segment_writer.rs
Expand Up @@ -63,6 +63,7 @@ fn posting_from_field_entry(field_entry: &FieldEntry) -> Box<PostingsWriter> {
}



impl<'a> SegmentWriter<'a> {

pub fn for_segment(block_store: &'a mut BlockStore, mut segment: Segment, schema: &Schema) -> Result<SegmentWriter<'a>> {
Expand Down Expand Up @@ -103,7 +104,11 @@ impl<'a> SegmentWriter<'a> {
segment_info,
self.segment_serializer)
}


pub fn is_buffer_full(&self,) -> bool {
self.block_store.num_free_blocks() < 1000
}

pub fn add_document(&mut self, doc: &Document, schema: &Schema) -> io::Result<()> {
let doc_id = self.max_doc;
for (field, field_values) in doc.get_sorted_fields() {
Expand Down
17 changes: 14 additions & 3 deletions src/postings/block_store.rs
Expand Up @@ -36,6 +36,10 @@ impl BlockStore {
}
}

pub fn num_free_blocks(&self) -> usize {
self.blocks.len() - self.free_block_id
}

pub fn new_list(&mut self) -> u32 {
let res = self.lists.len() as u32;
let new_block_id = self.new_block().unwrap();
Expand Down Expand Up @@ -128,11 +132,18 @@ impl<'a> Iterator for BlockIterator<'a> {
None
}
else {
let res = self.current_block.data[self.cursor % (BLOCK_SIZE as usize)];
self.cursor += 1;
if self.cursor % (BLOCK_SIZE as usize) == 0 {
self.current_block = &self.blocks[self.current_block.next as usize];
if self.cursor != 0 {
if self.current_block.next != u32::max_value() {
self.current_block = &self.blocks[self.current_block.next as usize];
}
else {
panic!("Block linked list ended prematurely.");
}
}
}
let res = self.current_block.data[self.cursor % (BLOCK_SIZE as usize)];
self.cursor += 1;
Some(res)
}

Expand Down
20 changes: 0 additions & 20 deletions src/postings/mod.rs
Expand Up @@ -167,23 +167,3 @@ mod tests {
}

}



// #[cfg(test)]
// mod tests {

// use super::*;
// use test::Bencher;

//
// #[bench]
// fn bench_single_intersection(b: &mut Bencher) {
// b.iter(|| {
// let docs = VecPostings::new((0..1_000_000).collect());
// let intersection = IntersectionDocSet::from_postings(vec!(docs));
// intersection.count()
// });
// }
// }
//
2 changes: 0 additions & 2 deletions src/postings/postings_writer.rs
Expand Up @@ -23,11 +23,9 @@ pub struct SpecializedPostingsWriter<Rec: Recorder + 'static> {

fn get_or_create_recorder<'a, Rec: Recorder>(term: Term, term_index: &'a mut HashMap<Term, Rec>, block_store: &mut BlockStore) -> &'a mut Rec {
if term_index.contains_key(&term) {
println!("recorder here");
term_index.get_mut(&term).unwrap()
}
else {
println!("adding recorder");
let recorder = Rec::new(block_store);
term_index
.entry(term)
Expand Down

0 comments on commit 93c984c

Please sign in to comment.