Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Counter Aggregator #119

Merged
merged 4 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ This drops all series which do not have a `job` label `k8s` and all metrics that

For equality filter on metric name you can use the simpler notation, e.g. `--include='metric_name{label="foo"}'`.

The flag may be repeated to provide several sets of filters, in which case the metric will be forwarded if it matches at least one of them.
The flag may be repeated to provide several sets of filters, in which case the metric will be forwarded if it matches at least one of them. Please note that inclusion filters only apply to Prometheus metrics proxied directly, and do not apply to [aggregated counters](#counter-aggregator).

#### File

Expand All @@ -77,6 +77,33 @@ static_metadata:
# - ...
```

#### Counter Aggregator

Counter Aggregator is an advanced feature of the sidecar that can be used to export a sum of multiple Prometheus counters to Stackdriver as a single CUMULATIVE metric.

You might find this useful if you have counter metrics in Prometheus with high cardinality labels (or perhaps just counters exported by a large number of targets) which makes exporting all of them to Stackdriver directly too expensive, however you would like to have a cumulative metric that has the sum of those counters.

Aggregated counters are configured in the `aggregated_counters` block of the configuration file. For example:

```yaml
aggregated_counters:
- metric: network_transmit_bytes
help: total number of bytes sent over eth0
filters:
jkohen marked this conversation as resolved.
Show resolved Hide resolved
- 'node_network_transmit_bytes_total{device="eth0"}'
- 'node_network_transmit_bytes{device="eth0"}'
```

In this example, the sidecar will export a new counter `network_transmit_bytes`, which will correspond to the total number of bytes transmitted over 'eth0' interface across all machines monitored by Prometheus. Counter Aggregator keeps track of all counters matching the filters and correctly handles counter resets. Like all internal metrics exported by the sidecar, the aggregated counter is exported using OpenCensus and will be available in Stackdriver as a custom metric (`custom.googleapis.com/opencensus/network_transmit_bytes`).

A list of [Prometheus instant vector selectors](https://prometheus.io/docs/prometheus/latest/querying/basics/#instant-vector-selectors) is expected in the `filters` field. A time series needs to match any of the specified selectors to be included in the aggregated counter.

##### Counter aggregator and inclusion filters

Please note that by default metrics that match one of aggregated counter filters will still be exported to Stackdriver unless you have inclusion filters configured that prevent those metrics from being exported (see `--include`). Using `--include` to prevent a metric from being exported to Stackdriver does not prevent the metric from being covered by aggregated counters.

When using Counter Aggregator you would usually want to configure a restrictive inclusion filter to avoid raw metrics from being exported to Stackdriver.

## Compatibility

The matrix below lists the versions of Prometheus Server and other dependencies that have been qualified to work with releases of `stackdriver-prometheus-sidecar`.
Expand Down
59 changes: 53 additions & 6 deletions cmd/stackdriver-prometheus-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ type fileConfig struct {
Type string `json:"type"`
Help string `json:"help"`
} `json:"static_metadata"`

AggregatedCounters []struct {
Metric string `json:"metric"`
Filters []string `json:"filters"`
Help string `json:"help"`
} `json:"aggregated_counters"`
}

func main() {
Expand All @@ -184,6 +190,7 @@ func main() {
listenAddress string
filters []string
filtersets []string
aggregations retrieval.CounterAggregatorConfig
metricRenames map[string]string
staticMetadata []scrape.MetricMetadata
monitoringBackends []string
Expand Down Expand Up @@ -254,12 +261,25 @@ func main() {

logger := promlog.New(cfg.logLevel)
if cfg.configFilename != "" {
cfg.metricRenames, cfg.staticMetadata, err = parseConfigFile(cfg.configFilename)
cfg.metricRenames, cfg.staticMetadata, cfg.aggregations, err = parseConfigFile(cfg.configFilename)
if err != nil {
msg := fmt.Sprintf("Parse config file %s", cfg.configFilename)
level.Error(logger).Log("msg", msg, "err", err)
os.Exit(2)
}

// Enable Stackdriver monitoring backend if counter aggregator configuration is present.
if len(cfg.aggregations) > 0 {
sdEnabled := false
for _, backend := range cfg.monitoringBackends {
if backend == "stackdriver" {
sdEnabled = true
}
}
if !sdEnabled {
cfg.monitoringBackends = append(cfg.monitoringBackends, "stackdriver")
}
}
}

level.Info(logger).Log("msg", "Starting Stackdriver Prometheus sidecar", "version", version.Info())
Expand Down Expand Up @@ -368,6 +388,16 @@ func main() {
level.Error(logger).Log("msg", "Creating queue manager failed", "err", err)
os.Exit(1)
}

counterAggregator, err := retrieval.NewCounterAggregator(
log.With(logger, "component", "counter_aggregator"),
&cfg.aggregations)
if err != nil {
level.Error(logger).Log("msg", "Creating counter aggregator failed", "err", err)
os.Exit(1)
}
defer counterAggregator.Close()

prometheusReader := retrieval.NewPrometheusReader(
log.With(logger, "component", "Prometheus reader"),
cfg.walDirectory,
Expand All @@ -379,6 +409,7 @@ func main() {
queueManager,
cfg.metricsPrefix,
cfg.useGkeResource,
counterAggregator,
)

// Exclude kingpin default flags to expose only Prometheus ones.
Expand Down Expand Up @@ -611,14 +642,14 @@ func fillMetadata(staticConfig *map[string]string) {
}
}

func parseConfigFile(filename string) (map[string]string, []scrape.MetricMetadata, error) {
func parseConfigFile(filename string) (map[string]string, []scrape.MetricMetadata, retrieval.CounterAggregatorConfig, error) {
b, err := ioutil.ReadFile(filename)
if err != nil {
return nil, nil, errors.Wrap(err, "reading file")
return nil, nil, nil, errors.Wrap(err, "reading file")
}
var fc fileConfig
if err := yaml.Unmarshal(b, &fc); err != nil {
return nil, nil, errors.Wrap(err, "invalid YAML")
return nil, nil, nil, errors.Wrap(err, "invalid YAML")
}
renameMapping := map[string]string{}
for _, r := range fc.MetricRenames {
Expand All @@ -633,13 +664,29 @@ func parseConfigFile(filename string) (map[string]string, []scrape.MetricMetadat
case textparse.MetricTypeCounter, textparse.MetricTypeGauge, textparse.MetricTypeHistogram,
textparse.MetricTypeSummary, textparse.MetricTypeUnknown:
default:
return nil, nil, errors.Errorf("invalid metric type %q", sm.Type)
return nil, nil, nil, errors.Errorf("invalid metric type %q", sm.Type)
}
staticMetadata = append(staticMetadata, scrape.MetricMetadata{
Metric: sm.Metric,
Type: textparse.MetricType(sm.Type),
Help: sm.Help,
})
}
return renameMapping, staticMetadata, nil

aggregations := make(retrieval.CounterAggregatorConfig)
for _, c := range fc.AggregatedCounters {
if _, ok := aggregations[c.Metric]; ok {
return nil, nil, nil, errors.Errorf("duplicate counter aggregator metric %s", c.Metric)
}
a := &retrieval.CounterAggregatorMetricConfig{Help: c.Help}
for _, f := range c.Filters {
matcher, err := promql.ParseMetricSelector(f)
if err != nil {
return nil, nil, nil, errors.Errorf("cannot parse metric selector '%s': %q", f, err)
}
a.Matchers = append(a.Matchers, matcher)
}
aggregations[c.Metric] = a
}
return renameMapping, staticMetadata, aggregations, nil
}
159 changes: 159 additions & 0 deletions retrieval/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retrieval
jkohen marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"math"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
promlabels "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/tsdb/labels"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
)

// CounterAggregator provides the 'aggregated counters' feature of the sidecar.
// It can be used to export a sum of multiple counters from Prometheus to
// Stackdriver as a single cumulative metric.
jkohen marked this conversation as resolved.
Show resolved Hide resolved
// Each aggregated counter is associated with a single OpenCensus counter that
// can then be exported to Stackdriver (as a CUMULATIVE metric) or exposed to
// Prometheus via the standard `/metrics` endpoint. Regular flushing of counter
// values is implemented by OpenCensus.
type CounterAggregator struct {
logger log.Logger
counters []*aggregatedCounter
statsRecord func(context.Context, ...stats.Measurement) // used in testing.
}

// aggregatedCounter is where CounterAggregator keeps internal state about each
// exported metric: OpenCensus measure and view as well as a list of Matchers that
// define which Prometheus metrics will get aggregated.
type aggregatedCounter struct {
measure *stats.Float64Measure
view *view.View
matchers [][]*promlabels.Matcher
}

// CounterAggregatorConfig contains configuration for CounterAggregator. Keys of the map
// are metric names that will be exported by counter aggregator.
type CounterAggregatorConfig map[string]*CounterAggregatorMetricConfig

// CounterAggregatorMetricConfig provides configuration of a single aggregated counter.
// Matchers specify what Prometheus metrics (which are expected to be counter metrics) will
// be re-aggregated. Help provides a description for the exported metric.
type CounterAggregatorMetricConfig struct {
Matchers [][]*promlabels.Matcher
Help string
}

// counterTracker keeps track of a single time series that has at least one aggregated
// counter associated with it (i.e. there is at least one aggregated counter that has
// Matchers covering this time series). Last timestamp and value are tracked
// to detect counter resets.
type counterTracker struct {
lastTimestamp int64
lastValue float64
measures []*stats.Float64Measure
ca *CounterAggregator
}

// NewCounterAggregator creates a counter aggregator.
func NewCounterAggregator(logger log.Logger, config *CounterAggregatorConfig) (*CounterAggregator, error) {
aggregator := &CounterAggregator{logger: logger, statsRecord: stats.Record}
for metric, cfg := range *config {
measure := stats.Float64(metric, cfg.Help, stats.UnitDimensionless)
v := &view.View{
Name: metric,
Description: cfg.Help,
Measure: measure,
Aggregation: view.Sum(),
}
if err := view.Register(v); err != nil {
return nil, err
}
aggregator.counters = append(aggregator.counters, &aggregatedCounter{
measure: measure,
view: v,
matchers: cfg.Matchers,
})
}
return aggregator, nil
}

// Close must be called when CounterAggregator is no longer needed.
func (c *CounterAggregator) Close() {
for _, counter := range c.counters {
view.Unregister(counter.view)
}
}

// getTracker returns a counterTracker for a specific time series defined by labelset.
// If `nil` is returned, it means that there are no aggregated counters that need to
// be incremented for this time series.
func (c *CounterAggregator) getTracker(lset labels.Labels) *counterTracker {
var measures []*stats.Float64Measure
for _, counter := range c.counters {
if matchFiltersets(lset, counter.matchers) {
measures = append(measures, counter.measure)
}
}
if len(measures) == 0 {
return nil
}
return &counterTracker{measures: measures, ca: c}
}

// newPoint gets called on each new sample (timestamp, value) for time series that need to feed
// values into aggregated counters.
func (t *counterTracker) newPoint(ctx context.Context, lset labels.Labels, ts int64, v float64) {
if math.IsNaN(v) {
level.Debug(t.ca.logger).Log("msg", "got NaN value", "labels", lset, "last ts", t.lastTimestamp, "ts", t, "lastValue", t.lastValue)
return
}
// Ignore measurements that are earlier than last seen timestamp, since they are already covered by
// later values. Samples are coming from TSDB in order, so this is unlikely to happen.
if ts < t.lastTimestamp {
level.Debug(t.ca.logger).Log("msg", "out of order timestamp", "labels", lset, "last ts", t.lastTimestamp, "ts", ts)
return
}
// Use the first value we see as the starting point for the counter.
if t.lastTimestamp == 0 {
level.Debug(t.ca.logger).Log("msg", "first point", "labels", lset)
t.lastTimestamp = ts
t.lastValue = v
return
}
var delta float64
if v < t.lastValue {
// Counter was reset.
delta = v
level.Debug(t.ca.logger).Log("msg", "counter reset", "labels", lset, "value", v, "lastValue", t.lastValue, "delta", delta)
} else {
delta = v - t.lastValue
level.Debug(t.ca.logger).Log("msg", "got delta", "labels", lset, "value", v, "lastValue", t.lastValue, "delta", delta)
}
t.lastTimestamp = ts
t.lastValue = v
if delta == 0 {
return
}
ms := make([]stats.Measurement, len(t.measures))
for i, measure := range t.measures {
ms[i] = measure.M(delta)
}
t.ca.statsRecord(ctx, ms...)
}
Loading