Skip to content

Commit

Permalink
Create msgpack.Stream abstraction; split C++/JS.
Browse files Browse the repository at this point in the history
- Provide the external API to the addon as lib/msgpack.js, which builds
  off of the rudimentary facilities provided by the bindings. This
  requires installation to install both build/default/mpBindings.node
  and lib/msgpack.js.
- Create msgpack.Stream abstraction which wraps a net.Stream and emits
  'msg' events when a full message is received. Provides a 'sendmsg'
  method to transparently pack JavaScript objects.
- Add tests for msgpack.Stream.
  • Loading branch information
pgriess committed May 27, 2010
1 parent f22f530 commit 976b512
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 1 deletion.
71 changes: 71 additions & 0 deletions lib/msgpack.js
@@ -0,0 +1,71 @@
// Wrap a nicer JavaScript API that wraps the direct MessagePack bindings.

var buffer = require('buffer');
var events = require('events');
var mpBindings = require('mpBindings');
var sys = require('sys');

var debug = ('NODE_DEBUG' in process.env) ?
function(m) { sys.error.apply(this, arguments); } :
function() { };

var pack = mpBindings.pack;
var unpack = mpBindings.unpack;

exports.pack = pack;
exports.unpack = unpack;

var Stream = function(s) {
var self = this;

events.EventEmitter.call(self);

// Buffer of incomplete stream data
self.buf = null;

// Send a message down the stream
self.send = function(m) {
s.write(pack(m));
};

// Listen for data from the underlying stream, consuming it and emitting
// 'msg' events as we find whole messages.
s.addListener('data', function(d) {
debug('received ' + d.length + ' bytes');

// Make sure that self.buf reflects the entirety of the unread stream
// of bytes; it needs to be a single buffer
if (self.buf) {
var b = new buffer.Buffer(self.buf.length + d.length);
self.buf.copy(b, 0, 0, self.buf.length);
d.copy(b, self.buf.length, 0, d.length);

self.buf = b;
} else {
self.buf = d;
}

// Consume messages from the stream, one by one
while (self.buf && self.buf.length > 0) {
var msg = unpack(self.buf);
if (!msg) {
break;
}

self.emit('msg', msg);
if (unpack.bytes_remaining > 0) {
self.buf = self.buf.slice(
self.buf.length - unpack.bytes_remaining,
self.buf.length
);
} else {
self.buf = null;
}
}

debug(((self.buf) ? self.buf.length : 0) + ' bytes remaining');
});
};

sys.inherits(Stream, events.EventEmitter);
exports.Stream = Stream;
2 changes: 1 addition & 1 deletion src/wscript
Expand Up @@ -11,7 +11,7 @@ def configure(ctx):

def build(ctx):
t = ctx.new_task_gen('cxx', 'shlib', 'node_addon')
t.target = 'msgpack'
t.target = 'mpBindings'
t.source = 'msgpack.cc'
t.includes = ['../deps/msgpack/dist/include']
t.cxxflags = ['-g', '-Wall']
Expand Down
66 changes: 66 additions & 0 deletions test/test-stream-disjoint.js
@@ -0,0 +1,66 @@
// Verify that a msgpack.Stream can handle partial messages arriving.

var assert = require('assert');
var buffer = require('buffer');
var msgpack = require('msgpack');
var net = require('net');
var netBindings = process.binding('net');
var sys = require('sys');

var MSGS = [
[1, 2, 3],
{'a' : 1, 'b' : 2},
{'test' : [1, 'a', 3]}
];

// Write a buffer to a stream with a delay
var writemsg = function(s, buf, delay) {
setTimeout(function() {
sys.debug(sys.inspect(buf));
s.write(buf);
},
delay
);
};

var fds = netBindings.socketpair();

var is = new net.Stream(fds[0]);
var ims = new msgpack.Stream(is);
var os = new net.Stream(fds[1]);

var msgsReceived = 0;
ims.addListener('msg', function(m) {
sys.debug('received msg: ' + sys.inspect(m));

assert.deepEqual(m, MSGS[msgsReceived]);

if (++msgsReceived == MSGS.length) {
is.end();
os.end();
}
});
is.resume();

// Serialize the messages into a single large buffer
var buf = new buffer.Buffer(0);
MSGS.forEach(function (m) {
var b = msgpack.pack(m);
var bb = new buffer.Buffer(buf.length + b.length);
buf.copy(bb, 0, 0, buf.length);
b.copy(bb, buf.length, 0, b.length);

buf = bb;
});

// Slice our single large buffer into 3 pieces and send them all off
// separately.
var nwrites = 3;
var sz = Math.ceil(buf.length / nwrites);
for (var i = 0; i < nwrites; i++) {
writemsg(
os,
buf.slice(i * sz, Math.min((i + 1) * sz, buf.length)),
(i + 1) * 1000
);
}
34 changes: 34 additions & 0 deletions test/test-stream.js
@@ -0,0 +1,34 @@
// Verify that msgpack.Stream can pass multiple messages around as expected.

var assert = require('assert');
var msgpack = require('msgpack');
var net = require('net');
var netBindings = process.binding('net');
var sys = require('sys');

var MSGS = [
[1, 2, 3],
{'a' : 1, 'b' : 2},
{'test' : [1, 'a', 3]}
];
var fds = netBindings.socketpair();

var is = new net.Stream(fds[0]);
var ims = new msgpack.Stream(is);
var os = new net.Stream(fds[1]);
var oms = new msgpack.Stream(os);

var msgsReceived = 0;
ims.addListener('msg', function(m) {
assert.deepEqual(m, MSGS[msgsReceived]);

if (++msgsReceived == MSGS.length) {
is.end();
os.end();
}
});
is.resume();

MSGS.forEach(function (m) {
oms.send(m);
});

0 comments on commit 976b512

Please sign in to comment.