Skip to content

Commit

Permalink
Add option to enforce strict monotonicy in metrics.
Browse files Browse the repository at this point in the history
This change adds the new option `force_monotonicy` to metric
configurations. It is intended for almost-but-not-really monotinic
sources, such as counters which reset when the sensor is restarted.

When this option is set to `true`, the source metric value is regularly
written to disk. This allows us to detect and compensate counter resets
even between restarts. When a reset is detected, the last value before
the reset becomes the new offset, which is added to the metric value
going forth.  The result is a strictly monotonic time series, like an
ever increasing counter.
  • Loading branch information
Christian Schneider authored and Christian Schneider committed Feb 5, 2024
1 parent 5c1917f commit 4943c97
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 11 deletions.
14 changes: 14 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ cache:
# Set the timeout to -1 to disable the deletion of metrics from the cache. The exporter presents the ingest timestamp
# to prometheus.
timeout: 24h
# Path to the directory to keep the state for monotonic metrics.
state_directory: "/var/lib/mqtt2prometheus"
json_parsing:
# Separator. Used to split path to elements when accessing json fields.
# You can access json fields with dots in it. F.E. {"key.name": {"nested": "value"}}
Expand Down Expand Up @@ -248,6 +250,18 @@ metrics:
# Metric value to use if a match cannot be found in the map above.
# If not specified, parsing error will occur.
error_value: 1
# The name of the metric in prometheus
- prom_name: total_energy
# The name of the metric in a MQTT JSON message
mqtt_name: aenergy.total
# Regular expression to only match sensors with the given name pattern
sensor_name_filter: "^shellyplus1pm-.*$"
# The prometheus help text for this metric
help: Total energy used
# The prometheus type for this metric. Valid values are: "gauge" and "counter"
type: counter
# This setting requires an almost monotonic counter as the source. When monotonicy is enforced, the metric value is regularly written to disk. Thus, resets in the source counter can be detected and corrected by adding an offset as if the reset did not happen. The result is a strict monotonic increasing time series, like an ever growing counter.
force_monotonicy: true

```

Expand Down
2 changes: 1 addition & 1 deletion cmd/mqtt2prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func setupGoKitLogger(l *zap.Logger) log.Logger {
}

func setupExtractor(cfg config.Config) (metrics.Extractor, error) {
parser := metrics.NewParser(cfg.Metrics, cfg.JsonParsing.Separator)
parser := metrics.NewParser(cfg.Metrics, cfg.JsonParsing.Separator, cfg.Cache.StateDir)
if cfg.MQTT.ObjectPerTopicConfig != nil {
switch cfg.MQTT.ObjectPerTopicConfig.Encoding {
case config.EncodingJSON:
Expand Down
24 changes: 22 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"fmt"
"io/ioutil"
"os"
"regexp"
"time"

Expand All @@ -26,7 +27,8 @@ var MQTTConfigDefaults = MQTTConfig{
}

var CacheConfigDefaults = CacheConfig{
Timeout: 2 * time.Minute,
Timeout: 2 * time.Minute,
StateDir: "/var/lib/mqtt2prometheus",
}

var JsonParsingConfigDefaults = JsonParsingConfig{
Expand Down Expand Up @@ -94,7 +96,8 @@ type Config struct {
}

type CacheConfig struct {
Timeout time.Duration `yaml:"timeout"`
Timeout time.Duration `yaml:"timeout"`
StateDir string `yaml:"state_directory"`
}

type JsonParsingConfig struct {
Expand Down Expand Up @@ -135,6 +138,7 @@ type MetricConfig struct {
Help string `yaml:"help"`
ValueType string `yaml:"type"`
OmitTimestamp bool `yaml:"omit_timestamp"`
ForceMonotonicy bool `yaml:"force_monotonicy"`
ConstantLabels map[string]string `yaml:"const_labels"`
StringValueMapping *StringValueMappingConfig `yaml:"string_value_mapping"`
MQTTValueScale float64 `yaml:"mqtt_value_scale"`
Expand Down Expand Up @@ -179,6 +183,9 @@ func LoadConfig(configFile string) (Config, error) {
if cfg.Cache == nil {
cfg.Cache = &CacheConfigDefaults
}
if cfg.Cache.StateDir == "" {
cfg.Cache.StateDir = CacheConfigDefaults.StateDir
}
if cfg.JsonParsing == nil {
cfg.JsonParsing = &JsonParsingConfigDefaults
}
Expand Down Expand Up @@ -217,5 +224,18 @@ func LoadConfig(configFile string) (Config, error) {
}
}

// If any metric forces monotonicy, we need a state directory.
forcesMonotonicy := false
for _, m := range cfg.Metrics {
if m.ForceMonotonicy {
forcesMonotonicy = true
}
}
if forcesMonotonicy {
if err := os.MkdirAll(cfg.Cache.StateDir, 0755); err != nil {
return Config{}, err
}
}

return cfg, nil
}
17 changes: 15 additions & 2 deletions pkg/metrics/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,24 @@ package metrics

import (
"fmt"
"regexp"

"github.com/hikhvar/mqtt2prometheus/pkg/config"
gojsonq "github.com/thedevsaddam/gojsonq/v2"
)

type Extractor func(topic string, payload []byte, deviceID string) (MetricCollection, error)

// metricID returns a deterministic identifier per metic config which is safe to use in a file path.
func metricID(topic, metric, deviceID, promName string) string {
re := regexp.MustCompile(`[^a-zA-Z0-9]`)
deviceID = re.ReplaceAllString(deviceID, "_")
topic = re.ReplaceAllString(topic, "_")
metric = re.ReplaceAllString(metric, "_")
promName = re.ReplaceAllString(promName, "_")
return fmt.Sprintf("%s-%s-%s-%s", deviceID, topic, metric, promName)
}

func NewJSONObjectExtractor(p Parser) Extractor {
return func(topic string, payload []byte, deviceID string) (MetricCollection, error) {
var mc MetricCollection
Expand All @@ -27,7 +38,8 @@ func NewJSONObjectExtractor(p Parser) Extractor {
continue
}

m, err := p.parseMetric(config, rawValue)
id := metricID(topic, path, deviceID, config.PrometheusName)
m, err := p.parseMetric(config, id, rawValue)
if err != nil {
return nil, fmt.Errorf("failed to parse valid metric value: %w", err)
}
Expand Down Expand Up @@ -63,7 +75,8 @@ func NewMetricPerTopicExtractor(p Parser, metricNameRegex *config.Regexp) Extrac
rawValue = string(payload)
}

m, err := p.parseMetric(config, rawValue)
id := metricID(topic, metricName, deviceID, config.PrometheusName)
m, err := p.parseMetric(config, id, rawValue)
if err != nil {
return nil, fmt.Errorf("failed to parse metric: %w", err)
}
Expand Down
94 changes: 92 additions & 2 deletions pkg/metrics/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,44 @@ package metrics

import (
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/hikhvar/mqtt2prometheus/pkg/config"
"gopkg.in/yaml.v2"
)

// monotonicState holds the runtime information to realize a monotonic increasing value.
type monotonicState struct {
// Basline value to add to each parsed metric value to maintain monotonicy
Offset float64 `yaml:"value_offset"`
// Last value that was parsed before the offset was added
LastRawValue float64 `yaml:"last_raw_value"`
}

// metricState holds runtime information per metric configuration.
type metricState struct {
monotonic monotonicState
// The last time the state file was written
lastWritten time.Time
}

type Parser struct {
separator string
// Maps the mqtt metric name to a list of configs
// The first that matches SensorNameFilter will be used
metricConfigs map[string][]config.MetricConfig
// Directory holding state files
stateDir string
// Per-metric state
states map[string]*metricState
}

var now = time.Now

func NewParser(metrics []config.MetricConfig, separator string) Parser {
func NewParser(metrics []config.MetricConfig, separator, stateDir string) Parser {
cfgs := make(map[string][]config.MetricConfig)
for i := range metrics {
key := metrics[i].MQTTName
Expand All @@ -26,6 +48,8 @@ func NewParser(metrics []config.MetricConfig, separator string) Parser {
return Parser{
separator: separator,
metricConfigs: cfgs,
stateDir: strings.TrimRight(stateDir, "/"),
states: make(map[string]*metricState),
}
}

Expand All @@ -47,7 +71,7 @@ func (p *Parser) findMetricConfig(metric string, deviceID string) (config.Metric

// parseMetric parses the given value according to the given deviceID and metricPath. The config allows to
// parse a metric value according to the device ID.
func (p *Parser) parseMetric(cfg config.MetricConfig, value interface{}) (Metric, error) {
func (p *Parser) parseMetric(cfg config.MetricConfig, metricID string, value interface{}) (Metric, error) {
var metricValue float64

if boolValue, ok := value.(bool); ok {
Expand Down Expand Up @@ -87,6 +111,22 @@ func (p *Parser) parseMetric(cfg config.MetricConfig, value interface{}) (Metric
return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s')", value, value)
}

if cfg.ForceMonotonicy {
ms, err := p.getMetricState(metricID)
if err != nil {
return Metric{}, err
}
// When the source metric is reset, the last adjusted value becomes the new offset.
if metricValue < ms.monotonic.LastRawValue {
ms.monotonic.Offset += ms.monotonic.LastRawValue
// Trigger flushing the new state to disk.
ms.lastWritten = time.Time{}
}

ms.monotonic.LastRawValue = metricValue
metricValue += ms.monotonic.Offset
}

if cfg.MQTTValueScale != 0 {
metricValue = metricValue * cfg.MQTTValueScale
}
Expand All @@ -103,3 +143,53 @@ func (p *Parser) parseMetric(cfg config.MetricConfig, value interface{}) (Metric
IngestTime: ingestTime,
}, nil
}

func (p *Parser) stateFileName(metricID string) string {
return fmt.Sprintf("%s/%s.yaml", p.stateDir, metricID)
}

// readMetricState parses the metric state from the configured path.
// If the file does not exist, an empty state is returned.
func (p *Parser) readMetricState(metricID string) (*metricState, error) {
data, err := os.ReadFile(p.stateFileName(metricID))
state := &metricState{}
if err != nil {
// The file does not exist for new metrics.
if os.IsNotExist(err) {
return state, nil
}
return state, err
}
err = yaml.UnmarshalStrict(data, &state.monotonic)
state.lastWritten = now()
return state, err
}

// writeMetricState writes back the metric's current state to the configured path.
func (p *Parser) writeMetricState(metricID string, state *metricState) error {
out, err := yaml.Marshal(state.monotonic)
if err != nil {
return err
}
return os.WriteFile(p.stateFileName(metricID), out, 0644)
}

// getMetricState returns the state of the given metric.
// The state is read from and written back to disk as needed.
func (p *Parser) getMetricState(metricID string) (*metricState, error) {
var err error
state, found := p.states[metricID]
if !found {
if state, err = p.readMetricState(metricID); err != nil {
return nil, err
}
p.states[metricID] = state
}
// Write the state back to disc every minute.
if now().Sub(state.lastWritten) >= time.Minute {
if err = p.writeMetricState(metricID, state); err == nil {
state.lastWritten = now()
}
}
return state, err
}

0 comments on commit 4943c97

Please sign in to comment.