forked from ghetzel/qcat
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
302 lines (274 loc) · 8.16 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
package main
import (
"encoding/json"
"fmt"
"os"
"github.com/urfave/cli"
"github.com/PerformLine/go-stockutil/log"
"github.com/PerformLine/go-stockutil/stringutil"
"github.com/PerformLine/go-stockutil/typeutil"
"github.com/PerformLine/qcat"
)
func createAmqpClient(c *cli.Context) (*qcat.AMQP, error) {
if len(c.Args()) > 0 {
if client, err := qcat.NewAMQP(c.Args()[0]); err == nil {
client.ConnectTimeout = c.Duration(`connect-timeout`)
client.Autodelete = c.Bool(`autodelete`)
client.Durable = c.Bool(`durable`)
client.Exclusive = c.Bool(`exclusive`)
client.Immediate = c.Bool(`immediate`)
client.Mandatory = c.Bool(`mandatory`)
client.ID = c.String(`consumer`)
client.QueueName = c.String(`queue`)
client.ExchangeName = c.String(`exchange`)
client.RoutingKey = c.String(`routing-key`)
client.Prefetch = c.Int(`prefetch`)
client.HeartbeatInterval = c.Duration(`heartbeat`)
for _, property := range c.StringSlice(`property`) {
key, value := stringutil.SplitPair(property, `=`)
client.ClientProperties[key] = stringutil.Autotype(value)
}
log.Debugf("Connecting to %s:%d vhost=%s queue=%s", client.Host, client.Port, client.Vhost, client.QueueName)
if err := client.Connect(); err == nil {
return client, nil
} else {
return nil, fmt.Errorf("Error connecting to consumer: %v", err)
}
} else {
return nil, fmt.Errorf("Error initializing consumer: %v", err)
}
} else {
return nil, fmt.Errorf("Must provide an AMQP connection URI as an argument")
}
}
func FlagsForConsumers() []cli.Flag {
return []cli.Flag{
cli.StringFlag{
Name: `consumer, C`,
Usage: `The consumer name to report to the broker`,
},
cli.StringFlag{
Name: `queue, Q`,
Usage: `The name of the queue to bind to`,
Value: qcat.DefaultQueueName,
},
cli.IntFlag{
Name: `prefetch, p`,
Usage: `The number of items to prefetch from the queue`,
Value: 1,
},
cli.BoolFlag{
Name: `raw`,
Usage: `Dump the whole recevied messsage.`,
},
}
}
func FlagsForPublishers() []cli.Flag {
return []cli.Flag{
cli.StringFlag{
Name: `exchange, e`,
Usage: `The name of the exchange to bind to`,
},
cli.StringFlag{
Name: `routing-key, r`,
Usage: `The routing key to use when publishing messages`,
Value: qcat.DefaultQueueName,
},
cli.StringFlag{
Name: `content-type`,
Usage: `The Content-Type header to include with published messages`,
},
cli.StringFlag{
Name: `content-encoding`,
Usage: `The Content-Encoding header to include with published messages`,
},
cli.DurationFlag{
Name: `ttl, t`,
Usage: `The maximum amount of time the message will live in a queue before being automatically deleted`,
},
cli.IntFlag{
Name: `priority`,
Usage: `The priority level (0-9) that will be supplied with the published message`,
},
cli.BoolFlag{
Name: `mandatory, M`,
Usage: `Messages are undeliverable when the mandatory flag is true and no queue is bound that matches the routing key`,
},
cli.BoolFlag{
Name: `immediate, I`,
Usage: `Messages are undeliverable when the immediate flag is true and no consumer on the matched queue is ready to accept the delivery`,
},
cli.BoolFlag{
Name: `persistent, P`,
Usage: `Persistent messages are written to disk such that in the event of a broker crash the message is not lost`,
},
cli.StringSliceFlag{
Name: `header, H`,
Usage: `A key=value pair that will be set as a message header.`,
},
}
}
func FlagsCommon() []cli.Flag {
return []cli.Flag{
cli.BoolFlag{
Name: `durable, D`,
Usage: `Durable queues will survive server restarts and remain when there are no remaining consumers or bindings`,
},
cli.BoolFlag{
Name: `autodelete, A`,
Usage: `Auto-deleted queues will be automatically removed when all clients disconnect`,
},
cli.BoolFlag{
Name: `exclusive, E`,
Usage: `Exclusive queues are only accessible by the connection that declares them and will be deleted when the connection closes`,
},
cli.DurationFlag{
Name: `heartbeat`,
Usage: `Specify on what interval to send heartbeat pings.`,
},
cli.StringSliceFlag{
Name: `property, c`,
Usage: `Specify a client property to send to the server as a key=value pair.`,
},
cli.DurationFlag{
Name: `connect-timeout, T`,
Usage: `How long to wait before timing out a connection attempt.`,
Value: qcat.DefaultConnectTimeout,
},
}
}
func headerFromContext(c *cli.Context) qcat.MessageHeader {
header := qcat.MessageHeader{}
if c.IsSet(`persistent`) {
if c.Bool(`persistent`) {
header.DeliveryMode = qcat.Persistent
} else {
header.DeliveryMode = qcat.Transient
}
}
if c.Duration(`ttl`) > 0 {
header.Expiration = c.Duration(`ttl`)
}
if c.IsSet(`priority`) {
header.Priority = c.Int(`priority`)
}
if c.IsSet(`content-type`) {
header.ContentType = c.String(`content-type`)
}
if c.IsSet(`content-encoding`) {
header.ContentEncoding = c.String(`content-encoding`)
}
for _, pair := range c.StringSlice(`header`) {
if header.Headers == nil {
header.Headers = make(map[string]interface{})
}
if k, v := stringutil.SplitPair(pair, `=`); v != `` {
header.Headers[k] = typeutil.Auto(v)
}
}
return header
}
func main() {
app := cli.NewApp()
app.Name = `qcat`
app.Usage = `utility for publishing and consuming data from an AMQP message broker`
app.Version = qcat.Version
app.EnableBashCompletion = true
app.Before = func(c *cli.Context) error {
log.SetLevelString(c.String(`log-level`))
return nil
}
app.Flags = []cli.Flag{
cli.StringFlag{
Name: `log-level, L`,
Usage: `Level of log output verbosity`,
Value: `info`,
EnvVar: `LOGLEVEL`,
},
}
app.Commands = []cli.Command{
{
Name: `publish`,
Usage: `Connect to an AMQP message broker and submit messages read from standard input`,
Flags: append(FlagsCommon(), FlagsForPublishers()...),
ArgsUsage: `AMQP_URI`,
Action: func(c *cli.Context) {
if client, err := createAmqpClient(c); err == nil {
header := headerFromContext(c)
if err := client.PublishLines(os.Stdin, header); err != nil {
log.Fatalf("Error publishing: %v", err)
}
} else {
log.Fatalf("%v", err)
}
},
}, {
Name: `consume`,
Usage: `Connect to an AMQP message broker and print messages to standard output`,
Flags: append(FlagsCommon(), FlagsForConsumers()...),
ArgsUsage: `AMQP_URI`,
Action: func(c *cli.Context) {
if client, err := createAmqpClient(c); err == nil {
if err := client.Subscribe(); err == nil {
for {
select {
case message := <-client.Receive():
if c.Bool(`raw`) {
if data, err := json.Marshal(message); err == nil {
fmt.Println(string(data))
} else {
log.Fatalf("malformed message: %v", err)
}
} else {
var line string
if err := message.Decode(&line); err == nil {
fmt.Println(line)
} else {
log.Fatalf("failed to decode message: %v", err)
}
}
case err := <-client.Err():
log.Fatal(err)
}
}
} else {
log.Fatalf("Error subscribing: %v", err)
}
} else {
log.Fatalf("%v", err)
}
},
}, {
Name: `serve`,
Usage: `Start an HTTP server for receiving and consuming messages from an AMQP message broker`,
ArgsUsage: `AMQP_URI`,
Flags: append([]cli.Flag{
cli.StringFlag{
Name: `address, a`,
Usage: `The address to listen on`,
Value: qcat.DefaultServerAddress,
},
}, append(FlagsCommon(), append(FlagsForPublishers(), FlagsForConsumers()...)...)...),
Action: func(c *cli.Context) {
if client, err := createAmqpClient(c); err == nil {
server := qcat.NewHttpServer(client)
server.BaseHeader = headerFromContext(c)
if err := server.ListenAndServe(c.String(`address`)); err != nil {
log.Fatalf("%v", err)
}
} else {
log.Fatalf("%v", err)
}
},
}, {
Name: `version`,
Usage: `Output the current version and exit`,
Action: func(c *cli.Context) {
fmt.Println(app.Version)
},
},
}
// load plugin subcommands
// app.Commands = append(app.Commands, api.Register()...)
app.Run(os.Args)
}