-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
amqp_structs.go
225 lines (192 loc) · 4.62 KB
/
amqp_structs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package amqp
import (
"time"
"github.com/elastic/beats/libbeat/common"
)
type amqpMethod func(*amqpMessage, []byte) (bool, bool)
const (
transactionsHashSize = 2 ^ 16
transactionTimeout = 10 * 1e9
)
//layout used when a timestamp must be parsed
const (
amqpTimeLayout = "January _2 15:04:05 2006"
)
//Frame types and codes
type frameType byte
const (
methodType frameType = 1
headerType frameType = 2
bodyType frameType = 3
heartbeatType frameType = 8
)
const (
frameEndOctet byte = 206
)
//Codes for MethodMap
type codeClass uint16
const (
connectionCode codeClass = 10
channelCode codeClass = 20
exchangeCode codeClass = 40
queueCode codeClass = 50
basicCode codeClass = 60
txCode codeClass = 90
)
type codeMethod uint16
const (
connectionStart codeMethod = 10
connectionStartOk codeMethod = 11
connectionTune codeMethod = 30
connectionTuneOk codeMethod = 31
connectionOpen codeMethod = 40
connectionOpenOk codeMethod = 41
connectionClose codeMethod = 50
connectionCloseOk codeMethod = 51
)
const (
channelOpen codeMethod = 10
channelOpenOk codeMethod = 11
channelFlow codeMethod = 20
channelFlowOk codeMethod = 21
channelClose codeMethod = 40
channelCloseOk codeMethod = 41
)
const (
exchangeDeclare codeMethod = 10
exchangeDeclareOk codeMethod = 11
exchangeDelete codeMethod = 20
exchangeDeleteOk codeMethod = 21
exchangeBind codeMethod = 30
exchangeBindOk codeMethod = 31
exchangeUnbind codeMethod = 40
exchangeUnbindOk codeMethod = 51
)
const (
queueDeclare codeMethod = 10
queueDeclareOk codeMethod = 11
queueBind codeMethod = 20
queueBindOk codeMethod = 21
queuePurge codeMethod = 30
queuePurgeOk codeMethod = 31
queueDelete codeMethod = 40
queueDeleteOk codeMethod = 41
queueUnbind codeMethod = 50
queueUnbindOk codeMethod = 51
)
const (
basicQos codeMethod = 10
basicQosOk codeMethod = 11
basicConsume codeMethod = 20
basicConsumeOk codeMethod = 21
basicCancel codeMethod = 30
basicCancelOk codeMethod = 31
basicPublish codeMethod = 40
basicReturn codeMethod = 50
basicDeliver codeMethod = 60
basicGet codeMethod = 70
basicGetOk codeMethod = 71
basicGetEmpty codeMethod = 72
basicAck codeMethod = 80
basicReject codeMethod = 90
basicRecover codeMethod = 110
basicRecoverOk codeMethod = 111
basicNack codeMethod = 120
)
const (
txSelect codeMethod = 10
txSelectOk codeMethod = 11
txCommit codeMethod = 20
txCommitOk codeMethod = 21
txRollback codeMethod = 30
txRollbackOk codeMethod = 31
)
//Message properties codes for byte prop1 in getMessageProperties
const (
expirationProp byte = 1
replyToProp byte = 2
correlationIDProp byte = 4
priorityProp byte = 8
deliveryModeProp byte = 16
headersProp byte = 32
contentEncodingProp byte = 64
contentTypeProp byte = 128
)
//Message properties codes for byte prop2 in getMessageProperties
const (
appIDProp byte = 8
userIDProp byte = 16
typeProp byte = 32
timestampProp byte = 64
messageIDProp byte = 128
)
//table types
const (
boolean = 't'
shortShortInt = 'b'
shortShortUint = 'B'
shortInt = 'U'
shortUint = 'u'
longInt = 'I'
longUint = 'i'
longLongInt = 'L'
longLongUint = 'l'
float = 'f'
double = 'd'
decimal = 'D'
shortString = 's'
longString = 'S'
fieldArray = 'A'
timestamp = 'T'
fieldTable = 'F'
noField = 'V'
byteArray = 'x' //rabbitMQ specific field
)
type amqpPrivateData struct {
data [2]*amqpStream
}
type amqpFrame struct {
Type frameType
// channel uint16 (frame channel is currently ignored)
size uint32
content []byte
}
type amqpMessage struct {
ts time.Time
tcpTuple common.TCPTuple
cmdlineTuple *common.CmdlineTuple
method string
isRequest bool
request string
direction uint8
parseArguments bool
//mapstr containing all the options for the methods and header fields
fields common.MapStr
body []byte
bodySize uint64
notes []string
}
// represent a stream of data to be parsed
type amqpStream struct {
data []byte
parseOffset int
message *amqpMessage
}
// contains the result of parsing
type amqpTransaction struct {
tuple common.TCPTuple
src common.Endpoint
dst common.Endpoint
ts time.Time
method string
request string
response string
responseTime int32
body []byte
bytesOut uint64
bytesIn uint64
toString bool
notes []string
amqp common.MapStr
timer *time.Timer
}