Skip to content

Add snapshot and checkpoint #216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions rdb/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package rdb

// #include <stdint.h>
// #include <stdlib.h>
// #include "rdbc.h"
import "C"
import (
"errors"
"unsafe"
)

// Checkpoint can be used to create openable snapshots.
type Checkpoint struct {
c *C.rdb_checkpoint_t
cDb *C.rdb_t
}

// Destroy removes the snapshot from the database's list of snapshots.
func (s *Checkpoint) Destroy() {
C.rdb_destroy_checkpoint(s.c)
s.c, s.cDb = nil, nil
}

// NewCheckpoint creates a new snapshot of the database.
func (db *DB) NewCheckpoint() (*Checkpoint, error) {
var cErr *C.char
cCheck := C.rdb_create_checkpoint(db.c, &cErr)
if cErr != nil {
defer C.free(unsafe.Pointer(cErr))
return nil, errors.New(C.GoString(cErr))
}
return &Checkpoint{
c: cCheck,
cDb: db.c,
}, nil
}

// Save builds openable snapshot of RocksDB on disk.
// CAUTION: checkpointDir should not already exist. If so, nothing will happen.
func (s *Checkpoint) Save(checkpointDir string) error {
var (
cErr *C.char
cDir = C.CString(checkpointDir)
)
defer C.free(unsafe.Pointer(cDir))
C.rdb_open_checkpoint(s.c, cDir, &cErr)
if cErr != nil {
defer C.free(unsafe.Pointer(cErr))
return errors.New(C.GoString(cErr))
}
return nil
}
9 changes: 9 additions & 0 deletions rdb/options_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,12 @@ func (opts *ReadOptions) Destroy() {
func (opts *ReadOptions) SetFillCache(value bool) {
C.rdb_readoptions_set_fill_cache(opts.c, boolToChar(value))
}

// SetSnapshot updates the default read options to use the given snapshot.
func (opts *ReadOptions) SetSnapshot(snapshot *Snapshot) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method ReadOptions.SetSnapshot should have comment or be unexported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method ReadOptions.SetSnapshot should have comment or be unexported

if snapshot == nil {
C.rdb_readoptions_set_snapshot(opts.c, nil)
return
}
C.rdb_readoptions_set_snapshot(opts.c, snapshot.c)
}
48 changes: 48 additions & 0 deletions rdb/rdbc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
#include "rocksdb/filter_policy.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/utilities/checkpoint.h"

#include "rdbc.h"
#include "_cgo_export.h"
Expand All @@ -31,6 +33,8 @@ using rocksdb::NewBloomFilterPolicy;
using rocksdb::Cache;
using rocksdb::NewLRUCache;
using rocksdb::BlockBasedTableOptions;
using rocksdb::Snapshot;
using rocksdb::Checkpoint;

struct rdb_t { DB* rep; };
struct rdb_options_t { Options rep; };
Expand All @@ -43,6 +47,8 @@ struct rdb_writebatch_t { WriteBatch rep; };
struct rdb_iterator_t { Iterator* rep; };
struct rdb_cache_t { std::shared_ptr<Cache> rep; };
struct rdb_block_based_table_options_t { BlockBasedTableOptions rep; };
struct rdb_snapshot_t { const Snapshot* rep; };
struct rdb_checkpoint_t { Checkpoint* rep; };

bool SaveError(char** errptr, const Status& s) {
assert(errptr != nullptr);
Expand Down Expand Up @@ -227,6 +233,12 @@ void rdb_readoptions_set_fill_cache(
opt->rep.fill_cache = v;
}

void rdb_readoptions_set_snapshot(
rdb_readoptions_t* opt,
const rdb_snapshot_t* snap) {
opt->rep.snapshot = (snap ? snap->rep : nullptr);
}

//////////////////////////// rdb_writeoptions_t
rdb_writeoptions_t* rdb_writeoptions_create() {
return new rdb_writeoptions_t;
Expand Down Expand Up @@ -484,3 +496,39 @@ void rdb_block_based_options_set_whole_key_filtering(
rdb_block_based_table_options_t* options, unsigned char v) {
options->rep.whole_key_filtering = v;
}

//////////////////////////// rdb_snapshot_t
const rdb_snapshot_t* rdb_create_snapshot(rdb_t* db) {
rdb_snapshot_t* result = new rdb_snapshot_t;
result->rep = db->rep->GetSnapshot();
return result;
}

void rdb_release_snapshot(
rdb_t* db,
const rdb_snapshot_t* snapshot) {
db->rep->ReleaseSnapshot(snapshot->rep);
delete snapshot;
}

//////////////////////////// rdb_checkpoint_t
rdb_checkpoint_t* rdb_create_checkpoint(rdb_t* db, char** errptr) {
Checkpoint* checkpoint;
if (SaveError(errptr, Checkpoint::Create(db->rep, &checkpoint))) {
return nullptr;
}
rdb_checkpoint_t* result = new rdb_checkpoint_t;
result->rep = checkpoint;
return result;
}

void rdb_open_checkpoint(
rdb_checkpoint_t* checkpoint,
const char* checkpoint_dir,
char** errptr) {
SaveError(errptr, checkpoint->rep->CreateCheckpoint(std::string(checkpoint_dir)));
}

void rdb_destroy_checkpoint(rdb_checkpoint_t* checkpoint) {
delete checkpoint->rep;
}
23 changes: 21 additions & 2 deletions rdb/rdbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ typedef struct rdb_iterator_t rdb_iterator_t;
typedef struct rdb_filterpolicy_t rdb_filterpolicy_t;
typedef struct rdb_cache_t rdb_cache_t;
typedef struct rdb_block_based_table_options_t rdb_block_based_table_options_t;
typedef struct rdb_snapshot_t rdb_snapshot_t;
typedef struct rdb_checkpoint_t rdb_checkpoint_t;

//////////////////////////// rdb_t
rdb_t* rdb_open(
Expand Down Expand Up @@ -52,8 +54,7 @@ char* rdb_property_value(

//////////////////////////// rdb_writebatch_t
rdb_writebatch_t* rdb_writebatch_create();
rdb_writebatch_t* rdb_writebatch_create_from(const char* rep,
size_t size);
rdb_writebatch_t* rdb_writebatch_create_from(const char* rep, size_t size);
void rdb_writebatch_destroy(rdb_writebatch_t* b);
void rdb_writebatch_clear(rdb_writebatch_t* b);
int rdb_writebatch_count(rdb_writebatch_t* b);
Expand Down Expand Up @@ -83,6 +84,9 @@ rdb_readoptions_t* rdb_readoptions_create();
void rdb_readoptions_destroy(rdb_readoptions_t* opt);
void rdb_readoptions_set_fill_cache(
rdb_readoptions_t* opt, unsigned char v);
void rdb_readoptions_set_snapshot(
rdb_readoptions_t* opt,
const rdb_snapshot_t* snap);

//////////////////////////// rdb_writeoptions_t
rdb_writeoptions_t* rdb_writeoptions_create();
Expand Down Expand Up @@ -152,6 +156,21 @@ void rdb_block_based_options_set_block_cache_compressed(
void rdb_block_based_options_set_whole_key_filtering(
rdb_block_based_table_options_t* options, unsigned char v);

//////////////////////////// rdb_snapshot_t
const rdb_snapshot_t* rdb_create_snapshot(
rdb_t* db);
void rdb_release_snapshot(
rdb_t* db,
const rdb_snapshot_t* snapshot);

//////////////////////////// rdb_checkpoint_t
rdb_checkpoint_t* rdb_create_checkpoint(rdb_t* db, char** errptr);
void rdb_open_checkpoint(
rdb_checkpoint_t* checkpoint,
const char* checkpoint_dir,
char** errptr);
void rdb_destroy_checkpoint(rdb_checkpoint_t* checkpoint);

#ifdef __cplusplus
} /* end extern "C" */
#endif
Expand Down
29 changes: 29 additions & 0 deletions rdb/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package rdb

// #include <stdint.h>
// #include <stdlib.h>
// #include "rdbc.h"
import "C"

// Snapshot provides a consistent view of read operations in a DB.
type Snapshot struct {
c *C.rdb_snapshot_t
cDb *C.rdb_t
}

// NewNativeSnapshot creates a Snapshot object.
func NewNativeSnapshot(c *C.rdb_snapshot_t, cDb *C.rdb_t) *Snapshot {
return &Snapshot{c, cDb}
}

// Release removes the snapshot from the database's list of snapshots.
func (s *Snapshot) Release() {
C.rdb_release_snapshot(s.cDb, s.c)
s.c, s.cDb = nil, nil
}

// NewSnapshot creates a new snapshot of the database.
func (db *DB) NewSnapshot() *Snapshot {
cSnap := C.rdb_create_snapshot(db.c)
return NewNativeSnapshot(cSnap, db.c)
}
30 changes: 18 additions & 12 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,11 @@ func (s *Store) Get(key []byte) ([]byte, error) {
return valSlice.Data(), nil
}

func (s *Store) SetOne(k []byte, val []byte) error {
return s.db.Put(s.wopt, k, val)
}
// SetOne adds a key-value to data store.
func (s *Store) SetOne(k []byte, val []byte) error { return s.db.Put(s.wopt, k, val) }

func (s *Store) Delete(k []byte) error {
return s.db.Delete(s.wopt, k)
}
// Delete deletes a key from data store.
func (s *Store) Delete(k []byte) error { return s.db.Delete(s.wopt, k) }

// NewIterator initializes a new iterator and returns it.
func (s *Store) NewIterator() *rdb.Iterator {
Expand All @@ -108,24 +106,23 @@ func (s *Store) NewIterator() *rdb.Iterator {
return s.db.NewIterator(ro)
}

func (s *Store) Close() {
s.db.Close()
}
// Close closes our data store.
func (s *Store) Close() { s.db.Close() }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Store.Close should have comment or be unexported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Store.Close should have comment or be unexported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Store.Close should have comment or be unexported


// Memtable returns the memtable size.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on exported method Store.MemtableSize should be of the form "MemtableSize ..."

func (s *Store) MemtableSize() uint64 {
memTableSize, _ := strconv.ParseUint(s.db.GetProperty("rocksdb.cur-size-all-mem-tables"), 10, 64)
return memTableSize
}

// IndexFilterblockSize returns the filter block size.
func (s *Store) IndexFilterblockSize() uint64 {
blockSize, _ := strconv.ParseUint(s.db.GetProperty("rocksdb.estimate-table-readers-mem"), 10, 64)
return blockSize
}

// NewWriteBatch creates a new WriteBatch object and returns a pointer to it.
func (s *Store) NewWriteBatch() *rdb.WriteBatch {
return rdb.NewWriteBatch()
}
func (s *Store) NewWriteBatch() *rdb.WriteBatch { return rdb.NewWriteBatch() }

// WriteBatch does a batch write to RocksDB from the data in WriteBatch object.
func (s *Store) WriteBatch(wb *rdb.WriteBatch) error {
Expand All @@ -134,3 +131,12 @@ func (s *Store) WriteBatch(wb *rdb.WriteBatch) error {
}
return nil
}

// NewCheckpoint creates new checkpoint from current store.
func (s *Store) NewCheckpoint() (*rdb.Checkpoint, error) { return s.db.NewCheckpoint() }

// NewSnapshot creates new snapshot from current store.
func (s *Store) NewSnapshot() *rdb.Snapshot { return s.db.NewSnapshot() }

// SetSnapshot updates default read options to use the given snapshot.
func (s *Store) SetSnapshot(snapshot *rdb.Snapshot) { s.ropt.SetSnapshot(snapshot) }
Loading