Skip to content

Commit

Permalink
support ingest external file (rust-rocksdb#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
hhkbp2 authored and siddontang committed Jan 13, 2017
1 parent 1c921cc commit 20edf91
Show file tree
Hide file tree
Showing 7 changed files with 446 additions and 9 deletions.
153 changes: 152 additions & 1 deletion librocksdb_sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub enum DBWriteBatch {}
pub enum DBComparator {}
pub enum DBFlushOptions {}
pub enum DBCompactionFilter {}
pub enum EnvOptions {}
pub enum SstFileWriter {}
pub enum IngestExternalFileOptions {}
pub enum DBBackupEngine {}
pub enum DBRestoreOptions {}
pub enum DBSliceTransform {}
Expand Down Expand Up @@ -509,6 +512,54 @@ extern "C" {
ignore_snapshot: bool);
pub fn crocksdb_compactionfilter_destroy(filter: *mut DBCompactionFilter);

// EnvOptions
pub fn crocksdb_envoptions_create() -> *mut EnvOptions;
pub fn crocksdb_envoptions_destroy(opt: *mut EnvOptions);

// IngestExternalFileOptions
pub fn crocksdb_ingestexternalfileoptions_create() -> *mut IngestExternalFileOptions;
pub fn crocksdb_ingestexternalfileoptions_set_move_files(opt: *mut IngestExternalFileOptions,
move_files: bool);
pub fn crocksdb_ingestexternalfileoptions_set_snapshot_consistency(
opt: *mut IngestExternalFileOptions, snapshot_consistency: bool);
pub fn crocksdb_ingestexternalfileoptions_set_allow_global_seqno(
opt: *mut IngestExternalFileOptions, allow_global_seqno: bool);
pub fn crocksdb_ingestexternalfileoptions_set_allow_blocking_flush(
opt: *mut IngestExternalFileOptions, allow_blocking_flush: bool);
pub fn crocksdb_ingestexternalfileoptions_destroy(opt: *mut IngestExternalFileOptions);

// SstFileWriter
pub fn crocksdb_sstfilewriter_create(env: *mut EnvOptions,
io_options: *const DBOptions)
-> *mut SstFileWriter;
pub fn crocksdb_sstfilewriter_create_with_comparator(env: *mut EnvOptions,
io_options: *const DBOptions,
comparator: *const DBComparator)
-> *mut SstFileWriter;
pub fn crocksdb_sstfilewriter_open(writer: *mut SstFileWriter,
name: *const c_char,
err: *mut *mut c_char);
pub fn crocksdb_sstfilewriter_add(writer: *mut SstFileWriter,
key: *const u8,
key_len: size_t,
val: *const u8,
val_len: size_t,
err: *mut *mut c_char);
pub fn crocksdb_sstfilewriter_finish(writer: *mut SstFileWriter, err: *mut *mut c_char);
pub fn crocksdb_sstfilewriter_destroy(writer: *mut SstFileWriter);

pub fn crocksdb_ingest_external_file(db: *mut DBInstance,
file_list: *const *const c_char,
list_len: size_t,
opt: *const IngestExternalFileOptions,
err: *mut *mut c_char);
pub fn crocksdb_ingest_external_file_cf(db: *mut DBInstance,
handle: *const DBCFHandle,
file_list: *const *const c_char,
list_len: size_t,
opt: *const IngestExternalFileOptions,
err: *mut *mut c_char);

// Restore Option
pub fn crocksdb_restore_options_create() -> *mut DBRestoreOptions;
pub fn crocksdb_restore_options_destroy(ropts: *mut DBRestoreOptions);
Expand Down Expand Up @@ -551,8 +602,8 @@ extern "C" {
#[cfg(test)]
mod test {
use libc::{self, c_void};
use std::{ptr, slice, fs};
use std::ffi::{CStr, CString};
use std::ptr;
use super::*;
use tempdir::TempDir;

Expand Down Expand Up @@ -635,4 +686,104 @@ mod test {
assert!(err.is_null());
}
}

unsafe fn check_get(db: *mut DBInstance,
opt: *const DBReadOptions,
key: &[u8],
val: Option<&[u8]>) {
let mut val_len = 0;
let mut err = ptr::null_mut();
let res_ptr = crocksdb_get(db, opt, key.as_ptr(), key.len(), &mut val_len, &mut err);
assert!(err.is_null());
let res = if res_ptr.is_null() {
None
} else {
Some(slice::from_raw_parts(res_ptr, val_len))
};
assert_eq!(res, val);
if !res_ptr.is_null() {
libc::free(res_ptr as *mut libc::c_void);
}
}

#[test]
fn test_ingest_external_file() {
unsafe {
let opts = crocksdb_options_create();
crocksdb_options_set_create_if_missing(opts, true);

let rustpath = TempDir::new("_rust_rocksdb_internaltest").expect("");
let cpath = CString::new(rustpath.path().to_str().unwrap()).unwrap();
let cpath_ptr = cpath.as_ptr();

let mut err = ptr::null_mut();
let db = crocksdb_open(opts, cpath_ptr, &mut err);
assert!(err.is_null(), error_message(err));

let env_opt = crocksdb_envoptions_create();
let io_options = crocksdb_options_create();
let writer = crocksdb_sstfilewriter_create(env_opt, io_options);

let sst_dir = TempDir::new("_rust_rocksdb_internaltest").expect("");
let sst_path = sst_dir.path().join("sstfilename");
let c_sst_path = CString::new(sst_path.to_str().unwrap()).unwrap();
let c_sst_path_ptr = c_sst_path.as_ptr();

crocksdb_sstfilewriter_open(writer, c_sst_path_ptr, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, b"sstk1".as_ptr(), 5, b"v1".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, b"sstk2".as_ptr(), 5, b"v2".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, b"sstk3".as_ptr(), 5, b"v3".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_finish(writer, &mut err);
assert!(err.is_null(), error_message(err));

let ing_opt = crocksdb_ingestexternalfileoptions_create();
let file_list = &[c_sst_path_ptr];
crocksdb_ingest_external_file(db, file_list.as_ptr(), 1, ing_opt, &mut err);
assert!(err.is_null(), error_message(err));
let roptions = crocksdb_readoptions_create();
check_get(db, roptions, b"sstk1", Some(b"v1"));
check_get(db, roptions, b"sstk2", Some(b"v2"));
check_get(db, roptions, b"sstk3", Some(b"v3"));

let snap = crocksdb_create_snapshot(db);

fs::remove_file(sst_path).unwrap();
crocksdb_sstfilewriter_open(writer, c_sst_path_ptr, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, "sstk2".as_ptr(), 5, "v4".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, "sstk22".as_ptr(), 6, "v5".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, "sstk3".as_ptr(), 5, "v6".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_finish(writer, &mut err);
assert!(err.is_null(), error_message(err));

crocksdb_ingest_external_file(db, file_list.as_ptr(), 1, ing_opt, &mut err);
assert!(err.is_null(), error_message(err));
check_get(db, roptions, b"sstk1", Some(b"v1"));
check_get(db, roptions, b"sstk2", Some(b"v4"));
check_get(db, roptions, b"sstk22", Some(b"v5"));
check_get(db, roptions, b"sstk3", Some(b"v6"));

let roptions2 = crocksdb_readoptions_create();
crocksdb_readoptions_set_snapshot(roptions2, snap);
check_get(db, roptions2, b"sstk1", Some(b"v1"));
check_get(db, roptions2, b"sstk2", Some(b"v2"));
check_get(db, roptions2, b"sstk22", None);
check_get(db, roptions2, b"sstk3", Some(b"v3"));
crocksdb_readoptions_destroy(roptions2);

crocksdb_readoptions_destroy(roptions);
crocksdb_release_snapshot(db, snap);
crocksdb_ingestexternalfileoptions_destroy(ing_opt);
crocksdb_sstfilewriter_destroy(writer);
crocksdb_options_destroy(io_options);
crocksdb_envoptions_destroy(env_opt);
}
}
}
69 changes: 69 additions & 0 deletions src/external_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

use librocksdb_sys;
use rocksdb_options::{Options, EnvOptions};

use std::ffi::CString;

/// SstFileWriter is used to create sst files that can be added to database later
/// All keys in files generated by SstFileWriter will have sequence number = 0
pub struct SstFileWriter {
inner: *mut librocksdb_sys::SstFileWriter,
}

impl SstFileWriter {
pub fn new(env_opt: &EnvOptions, opt: &Options) -> SstFileWriter {
unsafe {
SstFileWriter {
inner: librocksdb_sys::crocksdb_sstfilewriter_create(env_opt.inner, opt.inner),
}
}
}

/// Prepare SstFileWriter to write into file located at "file_path".
pub fn open(&mut self, name: &str) -> Result<(), String> {
let path = match CString::new(name.to_owned()) {
Err(e) => return Err(format!("invalid path {}: {:?}", name, e)),
Ok(p) => p,
};
unsafe { Ok(ffi_try!(crocksdb_sstfilewriter_open(self.inner, path.as_ptr()))) }
}

/// Add key, value to currently opened file
/// REQUIRES: key is after any previously added key according to comparator.
pub fn add(&mut self, key: &[u8], val: &[u8]) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_sstfilewriter_add(self.inner,
key.as_ptr(),
key.len(),
val.as_ptr(),
val.len()));
Ok(())
}
}

/// Finalize writing to sst file and close file.
pub fn finish(&mut self) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_sstfilewriter_finish(self.inner));
Ok(())
}
}
}

impl Drop for SstFileWriter {
fn drop(&mut self) {
unsafe { librocksdb_sys::crocksdb_sstfilewriter_destroy(self.inner) }
}
}
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ pub mod rocksdb_options;
pub mod merge_operator;
pub mod comparator;
mod compaction_filter;
mod external_file;
mod slice_transform;

pub use compaction_filter::CompactionFilter;
pub use external_file::SstFileWriter;
pub use librocksdb_sys::{DBCompactionStyle, DBCompressionType, DBRecoveryMode, new_bloom_filter,
self as crocksdb_ffi};
pub use merge_operator::MergeOperands;
pub use rocksdb::{DB, DBIterator, DBVector, Kv, SeekKey, Writable, WriteBatch, CFHandle, Range,
BackupEngine};
pub use rocksdb_options::{BlockBasedOptions, Options, ReadOptions, WriteOptions, RestoreOptions};
pub use rocksdb_options::{BlockBasedOptions, Options, ReadOptions, WriteOptions, RestoreOptions,
IngestExternalFileOptions, EnvOptions};
pub use slice_transform::SliceTransform;
75 changes: 68 additions & 7 deletions src/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@

use crocksdb_ffi::{self, DBWriteBatch, DBCFHandle, DBInstance, DBBackupEngine};
use libc::{self, c_int, c_void, size_t};
use rocksdb_options::{Options, ReadOptions, UnsafeSnap, WriteOptions, FlushOptions, RestoreOptions};
use rocksdb_options::{Options, ReadOptions, UnsafeSnap, WriteOptions, FlushOptions,
RestoreOptions, IngestExternalFileOptions};
use std::{fs, ptr, slice};
use std::collections::BTreeMap;
use std::collections::btree_map::Entry;
use std::ffi::{CStr, CString};
use std::fs;
use std::fmt::{self, Debug, Formatter};
use std::ops::Deref;
use std::path::Path;
use std::ptr;
use std::slice;
use std::str::from_utf8;

const DEFAULT_COLUMN_FAMILY: &'static str = "default";
Expand All @@ -40,6 +40,10 @@ impl Drop for CFHandle {
}
}

fn build_cstring_list(str_list: &[&str]) -> Vec<CString> {
str_list.into_iter().map(|s| CString::new(s.as_bytes()).unwrap()).collect()
}

pub struct DB {
inner: *mut DBInstance,
cfs: BTreeMap<String, CFHandle>,
Expand Down Expand Up @@ -323,9 +327,7 @@ impl DB {

// We need to store our CStrings in an intermediate vector
// so that their pointers remain valid.
let c_cfs: Vec<CString> = cfs_v.iter()
.map(|cf| CString::new(cf.as_bytes()).unwrap())
.collect();
let c_cfs = build_cstring_list(&cfs_v);

let cfnames: Vec<*const _> = c_cfs.iter()
.map(|cf| cf.as_ptr())
Expand Down Expand Up @@ -855,6 +857,42 @@ impl DB {
self.opts.get_statistics()
}

pub fn get_options(&self) -> &Options {
&self.opts
}

pub fn ingest_external_file(&self,
opt: &IngestExternalFileOptions,
files: &[&str])
-> Result<(), String> {
let c_files = build_cstring_list(files);
let c_files_ptrs: Vec<*const _> = c_files.iter().map(|s| s.as_ptr()).collect();
unsafe {
ffi_try!(crocksdb_ingest_external_file(self.inner,
c_files_ptrs.as_ptr(),
c_files.len(),
opt.inner));
}
Ok(())
}

pub fn ingest_external_file_cf(&self,
cf: &CFHandle,
opt: &IngestExternalFileOptions,
files: &[&str])
-> Result<(), String> {
let c_files = build_cstring_list(files);
let c_files_ptrs: Vec<*const _> = c_files.iter().map(|s| s.as_ptr()).collect();
unsafe {
ffi_try!(crocksdb_ingest_external_file_cf(self.inner,
cf.inner,
c_files_ptrs.as_ptr(),
c_files_ptrs.len(),
opt.inner));
}
Ok(())
}

pub fn backup_at(&self, path: &str) -> Result<BackupEngine, String> {
let backup_engine = BackupEngine::open(Options::new(), path).unwrap();
unsafe {
Expand Down Expand Up @@ -1067,6 +1105,29 @@ pub struct DBVector {
len: usize,
}

impl Debug for DBVector {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
unsafe {
write!(formatter,
"{:?}",
slice::from_raw_parts(self.base, self.len))
}
}
}

impl<'a> PartialEq<&'a [u8]> for DBVector {
fn eq(&self, rhs: &&[u8]) -> bool {
if self.len != rhs.len() {
return false;
}
unsafe {
libc::memcmp(self.base as *mut c_void,
rhs.as_ptr() as *mut c_void,
self.len) == 0
}
}
}

impl Deref for DBVector {
type Target = [u8];
fn deref(&self) -> &[u8] {
Expand Down
Loading

0 comments on commit 20edf91

Please sign in to comment.