/
main.go
282 lines (248 loc) · 8.2 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
package main
/*
Copyright (c) IBM Corporation 2016, 2021
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific
Contributors:
Mark Taylor - Initial Contribution
*/
import (
"context"
"crypto/tls"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/ibm-messaging/mq-golang/v5/ibmmq"
"github.com/ibm-messaging/mq-golang/v5/mqmetric"
cf "github.com/ibm-messaging/mq-metric-samples/v5/pkg/config"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
)
var (
BuildStamp string
GitCommit string
BuildPlatform string
discoverConfig mqmetric.DiscoverConfig
usingTLS = false
server *http.Server
startChannel = make(chan bool)
collector prometheus.Collector
mutex sync.RWMutex
retryCount = 0 // Might use this with a maxRetry to force a quit out of collector
)
func main() {
var err error
cf.PrintInfo("IBM MQ metrics exporter for Prometheus monitoring", BuildStamp, GitCommit, BuildPlatform)
err = initConfig()
if err == nil && config.cf.QMgrName == "" {
log.Errorln("Must provide a queue manager name to connect to.")
os.Exit(72) // Same as strmqm "queue manager name error"
}
if err != nil {
log.Error(err)
} else {
setConnectedOnce(false)
setConnectedQMgr(false)
setCollectorEnd(false)
setFirstCollection(false)
setCollectorSilent(false)
// Start the webserver in a separate thread
go startServer()
// This is the main loop that tries to keep the collector connected to a queue manager
// even after a failure.
for !isCollectorEnd() {
log.Debugf("In main loop: qMgrConnected=%v", isConnectedQMgr())
err = nil // Start clean on each loop
// The callback will set this flag to false if there's an error while
// processing the messages.
if !isConnectedQMgr() {
mutex.Lock()
if err == nil {
mqmetric.EndConnection()
// Connect and open standard queues. If we're going to manage reconnection from
// this collector, then turn off the MQ client automatic option
if config.keepRunning {
config.cf.CC.SingleConnect = true
} else {
config.cf.CC.SingleConnect = false
}
err = mqmetric.InitConnection(config.cf.QMgrName, config.cf.ReplyQ, config.cf.ReplyQ2, &config.cf.CC)
if err == nil {
log.Infoln("Connected to queue manager " + config.cf.QMgrName)
} else {
if mqe, ok := err.(mqmetric.MQMetricError); ok {
mqrc := mqe.MQReturn.MQRC
mqcc := mqe.MQReturn.MQCC
if mqrc == ibmmq.MQRC_STANDBY_Q_MGR {
log.Errorln(err)
os.Exit(30) // This is the same as the strmqm return code for "active instance running elsewhere"
} else if mqcc == ibmmq.MQCC_WARNING {
log.Infoln("Connected to queue manager " + config.cf.QMgrName)
log.Errorln(err)
err = nil
}
}
}
}
if err == nil {
retryCount = 0
defer mqmetric.EndConnection()
}
// What metrics can the queue manager provide? Find out, and subscribe.
if err == nil {
// Do we need to expand wildcarded queue names
// or use the wildcard as-is in the subscriptions
wildcardResource := true
if config.cf.MetaPrefix != "" {
wildcardResource = false
}
mqmetric.SetLocale(config.cf.Locale)
discoverConfig.MonitoredQueues.ObjectNames = config.cf.MonitoredQueues
discoverConfig.MonitoredQueues.SubscriptionSelector = strings.ToUpper(config.cf.QueueSubscriptionSelector)
discoverConfig.MonitoredQueues.UseWildcard = wildcardResource
discoverConfig.MetaPrefix = config.cf.MetaPrefix
err = mqmetric.DiscoverAndSubscribe(discoverConfig)
mqmetric.RediscoverAttributes(ibmmq.MQOT_CHANNEL, config.cf.MonitoredChannels)
log.Debugf("Returned from RediscoverAttributes with error %v", err)
}
if err == nil {
var compCode int32
compCode, err = mqmetric.VerifyConfig()
// We could choose to fail after a warning, but instead will continue for now
if compCode == ibmmq.MQCC_WARNING {
log.Println(err)
err = nil
}
}
// Once everything has been discovered, and the subscriptions
// created, allocate the Prometheus gauges for each resource. If this is
// a reconnect, then we clean up and create a new collector with new gauges
if err == nil {
allocateAllGauges()
if collector != nil {
setCollectorSilent(true)
prometheus.Unregister(collector)
setCollectorSilent(false)
}
collector = newExporter()
setFirstCollection(true)
prometheus.MustRegister(collector)
setConnectedQMgr(true)
if !isConnectedOnce() {
startChannel <- true
setConnectedOnce(true)
}
} else {
if !isConnectedOnce() || !config.keepRunning {
// If we've never successfully connected, then exit instead
// of retrying as it probably means a config error
log.Errorf("Connection to %s has failed. %v", config.cf.QMgrName, err)
setCollectorEnd(true)
} else {
log.Debug("Sleeping a bit after a failure")
retryCount++
time.Sleep(config.reconnectIntervalDuration)
}
}
mutex.Unlock()
} else {
log.Debug("Sleeping a bit while connected")
time.Sleep(config.reconnectIntervalDuration)
}
}
}
log.Info("Done.")
if err != nil {
os.Exit(10)
} else {
os.Exit(0)
}
}
func startServer() {
var err error
// This function starts a new thread to handle the web server that will then run
// permanently and drive the exporter callback that processes the metric messages
// Need to wait until signalled by the main thread that it's setup the gauges
log.Debug("HTTP server - waiting until MQ connection ready")
<-startChannel
http.Handle(config.httpMetricPath, promhttp.Handler())
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write(landingPage())
})
address := config.httpListenHost + ":" + config.httpListenPort
if config.httpsKeyFile == "" && config.httpsCertFile == "" {
usingTLS = false
} else {
usingTLS = true
}
if usingTLS {
// TLS has been enabled for the collector (which is acting as a TLS Server)
// So we setup the TLS configuration from the keystores and let Prometheus
// contact us over the https protocol.
cert, err := tls.LoadX509KeyPair(config.httpsCertFile, config.httpsKeyFile)
if err == nil {
server = &http.Server{Addr: address,
Handler: nil,
// More fields could be added here for further control of the connection
TLSConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
},
}
} else {
log.Fatal(err)
}
} else {
server = &http.Server{Addr: address,
Handler: nil,
}
}
// And now we can start the protocol-appropriate server
if usingTLS {
log.Infoln("Listening on https address", address)
err = server.ListenAndServeTLS("", "")
if err != nil && err != http.ErrServerClosed {
log.Fatalf("Metrics Error: Failed to handle metrics request: %v", err)
stopServer()
}
} else {
log.Infoln("Listening on http address", address)
err = server.ListenAndServe()
log.Fatalf("Metrics Error: Failed to handle metrics request: %v", err)
stopServer()
}
}
// Shutdown HTTP server
func stopServer() {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := server.Shutdown(timeout)
if err != nil {
log.Errorf("Failed to shutdown metrics server: %v", err)
}
setCollectorEnd(true)
}
/*
landingPage gives a very basic response if someone just connects to our port.
The only link on it jumps to the list of available metrics.
*/
func landingPage() []byte {
return []byte(
`<html>
<head><title>IBM MQ metrics exporter for Prometheus</title></head>
<body>
<h1>IBM MQ metrics exporter for Prometheus</h1>
<p><a href='` + config.httpMetricPath + `'>Metrics</a></p>
</body>
</html>
`)
}