Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
8 changes: 4 additions & 4 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 14 additions & 26 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package metrics
import (
"fmt"
"net/http"
"sync"
"time"

"github.com/ibm-messaging/mq-container/internal/logger"
"github.com/ibm-messaging/mq-golang/mqmetric"
"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -35,7 +35,6 @@ const (

// GatherMetrics gathers metrics for the queue manager
func GatherMetrics(qmName string, log *logger.Logger) {

for i := 0; i <= retryCount; i++ {
err := startMetricsGathering(qmName, log)
if err != nil {
Expand All @@ -52,36 +51,20 @@ func GatherMetrics(qmName string, log *logger.Logger) {

// startMetricsGathering starts gathering metrics for the queue manager
func startMetricsGathering(qmName string, log *logger.Logger) error {
var wg sync.WaitGroup

defer func() {
if r := recover(); r != nil {
log.Errorf("Metrics Error: %v", r)
}
}()

log.Println("Starting metrics gathering")

// Set connection configuration
var connConfig mqmetric.ConnectionConfig
connConfig.ClientMode = false
connConfig.UserId = ""
connConfig.Password = ""

// Connect to the queue manager - open the command and dynamic reply queues
err := mqmetric.InitConnectionStats(qmName, "SYSTEM.DEFAULT.MODEL.QUEUE", "", &connConfig)
if err != nil {
return fmt.Errorf("Failed to connect to queue manager %s: %v", qmName, err)
}
defer mqmetric.EndConnection()

// Discover available metrics for the queue manager and subscribe to them
err = mqmetric.DiscoverAndSubscribe("", true, "")
if err != nil {
return fmt.Errorf("Failed to discover and subscribe to metrics: %v", err)
}

// Start processing metrics
go processMetrics(log)
wg.Add(1)
go processMetrics(log, qmName, &wg)

// Wait for metrics to be ready before starting the prometheus handler
wg.Wait()

// Register metrics
prometheus.MustRegister(newExporter(qmName))
Expand All @@ -92,6 +75,11 @@ func startMetricsGathering(qmName string, log *logger.Logger) error {
w.WriteHeader(200)
w.Write([]byte("Status: METRICS ACTIVE"))
})
err = http.ListenAndServe(":"+defaultPort, nil)
return fmt.Errorf("Failed to handle metrics request: %v", err)

err := http.ListenAndServe(":"+defaultPort, nil)
if err != nil {
return fmt.Errorf("Failed to handle metrics request: %v", err)
}

return nil
}
74 changes: 59 additions & 15 deletions internal/metrics/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ limitations under the License.
package metrics

import (
"fmt"
"strings"
"sync"
"time"

"github.com/ibm-messaging/mq-container/internal/logger"
Expand All @@ -42,26 +44,68 @@ type metricData struct {
values map[string]float64
}

// processMetrics processes publications of metric data and handles describe/collect requests
func processMetrics(log *logger.Logger) {
var keepRunning = true
var first = true

func doConnect(qmName string) error {
// Set connection configuration
var connConfig mqmetric.ConnectionConfig
connConfig.ClientMode = false
connConfig.UserId = ""
connConfig.Password = ""

// Connect to the queue manager - open the command and dynamic reply queues
err := mqmetric.InitConnectionStats(qmName, "SYSTEM.DEFAULT.MODEL.QUEUE", "", &connConfig)
if err != nil {
return fmt.Errorf("Failed to connect to queue manager %s: %v", qmName, err)
}

// Discover available metrics for the queue manager and subscribe to them
err = mqmetric.DiscoverAndSubscribe("", true, "")
if err != nil {
return fmt.Errorf("Failed to discover and subscribe to metrics: %v", err)
}

return nil
}

// Initialise metrics
metrics := initialiseMetrics()
// processMetrics processes publications of metric data and handles describe/collect requests
func processMetrics(log *logger.Logger, qmName string, wg *sync.WaitGroup) {
var err error
var metrics map[string]*metricData

for keepRunning {
err = doConnect(qmName)
if err == nil {
if first {
first = false
wg.Done()
}
metrics = initialiseMetrics()
}

for {
// Process publications of metric data
mqmetric.ProcessPublications()
// now loop until something goes wrong
for err == nil {
// Process publications of metric data
err = mqmetric.ProcessPublications()

// Handle describe/collect requests
select {
case collect := <-requestChannel:
if collect {
updateMetrics(metrics)
// Handle describe/collect requests
select {
case collect := <-requestChannel:
if collect {
updateMetrics(metrics)
}
responseChannel <- metrics
case <-time.After(requestTimeout * time.Second):
log.Debugf("Metrics: No requests received within timeout period (%d seconds)", requestTimeout)
}
responseChannel <- metrics
case <-time.After(requestTimeout * time.Second):
log.Debugf("Metrics: No requests received within timeout period (%d seconds)", requestTimeout)
}

// Close the connection
mqmetric.EndConnection()

//If we're told to keep runnign sleep for a bit before trying again
time.Sleep(10 * time.Second)
}
}

Expand Down
90 changes: 52 additions & 38 deletions test/docker/docker_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ limitations under the License.
package main

import (
"archive/tar"
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"path/filepath"
"regexp"
"strconv"
Expand Down Expand Up @@ -74,16 +72,31 @@ func TestLicenseView(t *testing.T) {
}
}

// TestGoldenPath starts a queue manager successfully
func TestGoldenPath(t *testing.T) {
// TestGoldenPath starts a queue manager successfully when metrics are enabled
func TestGoldenPathWithMetrics(t *testing.T) {
t.Parallel()
goldenPath(t, true)
}

// TestGoldenPath starts a queue manager successfully when metrics are disabled
func TestGoldenPathNoMetrics(t *testing.T) {
t.Parallel()
goldenPath(t, false)
}

// Actual test function for TestGoldenPathNoMetrics & TestGoldenPathWithMetrics
func goldenPath(t *testing.T, metric bool) {
cli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
containerConfig := container.Config{
Env: []string{"LICENSE=accept", "MQ_QMGR_NAME=qm1"},
}
if metric {
containerConfig.Env = append(containerConfig.Env, "MQ_ENABLE_METRICS=true")
}

id := runContainer(t, cli, &containerConfig)
defer cleanContainer(t, cli, id)
waitForReady(t, cli, id)
Expand Down Expand Up @@ -143,9 +156,21 @@ func TestNoQueueManagerNameInvalidHostname(t *testing.T) {
}

// TestWithVolume runs a container with a Docker volume, then removes that
// container and starts a new one with same volume.
func TestWithVolume(t *testing.T) {
// container and starts a new one with same volume. With metrics enabled
func TestWithVolumeAndMetrics(t *testing.T) {
t.Parallel()
withVolume(t, true)
}

// TestWithVolume runs a container with a Docker volume, then removes that
// container and starts a new one with same volume. With metrics disabled
func TestWithVolumeNoMetrics(t *testing.T) {
t.Parallel()
withVolume(t, false)
}

// Actual test function for TestWithVolumeNoMetrics & TestWithVolumeAndMetrics
func withVolume(t *testing.T, metric bool) {
cli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
Expand All @@ -156,6 +181,10 @@ func TestWithVolume(t *testing.T) {
Image: imageName(),
Env: []string{"LICENSE=accept", "MQ_QMGR_NAME=qm1"},
}
if metric {
containerConfig.Env = append(containerConfig.Env, "MQ_ENABLE_METRICS=true")
}

hostConfig := container.HostConfig{
Binds: []string{
coverageBind(t),
Expand Down Expand Up @@ -423,37 +452,6 @@ func TestReadiness(t *testing.T) {
}
}

func countLines(t *testing.T, r io.Reader) int {
scanner := bufio.NewScanner(r)
count := 0
for scanner.Scan() {
count++
}
err := scanner.Err()
if err != nil {
t.Fatal(err)
}
return count
}

func countTarLines(t *testing.T, b []byte) int {
r := bytes.NewReader(b)
tr := tar.NewReader(r)
total := 0
for {
_, err := tr.Next()
if err == io.EOF {
// End of TAR
break
}
if err != nil {
t.Fatal(err)
}
total += countLines(t, tr)
}
return total
}

func TestErrorLogRotation(t *testing.T) {
t.Parallel()
cli, err := client.NewEnvClient()
Expand Down Expand Up @@ -518,8 +516,20 @@ func TestErrorLogRotation(t *testing.T) {
}
}

func TestJSONLogFormat(t *testing.T) {
// Tests the log comes out in JSON format when JSON format is enabled. With metrics enabled
func TestJSONLogFormatWithMetrics(t *testing.T) {
t.Parallel()
jsonLogFormat(t, true)
}

// Tests the log comes out in JSON format when JSON format is enabled. With metrics disabled
func TestJSONLogFormatNoMetrics(t *testing.T) {
t.Parallel()
jsonLogFormat(t, false)
}

// Actual test function for TestJSONLogFormatWithMetrics & TestJSONLogFormatNoMetrics
func jsonLogFormat(t *testing.T, metric bool) {
cli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
Expand All @@ -530,6 +540,10 @@ func TestJSONLogFormat(t *testing.T) {
"LOG_FORMAT=json",
},
}
if metric {
containerConfig.Env = append(containerConfig.Env, "MQ_ENABLE_METRICS=true")
}

id := runContainer(t, cli, &containerConfig)
defer cleanContainer(t, cli, id)
waitForReady(t, cli, id)
Expand Down
31 changes: 31 additions & 0 deletions test/docker/docker_api_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,3 +571,34 @@ func getWebPort(t *testing.T, cli *client.Client, ID string) string {
}
return i.NetworkSettings.Ports["9443/tcp"][0].HostPort
}

func countLines(t *testing.T, r io.Reader) int {
scanner := bufio.NewScanner(r)
count := 0
for scanner.Scan() {
count++
}
err := scanner.Err()
if err != nil {
t.Fatal(err)
}
return count
}

func countTarLines(t *testing.T, b []byte) int {
r := bytes.NewReader(b)
tr := tar.NewReader(r)
total := 0
for {
_, err := tr.Next()
if err == io.EOF {
// End of TAR
break
}
if err != nil {
t.Fatal(err)
}
total += countLines(t, tr)
}
return total
}
Loading