From e574e91de01bfa529ff5bf29b800e6a169230ddd Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 20 Mar 2017 17:30:31 +0200 Subject: [PATCH 1/2] restoring order, not important anyway --- server/sms/sms_gateway.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/sms/sms_gateway.go b/server/sms/sms_gateway.go index ffd80660..cc29ffec 100644 --- a/server/sms/sms_gateway.go +++ b/server/sms/sms_gateway.go @@ -192,9 +192,9 @@ func (g *gateway) send(receivedMsg *protocol.Message) error { pNexmoResponseErrors.Inc() return err } - g.SetLastSentID(receivedMsg.ID) mTotalSentMessages.Add(1) pSent.Inc() + g.SetLastSentID(receivedMsg.ID) return nil } From b16b7050f999d9e1b330cf04ade372ce5e342694 Mon Sep 17 00:00:00 2001 From: Cosmin Rentea Date: Mon, 20 Mar 2017 17:49:59 +0200 Subject: [PATCH 2/2] defining counters and gauges for the router metrics, and using them where the old metrics were used --- server/router/route.go | 4 + server/router/router.go | 19 +++++ server/router/router_prometheus.go | 120 +++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+) create mode 100644 server/router/router_prometheus.go diff --git a/server/router/route.go b/server/router/route.go index 31845a88..430c9187 100644 --- a/server/router/route.go +++ b/server/router/route.go @@ -80,12 +80,14 @@ func (r *Route) Deliver(msg *protocol.Message, isFromStore bool) error { if r.isInvalid() { loggerMessage.Error("Cannot deliver because route is invalid") mTotalDeliverMessageErrors.Add(1) + pDeliverMessageErrors.Inc() return ErrInvalidRoute } if !r.messageFilter(msg) { loggerMessage.Debug("Message filter didn't match route") mTotalNotMatchedByFilters.Add(1) + pNotMatchedByFilters.Inc() return nil } // not an infinite queue @@ -97,6 +99,7 @@ func (r *Route) Deliver(msg *protocol.Message, isFromStore bool) error { loggerMessage.Error("Closing route because queue is full") r.Close() mTotalDeliverMessageErrors.Add(1) + pDeliverMessageErrors.Inc() return ErrQueueFull } } @@ -276,6 +279,7 @@ func (r *Route) consume() { if r.isInvalid() { r.logger.Debug("Stopping to consume because route is invalid.") mTotalDeliverMessageErrors.Add(1) + pDeliverMessageErrors.Inc() return } diff --git a/server/router/router.go b/server/router/router.go index 2de270e0..c9daf9c6 100644 --- a/server/router/router.go +++ b/server/router/router.go @@ -158,6 +158,8 @@ func (router *router) HandleMessage(message *protocol.Message) error { "path": message.Path}).Debug("HandleMessage") mTotalMessagesIncoming.Add(1) + pMessagesIncoming.Inc() + if err := router.isStopping(); err != nil { logger.WithField("error", err.Error()).Error("Router is stopping") return err @@ -169,13 +171,16 @@ func (router *router) HandleMessage(message *protocol.Message) error { } mTotalMessagesIncomingBytes.Add(int64(len(message.Bytes()))) + pMessagesIncomingBytes.Add(float64(len(message.Bytes()))) size, err := router.messageStore.StoreMessage(message, nodeID) if err != nil { logger.WithField("error", err.Error()).Error("Error storing message") mTotalMessageStoreErrors.Add(1) + pMessageStoreErrors.Inc() return err } mTotalMessagesStoredBytes.Add(int64(size)) + pMessagesStoredBytes.Add(float64(size)) router.handleOverloadedChannel() @@ -235,6 +240,7 @@ func (router *router) GetSubscribers(topicPath string) ([]byte, error) { func (router *router) subscribe(r *Route) { logger.WithField("route", r).Debug("Internal subscribe") mTotalSubscriptionAttempts.Add(1) + pSubscriptionAttempts.Inc() routePath := r.Path slice, present := router.routes[routePath] @@ -247,37 +253,47 @@ func (router *router) subscribe(r *Route) { slice = make([]*Route, 0, 1) router.routes[routePath] = slice mCurrentRoutes.Add(1) + pRoutes.Inc() } router.routes[routePath] = append(slice, r) if removed { mTotalDuplicateSubscriptionsAttempts.Add(1) + pDuplicateSubscriptionAttempts.Inc() } else { mTotalSubscriptions.Add(1) + pTotalSubscriptions.Inc() mCurrentSubscriptions.Add(1) + pSubscriptions.Inc() } } func (router *router) unsubscribe(r *Route) { logger.WithField("route", r).Debug("Internal unsubscribe") mTotalUnsubscriptionAttempts.Add(1) + pUnsubscriptionAttempts.Inc() routePath := r.Path slice, present := router.routes[routePath] if !present { mTotalInvalidTopicOnUnsubscriptionAttempts.Add(1) + pInvalidTopicOnUnsubscriptionAttempts.Inc() return } var removed bool router.routes[routePath], removed = removeIfMatching(slice, r) if removed { mTotalUnsubscriptions.Add(1) + pTotalUnsubscriptions.Inc() mCurrentSubscriptions.Add(-1) + pSubscriptions.Dec() } else { mTotalInvalidUnsubscriptionAttempts.Add(1) + pInvalidUnsubscriptionAttempts.Inc() } if len(router.routes[routePath]) == 0 { delete(router.routes, routePath) mCurrentRoutes.Add(-1) + pRoutes.Dec() } } @@ -322,6 +338,7 @@ func (router *router) handleMessage(message *protocol.Message) { }) flog.Debug("Called routeMessage for data") mTotalMessagesRouted.Add(1) + pMessagesRouted.Inc() matched := false for path, pathRoutes := range router.routes { @@ -339,6 +356,7 @@ func (router *router) handleMessage(message *protocol.Message) { if !matched { flog.Debug("No route matched.") mTotalMessagesNotMatchingTopic.Add(1) + pMessagesNotMatchingTopic.Inc() } } @@ -361,6 +379,7 @@ func (router *router) handleOverloadedChannel() { "maxCapacity": cap(router.handleC), }).Warn("handleC channel is almost full") mTotalOverloadedHandleChannel.Add(1) + pOverloadedHandleChannel.Inc() } } diff --git a/server/router/router_prometheus.go b/server/router/router_prometheus.go new file mode 100644 index 00000000..91d9e8e5 --- /dev/null +++ b/server/router/router_prometheus.go @@ -0,0 +1,120 @@ +package router + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + pSubscriptionAttempts = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_subscription_attempts", + Help: "Number of subscription attempts in router", + }) + + pDuplicateSubscriptionAttempts = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_duplicate_subscription_attempts", + Help: "Number of duplicate subscription attempts in router", + }) + + pTotalSubscriptions = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_total_subscriptions", + Help: "Total number of subscriptions in router", + }) + + pUnsubscriptionAttempts = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_total_unsubscription_attempts", + Help: "Number of unsubscriptions attempts in router", + }) + + pInvalidTopicOnUnsubscriptionAttempts = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_total_unsubscription_attempts_invalid_topic", + Help: "Number of unsubscriptions attempts having invalid topic in router", + }) + + pInvalidUnsubscriptionAttempts = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_total_unsubscription_attempts_invalid", + Help: "Number of invalid subscription attempts in router", + }) + + pTotalUnsubscriptions = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_total_unsubscriptions", + Help: "Total number of unsubscriptions in router", + }) + + pSubscriptions = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "router_current_subscriptions", + Help: "Number of active subscriptions in router", + }) + + pRoutes = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "router_current_routes", + Help: "Number of active routes in router", + }) + + pMessagesIncoming = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_messages_incoming", + Help: "Number of incoming messages in router", + }) + + pMessagesIncomingBytes = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_messages_bytes_incoming", + Help: "Number of bytes in incoming messages in router", + }) + + pMessagesStoredBytes = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_messages_bytes_stored", + Help: "Number of bytes stored from incoming messages in router", + }) + + pMessagesRouted = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_messages_routed", + Help: "Number of messages routed in router", + }) + + pOverloadedHandleChannel = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_overloaded_handle_channel", + Help: "Number of overloaded handle-channel occurrences in router", + }) + + pMessagesNotMatchingTopic = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_messages_not_matching_topic", + Help: "Number of messages not matching a topic in router", + }) + + pMessageStoreErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_errors_message_store", + Help: "Number of errors related to the message-store in the router", + }) + + pDeliverMessageErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_errors_deliver_message", + Help: "Number of errors when trying to deliver the message in router", + }) + + pNotMatchedByFilters = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "router_not_matched_by_filters", + Help: "Number of messages not matched by any filters in the router", + }) +) + +func init() { + prometheus.MustRegister( + pSubscriptionAttempts, + pDuplicateSubscriptionAttempts, + pTotalSubscriptions, + pUnsubscriptionAttempts, + pInvalidTopicOnUnsubscriptionAttempts, + pInvalidUnsubscriptionAttempts, + pTotalUnsubscriptions, + pSubscriptions, + pRoutes, + pMessagesIncoming, + pMessagesIncomingBytes, + pMessagesStoredBytes, + pMessagesRouted, + pOverloadedHandleChannel, + pMessagesNotMatchingTopic, + pMessageStoreErrors, + pDeliverMessageErrors, + pNotMatchedByFilters, + ) +}