Skip to content
This repository has been archived by the owner on Jan 26, 2018. It is now read-only.

Commit

Permalink
better 'none' compression handling on produce
Browse files Browse the repository at this point in the history
  • Loading branch information
dannycoates committed Nov 18, 2012
1 parent 92cac95 commit 2b9022a
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 13 deletions.
1 change: 1 addition & 0 deletions client/client.js
Expand Up @@ -64,6 +64,7 @@ module.exports = function (
return cb(err) return cb(err)
} }
if (!written) { if (!written) {
logger.info('connection', 'wait')
this.ready = false this.ready = false
} }
this.receiver.push(request, cb) this.receiver.push(request, cb)
Expand Down
9 changes: 5 additions & 4 deletions client/produce-request.js
Expand Up @@ -56,7 +56,8 @@ module.exports = function (
if (err) { if (err) {
return cb(err) return cb(err)
} }
if (buffer.length > this.topic.maxMessageSize) { if (this.topic.compression !== Message.compression.NONE &&
buffer.length > this.topic.maxMessageSize) {
return cb(new ProduceError("message too big", buffer.length)) return cb(new ProduceError("message too big", buffer.length))
} }
var header = new RequestHeader( var header = new RequestHeader(
Expand All @@ -66,11 +67,11 @@ module.exports = function (
this.partitionId this.partitionId
) )
try { try {
header.serialize(stream) var written = header.serialize(stream)
var mlen = new Buffer(4) var mlen = new Buffer(4)
mlen.writeUInt32BE(buffer.length, 0) mlen.writeUInt32BE(buffer.length, 0)
stream.write(mlen) written = stream.write(mlen) && written
var written = stream.write(buffer) written = stream.write(buffer) && written
} }
catch (e) { catch (e) {
err = e err = e
Expand Down
2 changes: 1 addition & 1 deletion index.js
Expand Up @@ -34,7 +34,7 @@ module.exports = function (options) {
var Broker = require('./broker')(logger, inherits, EventEmitter, Client) var Broker = require('./broker')(logger, inherits, EventEmitter, Client)
var BrokerPool = require('./broker-pool')(logger, inherits, EventEmitter) var BrokerPool = require('./broker-pool')(logger, inherits, EventEmitter)
var Partition = require('./partition')(logger, inherits, EventEmitter, Broker) var Partition = require('./partition')(logger, inherits, EventEmitter, Broker)
var PartitionSet = require('./partition-set')(logger, inherits, EventEmitter, Partition) var PartitionSet = require('./partition-set')(logger, inherits, EventEmitter)
var MessageBuffer = require('./message-buffer')(inherits, EventEmitter) var MessageBuffer = require('./message-buffer')(inherits, EventEmitter)
var Topic = require('./topic')(logger, inherits, Stream, MessageBuffer, Partition, PartitionSet) var Topic = require('./topic')(logger, inherits, Stream, MessageBuffer, Partition, PartitionSet)
var StaticConnector = require('./static-connector')(logger, inherits, EventEmitter, Broker) var StaticConnector = require('./static-connector')(logger, inherits, EventEmitter, Broker)
Expand Down
9 changes: 3 additions & 6 deletions partition-set.js
@@ -1,8 +1,7 @@
module.exports = function ( module.exports = function (
logger, logger,
inherits, inherits,
EventEmitter, EventEmitter) {
Partition) {


function PartitionSet() { function PartitionSet() {
this.partitionsByName = {} this.partitionsByName = {}
Expand Down Expand Up @@ -69,11 +68,11 @@ module.exports = function (
} }


PartitionSet.prototype.nextWritable = function () { PartitionSet.prototype.nextWritable = function () {
var partition = Partition.nil var partition = null
for (var i = 0; i < this.partitions.length; i++) { for (var i = 0; i < this.partitions.length; i++) {
partition = this.next() partition = this.next()
if (partition.isWritable() && partition.isReady()) { if (partition.isWritable() && partition.isReady()) {
break; return partition
} }
} }
return partition return partition
Expand Down Expand Up @@ -107,8 +106,6 @@ module.exports = function (
this.readable().forEach(stopPartition) this.readable().forEach(stopPartition)
} }


PartitionSet.nil = new PartitionSet()

function readableChanged(partition) { function readableChanged(partition) {
if (partition.isReadable()) { if (partition.isReadable()) {
this.readables[partition.name] = partition this.readables[partition.name] = partition
Expand Down
8 changes: 6 additions & 2 deletions topic.js
Expand Up @@ -166,16 +166,17 @@ module.exports = function (
} }
} }


// Readable Stream

Topic.prototype.error = function (err) { Topic.prototype.error = function (err) {
if (!this.paused) { if (!this.paused) {
this.pause() this.pause()
} }
logger.info('topic', this.name, 'error', err.message) logger.info('topic', this.name, 'error', err.message)
this.emit('error', err) this.emit('error', err)
return false
} }


// Readable Stream

Topic.prototype.pause = function () { Topic.prototype.pause = function () {
logger.info('pause', this.name) logger.info('pause', this.name)
this.paused = true this.paused = true
Expand Down Expand Up @@ -206,6 +207,9 @@ module.exports = function (
encoding = encoding || 'utf8' encoding = encoding || 'utf8'
data = new Buffer(data, encoding) data = new Buffer(data, encoding)
} }
if (data.length > this.maxMessageSize) {
return this.error(new Error("message too big"))
}
return this.produceBuffer.push(data) return this.produceBuffer.push(data)
} }


Expand Down

0 comments on commit 2b9022a

Please sign in to comment.