Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Dec 22, 2014
1 parent 4a455b5 commit 016bf41
Showing 1 changed file with 22 additions and 28 deletions.
50 changes: 22 additions & 28 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ type Channel struct {
// State machine that manages frame order, must only be mutated by the connection
recv func(*Channel, frame) error

// State that manages the send behavior after before and after shutdown, must
// only be mutated in shutdown()
send func(*Channel, message) error
shutdowned bool

// Current state for frame re-assembly, only mutated from recv
message messageWithContent
Expand All @@ -88,7 +86,6 @@ func newChannel(c *Connection, id uint16) *Channel {
rpc: make(chan message),
consumers: makeConsumers(),
recv: (*Channel).recvMethod,
send: (*Channel).sendOpen,
errors: make(chan *Error, 1),
}
}
Expand All @@ -105,7 +102,7 @@ func (me *Channel) shutdown(e *Error) {
}
}

me.send = (*Channel).sendClosed
me.shutdowned = true

// Notify RPC if we're selecting
if e != nil {
Expand Down Expand Up @@ -161,7 +158,7 @@ func (me *Channel) open() (err error) {
// Performs a request/response call for when the message is not NoWait and is
// specified as Synchronous.
func (me *Channel) call(req message, res ...message) error {
if err := me.send(me, req); err != nil {
if err := me.send(req); err != nil {
return err
}

Expand Down Expand Up @@ -194,25 +191,22 @@ func (me *Channel) call(req message, res ...message) error {
return nil
}

func (me *Channel) sendClosed(msg message) (err error) {
func (me *Channel) send(msg message) (err error) {
me.sendM.Lock()
defer me.sendM.Unlock()

// After a 'channel.close' is sent or received the only valid response is
// channel.close-ok
if _, ok := msg.(*channelCloseOk); ok {
return me.connection.send(&methodFrame{
ChannelId: me.id,
Method: msg,
})
}

return ErrClosed
}
if me.shutdowned {
// After a 'channel.close' is sent or received the only valid response is
// channel.close-ok
if _, ok := msg.(*channelCloseOk); ok {
return me.connection.send(&methodFrame{
ChannelId: me.id,
Method: msg,
})
}

func (me *Channel) sendOpen(msg message) (err error) {
me.sendM.Lock()
defer me.sendM.Unlock()
return ErrClosed
}

if content, ok := msg.(messageWithContent); ok {
props, body := content.getContent()
Expand Down Expand Up @@ -263,19 +257,19 @@ func (me *Channel) dispatch(msg message) {
switch m := msg.(type) {
case *channelClose:
me.shutdown(newError(m.ReplyCode, m.ReplyText))
me.send(me, &channelCloseOk{})
me.send(&channelCloseOk{})

case *channelFlow:
for _, c := range me.flows {
c <- m.Active
}
me.send(me, &channelFlowOk{Active: m.Active})
me.send(&channelFlowOk{Active: m.Active})

case *basicCancel:
for _, c := range me.cancels {
c <- m.ConsumerTag
}
me.send(me, &basicCancelOk{ConsumerTag: m.ConsumerTag})
me.send(&basicCancelOk{ConsumerTag: m.ConsumerTag})

case *basicReturn:
ret := newReturn(*m)
Expand Down Expand Up @@ -1332,7 +1326,7 @@ func (me *Channel) Publish(exchange, key string, mandatory, immediate bool, msg
me.m.Lock()
defer me.m.Unlock()

if err := me.send(me, &basicPublish{
if err := me.send(&basicPublish{
Exchange: exchange,
RoutingKey: key,
Mandatory: mandatory,
Expand Down Expand Up @@ -1553,7 +1547,7 @@ is true.
See also Delivery.Ack
*/
func (me *Channel) Ack(tag uint64, multiple bool) error {
return me.send(me, &basicAck{
return me.send(&basicAck{
DeliveryTag: tag,
Multiple: multiple,
})
Expand All @@ -1567,7 +1561,7 @@ it must be redelivered or dropped.
See also Delivery.Nack
*/
func (me *Channel) Nack(tag uint64, multiple bool, requeue bool) error {
return me.send(me, &basicNack{
return me.send(&basicNack{
DeliveryTag: tag,
Multiple: multiple,
Requeue: requeue,
Expand All @@ -1582,7 +1576,7 @@ multiple messages, reducing the amount of protocol messages to exchange.
See also Delivery.Reject
*/
func (me *Channel) Reject(tag uint64, requeue bool) error {
return me.send(me, &basicReject{
return me.send(&basicReject{
DeliveryTag: tag,
Requeue: requeue,
})
Expand Down

0 comments on commit 016bf41

Please sign in to comment.