Skip to content

Commit

Permalink
Removing data copy in the RAMDirectory
Browse files Browse the repository at this point in the history
The fst crate recently added support for sliced `Arc<Vec<u8>>`.
This called for a rewrite of the RAMDirectory for tantivy's RAMDirectory.
Previously every single read was copying data.

In addition:
- RAMDirectory's Write object panic if someone does not flush
right before the destruction of the object.
- In the same spirit, the postings serializer panics if someone
opens a term without closing the previous one.

Closes #16
  • Loading branch information
fulmicoton committed Aug 18, 2016
1 parent cecc9f9 commit 0972a1c
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 71 deletions.
10 changes: 5 additions & 5 deletions src/core/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl fmt::Debug for Index {
}
}

type DirectoryPtr = Box<Directory>;
pub type DirectoryPtr = Box<Directory>;

#[derive(Clone)]
pub struct Index {
Expand Down Expand Up @@ -127,7 +127,7 @@ impl Index {
pub fn publish_segment(&mut self, segment: &Segment) -> Result<()> {
{
let mut meta_write = self.metas.write().unwrap();
meta_write.segments.push(segment.segment_id.clone());
meta_write.segments.push(segment.segment_id);
}
self.save_metas()
}
Expand All @@ -148,7 +148,6 @@ impl Index {
}
meta_write.segments.push(merged_segment.id());
}
// TODO use logs
self.save_metas()
}

Expand Down Expand Up @@ -201,10 +200,11 @@ impl Index {
pub fn save_metas(&mut self,) -> Result<()> {
let mut w = Vec::new();
{
let metas_lock = self.metas.read().unwrap() ;
let metas_lock = self.metas.read().unwrap();
try!(write!(&mut w, "{}\n", json::as_pretty_json(&*metas_lock)));
};
try!(self.rw_directory())
try!(self
.rw_directory())
.atomic_write(&META_FILEPATH, &w[..])
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/segment_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl SegmentReader {

let positions_data = segment
.open_read(SegmentComponent::POSITIONS)
.unwrap_or(ReadOnlySource::Anonymous(Vec::new()));
.unwrap_or(ReadOnlySource::empty());

let schema = segment.schema();
Ok(SegmentReader {
Expand Down
3 changes: 2 additions & 1 deletion src/datastruct/fstmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub struct FstMap<V: BinarySerializable> {

fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
Ok(fst::Map::from(match source {
ReadOnlySource::Anonymous(data) => try!(Fst::from_bytes(data).map_err(convert_fst_error)),
ReadOnlySource::Anonymous(data) => try!(Fst::from_shared_bytes(data.data, data.start, data.len).map_err(convert_fst_error)),
ReadOnlySource::Mmap(mmap_readonly) => try!(Fst::from_mmap(mmap_readonly).map_err(convert_fst_error)),
}))
}
Expand Down Expand Up @@ -141,6 +141,7 @@ mod tests {
assert_eq!(keys.next().unwrap(), "abc".as_bytes());
assert_eq!(keys.next().unwrap(), "abcd".as_bytes());
assert_eq!(keys.next(), None);

}

}
34 changes: 32 additions & 2 deletions src/directory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ mod mmap_directory;
mod ram_directory;
mod directory;
mod read_only_source;
mod shared_vec_slice;

use std::io::{Seek, Write};
use std::io;

use std::path::PathBuf;
pub use self::shared_vec_slice::SharedVecSlice;

pub use self::read_only_source::ReadOnlySource;
pub use self::directory::Directory;
pub use self::ram_directory::RAMDirectory;
pub use self::mmap_directory::MmapDirectory;
pub use self::ram_directory::SharedVec;


////////////////////////////////////////
Expand All @@ -35,6 +37,7 @@ mod tests {

use super::*;
use std::path::Path;
use std::io::SeekFrom;

#[test]
fn test_ram_directory() {
Expand All @@ -48,12 +51,13 @@ mod tests {
test_directory(&mut mmap_directory);
}

fn test_directory(directory: &mut Directory) {
fn test_directory_simple(directory: &mut Directory) {
{
let mut write_file = directory.open_write(Path::new("toto")).unwrap();
write_file.write_all(&[4]).unwrap();
write_file.write_all(&[3]).unwrap();
write_file.write_all(&[7,3,5]).unwrap();
write_file.flush().unwrap();
}
let read_file = directory.open_read(Path::new("toto")).unwrap();
let data: &[u8] = &*read_file;
Expand All @@ -65,4 +69,30 @@ mod tests {
assert_eq!(data[4], 5);
}


fn test_directory_seek(directory: &mut Directory) {
{
let mut write_file = directory.open_write(Path::new("toto_seek")).unwrap();
write_file.write_all(&[4]).unwrap();
write_file.write_all(&[3]).unwrap();
write_file.write_all(&[7,3,5]).unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
write_file.write_all(&[3,1]).unwrap();
write_file.flush().unwrap();
}
let read_file = directory.open_read(Path::new("toto_seek")).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data.len(), 5);
assert_eq!(data[0], 3);
assert_eq!(data[1], 1);
assert_eq!(data[2], 7);
assert_eq!(data[3], 3);
assert_eq!(data[4], 5);
}

fn test_directory(directory: &mut Directory) {
test_directory_simple(directory);
test_directory_seek(directory);
}

}
107 changes: 76 additions & 31 deletions src/directory/ram_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,81 @@ use std::path::{Path, PathBuf};
use directory::OpenError;
use directory::WritePtr;
use std::result;
use super::SharedVecSlice;
use Result;

#[derive(Clone)]
pub struct SharedVec(Arc<RwLock<Cursor<Vec<u8>>>>);


pub struct RAMDirectory {
fs: HashMap<PathBuf, SharedVec>,
struct VecWriter {
path: PathBuf,
shared_directory: InnerDirectory,
data: Cursor<Vec<u8>>,
is_flushed: bool,
}

impl SharedVec {
pub fn new() -> SharedVec {
SharedVec(Arc::new( RwLock::new(Cursor::new(Vec::new())) ))

impl Drop for VecWriter {
fn drop(&mut self) {
if !self.is_flushed {
panic!("You forgot to flush {:?} before its writter got Drop. Do not rely on drop.", self.path)
}
}

pub fn copy_vec(&self,) -> Vec<u8> {
self.0.read().unwrap().clone().into_inner()
}

impl Seek for VecWriter {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.data.seek(pos)
}
}

impl Write for SharedVec {
impl Write for VecWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
try!(self.0.write().unwrap().write(buf));
self.is_flushed = false;
try!(self.data.write(buf));
Ok(buf.len())
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
self.is_flushed = true;
self.shared_directory.write(self.path.clone(), self.data.get_ref())
}
}

impl Seek for SharedVec {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.0.write().unwrap().seek(pos)
#[derive(Clone)]
struct InnerDirectory(Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>);

impl InnerDirectory {

fn new() -> InnerDirectory {
InnerDirectory(Arc::new(RwLock::new(HashMap::new())))
}

fn write(&self, path: PathBuf, data: &Vec<u8>) -> io::Result<()> {
let mut map = try!(
self.0
.write()
.map_err(|_| io::Error::new(io::ErrorKind::Other, format!("Failed to lock the directory, when trying to write {:?}", path)))
);
map.insert(path, Arc::new(data.clone()));
Ok(())
}

fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenError> {
self.0
.read()
.map_err(|_| {
let io_err = io::Error::new(io::ErrorKind::Other, format!("Failed to read lock for the directory, when trying to read {:?}", path));
OpenError::IOError(io_err)
})
.and_then(|readable_map| {
readable_map
.get(path)
.ok_or_else(|| OpenError::FileDoesNotExist(PathBuf::from(path)))
.map(|data| {
ReadOnlySource::Anonymous(SharedVecSlice::new(data.clone()))
})
})
}

}

impl fmt::Debug for RAMDirectory {
Expand All @@ -53,33 +94,37 @@ impl fmt::Debug for RAMDirectory {
impl RAMDirectory {
pub fn create() -> RAMDirectory {
RAMDirectory {
fs: HashMap::new()
fs: InnerDirectory::new()
}
}
}


pub struct RAMDirectory {
fs: InnerDirectory,
}

impl Directory for RAMDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenError> {
match self.fs.get(path) {
Some(ref data) => {
let data_copy = data.copy_vec();
Ok(ReadOnlySource::Anonymous(data_copy))
},
None => {
Err(OpenError::FileDoesNotExist(PathBuf::from(path)))
}
}
self.fs.open_read(path)
}

fn open_write(&mut self, path: &Path) -> Result<WritePtr> {
let full_path = PathBuf::from(&path);
let data = SharedVec::new();
self.fs.insert(full_path, data.clone());
Ok(Box::new(data))
let mut vec_writer = VecWriter {
path: PathBuf::from(path),
data: Cursor::new(Vec::new()),
shared_directory: self.fs.clone(),
is_flushed: false,
};
// force the creation of the file to mimick the MMap directory.
try!(vec_writer.flush());
Ok(Box::new(vec_writer))
}

fn atomic_write(&mut self, path: &Path, data: &[u8]) -> Result<()> {
let mut write = try!(self.open_write(path));
try!(write.write_all(data));
try!(write.flush());
Ok(())
}

Expand Down
14 changes: 10 additions & 4 deletions src/directory/read_only_source.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use fst::raw::MmapReadOnly;
use std::ops::Deref;
use std::io::Cursor;
use super::SharedVecSlice;

////////////////////////////////////////
// Read only source.


pub enum ReadOnlySource {
Mmap(MmapReadOnly),
Anonymous(Vec<u8>),
Anonymous(SharedVecSlice),
}

impl Deref for ReadOnlySource {
Expand All @@ -25,12 +26,18 @@ impl ReadOnlySource {
self.as_slice().len()
}

pub fn empty() -> ReadOnlySource {
ReadOnlySource::Anonymous(SharedVecSlice::empty())
}

pub fn as_slice(&self,) -> &[u8] {
match *self {
ReadOnlySource::Mmap(ref mmap_read_only) => unsafe {
mmap_read_only.as_slice()
},
ReadOnlySource::Anonymous(ref shared_vec) => shared_vec.as_slice(),
ReadOnlySource::Anonymous(ref shared_vec) => {
shared_vec.as_slice()
},
}
}

Expand All @@ -45,8 +52,7 @@ impl ReadOnlySource {
ReadOnlySource::Mmap(sliced_mmap)
}
ReadOnlySource::Anonymous(ref shared_vec) => {
let sliced_data: Vec<u8> = Vec::from(&shared_vec[from_offset..to_offset]);
ReadOnlySource::Anonymous(sliced_data)
ReadOnlySource::Anonymous(shared_vec.slice(from_offset, to_offset))
},
}
}
Expand Down
36 changes: 36 additions & 0 deletions src/directory/shared_vec_slice.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::sync::Arc;

#[derive(Clone)]
pub struct SharedVecSlice {
pub data: Arc<Vec<u8>>,
pub start: usize,
pub len: usize
}

impl SharedVecSlice {

pub fn empty() -> SharedVecSlice {
SharedVecSlice::new(Arc::new(Vec::new()))
}

pub fn new(data: Arc<Vec<u8>>) -> SharedVecSlice {
let data_len = data.len();
SharedVecSlice {
data: data,
start: 0,
len: data_len,
}
}

pub fn as_slice(&self,) -> &[u8] {
&self.data[self.start..self.start + self.len]
}

pub fn slice(&self, from_offset: usize, to_offset:usize) -> SharedVecSlice {
SharedVecSlice {
data: self.data.clone(),
start: self.start + from_offset,
len: to_offset - from_offset,
}
}
}
1 change: 1 addition & 0 deletions src/fastfield/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl FastFieldSerializer {
self.written_size += try!(self.fields.serialize(&mut self.write));
try!(self.write.seek(SeekFrom::Start(0)));
try!((header_offset as u32).serialize(&mut self.write));
try!(self.write.flush());
Ok(self.written_size)
}
}
7 changes: 4 additions & 3 deletions src/postings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ mod tests {
let mut posting_serializer = PostingsSerializer::open(&segment).unwrap();
let term = Term::from_field_text(text_field, "abc");
posting_serializer.new_term(&term, 3).unwrap();
for _ in 0..3 {
let a = vec!(1,2,3,2);
posting_serializer.write_doc(0, 2, &a).unwrap();
for doc_id in 0u32..3u32 {
let positions = vec!(1,2,3,2);
posting_serializer.write_doc(doc_id, 2, &positions).unwrap();
}
posting_serializer.close_term().unwrap();
posting_serializer.close().unwrap();
let read = segment.open_read(SegmentComponent::POSITIONS).unwrap();
assert_eq!(read.len(), 13);
}
Expand Down

0 comments on commit 0972a1c

Please sign in to comment.