Skip to content

Commit

Permalink
Merge pull request #14 from cosminrentea/feature/prometheus-router
Browse files Browse the repository at this point in the history
prometheus - router
  • Loading branch information
cosminrentea committed Mar 21, 2017
2 parents a74bed7 + 9dd9adc commit ccca983
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 1 deletion.
4 changes: 4 additions & 0 deletions server/router/route.go
Expand Up @@ -80,12 +80,14 @@ func (r *Route) Deliver(msg *protocol.Message, isFromStore bool) error {
if r.isInvalid() {
loggerMessage.Error("Cannot deliver because route is invalid")
mTotalDeliverMessageErrors.Add(1)
pDeliverMessageErrors.Inc()
return ErrInvalidRoute
}

if !r.messageFilter(msg) {
loggerMessage.Debug("Message filter didn't match route")
mTotalNotMatchedByFilters.Add(1)
pNotMatchedByFilters.Inc()
return nil
}
// not an infinite queue
Expand All @@ -97,6 +99,7 @@ func (r *Route) Deliver(msg *protocol.Message, isFromStore bool) error {
loggerMessage.Error("Closing route because queue is full")
r.Close()
mTotalDeliverMessageErrors.Add(1)
pDeliverMessageErrors.Inc()
return ErrQueueFull
}
}
Expand Down Expand Up @@ -276,6 +279,7 @@ func (r *Route) consume() {
if r.isInvalid() {
r.logger.Debug("Stopping to consume because route is invalid.")
mTotalDeliverMessageErrors.Add(1)
pDeliverMessageErrors.Inc()
return
}

Expand Down
19 changes: 19 additions & 0 deletions server/router/router.go
Expand Up @@ -158,6 +158,8 @@ func (router *router) HandleMessage(message *protocol.Message) error {
"path": message.Path}).Debug("HandleMessage")

mTotalMessagesIncoming.Add(1)
pMessagesIncoming.Inc()

if err := router.isStopping(); err != nil {
logger.WithField("error", err.Error()).Error("Router is stopping")
return err
Expand All @@ -169,13 +171,16 @@ func (router *router) HandleMessage(message *protocol.Message) error {
}

mTotalMessagesIncomingBytes.Add(int64(len(message.Encode())))
pMessagesIncomingBytes.Add(float64(len(message.Encode())))
size, err := router.messageStore.StoreMessage(message, nodeID)
if err != nil {
logger.WithField("error", err.Error()).Error("Error storing message")
mTotalMessageStoreErrors.Add(1)
pMessageStoreErrors.Inc()
return err
}
mTotalMessagesStoredBytes.Add(int64(size))
pMessagesStoredBytes.Add(float64(size))

router.handleOverloadedChannel()

Expand Down Expand Up @@ -235,6 +240,7 @@ func (router *router) GetSubscribers(topicPath string) ([]byte, error) {
func (router *router) subscribe(r *Route) {
logger.WithField("route", r).Debug("Internal subscribe")
mTotalSubscriptionAttempts.Add(1)
pSubscriptionAttempts.Inc()

routePath := r.Path
slice, present := router.routes[routePath]
Expand All @@ -247,37 +253,47 @@ func (router *router) subscribe(r *Route) {
slice = make([]*Route, 0, 1)
router.routes[routePath] = slice
mCurrentRoutes.Add(1)
pRoutes.Inc()
}
router.routes[routePath] = append(slice, r)
if removed {
mTotalDuplicateSubscriptionsAttempts.Add(1)
pDuplicateSubscriptionAttempts.Inc()
} else {
mTotalSubscriptions.Add(1)
pTotalSubscriptions.Inc()
mCurrentSubscriptions.Add(1)
pSubscriptions.Inc()
}
}

func (router *router) unsubscribe(r *Route) {
logger.WithField("route", r).Debug("Internal unsubscribe")
mTotalUnsubscriptionAttempts.Add(1)
pUnsubscriptionAttempts.Inc()

routePath := r.Path
slice, present := router.routes[routePath]
if !present {
mTotalInvalidTopicOnUnsubscriptionAttempts.Add(1)
pInvalidTopicOnUnsubscriptionAttempts.Inc()
return
}
var removed bool
router.routes[routePath], removed = removeIfMatching(slice, r)
if removed {
mTotalUnsubscriptions.Add(1)
pTotalUnsubscriptions.Inc()
mCurrentSubscriptions.Add(-1)
pSubscriptions.Dec()
} else {
mTotalInvalidUnsubscriptionAttempts.Add(1)
pInvalidUnsubscriptionAttempts.Inc()
}
if len(router.routes[routePath]) == 0 {
delete(router.routes, routePath)
mCurrentRoutes.Add(-1)
pRoutes.Dec()
}
}

Expand Down Expand Up @@ -322,6 +338,7 @@ func (router *router) handleMessage(message *protocol.Message) {
})
flog.Debug("Called routeMessage for data")
mTotalMessagesRouted.Add(1)
pMessagesRouted.Inc()

matched := false
for path, pathRoutes := range router.routes {
Expand All @@ -339,6 +356,7 @@ func (router *router) handleMessage(message *protocol.Message) {
if !matched {
flog.Debug("No route matched.")
mTotalMessagesNotMatchingTopic.Add(1)
pMessagesNotMatchingTopic.Inc()
}
}

Expand All @@ -361,6 +379,7 @@ func (router *router) handleOverloadedChannel() {
"maxCapacity": cap(router.handleC),
}).Warn("handleC channel is almost full")
mTotalOverloadedHandleChannel.Add(1)
pOverloadedHandleChannel.Inc()
}
}

Expand Down
120 changes: 120 additions & 0 deletions server/router/router_prometheus.go
@@ -0,0 +1,120 @@
package router

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
pSubscriptionAttempts = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_subscription_attempts",
Help: "Number of subscription attempts in router",
})

pDuplicateSubscriptionAttempts = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_duplicate_subscription_attempts",
Help: "Number of duplicate subscription attempts in router",
})

pTotalSubscriptions = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_total_subscriptions",
Help: "Total number of subscriptions in router",
})

pUnsubscriptionAttempts = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_total_unsubscription_attempts",
Help: "Number of unsubscriptions attempts in router",
})

pInvalidTopicOnUnsubscriptionAttempts = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_total_unsubscription_attempts_invalid_topic",
Help: "Number of unsubscriptions attempts having invalid topic in router",
})

pInvalidUnsubscriptionAttempts = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_total_unsubscription_attempts_invalid",
Help: "Number of invalid subscription attempts in router",
})

pTotalUnsubscriptions = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_total_unsubscriptions",
Help: "Total number of unsubscriptions in router",
})

pSubscriptions = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "router_current_subscriptions",
Help: "Number of active subscriptions in router",
})

pRoutes = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "router_current_routes",
Help: "Number of active routes in router",
})

pMessagesIncoming = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_messages_incoming",
Help: "Number of incoming messages in router",
})

pMessagesIncomingBytes = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_messages_bytes_incoming",
Help: "Number of bytes in incoming messages in router",
})

pMessagesStoredBytes = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_messages_bytes_stored",
Help: "Number of bytes stored from incoming messages in router",
})

pMessagesRouted = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_messages_routed",
Help: "Number of messages routed in router",
})

pOverloadedHandleChannel = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_overloaded_handle_channel",
Help: "Number of overloaded handle-channel occurrences in router",
})

pMessagesNotMatchingTopic = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_messages_not_matching_topic",
Help: "Number of messages not matching a topic in router",
})

pMessageStoreErrors = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_errors_message_store",
Help: "Number of errors related to the message-store in the router",
})

pDeliverMessageErrors = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_errors_deliver_message",
Help: "Number of errors when trying to deliver the message in router",
})

pNotMatchedByFilters = prometheus.NewCounter(prometheus.CounterOpts{
Name: "router_not_matched_by_filters",
Help: "Number of messages not matched by any filters in the router",
})
)

func init() {
prometheus.MustRegister(
pSubscriptionAttempts,
pDuplicateSubscriptionAttempts,
pTotalSubscriptions,
pUnsubscriptionAttempts,
pInvalidTopicOnUnsubscriptionAttempts,
pInvalidUnsubscriptionAttempts,
pTotalUnsubscriptions,
pSubscriptions,
pRoutes,
pMessagesIncoming,
pMessagesIncomingBytes,
pMessagesStoredBytes,
pMessagesRouted,
pOverloadedHandleChannel,
pMessagesNotMatchingTopic,
pMessageStoreErrors,
pDeliverMessageErrors,
pNotMatchedByFilters,
)
}
2 changes: 1 addition & 1 deletion server/sms/sms_gateway.go
Expand Up @@ -192,9 +192,9 @@ func (g *gateway) send(receivedMsg *protocol.Message) error {
pNexmoResponseErrors.Inc()
return err
}
g.SetLastSentID(receivedMsg.ID)
mTotalSentMessages.Add(1)
pSent.Inc()
g.SetLastSentID(receivedMsg.ID)
return nil
}

Expand Down

0 comments on commit ccca983

Please sign in to comment.