Skip to content

Commit

Permalink
feat(mqtt): export metrics using Prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
gsalomao committed Jun 14, 2023
1 parent fdb9ec3 commit 1b51e95
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 42 deletions.
88 changes: 88 additions & 0 deletions internal/mqtt/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2023 The MaxMQ Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mqtt

import (
"sync/atomic"

"github.com/mochi-co/mqtt/v2/system"
"github.com/prometheus/client_golang/prometheus"
)

type metric struct {
metricType string
name string
help string
value *int64
}

func registerMetrics(i *system.Info) {
metrics := []metric{
{"counter", "bytes_received", "Total number of bytes received", &i.BytesReceived},
{"counter", "bytes_sent", "Total number of bytes sent", &i.BytesSent},
{"gauge", "clients_connected", "Number of currently connected clients", &i.ClientsConnected},
{"gauge", "clients_disconnected", "Total number of persistent clients", &i.ClientsDisconnected},
{"counter", "clients_maximum", "Maximum number of active clients that have been connected",
&i.ClientsMaximum},
{"gauge", "clients_total",
"Total number of connected and disconnected clients with a persistent session currently connected and registered",
&i.ClientsTotal},
{"counter", "messages_received", "Total number of publish messages received",
&i.MessagesReceived},
{"counter", "messages_sent", "Total number of publish messages sent", &i.MessagesSent},
{"counter", "messages_dropped", "Total number of publish messages dropped",
&i.MessagesDropped},
{"gauge", "retained", "Total number of retained messages active", &i.Retained},
{"gauge", "inflight", "Number of messages currently inflight", &i.Inflight},
{"counter", "inflight_dropped", "Number of inflight messages which were dropped", &i.InflightDropped},
{"gauge", "subscriptions", "Total number of subscriptions active", &i.Subscriptions},
{"counter", "packets_received", "Total number of packets received", &i.PacketsReceived},
{"counter", "packets_sent", "Total number of packets sent", &i.PacketsSent},
}

for _, m := range metrics {
m := m
fn := func() float64 {
return float64(atomic.LoadInt64(m.value))
}

switch m.metricType {
case "counter":
prometheus.MustRegister(
prometheus.NewCounterFunc(
prometheus.CounterOpts{
Namespace: "maxmq",
Subsystem: "mqtt",
Name: m.name,
Help: m.help,
},
fn,
),
)
case "gauge":
prometheus.MustRegister(
prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "maxmq",
Subsystem: "mqtt",
Name: m.name,
Help: m.help,
},
fn,
),
)
}
}
}
73 changes: 38 additions & 35 deletions internal/mqtt/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,43 +55,41 @@ func NewServer(opts ...OptionFn) (*Server, error) {
Logger: &log,
}

if s.conf != nil {
var sharedSubAvailable byte
var retainAvailable byte
var wildcardSubAvailable byte
var subIDAvailable byte

if s.conf.SharedSubscriptionAvailable {
sharedSubAvailable = 1
}
if s.conf.RetainAvailable {
retainAvailable = 1
}
if s.conf.WildcardSubscriptionAvailable {
wildcardSubAvailable = 1
}
if s.conf.SubscriptionIDAvailable {
subIDAvailable = 1
}

opt.Capabilities.MaximumMessageExpiryInterval = int64(s.conf.MaxMessageExpiryInterval)
opt.Capabilities.MaximumClientWritesPending = int32(s.conf.MaxOutboundMessages)
opt.Capabilities.MaximumSessionExpiryInterval = uint32(s.conf.MaxSessionExpiryInterval)
opt.Capabilities.MaximumPacketSize = uint32(s.conf.MaxPacketSize)
opt.Capabilities.ReceiveMaximum = uint16(s.conf.ReceiveMaximum)
opt.Capabilities.TopicAliasMaximum = uint16(s.conf.MaxTopicAlias)
opt.Capabilities.SharedSubAvailable = sharedSubAvailable
opt.Capabilities.MinimumProtocolVersion = s.conf.MinProtocolVersion
opt.Capabilities.MaximumQos = s.conf.MaximumQoS
opt.Capabilities.RetainAvailable = retainAvailable
opt.Capabilities.WildcardSubAvailable = wildcardSubAvailable
opt.Capabilities.SubIDAvailable = subIDAvailable

opt.ClientNetWriteBufferSize = s.conf.BufferSize
opt.ClientNetReadBufferSize = s.conf.BufferSize
opt.SysTopicResendInterval = int64(s.conf.SysTopicUpdateInterval)
var sharedSubAvailable byte
var retainAvailable byte
var wildcardSubAvailable byte
var subIDAvailable byte

if s.conf.SharedSubscriptionAvailable {
sharedSubAvailable = 1
}
if s.conf.RetainAvailable {
retainAvailable = 1
}
if s.conf.WildcardSubscriptionAvailable {
wildcardSubAvailable = 1
}
if s.conf.SubscriptionIDAvailable {
subIDAvailable = 1
}

opt.Capabilities.MaximumMessageExpiryInterval = int64(s.conf.MaxMessageExpiryInterval)
opt.Capabilities.MaximumClientWritesPending = int32(s.conf.MaxOutboundMessages)
opt.Capabilities.MaximumSessionExpiryInterval = uint32(s.conf.MaxSessionExpiryInterval)
opt.Capabilities.MaximumPacketSize = uint32(s.conf.MaxPacketSize)
opt.Capabilities.ReceiveMaximum = uint16(s.conf.ReceiveMaximum)
opt.Capabilities.TopicAliasMaximum = uint16(s.conf.MaxTopicAlias)
opt.Capabilities.SharedSubAvailable = sharedSubAvailable
opt.Capabilities.MinimumProtocolVersion = s.conf.MinProtocolVersion
opt.Capabilities.MaximumQos = s.conf.MaximumQoS
opt.Capabilities.RetainAvailable = retainAvailable
opt.Capabilities.WildcardSubAvailable = wildcardSubAvailable
opt.Capabilities.SubIDAvailable = subIDAvailable

opt.ClientNetWriteBufferSize = s.conf.BufferSize
opt.ClientNetReadBufferSize = s.conf.BufferSize
opt.SysTopicResendInterval = int64(s.conf.SysTopicUpdateInterval)

s.mochi = mqtt.New(opt)
_ = s.mochi.AddHook(newLoggingHook(s.log), nil)
_ = s.mochi.AddHook(new(auth.AllowHook), nil)
Expand All @@ -109,7 +107,12 @@ func (s *Server) Start() error {
return err
}

if s.conf.MetricsEnabled {
registerMetrics(s.mochi.Info)
}

_ = s.mochi.Serve()

s.log.Info().Msg("MQTT server started with success")
return nil
}
Expand Down
11 changes: 4 additions & 7 deletions internal/mqtt/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package mqtt

import (
"bytes"
"io"
"testing"

"github.com/gsalomao/maxmq/internal/logger"
Expand All @@ -30,8 +30,7 @@ func TestServerNewServer(t *testing.T) {
}

func TestServerNewServerWithOptions(t *testing.T) {
out := bytes.NewBufferString("")
log := logger.New(out, nil, logger.Pretty)
log := logger.New(io.Discard, nil, logger.Pretty)

s, err := NewServer(
WithConfig(&Config{
Expand All @@ -48,8 +47,7 @@ func TestServerNewServerWithOptions(t *testing.T) {
}

func TestServerStartAndStop(t *testing.T) {
out := bytes.NewBufferString("")
log := logger.New(out, nil, logger.Pretty)
log := logger.New(io.Discard, nil, logger.Pretty)

s, err := NewServer(WithLogger(log))
require.Nil(t, err)
Expand All @@ -60,8 +58,7 @@ func TestServerStartAndStop(t *testing.T) {
}

func TestServerStartError(t *testing.T) {
out := bytes.NewBufferString("")
log := logger.New(out, nil, logger.Pretty)
log := logger.New(io.Discard, nil, logger.Pretty)

s, err := NewServer(
WithConfig(&Config{
Expand Down

0 comments on commit 1b51e95

Please sign in to comment.