Skip to content

Commit

Permalink
basic respect of channel max and frame max
Browse files Browse the repository at this point in the history
  • Loading branch information
David Barshow committed May 14, 2015
1 parent 10f43f3 commit bfc9fd1
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 14 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{ "name" : "amqp-coffee"
, "description" : "AMQP driver for node"
, "keywords" : [ "amqp" ]
, "version" : "0.1.21"
, "version" : "0.1.22"
, "author" : { "name" : "David Barshow" },
"licenses": [
{
Expand Down
6 changes: 4 additions & 2 deletions src/lib/AMQPParser.coffee
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# debug = require('debug')('amqp:AMQPParser')
{EventEmitter} = require('events')

{ Indicators, FrameType, MaxFrameBuffer } = require('./config').constants
{ Indicators, FrameType } = require('./config').constants
{ methodTable, classes, methods } = require('./config').protocol
debug = require('./config').debug('amqp:AMQPParser')

Expand All @@ -11,6 +11,8 @@ debug = require('./config').debug('amqp:AMQP

class AMQPParser extends EventEmitter
constructor: (version, type, connection) ->
@connection = connection

# send the start of the handshake....
connection.write("AMQP" + String.fromCharCode(0,0,9,1));

Expand Down Expand Up @@ -49,7 +51,7 @@ class AMQPParser extends EventEmitter
@frameChannel = parseIntFromBuffer(@frameHeader,2)
@frameSize = parseIntFromBuffer(@frameHeader,4)

if @frameSize > MaxFrameBuffer
if @frameSize > @connection.frameMax
return @error "Oversize frame #{@frameSize}"

# # setup our frameBuffer
Expand Down
2 changes: 2 additions & 0 deletions src/lib/Channel.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Channel extends EventEmitter

@waitForMethod(methods.channelOpenOk, cb) if cb?
@connection._sendMethod(@channel, methods.channelOpen, {})
@connection.channelCount++

if @transactional then @temporaryChannel()
else
Expand Down Expand Up @@ -92,6 +93,7 @@ class Channel extends EventEmitter
@channelTracker = null

if @state is 'open'
@connection.channelCount--
@state = 'closed'
@connection._sendMethod @channel, methods.channelClose, {
replyText : 'Goodbye'
Expand Down
4 changes: 3 additions & 1 deletion src/lib/ChannelManager.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class ChannelManager
cb(null, pool[i].channel)

temporaryChannel: (cb)=>
if @tempChannel? then return cb null, @tempChannel
if @tempChannel?
cb?(null, @tempChannel)
return @tempChannel

channel = @nextChannelNumber()

Expand Down
23 changes: 17 additions & 6 deletions src/lib/Connection.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async = require('async')

defaults = require('./defaults')
{ methodTable, classes, methods } = require('./config').protocol
{ MaxFrameBuffer, FrameType, HeartbeatFrame, EndFrame } = require('./config').constants
{ FrameType, HeartbeatFrame, EndFrame } = require('./config').constants
{ serializeInt, serializeFields } = require('./serializationHelpers')

Queue = require('./Queue')
Expand Down Expand Up @@ -52,7 +52,11 @@ class Connection extends EventEmitter
@queues = {}
@exchanges = {}

@sendBuffer = new Buffer(MaxFrameBuffer)
# connection tuning paramaters
@channelMax = @connectionOptions.channelMax
@frameMax = @connectionOptions.frameMax

This comment has been minimized.

Copy link
@carlhoerberg

carlhoerberg May 15, 2015

Normally you take the lowest of the two (servers wish and the clients wish) (except if it's zero, which means max), but this is totally fine too.


@sendBuffer = new Buffer(@frameMax)

@channelManager = new ChannelManager(@)

Expand Down Expand Up @@ -409,7 +413,7 @@ class Connection extends EventEmitter
offset = 0
while offset < body.length

length = Math.min((body.length - offset), MaxFrameBuffer)
length = Math.min((body.length - offset), @frameMax)
h = new Buffer(7)
h.used = 0

Expand All @@ -423,7 +427,7 @@ class Connection extends EventEmitter
@connection.write(EndFrame)
@_resetSendHeartbeatTimer()

offset += MaxFrameBuffer
offset += @frameMax

cb?()
return true
Expand Down Expand Up @@ -479,9 +483,16 @@ class Connection extends EventEmitter
locale: 'en_US'
}
when methods.connectionTune
if args.channelMax? and args.channelMax isnt 0 and args.channelMax < @channelMax or @channelMax is 0
@channelMax = args.channelMax

if args.frameMax? and args.frameMax < @frameMax
@frameMax = args.frameMax
@sendBuffer = new Buffer(@frameMax)

@_sendMethod 0, methods.connectionTuneOk, {
channelMax: 0
frameMax: MaxFrameBuffer
channelMax: @channelMax
frameMax: @frameMax
heartbeat: @connectionOptions.heartbeat / 1000
}

Expand Down
6 changes: 4 additions & 2 deletions src/lib/Consumer.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defaults = require('./defaults')
{BSON} = require('bson').BSONPure

{ methodTable, classes, methods } = require('./config').protocol
{ MaxFrameSize } = require('./config').constants
{ MaxEmptyFrameSize } = require('./config').constants

class Consumer extends Channel

Expand Down Expand Up @@ -181,7 +181,8 @@ class Consumer extends Channel
@incomingMessage = _.extend @incomingMessage, {weight, properties, size}

# if we're only expecting one packet lets just copy the buffer when we get it
if size > MaxFrameSize
# otherwise lets create a new incoming data buffer and pre alloc the space
if size > @connection.frameMax - MaxEmptyFrameSize
@incomingMessage.data = new Buffer(size)
@incomingMessage.data.used = 0

Expand All @@ -207,6 +208,7 @@ class Consumer extends Channel
try
return JSON.parse message.raw.toString()
catch e
console.error e
return message.raw
}

Expand Down
3 changes: 1 addition & 2 deletions src/lib/constants.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module.exports = {
CLOSED: 'closed'
OPENING: 'opening'

MaxFrameBuffer : 131072
MaxFrameSize : 131072
MaxEmptyFrameSize : 8

AMQPTypes: Object.freeze({
Expand Down Expand Up @@ -42,4 +42,3 @@ module.exports = {
EndFrame : new Buffer([206])
}

module.exports.MaxFrameSize = module.exports.MaxFrameBuffer - module.exports.MaxEmptyFrameSize
4 changes: 4 additions & 0 deletions src/lib/defaults.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ try
catch e
clientVersion = '0.0.1'

{ MaxFrameSize } = require('./constants')

os = require('os')

module.exports =
Expand All @@ -22,6 +24,8 @@ module.exports =
reconnectDelayTime: 1000 # in ms
hostRandom: false
connectTimeout: 30000 # in ms
channelMax: 0 # unlimited
frameMax: MaxFrameSize
clientProperties:
version: clientVersion
platform: os.hostname() + '-node-' + process.version
Expand Down

0 comments on commit bfc9fd1

Please sign in to comment.