Skip to content

Commit

Permalink
Updated to use the Streaming msgpack API
Browse files Browse the repository at this point in the history
Updated C++ code and JS, however its segfaulting after 2 messages in getResults()
  • Loading branch information
aikar committed Apr 7, 2011
1 parent eec5703 commit 0a4e4a4
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 209 deletions.
117 changes: 23 additions & 94 deletions lib/wormhole.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,105 +19,34 @@
// THE SOFTWARE.

(function() {

var mpBinding = require('./mpBindings');
var whBinding = require('./whBindings');

var binding = require('./whBindings'),
Wormhole = binding.wormhole,
pack = binding.pack;
var dump = require('sys').inspect;

var wormholeParser = function (stream, callback, msgEvent, clientName) {
clientName = clientName || "default";
msgEvent = msgEvent || '__msg';

var self = this;

if (callback) self.addListener(msgEvent, callback);

var buffer = null,
len = null;

stream.on('data', function (data) {
//console.log("[" + clientName + "] got data", dump(data));
var newBuffer;
if (data && data.length) {
if (buffer) {
newBuffer = new Buffer(buffer.length + data.length);
buffer.copy(newBuffer);
data.copy(newBuffer, buffer.length);
buffer = newBuffer;
//console.log('had a buffer, appended', dump(buffer));
} else {
//console.log('buffer is now data');
buffer = data;
}
// 6 byte head then body
if (!len && buffer.length >= 2) {

var retlen = whBinding.getlength(buffer);
if (retlen) {
//console.log("got retlen", retlen);
var i = retlen[0];
len = retlen[1];

// if the frame header was not found at start of buffer
// readjust buffer to the header
if (i) {
//console.log("readjusting buffer");
buffer = buffer.slice(i);
}

if (len) {
//console.log("hasLen = true");
buffer = buffer.slice(6);
}
} else {
//console.log("destroying buffer cause getlength returned null");
buffer = null;
}
} else if (!len && buffer[0] != 0xFF) {
// on the extreme chance that we get a 1 byte buffer that is the
// start of the frame header...
//console.log("destroying buffer cause data had no frame header")
buffer = null;
}

// If through all that we have a length...
if (buffer && len && len <= buffer.length) {
//console.log('unpacking buffer', dump(buffer));
var dataObj = mpBinding.unpack(buffer);
//console.log("unpacked as", dataObj);
self.emit(msgEvent, dataObj);

if (len == buffer.length) {
//console.log("emptying buffer");
buffer = null;
} else {

buffer = buffer.slice(len);
//console.log('buffer left', dump(buffer));
}
len = 0;
}
}
});
}

require('sys').inherits(wormholeParser, require('events').EventEmitter);

// Define exports
module.exports = function(client, callback, eventName, clientName) {
module.exports = function(client, callback) {
client.origwrite = client.write;
client.write = function(data) {
var buffer = mpBinding.pack(data);
var header = whBinding.buildheader(buffer.length);
//console.log("sending:", header, buffer);
client.origwrite(header);
client.origwrite(buffer);
return [header, buffer];
var packed = pack(data);
return client.origwrite(packed);
};
return new wormholeParser(client, callback, eventName, clientName);
if (typeof callback == 'function') {
var wormhole = new Wormhole();
var result;
client.on('data', function (data) {
wormhole.unpack(data);
do {
// Segfaults here?
result = wormhole.getResult();
if (typeof result !== 'undefined') {
callback(result);
} else {
break;
}
} while (true);
});
}
};
module.exports.mp = mpBinding;
module.exports.wh = whBinding;

})();
188 changes: 74 additions & 114 deletions src/wormhole.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,131 +21,91 @@
#include <v8.h>
#include <node.h>
#include <node_buffer.h>
#include <node-msgpack.h>
#include <msgpack.hpp>
#include <endian.h>
#include <node-msgpack.h>

using namespace v8;
using namespace node;





msgpack::unpacker pac;

// feeds the buffer.
pac.reserve_buffer(buffer.size());
memcpy(pac.buffer(), buffer.data(), buffer.size());
pac.buffer_consumed(buffer.size());

// now starts streaming deserialization.
msgpack::unpacked result;
while(pac.next(&result)) {
std::cout << result.get() << std::endl;
static v8::Persistent<v8::FunctionTemplate> constructor_template;
class Wormhole : public ObjectWrap {
public:
// Actual Constructor when object is built in Wormhole::New
Wormhole(Handle<Object> wrapper) : ObjectWrap() {
Wrap(wrapper);
msgpack_unpacker_init(&unpacker, MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
msgpack_unpacked_init(&result);
}
~Wormhole() {
msgpack_unpacker_destroy(&unpacker);
msgpack_unpacked_destroy(&result);
}
// get a new Wormhole() and wrap it inside of this
static Handle<Value> New(const Arguments &args) {
HandleScope scope;
Wormhole* wormhole;
wormhole = new Wormhole(args.This());
return args.This();
}












































/**
* Scans a buffer looking for the header start symbol and then array or null
* array(headstart, length)
* or array(headstart, null) if found head but not enough data for length (rare)
*
* Null = bad data (header was long enough but contained no start symbols)
*/
static Handle<Value> wh_getlength(const Arguments &args) {
HandleScope scope;

Local<Object> buf = args[0]->ToObject();
uint32_t len = 0;

Local<Array> result = Array::New(2);

size_t buflen = Buffer::Length(buf);

// the js side should enforce the 2 min
if (buflen >= 2) {
//Wormhole.prototype.unpack();
static Handle<Value> Unpack(const Arguments &args) {
HandleScope scope;
Wormhole* wormhole = ObjectWrap::Unwrap<Wormhole>(args.This());

msgpack_unpacker* unpacker = &wormhole->unpacker;
if (args.Length() <= 0 || !Buffer::HasInstance(args[0])) {
return ThrowException(Exception::TypeError(
String::New("First argument must be a Buffer")));
}
Local<Object> buf = args[0]->ToObject();
char* data = Buffer::Data(buf);
for (size_t i = 0; i < buflen - 1; i++) {
if ((unsigned char) data[i] == 0xFF &&
(unsigned char) data[i+1] == 0x0F) {
result->Set(Number::New(0), Uint32::New(i));
if (i+6 <= buflen) {
len = le32toh(*(uint32_t*)(data + i + 2));
result->Set(Number::New(1), Uint32::New(len));
} else {
result->Set(Number::New(1), Null());
}
return scope.Close(result);
}
size_t len = Buffer::Length(buf);

if(!msgpack_unpacker_reserve_buffer(unpacker, len)) {
return ThrowException(Exception::Error(
String::New("Could not reserve buffer")));
}
if (msgpack_unpacker_buffer_capacity(unpacker) < len) {
return ThrowException(Exception::Error(
String::New("buffer capacity is less than required length")));
}
memcpy(msgpack_unpacker_buffer(unpacker), data, len);
msgpack_unpacker_buffer_consumed(unpacker, len);
return scope.Close(True());
}
return scope.Close(Null());
}
/**
* Builds header with 0xFF0x0F32bitlen
*/
static Handle<Value> wh_buildheader(const Arguments &args) {
HandleScope scope;
static Handle<Value> GetResult(const Arguments &args) {
HandleScope scope;
Wormhole* wormhole = ObjectWrap::Unwrap<Wormhole>(args.This());

msgpack_unpacker* unpacker = &wormhole->unpacker;
msgpack_unpacked* result = &wormhole->result;

if (msgpack_unpacker_next(unpacker, result)) {
return scope.Close(msgpack_to_v8(&(result->data)));
}
return scope.Close(Undefined());
}

msgpack_unpacked result;
msgpack_unpacker unpacker;
private:

};

uint32_t len = args[0]->Uint32Value();

Buffer* bp = Buffer::New(6);
char* buf = Buffer::Data(bp->handle_);
// 0xFF 0x0F <size> header
*(buf) = 0xFF;
*(buf+1) = 0x0F;
*(uint32_t*)(buf+2) = htole32(len);
return scope.Close(bp->handle_);
}
/**
* Exports the functions
*/
extern "C" void init(Handle<Object> target) {
extern "C" void
init(Handle<Object> target) {
HandleScope scope;

Local<FunctionTemplate> t = FunctionTemplate::New(Wormhole::New);
constructor_template = Persistent<FunctionTemplate>::New(t);
constructor_template->InstanceTemplate()->SetInternalFieldCount(1);
constructor_template->SetClassName(String::NewSymbol("Wormhole"));
NODE_SET_PROTOTYPE_METHOD(constructor_template, "unpack", Wormhole::Unpack);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "getResult", Wormhole::GetResult);
target->Set( String::NewSymbol("wormhole"), constructor_template->GetFunction() );
NODE_SET_METHOD(target, "pack", pack);
};


NODE_SET_METHOD(target, "getlength", wh_getlength);
NODE_SET_METHOD(target, "buildheader", wh_buildheader);
}
45 changes: 44 additions & 1 deletion support/node-msgpack.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
================================================================================
*/

#include <v8.h>
#include <node.h>
#include <node_buffer.h>
#include <msgpack.h>
#include <math.h>
#include <vector>

using namespace v8;
using namespace node;

// An exception class that wraps a textual message
class MsgpackException {
public:
Expand Down Expand Up @@ -149,7 +159,7 @@ class MsgpackCycle {
static void
v8_to_msgpack(Handle<Value> v8obj, msgpack_object *mo, msgpack_zone *mz,
MsgpackCycle *mc) {
mc->check(v8obj);
//mc->check(v8obj);

if (v8obj->IsUndefined() || v8obj->IsNull()) {
mo->type = MSGPACK_OBJECT_NIL;
Expand Down Expand Up @@ -269,3 +279,36 @@ msgpack_to_v8(msgpack_object *mo) {
throw MsgpackException("Encountered unknown MesssagePack object type");
}
}


static Handle<Value>
pack(const Arguments &args) {
HandleScope scope;

msgpack_packer pk;
MsgpackZone mz;
MsgpackSbuffer sb;
MsgpackCycle mc;

msgpack_packer_init(&pk, &sb._sbuf, msgpack_sbuffer_write);

for (int i = 0; i < args.Length(); i++) {
msgpack_object mo;

try {
v8_to_msgpack(args[0], &mo, &mz._mz, &mc);
} catch (MsgpackException e) {
return ThrowException(e.getThrownException());
}

if (msgpack_pack_object(&pk, mo)) {
return ThrowException(Exception::Error(
String::New("Error serializaing object")));
}
}

Buffer *bp = Buffer::New(sb._sbuf.size);
memcpy(Buffer::Data(bp->handle_), sb._sbuf.data, sb._sbuf.size);

return scope.Close(bp->handle_);
}

0 comments on commit 0a4e4a4

Please sign in to comment.