Skip to content

Commit

Permalink
Merge 64d34a3 into 112e126
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanDonovan committed Feb 22, 2015
2 parents 112e126 + 64d34a3 commit c4ff606
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 9 deletions.
7 changes: 5 additions & 2 deletions Consumer.js
Expand Up @@ -47,6 +47,9 @@ Consumer.prototype._unsetRequestMode = function(){

Consumer.prototype.connect = function(cb){
var that = this;
if (this.port <= 0 || this.port >= 65536) {
return cb(new Error('Port should be > 0 and < 65536'));
}
this.socket = net.createConnection({port : this.port, host : this.host}, function(){
cb();
});
Expand Down Expand Up @@ -153,7 +156,7 @@ Consumer.prototype.handleOffsetsData = function(cb){
return cb(null, offsets);
};

Consumer.prototype.sendConsumeRequest = function(cb){
Consumer.prototype.sendConsumeRequest = function(cb){
if (this.offset === null || this.offset === undefined || !this.offset.eq) {
return cb("offset was " + this.offset);
}
Expand Down Expand Up @@ -205,7 +208,7 @@ Consumer.prototype.getOffsets = function(cb){
var that = this;
var request = new OffsetsRequest(this.topic, this.partition, -1, this.MAX_OFFSETS);
this.socket.write(request.toBytes());

};

Consumer.prototype.getLatestOffset = function(cb){
Expand Down
4 changes: 2 additions & 2 deletions package.json
Expand Up @@ -25,11 +25,11 @@
],
"dependencies": {
"underscore": "1.3.3",
"buffermaker": "1.0.0",
"buffermaker": "1.2.0",
"binary": "0.3.0",
"crc32": "0.2.2",
"buffer-crc32": "0.2.1",
"bignum": "0.6.2",
"bignum": "0.9.2",
"readable-stream": "1.0.26"
},
"devDependencies": {
Expand Down
12 changes: 7 additions & 5 deletions test/Consumer.js
@@ -1,3 +1,4 @@
var util = require('util');
var should = require('should');
var net = require('net');
var bignum = require('bignum');
Expand Down Expand Up @@ -160,7 +161,7 @@ describe("Consumer", function(){
setTimeout(function() { connectionListener(); }, 10);
return socket;
});
var consumer = new Consumer({topic: "test", port : -1, offset : 1});
var consumer = new Consumer({topic: "test", port : 1, offset : 1});
consumer._setRequestMode("fetch");
consumer.onFetch(function(err) {
should.exist(err, 'we should emit an error for oversized messages');
Expand All @@ -173,16 +174,17 @@ describe("Consumer", function(){
done();
});
consumer.connect(function(err) {
should.not.exist(err, 'should not throw an error here');
should.not.exist(err, 'should not throw an error here: ' + util.inspect(err));
});
});
});

describe("invalid port", function(){
it("calls back with a socket error", function(done){
it("calls back with an error", function(done){

var consumer = new Consumer({topic: "test", port : -1});
consumer.connect(function(err){
err.code.should.equal('ECONNREFUSED');
err.message.should.match(/port/i);
done();
});

Expand Down Expand Up @@ -287,7 +289,7 @@ describe("Consumer", function(){
var emptyBuffer = new Buffer([]);
consumer.responseBuffer.should.eql(emptyBuffer);
should.not.exist(consumer.requestMode);
consumer.offset.eq(bignum(12 +
consumer.offset.eq(bignum(12 +
messages[0].toBytes().length +
messages[1].toBytes().length )).should.equal(true);
done();
Expand Down

0 comments on commit c4ff606

Please sign in to comment.