forked from dayorbyte/dispatchd
/
channelMethods.go
57 lines (50 loc) · 1.78 KB
/
channelMethods.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package server
import (
"github.com/ernestrc/dispatchd/amqp"
)
func (channel *Channel) channelRoute(methodFrame amqp.MethodFrame) *amqp.AMQPError {
switch method := methodFrame.(type) {
case *amqp.ChannelOpen:
return channel.channelOpen(method)
case *amqp.ChannelFlow:
return channel.channelFlow(method)
case *amqp.ChannelFlowOk:
return channel.channelFlowOk(method)
case *amqp.ChannelClose:
return channel.channelClose(method)
case *amqp.ChannelCloseOk:
return channel.channelCloseOk(method)
// case *amqp.ChannelOpenOk:
// return channel.channelOpenOk(method)
}
var classId, methodId = methodFrame.MethodIdentifier()
return amqp.NewHardError(540, "Unable to route method frame", classId, methodId)
}
func (channel *Channel) channelOpen(method *amqp.ChannelOpen) *amqp.AMQPError {
if channel.state == CH_STATE_OPEN {
var classId, methodId = method.MethodIdentifier()
return amqp.NewHardError(504, "Channel already open", classId, methodId)
}
channel.SendMethod(&amqp.ChannelOpenOk{})
channel.setStateOpen()
return nil
}
func (channel *Channel) channelFlow(method *amqp.ChannelFlow) *amqp.AMQPError {
channel.changeFlow(method.Active)
channel.SendMethod(&amqp.ChannelFlowOk{Active: channel.flow})
return nil
}
func (channel *Channel) channelFlowOk(method *amqp.ChannelFlowOk) *amqp.AMQPError {
var classId, methodId = method.MethodIdentifier()
return amqp.NewHardError(540, "Not implemented", classId, methodId)
}
func (channel *Channel) channelClose(method *amqp.ChannelClose) *amqp.AMQPError {
// TODO(MAY): Report the class and method that are the reason for the close
channel.SendMethod(&amqp.ChannelCloseOk{})
channel.shutdown()
return nil
}
func (channel *Channel) channelCloseOk(method *amqp.ChannelCloseOk) *amqp.AMQPError {
channel.shutdown()
return nil
}