Skip to content

Commit

Permalink
Send method and header in one go
Browse files Browse the repository at this point in the history
To avoid the scenario in which the message publish method is sent, but
attempting to send the header fails, I now encode the method and the
header before sending both. Encoding is the second most likely cause
of problems; the first is trying to send something other than a byte
buffer, so I also check the `content` argument.

This changes the connection API such that `sendContent` only sends
content, instead of the header *then* content, and there's an
additional method `sendMethodAndProperties`.

The original way seemed correct in that it resulted in fewer methods
(just sendMethod and send[HeaderAnd]Content); however, it's more er,
correct, to factor the methods such that the channel can't be in an
invalid state in-between method calls.

(Side note: this may also open the way for e.g., sendStream, since the
'whole' content doesn't have to be given at once any longer).
  • Loading branch information
squaremo committed Sep 5, 2013
1 parent eac43bb commit ed3f93d
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 40 deletions.
11 changes: 7 additions & 4 deletions lib/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,13 @@ C.sendOrEnqueue = function(method, fields, reply) {
};

C.sendMessage = function(fields, properties, content) {
this.sendImmediately(defs.BasicPublish, fields);
return this.connection.sendContent(this.ch,
defs.BasicProperties, properties,
content);
if (!Buffer.isBuffer(content))
throw new TypeError('content is not a buffer');

this.connection.sendMethodAndProperties(
this.ch, defs.BasicPublish, fields,
defs.BasicProperties, properties, content.length);
return this.connection.sendContent(this.ch, content);
};

// Internal, synchronously resolved RPC; the return value is resolved
Expand Down
19 changes: 14 additions & 5 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -439,18 +439,27 @@ C.sendMethod = function(channel, Method, fields) {
return buffer.write(frame);
};

C.sendMethodAndProperties = function(channel,
Method, fields,
Properties, props,
size) {
var mframe = encodeMethod(Method, channel, fields);
var pframe = encodeProperties(Properties, channel, size, props);
var buffer = this.channels[channel].buffer;
this.sentSinceLastCheck = true;
buffer.write(mframe);
return buffer.write(pframe);
};

var FRAME_OVERHEAD = frame.FRAME_OVERHEAD;
var makeBodyFrame = frame.makeBodyFrame;

C.sendContent = function(channel, Properties, fields, body) {
C.sendContent = function(channel, body) {
var writeResult = true;
var buffer = this.channels[channel].buffer;
var headerFrame = encodeProperties(
Properties, channel, body.length, fields);
// I'll send the headers regardless
buffer.write(headerFrame);

var maxBody = this.frameMax - FRAME_OVERHEAD;

for (var offset = 0; offset < body.length; offset += maxBody) {
var end = offset + maxBody;
var slice = (end > body.length) ? body.slice(offset) : body.slice(offset, end);
Expand Down
84 changes: 55 additions & 29 deletions test/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ test("delivery", channelTest(
});
},
function(send, await, done, ch) {
send(defs.BasicDeliver, DELIVER_FIELDS, ch);
send(false, {}, ch, new Buffer('barfoo'));
send(defs.BasicDeliver, DELIVER_FIELDS, ch, new Buffer('barfoo'));
}));

test("zero byte msg = no content body frames", channelTest(
Expand All @@ -252,8 +251,7 @@ test("zero byte msg = no content body frames", channelTest(
});
},
function(send, await, done, ch) {
send(defs.BasicDeliver, DELIVER_FIELDS, ch);
send(false, {}, ch, new Buffer(""));
send(defs.BasicDeliver, DELIVER_FIELDS, ch, new Buffer(''));
}));

test("bad delivery", channelTest(
Expand All @@ -273,6 +271,35 @@ test("bad delivery", channelTest(
});
}));

test("bad content send", channelTest(
function(ch, done) {
ch.open();
assert.throws(function() {
ch.sendMessage({routingKey: 'foo',
exchange: 'amq.direct'},
{}, null);
});
done();
},
function(send, await, done, ch) {
// nothing gets sent ...
}));

test("bad properties send", channelTest(
function(ch, done) {
ch.open();
assert.throws(function() {
ch.sendMessage({routingKey: 'foo',
exchange: 'amq.direct'},
{contentEncoding: 7},
new Buffer('foobar'));
});
done();
},
function(send, await, done, ch) {
// nothing gets sent ...
}));

test("bad consumer", channelTest(
function(ch, done) {
errorAndClose = latch(2, done);
Expand All @@ -284,8 +311,29 @@ test("bad consumer", channelTest(
ch.open();
},
function(send, await, done, ch) {
send(defs.BasicDeliver, DELIVER_FIELDS, ch);
send(false, {}, ch, new Buffer('barfoo'));
send(defs.BasicDeliver, DELIVER_FIELDS, ch, new Buffer('barfoo'));
return await(defs.ChannelClose)()
.then(function() {
send(defs.ChannelCloseOk, {}, ch);
});
}));

test("bad send in consumer", channelTest(
function(ch, done) {
var errorAndClose = latch(2, done);
ch.on('close', succeed(errorAndClose));
ch.on('error', succeed(errorAndClose));

ch.on('delivery', function() {
ch.sendMessage({routingKey: 'foo',
exchange: 'amq.direct'},
{}, null);
});

ch.open();
},
function(send, await, done, ch) {
send(defs.BasicDeliver, DELIVER_FIELDS, ch, new Buffer('barfoo'));
return await(defs.ChannelClose)()
.then(function() {
send(defs.ChannelCloseOk, {}, ch);
Expand All @@ -301,8 +349,7 @@ test("return", channelTest(
ch.open();
},
function(send, await, done, ch) {
send(defs.BasicReturn, DELIVER_FIELDS, ch);
send(null, {}, ch, new Buffer('barfoo'));
send(defs.BasicReturn, DELIVER_FIELDS, ch, new Buffer('barfoo'));
}));

function confirmTest(variety, Method) {
Expand All @@ -325,25 +372,4 @@ function confirmTest(variety, Method) {
confirmTest("ack", defs.BasicAck);
confirmTest("nack", defs.BasicNack);

test("interleaved RPC/delivery", channelTest(
function(ch, done) {
var both = latch(2, done);
ch.on('delivery', succeed(both));

ch.open().then(function() {
ch.rpc(defs.BasicQos,
{ global: false, prefetchSize: 0, prefetchCount: 7},
defs.BasicQosOk)
.then(succeed(both), fail(both));
}, fail(both));
},
function(send, await, done, ch) {
return await(defs.BasicQos)()
.then(function() {
send(defs.BasicDeliver, DELIVER_FIELDS, ch);
send(defs.BasicQosOk, {}, ch);
send(false, {}, ch, new Buffer('boofar'));
});
}));

});
7 changes: 5 additions & 2 deletions test/mocknet.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ function runServer(socket, run) {

function send(id, fields, channel, content) {
channel = channel || 0;
if (!id && content) {
frames.sendContent(channel, defs.BasicProperties, fields, content);
if (content) {
frames.sendMethodAndProperties(channel, id, fields,
defs.BasicProperties, fields,
content.length);
frames.sendContent(channel, content);
}
else {
frames.sendMethod(channel, id, fields);
Expand Down

0 comments on commit ed3f93d

Please sign in to comment.