Skip to content

Commit

Permalink
Implementing Put, Del, and Write async methods
Browse files Browse the repository at this point in the history
  • Loading branch information
carter-thaxton committed Jun 7, 2011
1 parent f538728 commit 42228bd
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 72 deletions.
26 changes: 18 additions & 8 deletions demo/async.js
Expand Up @@ -11,19 +11,29 @@ var db = new DB();
console.log("Opening...");
db.open(path, {create_if_missing: true, paranoid_checks: true}, function(err) {
if (err) throw err;
else console.log("ok");
console.log("ok");

// Putting TODO: async
// Putting
console.log("\nPutting...");
var key = new Buffer("Hello");
var value = new Buffer("World");
var status = db.put({}, key, value);
console.log(status);

console.log("\nClosing...")
db.close(function(err) {
db.put(key, value, function(err) {
if (err) throw err;
console.log('ok');
console.log("ok");

// Deleting
console.log("\nDeleting...");
db.del(key, function(err) {
if (err) throw err;
console.log("ok");

// Closing
console.log("\nClosing...")
db.close(function(err) {
if (err) throw err;
console.log('ok');
});
});
});
});

2 changes: 1 addition & 1 deletion leveldb.js
Expand Up @@ -82,7 +82,7 @@ db.put(key, value, options, function (err) { /*...*/ });
db.del(key, options, function (err) { /*...*/ });

// Write a batch of updates in one call
db.write(updates, options, function (err) { /*...*/ });
db.write(writeBatch, options, function (err) { /*...*/ });

// Getting a value from the database
db.get(key, options, function (err, value) { /*...*/ });
Expand Down
244 changes: 186 additions & 58 deletions src/DB.cc
Expand Up @@ -3,6 +3,7 @@

#include <node_buffer.h>
#include "helpers.h"
#include <iostream>

using namespace node_leveldb;

Expand Down Expand Up @@ -108,7 +109,7 @@ int DB::EIO_Open(eio_req *req) {
DB *self = params->self;

// Close old DB, if open() is called more than once
if (self->db) {
if (self->db != NULL) {
delete self->db;
self->db = NULL;
}
Expand All @@ -135,6 +136,7 @@ int DB::EIO_AfterOpen(eio_req *req) {
Handle<Value> DB::Close(const Arguments& args) {
HandleScope scope;

// Get this and arguments
DB* self = ObjectWrap::Unwrap<DB>(args.This());

// Optional callback
Expand All @@ -157,7 +159,7 @@ int DB::EIO_Close(eio_req *req) {
Params *params = static_cast<Params*>(req->data);
DB *self = params->self;

if (self->db) {
if (self->db != NULL) {
delete self->db;
self->db = NULL;
}
Expand All @@ -175,76 +177,183 @@ int DB::EIO_AfterClose(eio_req *req) {


//
// DestroyDB
// Put
//

Handle<Value> DB::DestroyDB(const Arguments& args) {
Handle<Value> DB::Put(const Arguments& args) {
HandleScope scope;


// Get this and arguments
DB* self = ObjectWrap::Unwrap<DB>(args.This());
if (self->db == NULL) {
return ThrowException(Exception::Error(String::New("DB has not been opened")));
}

// Check args
if (!(args.Length() == 2 && args[0]->IsString() && args[1]->IsObject())) {
return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (String, Object)")));
if (args.Length() < 2 || !Buffer::HasInstance(args[0]) || !Buffer::HasInstance(args[1])) {
return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (Buffer, Buffer)")));
}

String::Utf8Value name(args[0]);
leveldb::Options options = JsToOptions(args[1]);

return processStatus(leveldb::DestroyDB(*name, options));
}
leveldb::Slice key = JsToSlice(args[0]);
leveldb::Slice value = JsToSlice(args[1]);
leveldb::WriteBatch *writeBatch = new leveldb::WriteBatch();
writeBatch->Put(key, value);

Handle<Value> DB::RepairDB(const Arguments& args) {
HandleScope scope;
int pos = 2;

// Check args
if (!(args.Length() == 2 && args[0]->IsString() && args[1]->IsObject())) {
return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (String, Object)")));
// Optional write options
leveldb::WriteOptions options = leveldb::WriteOptions();
if (pos < args.Length() && args[pos]->IsObject() && !args[pos]->IsFunction()) {
options = JsToWriteOptions(args[pos]);
pos++;
}

String::Utf8Value name(args[0]);
leveldb::Options options = JsToOptions(args[1]);
// Optional callback
Local<Function> callback;
if (pos < args.Length() && args[pos]->IsFunction()) {
callback = Local<Function>::Cast(args[pos]);
pos++;
}

// Pass parameters to async function
WriteParams *params = new WriteParams(self, writeBatch, options, callback);
params->disposeWriteBatch = true;

// Use Write to implement Put
EIO_BeforeWrite(params);

return processStatus(leveldb::RepairDB(*name, options));
return args.This();
}

Handle<Value> DB::Put(const Arguments& args) {
HandleScope scope;

// Check args
if (!(args.Length() == 3 && args[0]->IsObject() && Buffer::HasInstance(args[1]) && Buffer::HasInstance(args[2]))) {
return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (Object, Buffer, Buffer)")));
}
//
// Del
//

Handle<Value> DB::Del(const Arguments& args) {
HandleScope scope;

// Get this and arguments
DB* self = ObjectWrap::Unwrap<DB>(args.This());
if (self->db == NULL) {
return ThrowException(Exception::Error(String::New("DB has not been opened")));
}

leveldb::WriteOptions options = JsToWriteOptions(args[0]);
leveldb::Slice key = JsToSlice(args[1]);
leveldb::Slice value = JsToSlice(args[2]);
// Check args
if (args.Length() < 1 || !Buffer::HasInstance(args[0])) {
return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (Buffer)")));
}

return processStatus(self->db->Put(options, key, value));
}
leveldb::Slice key = JsToSlice(args[0]);

leveldb::WriteBatch *writeBatch = new leveldb::WriteBatch();
writeBatch->Delete(key);

Handle<Value> DB::Del(const Arguments& args) {
HandleScope scope;
int pos = 1;

// Check args
if (!(args.Length() == 2 && args[0]->IsObject() && Buffer::HasInstance(args[1]))) {
return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (Object, Buffer)")));
// Optional write options
leveldb::WriteOptions options = leveldb::WriteOptions();
if (pos < args.Length() && args[pos]->IsObject() && !args[pos]->IsFunction()) {
options = JsToWriteOptions(args[pos]);
pos++;
}

// Optional callback
Local<Function> callback;
if (pos < args.Length() && args[pos]->IsFunction()) {
callback = Local<Function>::Cast(args[pos]);
pos++;
}

// Pass parameters to async function
WriteParams *params = new WriteParams(self, writeBatch, options, callback);
params->disposeWriteBatch = true;

// Use Write to implement Del
EIO_BeforeWrite(params);

return args.This();
}


//
// Write
//

Handle<Value> DB::Write(const Arguments& args) {
HandleScope scope;

// Get this and arguments
DB* self = ObjectWrap::Unwrap<DB>(args.This());
if (self->db == NULL) {
return ThrowException(Exception::Error(String::New("DB has not been opened")));
}

leveldb::WriteOptions options = JsToWriteOptions(args[0]);
leveldb::Slice key = JsToSlice(args[1]);
// Required WriteBatch
if (args.Length() < 1 || !args[0]->IsObject()) {
return ThrowException(Exception::TypeError(String::New("DB.write() expects a WriteBatch object")));
}
Local<Object> writeBatchObject = Object::Cast(*args[0]);
WriteBatch* writeBatchWrapper = ObjectWrap::Unwrap<WriteBatch>(writeBatchObject);
leveldb::WriteBatch* writeBatch = writeBatchWrapper->wb;

int pos = 1;

// Optional write options
leveldb::WriteOptions options = leveldb::WriteOptions();
if (pos < args.Length() && args[pos]->IsObject() && !args[pos]->IsFunction()) {
options = JsToWriteOptions(args[pos]);
pos++;
}

// Optional callback
Local<Function> callback;
if (pos < args.Length() && args[pos]->IsFunction()) {
callback = Local<Function>::Cast(args[pos]);
pos++;
}

// Pass parameters to async function
WriteParams *params = new WriteParams(self, writeBatch, options, callback);
EIO_BeforeWrite(params);

return args.This();
}

void DB::EIO_BeforeWrite(WriteParams *params) {
eio_custom(EIO_Write, EIO_PRI_DEFAULT, EIO_AfterWrite, params);
}

int DB::EIO_Write(eio_req *req) {
WriteParams *params = static_cast<WriteParams*>(req->data);
DB *self = params->self;

// Do the actual work
if (self->db != NULL) {
params->status = self->db->Write(params->options, params->writeBatch);
}

return 0;
}

return processStatus(self->db->Delete(options, key));
int DB::EIO_AfterWrite(eio_req *req) {
WriteParams *params = static_cast<WriteParams*>(req->data);
params->Callback();

if (params->disposeWriteBatch) {
delete params->writeBatch;
}

delete params;
return 0;
}


//
// Get
//

Handle<Value> DB::Get(const Arguments& args) {
HandleScope scope;

Expand Down Expand Up @@ -277,25 +386,6 @@ Handle<Value> DB::Get(const Arguments& args) {
return scope.Close(actualBuffer);
}

Handle<Value> DB::Write(const Arguments& args) {
HandleScope scope;

if (!(args.Length() == 2 && args[0]->IsObject() && args[1]->IsObject())) {
return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (Object, WriteBatch)")));
}

DB* self = ObjectWrap::Unwrap<DB>(args.This());
if (self->db == NULL) {
return ThrowException(Exception::Error(String::New("DB has not been opened")));
}

leveldb::WriteOptions options = JsToWriteOptions(args[0]);
Local<Object> wbObject = Object::Cast(*args[1]);

WriteBatch* wb = ObjectWrap::Unwrap<WriteBatch>(wbObject);

return processStatus(self->db->Write(options, wb->wb));
}

Handle<Value> DB::NewIterator(const Arguments& args) {
HandleScope scope;
Expand Down Expand Up @@ -323,6 +413,44 @@ Handle<Value> DB::GetApproximateSizes(const Arguments& args) {
}


//
// DestroyDB
//

Handle<Value> DB::DestroyDB(const Arguments& args) {
HandleScope scope;

// Check args
if (!(args.Length() == 2 && args[0]->IsString() && args[1]->IsObject())) {
return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (String, Object)")));
}

String::Utf8Value name(args[0]);
leveldb::Options options = JsToOptions(args[1]);

return processStatus(leveldb::DestroyDB(*name, options));
}


//
// RepairDB
//

Handle<Value> DB::RepairDB(const Arguments& args) {
HandleScope scope;

// Check args
if (!(args.Length() == 2 && args[0]->IsString() && args[1]->IsObject())) {
return ThrowException(Exception::TypeError(String::New("Invalid arguments: Expected (String, Object)")));
}

String::Utf8Value name(args[0]);
leveldb::Options options = JsToOptions(args[1]);

return processStatus(leveldb::RepairDB(*name, options));
}


//
// Implementation of Params, which are passed from JS thread to EIO thread and back again
//
Expand Down

0 comments on commit 42228bd

Please sign in to comment.