diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2b0a0d44864..41ec68aace3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -190,6 +190,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Add GCP Redis regions support {pull}33728[33728] - Add namespace metadata to all namespaced kubernetes resources. {pull}33763[33763] - Changed cloudwatch module to call ListMetrics API only once per region, instead of per AWS namespace {pull}34055[34055] +- Handle duplicated TYPE line for prometheus metrics {issue}18813[18813] {pull}33865[33865] *Packetbeat* diff --git a/metricbeat/helper/openmetrics/metric.go b/metricbeat/helper/openmetrics/metric.go index d2083ca4a6c..e35ea5773a1 100644 --- a/metricbeat/helper/openmetrics/metric.go +++ b/metricbeat/helper/openmetrics/metric.go @@ -24,8 +24,10 @@ import ( "strings" "time" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/elastic-agent-libs/mapstr" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/metricbeat/helper/prometheus" ) // MetricMap defines the mapping from Openmetrics metric to a Metricbeat field @@ -37,7 +39,7 @@ type MetricMap interface { GetField() string // GetValue returns the resulting value - GetValue(m *OpenMetric) interface{} + GetValue(m *prometheus.OpenMetric) interface{} GetNilValue() interface{} // GetConfiguration returns the configuration for the metric @@ -207,7 +209,7 @@ func (m *commonMetric) GetNilValue() interface{} { } // GetValue returns the resulting value -func (m *commonMetric) GetValue(metric *OpenMetric) interface{} { +func (m *commonMetric) GetValue(metric *prometheus.OpenMetric) interface{} { info := metric.GetInfo() if info != nil { if info.HasValidValue() { @@ -325,7 +327,7 @@ type keywordMetric struct { } // GetValue returns the resulting value -func (m *keywordMetric) GetValue(metric *OpenMetric) interface{} { +func (m *keywordMetric) GetValue(metric *prometheus.OpenMetric) interface{} { if gauge := metric.GetGauge(); gauge != nil && gauge.GetValue() == 1 { return m.keyword } @@ -337,7 +339,7 @@ type booleanMetric struct { } // GetValue returns the resulting value -func (m *booleanMetric) GetValue(metric *OpenMetric) interface{} { +func (m *booleanMetric) GetValue(metric *prometheus.OpenMetric) interface{} { if gauge := metric.GetGauge(); gauge != nil { return gauge.GetValue() == 1 } @@ -350,14 +352,14 @@ type labelMetric struct { } // GetValue returns the resulting value -func (m *labelMetric) GetValue(metric *OpenMetric) interface{} { +func (m *labelMetric) GetValue(metric *prometheus.OpenMetric) interface{} { if gauge := metric.GetGauge(); gauge != nil && gauge.GetValue() == 1 { return getLabel(metric, m.label) } return nil } -func getLabel(metric *OpenMetric, name string) string { +func getLabel(metric *prometheus.OpenMetric, name string) string { for _, label := range metric.GetLabel() { if label.Name == name { return label.Value @@ -371,7 +373,7 @@ type infoMetric struct { } // GetValue returns the resulting value -func (m *infoMetric) GetValue(metric *OpenMetric) interface{} { +func (m *infoMetric) GetValue(metric *prometheus.OpenMetric) interface{} { return "" } diff --git a/metricbeat/helper/openmetrics/openmetrics.go b/metricbeat/helper/openmetrics/openmetrics.go index 2aea45b8e0f..f4714c56f74 100644 --- a/metricbeat/helper/openmetrics/openmetrics.go +++ b/metricbeat/helper/openmetrics/openmetrics.go @@ -21,25 +21,12 @@ import ( "compress/gzip" "fmt" "io" - "io/ioutil" - "math" - "mime" "net/http" "regexp" - "strconv" - "strings" "time" - "github.com/prometheus/common/model" - - "github.com/prometheus/prometheus/pkg/exemplar" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/textparse" - "github.com/prometheus/prometheus/pkg/timestamp" - - "github.com/pkg/errors" - "github.com/elastic/beats/v7/metricbeat/helper" + "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" @@ -47,292 +34,15 @@ import ( const acceptHeader = `application/openmetrics-text; version=1.0.0; charset=utf-8,text/plain` -var errNameLabelMandatory = fmt.Errorf("missing metric name (%s label)", labels.MetricName) - -type Gauge struct { - Value *float64 -} - -func (m *Gauge) GetValue() float64 { - if m != nil && m.Value != nil { - return *m.Value - } - return 0 -} - -type Info struct { - Value *int64 -} - -func (m *Info) GetValue() int64 { - if m != nil && m.Value != nil { - return *m.Value - } - return 0 -} -func (m *Info) HasValidValue() bool { - return m != nil && *m.Value == 1 -} - -type Stateset struct { - Value *int64 -} - -func (m *Stateset) GetValue() int64 { - if m != nil && m.Value != nil { - return *m.Value - } - return 0 -} -func (m *Stateset) HasValidValue() bool { - return m != nil && (*m.Value == 0 || *m.Value == 1) -} - -type Counter struct { - Value *float64 -} - -func (m *Counter) GetValue() float64 { - if m != nil && m.Value != nil { - return *m.Value - } - return 0 -} - -type Quantile struct { - Quantile *float64 - Value *float64 - Exemplar *exemplar.Exemplar -} - -func (m *Quantile) GetQuantile() float64 { - if m != nil && m.Quantile != nil { - return *m.Quantile - } - return 0 -} - -func (m *Quantile) GetValue() float64 { - if m != nil && m.Value != nil { - return *m.Value - } - return 0 -} - -type Summary struct { - SampleCount *uint64 - SampleSum *float64 - Quantile []*Quantile -} - -func (m *Summary) GetSampleCount() uint64 { - if m != nil && m.SampleCount != nil { - return *m.SampleCount - } - return 0 -} - -func (m *Summary) GetSampleSum() float64 { - if m != nil && m.SampleSum != nil { - return *m.SampleSum - } - return 0 -} - -func (m *Summary) GetQuantile() []*Quantile { - if m != nil { - return m.Quantile - } - return nil -} - -type Unknown struct { - Value *float64 -} - -func (m *Unknown) GetValue() float64 { - if m != nil && m.Value != nil { - return *m.Value - } - return 0 -} - -type Bucket struct { - CumulativeCount *uint64 - UpperBound *float64 - Exemplar *exemplar.Exemplar -} - -func (m *Bucket) GetCumulativeCount() uint64 { - if m != nil && m.CumulativeCount != nil { - return *m.CumulativeCount - } - return 0 -} - -func (m *Bucket) GetUpperBound() float64 { - if m != nil && m.UpperBound != nil { - return *m.UpperBound - } - return 0 -} - -type Histogram struct { - SampleCount *uint64 - SampleSum *float64 - Bucket []*Bucket - IsGaugeHistogram bool -} - -func (m *Histogram) GetSampleCount() uint64 { - if m != nil && m.SampleCount != nil { - return *m.SampleCount - } - return 0 -} - -func (m *Histogram) GetSampleSum() float64 { - if m != nil && m.SampleSum != nil { - return *m.SampleSum - } - return 0 -} - -func (m *Histogram) GetBucket() []*Bucket { - if m != nil { - return m.Bucket - } - return nil -} - -type OpenMetric struct { - Label []*labels.Label - Exemplar *exemplar.Exemplar - Name *string - Gauge *Gauge - Counter *Counter - Info *Info - Stateset *Stateset - Summary *Summary - Unknown *Unknown - Histogram *Histogram - TimestampMs *int64 -} - -func (m *OpenMetric) GetName() *string { - if m != nil { - return m.Name - } - return nil -} - -func (m *OpenMetric) GetLabel() []*labels.Label { - if m != nil { - return m.Label - } - return nil -} - -func (m *OpenMetric) GetGauge() *Gauge { - if m != nil { - return m.Gauge - } - return nil -} - -func (m *OpenMetric) GetCounter() *Counter { - if m != nil { - return m.Counter - } - return nil -} - -func (m *OpenMetric) GetInfo() *Info { - if m != nil { - return m.Info - } - return nil -} - -func (m *OpenMetric) GetStateset() *Stateset { - if m != nil { - return m.Stateset - } - return nil -} - -func (m *OpenMetric) GetSummary() *Summary { - if m != nil { - return m.Summary - } - return nil -} - -func (m *OpenMetric) GetUnknown() *Unknown { - if m != nil { - return m.Unknown - } - return nil -} - -func (m *OpenMetric) GetHistogram() *Histogram { - if m != nil && m.Histogram != nil && !m.Histogram.IsGaugeHistogram { - return m.Histogram - } - return nil -} - -func (m *OpenMetric) GetGaugeHistogram() *Histogram { - if m != nil && m.Histogram != nil && m.Histogram.IsGaugeHistogram { - return m.Histogram - } - return nil -} - -func (m *OpenMetric) GetTimestampMs() int64 { - if m != nil && m.TimestampMs != nil { - return *m.TimestampMs - } - return 0 -} - -type OpenMetricFamily struct { - Name *string - Help *string - Type textparse.MetricType - Unit *string - Metric []*OpenMetric -} - -func (m *OpenMetricFamily) GetName() string { - if m != nil && m.Name != nil { - return *m.Name - } - return "" -} -func (m *OpenMetricFamily) GetUnit() string { - if m != nil && *m.Unit != "" { - return *m.Unit - } - return "" -} - -func (m *OpenMetricFamily) GetMetric() []*OpenMetric { - if m != nil { - return m.Metric - } - return nil -} - // OpenMetrics helper retrieves openmetrics formatted metrics // This interface needs to use TextParse type OpenMetrics interface { // GetFamilies requests metric families from openmetrics endpoint and returns them - GetFamilies() ([]*OpenMetricFamily, error) + GetFamilies() ([]*prometheus.MetricFamily, error) GetProcessedMetrics(mapping *MetricsMapping) ([]mapstr.M, error) - ProcessMetrics(families []*OpenMetricFamily, mapping *MetricsMapping) ([]mapstr.M, error) + ProcessMetrics(families []*prometheus.MetricFamily, mapping *MetricsMapping) ([]mapstr.M, error) ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error } @@ -359,7 +69,7 @@ func NewOpenMetricsClient(base mb.BaseMetricSet) (OpenMetrics, error) { } // GetFamilies requests metric families from openmetrics endpoint and returns them -func (p *openmetrics) GetFamilies() ([]*OpenMetricFamily, error) { +func (p *openmetrics) GetFamilies() ([]*prometheus.MetricFamily, error) { var reader io.Reader resp, err := p.FetchResponse() @@ -380,374 +90,28 @@ func (p *openmetrics) GetFamilies() ([]*OpenMetricFamily, error) { } if resp.StatusCode > 399 { - bodyBytes, err := ioutil.ReadAll(reader) + bodyBytes, err := io.ReadAll(reader) if err == nil { p.logger.Debug("error received from openmetrics endpoint: ", string(bodyBytes)) } return nil, fmt.Errorf("unexpected status code %d from server", resp.StatusCode) } - contentType := getContentType(resp.Header) + contentType := prometheus.GetContentType(resp.Header) if contentType == "" { return nil, fmt.Errorf("Invalid format for response of response") } appendTime := time.Now().Round(0) - b, err := ioutil.ReadAll(reader) - families, err := parseMetricFamilies(b, contentType, appendTime) - - return families, nil -} - -const ( - suffixInfo = "_info" - suffixTotal = "_total" - suffixGCount = "_gcount" - suffixGSum = "_gsum" - suffixCount = "_count" - suffixSum = "_sum" - suffixBucket = "_bucket" -) - -func isInfo(name string) bool { - return len(name) > 5 && name[len(name)-5:] == suffixInfo -} - -// Counters have _total suffix -func isTotal(name string) bool { - return len(name) > 6 && name[len(name)-6:] == suffixTotal -} - -func isGCount(name string) bool { - return len(name) > 7 && name[len(name)-7:] == suffixGCount -} - -func isGSum(name string) bool { - return len(name) > 5 && name[len(name)-5:] == suffixGSum -} - -func isCount(name string) bool { - return len(name) > 6 && name[len(name)-6:] == suffixCount -} - -func isSum(name string) bool { - return len(name) > 4 && name[len(name)-4:] == suffixSum -} - -func isBucket(name string) bool { - return len(name) > 7 && name[len(name)-7:] == suffixBucket -} - -func summaryMetricName(name string, s float64, qv string, lbls string, t *int64, summariesByName map[string]map[string]*OpenMetric) (string, *OpenMetric) { - var summary = &Summary{} - var quantile = []*Quantile{} - var quant = &Quantile{} - - switch { - case isCount(name): - u := uint64(s) - summary.SampleCount = &u - name = name[:len(name)-6] - case isSum(name): - summary.SampleSum = &s - name = name[:len(name)-4] - default: - f, err := strconv.ParseFloat(qv, 64) - if err != nil { - f = -1 - } - quant.Quantile = &f - quant.Value = &s - } - - _, k := summariesByName[name] - if !k { - summariesByName[name] = make(map[string]*OpenMetric) - } - metric, ok := summariesByName[name][lbls] - if !ok { - metric = &OpenMetric{} - metric.Name = &name - metric.Summary = summary - metric.Summary.Quantile = quantile - summariesByName[name][lbls] = metric - } - if metric.Summary.SampleSum == nil && summary.SampleSum != nil { - metric.Summary.SampleSum = summary.SampleSum - } else if metric.Summary.SampleCount == nil && summary.SampleCount != nil { - metric.Summary.SampleCount = summary.SampleCount - } else if quant.Quantile != nil { - metric.Summary.Quantile = append(metric.Summary.Quantile, quant) - } - - return name, metric -} - -func histogramMetricName(name string, s float64, qv string, lbls string, t *int64, isGaugeHistogram bool, e *exemplar.Exemplar, histogramsByName map[string]map[string]*OpenMetric) (string, *OpenMetric) { - var histogram = &Histogram{} - var bucket = []*Bucket{} - var bkt = &Bucket{} - - switch { - case isCount(name): - u := uint64(s) - histogram.SampleCount = &u - name = name[:len(name)-6] - case isSum(name): - histogram.SampleSum = &s - name = name[:len(name)-4] - case isGaugeHistogram && isGCount(name): - u := uint64(s) - histogram.SampleCount = &u - name = name[:len(name)-7] - case isGaugeHistogram && isGSum(name): - histogram.SampleSum = &s - name = name[:len(name)-5] - default: - if isBucket(name) { - name = name[:len(name)-7] - } - f, err := strconv.ParseFloat(qv, 64) - if err != nil { - f = math.MaxUint64 - } - cnt := uint64(s) - bkt.UpperBound = &f - bkt.CumulativeCount = &cnt - - if e != nil { - if !e.HasTs { - e.Ts = *t - } - bkt.Exemplar = e - } - } - - _, k := histogramsByName[name] - if !k { - histogramsByName[name] = make(map[string]*OpenMetric) - } - metric, ok := histogramsByName[name][lbls] - if !ok { - metric = &OpenMetric{} - metric.Name = &name - metric.Histogram = histogram - metric.Histogram.Bucket = bucket - histogramsByName[name][lbls] = metric - } - if metric.Histogram.SampleSum == nil && histogram.SampleSum != nil { - metric.Histogram.SampleSum = histogram.SampleSum - } else if metric.Histogram.SampleCount == nil && histogram.SampleCount != nil { - metric.Histogram.SampleCount = histogram.SampleCount - } else if bkt.UpperBound != nil { - metric.Histogram.Bucket = append(metric.Histogram.Bucket, bkt) + b, err := io.ReadAll(reader) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) } - - return name, metric -} - -func parseMetricFamilies(b []byte, contentType string, ts time.Time) ([]*OpenMetricFamily, error) { - var ( - parser = textparse.New(b, contentType) - defTime = timestamp.FromTime(ts) - metricFamiliesByName = map[string]*OpenMetricFamily{} - summariesByName = map[string]map[string]*OpenMetric{} - histogramsByName = map[string]map[string]*OpenMetric{} - fam *OpenMetricFamily - mt = textparse.MetricTypeUnknown - ) - var err error - -loop: - for { - var ( - et textparse.Entry - ok bool - e exemplar.Exemplar - ) - if et, err = parser.Next(); err != nil { - if err == io.EOF { - err = nil - } - break - } - switch et { - case textparse.EntryType: - buf, t := parser.Type() - s := string(buf) - fam, ok = metricFamiliesByName[s] - if !ok { - fam = &OpenMetricFamily{Name: &s, Type: t} - metricFamiliesByName[s] = fam - } - mt = t - continue - case textparse.EntryHelp: - buf, t := parser.Help() - s := string(buf) - h := string(t) - fam, ok = metricFamiliesByName[s] - if !ok { - fam = &OpenMetricFamily{Name: &s, Help: &h, Type: textparse.MetricTypeUnknown} - metricFamiliesByName[s] = fam - } - fam.Help = &h - continue - case textparse.EntryUnit: - buf, t := parser.Unit() - s := string(buf) - u := string(t) - fam, ok = metricFamiliesByName[s] - if !ok { - fam = &OpenMetricFamily{Name: &s, Unit: &u, Type: textparse.MetricTypeUnknown} - metricFamiliesByName[string(buf)] = fam - } - fam.Unit = &u - continue - case textparse.EntryComment: - continue - default: - } - - t := defTime - _, tp, v := parser.Series() - - var ( - lset labels.Labels - mets string - ) - - mets = parser.Metric(&lset) - - if !lset.Has(labels.MetricName) { - err = errNameLabelMandatory - break loop - } - - var lbls strings.Builder - lbls.Grow(len(mets)) - var labelPairs = []*labels.Label{} - for _, l := range lset.Copy() { - if l.Name == labels.MetricName { - continue - } - - if l.Name != model.QuantileLabel && l.Name != labels.BucketLabel { // quantile and le are special labels handled below - - lbls.WriteString(l.Name) - lbls.WriteString(l.Value) - } - n := l.Name - v := l.Value - - labelPairs = append(labelPairs, &labels.Label{ - Name: n, - Value: v, - }) - } - - var metric *OpenMetric - - metricName := lset.Get(labels.MetricName) - var lookupMetricName string - var exm *exemplar.Exemplar - - // Suffixes - https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#suffixes - switch mt { - case textparse.MetricTypeCounter: - var counter = &Counter{Value: &v} - mn := lset.Get(labels.MetricName) - metric = &OpenMetric{Name: &mn, Counter: counter, Label: labelPairs} - if isTotal(metricName) { // Remove suffix _total, get lookup metricname - lookupMetricName = metricName[:len(metricName)-6] - } - break - case textparse.MetricTypeGauge: - var gauge = &Gauge{Value: &v} - metric = &OpenMetric{Name: &metricName, Gauge: gauge, Label: labelPairs} - lookupMetricName = metricName - break - case textparse.MetricTypeInfo: - value := int64(v) - var info = &Info{Value: &value} - metric = &OpenMetric{Name: &metricName, Info: info, Label: labelPairs} - lookupMetricName = metricName - break - case textparse.MetricTypeSummary: - lookupMetricName, metric = summaryMetricName(metricName, v, lset.Get(model.QuantileLabel), lbls.String(), &t, summariesByName) - metric.Label = labelPairs - if !isSum(metricName) { - continue - } - metricName = lookupMetricName - break - case textparse.MetricTypeHistogram: - if hasExemplar := parser.Exemplar(&e); hasExemplar { - exm = &e - } - lookupMetricName, metric = histogramMetricName(metricName, v, lset.Get(labels.BucketLabel), lbls.String(), &t, false, exm, histogramsByName) - metric.Label = labelPairs - if !isSum(metricName) { - continue - } - metricName = lookupMetricName - break - case textparse.MetricTypeGaugeHistogram: - if hasExemplar := parser.Exemplar(&e); hasExemplar { - exm = &e - } - lookupMetricName, metric = histogramMetricName(metricName, v, lset.Get(labels.BucketLabel), lbls.String(), &t, true, exm, histogramsByName) - metric.Label = labelPairs - metric.Histogram.IsGaugeHistogram = true - if !isGSum(metricName) { - continue - } - metricName = lookupMetricName - break - case textparse.MetricTypeStateset: - value := int64(v) - var stateset = &Stateset{Value: &value} - metric = &OpenMetric{Name: &metricName, Stateset: stateset, Label: labelPairs} - lookupMetricName = metricName - break - case textparse.MetricTypeUnknown: - var unknown = &Unknown{Value: &v} - metric = &OpenMetric{Name: &metricName, Unknown: unknown, Label: labelPairs} - lookupMetricName = metricName - break - default: - lookupMetricName = metricName - } - - fam, ok = metricFamiliesByName[lookupMetricName] - if !ok { - fam = &OpenMetricFamily{Type: mt} - metricFamiliesByName[lookupMetricName] = fam - } - - fam.Name = &metricName - - if hasExemplar := parser.Exemplar(&e); hasExemplar && mt != textparse.MetricTypeHistogram && metric != nil { - if !e.HasTs { - e.Ts = t - } - metric.Exemplar = &e - } - - if tp != nil && metric != nil { - t = *tp - metric.TimestampMs = &t - } - - fam.Metric = append(fam.Metric, metric) + families, err := prometheus.ParseMetricFamilies(b, contentType, appendTime) + if err != nil { + return nil, fmt.Errorf("failed to parse families: %w", err) } - families := make([]*OpenMetricFamily, 0, len(metricFamiliesByName)) - for _, v := range metricFamiliesByName { - if v.Metric != nil { - families = append(families, v) - } - } return families, nil } @@ -766,7 +130,7 @@ type MetricsMapping struct { ExtraFields map[string]string } -func (p *openmetrics) ProcessMetrics(families []*OpenMetricFamily, mapping *MetricsMapping) ([]mapstr.M, error) { +func (p *openmetrics) ProcessMetrics(families []*prometheus.MetricFamily, mapping *MetricsMapping) ([]mapstr.M, error) { eventsMap := map[string]mapstr.M{} infoMetrics := []*infoMetricData{} @@ -808,14 +172,14 @@ func (p *openmetrics) ProcessMetrics(families []*OpenMetricFamily, mapping *Metr for k, v := range allLabels { if l, ok := mapping.Labels[k]; ok { if l.IsKey() { - keyLabels.Put(l.GetField(), v) + _, _ = keyLabels.Put(l.GetField(), v) } else { - labels.Put(l.GetField(), v) + _, _ = labels.Put(l.GetField(), v) } } else if storeAllLabels { // if label for this metric is not found at the label mappings but // it is configured to store any labels found, make it so - labels.Put(labelsLocation+"."+k, v) + _, _ = labels.Put(labelsLocation+"."+k, v) } } @@ -825,7 +189,7 @@ func (p *openmetrics) ProcessMetrics(families []*OpenMetricFamily, mapping *Metr // not considering these extra fields to be keylabels as that case // have not appeared yet for k, v := range extraFields { - labels.Put(k, v) + _, _ = labels.Put(k, v) } // Keep a info document if it's an infoMetric @@ -841,7 +205,7 @@ func (p *openmetrics) ProcessMetrics(families []*OpenMetricFamily, mapping *Metr if field != "" { event := getEvent(eventsMap, keyLabels) update := mapstr.M{} - update.Put(field, value) + _, _ = update.Put(field, value) // value may be a mapstr (for histograms and summaries), do a deep update to avoid smashing existing fields event.DeepUpdate(update) @@ -899,7 +263,7 @@ type infoMetricData struct { func (p *openmetrics) ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error { events, err := p.GetProcessedMetrics(mapping) if err != nil { - return errors.Wrap(err, "error getting processed metrics") + return fmt.Errorf("error getting processed metrics: %w", err) } for _, event := range events { r.Event(mb.Event{ @@ -921,11 +285,11 @@ func getEvent(m map[string]mapstr.M, labels mapstr.M) mapstr.M { return res } -func getLabels(metric *OpenMetric) mapstr.M { +func getLabels(metric *prometheus.OpenMetric) mapstr.M { labels := mapstr.M{} for _, label := range metric.GetLabel() { if label.Name != "" && label.Value != "" { - labels.Put(label.Name, label.Value) + _, _ = labels.Put(label.Name, label.Value) } } return labels @@ -939,7 +303,7 @@ func CompilePatternList(patterns *[]string) ([]*regexp.Regexp, error) { for _, pattern := range *patterns { r, err := regexp.Compile(pattern) if err != nil { - return nil, errors.Wrapf(err, "compiling pattern '%s'", pattern) + return nil, fmt.Errorf("failed to compile pattern '%s': %w", pattern, err) } compiledPatterns = append(compiledPatterns, r) } @@ -958,43 +322,3 @@ func MatchMetricFamily(family string, matchMetrics []*regexp.Regexp) bool { } return false } - -const ( - TextVersion = "0.0.4" - OpenMetricsType = `application/openmetrics-text` - - // The Content-Type values for the different wire protocols. - FmtUnknown string = `` - FmtText string = `text/plain; version=` + TextVersion + `; charset=utf-8` -) - -const ( - hdrContentType = "Content-Type" -) - -func getContentType(h http.Header) string { - ct := h.Get(hdrContentType) - - mediatype, params, err := mime.ParseMediaType(ct) - if err != nil { - return FmtUnknown - } - - const textType = "text/plain" - - switch mediatype { - case OpenMetricsType: - if e, ok := params["encoding"]; ok && e != "delimited" { - return FmtUnknown - } - return OpenMetricsType - - case textType: - if v, ok := params["version"]; ok && v != TextVersion { - return FmtUnknown - } - return FmtText - } - - return FmtUnknown -} diff --git a/metricbeat/helper/openmetrics/openmetrics_test.go b/metricbeat/helper/openmetrics/openmetrics_test.go index cf6bd3ac4b9..0cb5bec8bb0 100644 --- a/metricbeat/helper/openmetrics/openmetrics_test.go +++ b/metricbeat/helper/openmetrics/openmetrics_test.go @@ -20,7 +20,7 @@ package openmetrics import ( "bytes" "compress/gzip" - "io/ioutil" + "io" "net/http" "sort" "testing" @@ -186,7 +186,7 @@ var _ = httpfetcher(&mockFetcher{}) func (m mockFetcher) FetchResponse() (*http.Response, error) { body := bytes.NewBuffer(nil) writer := gzip.NewWriter(body) - writer.Write([]byte(m.response)) + _, _ = writer.Write([]byte(m.response)) writer.Close() return &http.Response{ @@ -195,7 +195,7 @@ func (m mockFetcher) FetchResponse() (*http.Response, error) { "Content-Encoding": []string{"gzip"}, "Content-Type": []string{"application/openmetrics-text"}, }, - Body: ioutil.NopCloser(body), + Body: io.NopCloser(body), }, nil } @@ -576,7 +576,7 @@ func TestOpenMetrics(t *testing.T) { for _, test := range tests { t.Run(test.msg, func(t *testing.T) { reporter := &mbtest.CapturingReporterV2{} - p.ReportProcessedMetrics(test.mapping, reporter) + _ = p.ReportProcessedMetrics(test.mapping, reporter) assert.Nil(t, reporter.GetErrors(), test.msg) // Sort slice to avoid randomness res := reporter.GetEvents() @@ -1062,7 +1062,7 @@ func TestOpenMetricsKeyLabels(t *testing.T) { for _, tc := range testCases { r := &mbtest.CapturingReporterV2{} p := &openmetrics{mockFetcher{response: tc.openmetricsResponse}, logp.NewLogger("test")} - p.ReportProcessedMetrics(tc.mapping, r) + _ = p.ReportProcessedMetrics(tc.mapping, r) if !assert.Nil(t, r.GetErrors(), "error reporting/processing metrics, at %q", tc.testName) { continue diff --git a/metricbeat/helper/prometheus/metric.go b/metricbeat/helper/prometheus/metric.go index 815881c5b58..f5c97270938 100644 --- a/metricbeat/helper/prometheus/metric.go +++ b/metricbeat/helper/prometheus/metric.go @@ -26,8 +26,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/elastic-agent-libs/mapstr" - - dto "github.com/prometheus/client_model/go" ) // MetricMap defines the mapping from Prometheus metric to a Metricbeat field @@ -39,7 +37,7 @@ type MetricMap interface { GetField() string // GetValue returns the resulting value - GetValue(m *dto.Metric) interface{} + GetValue(m *OpenMetric) interface{} // GetConfiguration returns the configuration for the metric GetConfiguration() Configuration @@ -205,7 +203,7 @@ func (m *commonMetric) GetConfiguration() Configuration { } // GetValue returns the resulting value -func (m *commonMetric) GetValue(metric *dto.Metric) interface{} { +func (m *commonMetric) GetValue(metric *OpenMetric) interface{} { counter := metric.GetCounter() if counter != nil { if !math.IsNaN(counter.GetValue()) && !math.IsInf(counter.GetValue(), 0) { @@ -278,7 +276,7 @@ type keywordMetric struct { } // GetValue returns the resulting value -func (m *keywordMetric) GetValue(metric *dto.Metric) interface{} { +func (m *keywordMetric) GetValue(metric *OpenMetric) interface{} { if gauge := metric.GetGauge(); gauge != nil && gauge.GetValue() == 1 { return m.keyword } @@ -290,7 +288,7 @@ type booleanMetric struct { } // GetValue returns the resulting value -func (m *booleanMetric) GetValue(metric *dto.Metric) interface{} { +func (m *booleanMetric) GetValue(metric *OpenMetric) interface{} { if gauge := metric.GetGauge(); gauge != nil { return gauge.GetValue() == 1 } @@ -303,17 +301,17 @@ type labelMetric struct { } // GetValue returns the resulting value -func (m *labelMetric) GetValue(metric *dto.Metric) interface{} { +func (m *labelMetric) GetValue(metric *OpenMetric) interface{} { if gauge := metric.GetGauge(); gauge != nil && gauge.GetValue() == 1 { return getLabel(metric, m.label) } return nil } -func getLabel(metric *dto.Metric, name string) string { +func getLabel(metric *OpenMetric, name string) string { for _, label := range metric.GetLabel() { - if label.GetName() == name { - return label.GetValue() + if label.Name == name { + return label.Value } } return "" @@ -324,7 +322,7 @@ type infoMetric struct { } // GetValue returns the resulting value -func (m *infoMetric) GetValue(metric *dto.Metric) interface{} { +func (m *infoMetric) GetValue(metric *OpenMetric) interface{} { return "" } diff --git a/metricbeat/helper/prometheus/prometheus.go b/metricbeat/helper/prometheus/prometheus.go index bb8a88b134a..0cfab2dafe2 100644 --- a/metricbeat/helper/prometheus/prometheus.go +++ b/metricbeat/helper/prometheus/prometheus.go @@ -21,13 +21,9 @@ import ( "compress/gzip" "fmt" "io" - "io/ioutil" "net/http" "regexp" - - "github.com/pkg/errors" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" + "time" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" @@ -40,11 +36,11 @@ const acceptHeader = `text/plain;version=0.0.4;q=0.5,*/*;q=0.1` // Prometheus helper retrieves prometheus formatted metrics type Prometheus interface { // GetFamilies requests metric families from prometheus endpoint and returns them - GetFamilies() ([]*dto.MetricFamily, error) + GetFamilies() ([]*MetricFamily, error) GetProcessedMetrics(mapping *MetricsMapping) ([]mapstr.M, error) - ProcessMetrics(families []*dto.MetricFamily, mapping *MetricsMapping) ([]mapstr.M, error) + ProcessMetrics(families []*MetricFamily, mapping *MetricsMapping) ([]mapstr.M, error) ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error } @@ -71,7 +67,7 @@ func NewPrometheusClient(base mb.BaseMetricSet) (Prometheus, error) { } // GetFamilies requests metric families from prometheus endpoint and returns them -func (p *prometheus) GetFamilies() ([]*dto.MetricFamily, error) { +func (p *prometheus) GetFamilies() ([]*MetricFamily, error) { var reader io.Reader resp, err := p.FetchResponse() @@ -92,35 +88,26 @@ func (p *prometheus) GetFamilies() ([]*dto.MetricFamily, error) { } if resp.StatusCode > 399 { - bodyBytes, err := ioutil.ReadAll(reader) + bodyBytes, err := io.ReadAll(reader) if err == nil { p.logger.Debug("error received from prometheus endpoint: ", string(bodyBytes)) } return nil, fmt.Errorf("unexpected status code %d from server", resp.StatusCode) } - format := expfmt.ResponseFormat(resp.Header) - if format == "" { - return nil, fmt.Errorf("Invalid format for response of response") + contentType := GetContentType(resp.Header) + if contentType == "" { + return nil, fmt.Errorf("invalid format for response: %v", resp.Header) } - decoder := expfmt.NewDecoder(reader, format) - if decoder == nil { - return nil, fmt.Errorf("Unable to create decoder to decode response") + appendTime := time.Now().Round(0) + b, err := io.ReadAll(reader) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) } - - families := []*dto.MetricFamily{} - for { - mf := &dto.MetricFamily{} - err = decoder.Decode(mf) - if err != nil { - if err == io.EOF { - break - } - return nil, errors.Wrap(err, "decoding of metric family failed") - } else { - families = append(families, mf) - } + families, err := ParseMetricFamilies(b, contentType, appendTime) + if err != nil { + return nil, fmt.Errorf("failed to parse families: %w", err) } return families, nil @@ -141,7 +128,7 @@ type MetricsMapping struct { ExtraFields map[string]string } -func (p *prometheus) ProcessMetrics(families []*dto.MetricFamily, mapping *MetricsMapping) ([]mapstr.M, error) { +func (p *prometheus) ProcessMetrics(families []*MetricFamily, mapping *MetricsMapping) ([]mapstr.M, error) { eventsMap := map[string]mapstr.M{} infoMetrics := []*infoMetricData{} @@ -183,15 +170,15 @@ func (p *prometheus) ProcessMetrics(families []*dto.MetricFamily, mapping *Metri for k, v := range allLabels { if l, ok := mapping.Labels[k]; ok { if l.IsKey() { - keyLabels.Put(l.GetField(), v) + _, _ = keyLabels.Put(l.GetField(), v) } else { - labels.Put(l.GetField(), v) + _, _ = labels.Put(l.GetField(), v) } } else if storeAllLabels { // if label for this metric is not found at the label mappings but // it is configured to store any labels found, make it so // TODO dedot - labels.Put(labelsLocation+"."+k, v) + _, _ = labels.Put(labelsLocation+"."+k, v) } } @@ -201,7 +188,7 @@ func (p *prometheus) ProcessMetrics(families []*dto.MetricFamily, mapping *Metri // not considering these extra fields to be keylabels as that case // have not appeared yet for k, v := range extraFields { - labels.Put(k, v) + _, _ = labels.Put(k, v) } // Keep a info document if it's an infoMetric @@ -217,7 +204,7 @@ func (p *prometheus) ProcessMetrics(families []*dto.MetricFamily, mapping *Metri if field != "" { event := getEvent(eventsMap, keyLabels) update := mapstr.M{} - update.Put(field, value) + _, _ = update.Put(field, value) // value may be a mapstr (for histograms and summaries), do a deep update to avoid smashing existing fields event.DeepUpdate(update) @@ -275,7 +262,7 @@ type infoMetricData struct { func (p *prometheus) ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error { events, err := p.GetProcessedMetrics(mapping) if err != nil { - return errors.Wrap(err, "error getting processed metrics") + return fmt.Errorf("error getting processed metrics: %w", err) } for _, event := range events { r.Event(mb.Event{ @@ -297,11 +284,11 @@ func getEvent(m map[string]mapstr.M, labels mapstr.M) mapstr.M { return res } -func getLabels(metric *dto.Metric) mapstr.M { +func getLabels(metric *OpenMetric) mapstr.M { labels := mapstr.M{} for _, label := range metric.GetLabel() { - if label.GetName() != "" && label.GetValue() != "" { - labels.Put(label.GetName(), label.GetValue()) + if label.Name != "" && label.Value != "" { + _, _ = labels.Put(label.Name, label.Value) } } return labels @@ -315,7 +302,7 @@ func CompilePatternList(patterns *[]string) ([]*regexp.Regexp, error) { for _, pattern := range *patterns { r, err := regexp.Compile(pattern) if err != nil { - return nil, errors.Wrapf(err, "compiling pattern '%s'", pattern) + return nil, fmt.Errorf("failed compiling pattern '%s': %w", pattern, err) } compiledPatterns = append(compiledPatterns, r) } diff --git a/metricbeat/helper/prometheus/prometheus_test.go b/metricbeat/helper/prometheus/prometheus_test.go index 53baf2a3776..b135f0683d6 100644 --- a/metricbeat/helper/prometheus/prometheus_test.go +++ b/metricbeat/helper/prometheus/prometheus_test.go @@ -20,7 +20,7 @@ package prometheus import ( "bytes" "compress/gzip" - "io/ioutil" + "io" "net/http" "sort" "testing" @@ -188,15 +188,16 @@ var _ = httpfetcher(&mockFetcher{}) func (m mockFetcher) FetchResponse() (*http.Response, error) { body := bytes.NewBuffer(nil) writer := gzip.NewWriter(body) - writer.Write([]byte(m.response)) + _, _ = writer.Write([]byte(m.response)) writer.Close() return &http.Response{ StatusCode: 200, Header: http.Header{ "Content-Encoding": []string{"gzip"}, + "Content-Type": []string{"text/plain; version=0.0.4; charset=utf-8"}, }, - Body: ioutil.NopCloser(body), + Body: io.NopCloser(body), }, nil } @@ -514,7 +515,7 @@ func TestPrometheus(t *testing.T) { for _, test := range tests { t.Run(test.msg, func(t *testing.T) { reporter := &mbtest.CapturingReporterV2{} - p.ReportProcessedMetrics(test.mapping, reporter) + _ = p.ReportProcessedMetrics(test.mapping, reporter) assert.Nil(t, reporter.GetErrors(), test.msg) // Sort slice to avoid randomness res := reporter.GetEvents() @@ -971,7 +972,7 @@ func TestPrometheusKeyLabels(t *testing.T) { for _, tc := range testCases { r := &mbtest.CapturingReporterV2{} p := &prometheus{mockFetcher{response: tc.prometheusResponse}, logp.NewLogger("test")} - p.ReportProcessedMetrics(tc.mapping, r) + _ = p.ReportProcessedMetrics(tc.mapping, r) if !assert.Nil(t, r.GetErrors(), "error reporting/processing metrics, at %q", tc.testName) { continue diff --git a/metricbeat/helper/prometheus/ptest/ptest.go b/metricbeat/helper/prometheus/ptest/ptest.go index 76a1899c457..cc0de3de30c 100644 --- a/metricbeat/helper/prometheus/ptest/ptest.go +++ b/metricbeat/helper/prometheus/ptest/ptest.go @@ -19,7 +19,7 @@ package ptest import ( "encoding/json" - "io/ioutil" + "io" "net/http" "net/http/httptest" "os" @@ -54,13 +54,13 @@ func TestMetricSet(t *testing.T, module, metricset string, cases TestCases) { file, err := os.Open(test.MetricsFile) assert.NoError(t, err, "cannot open test file "+test.MetricsFile) - body, err := ioutil.ReadAll(file) + body, err := io.ReadAll(file) assert.NoError(t, err, "cannot read test file "+test.MetricsFile) server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) - w.Header().Set("Content-Type", "text/plain; charset=ISO-8859-1") - w.Write([]byte(body)) + w.Header().Set("Content-Type", "text/plain; version=0.0.4") + _, _ = w.Write(body) })) server.Start() @@ -83,12 +83,12 @@ func TestMetricSet(t *testing.T, module, metricset string, cases TestCases) { return h1 < h2 }) eventsJSON, _ := json.MarshalIndent(events, "", "\t") - err = ioutil.WriteFile(test.ExpectedFile, eventsJSON, 0644) + err = os.WriteFile(test.ExpectedFile, eventsJSON, 0644) assert.NoError(t, err) } // Read expected events from reference file - expected, err := ioutil.ReadFile(test.ExpectedFile) + expected, err := os.ReadFile(test.ExpectedFile) if err != nil { t.Fatal(err) } diff --git a/metricbeat/helper/prometheus/textparse.go b/metricbeat/helper/prometheus/textparse.go new file mode 100644 index 00000000000..de6cc075c4b --- /dev/null +++ b/metricbeat/helper/prometheus/textparse.go @@ -0,0 +1,683 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package prometheus + +import ( + "math" + "mime" + "net/http" + "strconv" + "strings" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/prometheus/prometheus/pkg/timestamp" +) + +const ( + // The Content-Type values for the different wire protocols + hdrContentType = "Content-Type" + TextVersion = "0.0.4" + OpenMetricsType = `application/openmetrics-text` + FmtUnknown string = `` + ContentTypeTextFormat string = `text/plain; version=` + TextVersion + `; charset=utf-8` +) + +type Gauge struct { + Value *float64 +} + +func (m *Gauge) GetValue() float64 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} + +type Info struct { + Value *int64 +} + +func (m *Info) GetValue() int64 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} +func (m *Info) HasValidValue() bool { + return m != nil && *m.Value == 1 +} + +type Stateset struct { + Value *int64 +} + +func (m *Stateset) GetValue() int64 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} +func (m *Stateset) HasValidValue() bool { + return m != nil && (*m.Value == 0 || *m.Value == 1) +} + +type Counter struct { + Value *float64 +} + +func (m *Counter) GetValue() float64 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} + +type Quantile struct { + Quantile *float64 + Value *float64 + Exemplar *exemplar.Exemplar +} + +func (m *Quantile) GetQuantile() float64 { + if m != nil && m.Quantile != nil { + return *m.Quantile + } + return 0 +} + +func (m *Quantile) GetValue() float64 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} + +type Summary struct { + SampleCount *uint64 + SampleSum *float64 + Quantile []*Quantile +} + +func (m *Summary) GetSampleCount() uint64 { + if m != nil && m.SampleCount != nil { + return *m.SampleCount + } + return 0 +} + +func (m *Summary) GetSampleSum() float64 { + if m != nil && m.SampleSum != nil { + return *m.SampleSum + } + return 0 +} + +func (m *Summary) GetQuantile() []*Quantile { + if m != nil { + return m.Quantile + } + return nil +} + +type Unknown struct { + Value *float64 +} + +func (m *Unknown) GetValue() float64 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} + +type Bucket struct { + CumulativeCount *uint64 + UpperBound *float64 + Exemplar *exemplar.Exemplar +} + +func (m *Bucket) GetCumulativeCount() uint64 { + if m != nil && m.CumulativeCount != nil { + return *m.CumulativeCount + } + return 0 +} + +func (m *Bucket) GetUpperBound() float64 { + if m != nil && m.UpperBound != nil { + return *m.UpperBound + } + return 0 +} + +type Histogram struct { + SampleCount *uint64 + SampleSum *float64 + Bucket []*Bucket + IsGaugeHistogram bool +} + +func (m *Histogram) GetSampleCount() uint64 { + if m != nil && m.SampleCount != nil { + return *m.SampleCount + } + return 0 +} + +func (m *Histogram) GetSampleSum() float64 { + if m != nil && m.SampleSum != nil { + return *m.SampleSum + } + return 0 +} + +func (m *Histogram) GetBucket() []*Bucket { + if m != nil { + return m.Bucket + } + return nil +} + +type OpenMetric struct { + Label []*labels.Label + Exemplar *exemplar.Exemplar + Name *string + Gauge *Gauge + Counter *Counter + Info *Info + Stateset *Stateset + Summary *Summary + Unknown *Unknown + Histogram *Histogram + TimestampMs *int64 +} + +func (m *OpenMetric) GetName() *string { + if m != nil { + return m.Name + } + return nil +} + +func (m *OpenMetric) GetLabel() []*labels.Label { + if m != nil { + return m.Label + } + return nil +} + +func (m *OpenMetric) GetGauge() *Gauge { + if m != nil { + return m.Gauge + } + return nil +} + +func (m *OpenMetric) GetCounter() *Counter { + if m != nil { + return m.Counter + } + return nil +} + +func (m *OpenMetric) GetInfo() *Info { + if m != nil { + return m.Info + } + return nil +} + +func (m *OpenMetric) GetStateset() *Stateset { + if m != nil { + return m.Stateset + } + return nil +} + +func (m *OpenMetric) GetSummary() *Summary { + if m != nil { + return m.Summary + } + return nil +} + +func (m *OpenMetric) GetUnknown() *Unknown { + if m != nil { + return m.Unknown + } + return nil +} + +func (m *OpenMetric) GetHistogram() *Histogram { + if m != nil && m.Histogram != nil && !m.Histogram.IsGaugeHistogram { + return m.Histogram + } + return nil +} + +func (m *OpenMetric) GetGaugeHistogram() *Histogram { + if m != nil && m.Histogram != nil && m.Histogram.IsGaugeHistogram { + return m.Histogram + } + return nil +} + +func (m *OpenMetric) GetTimestampMs() int64 { + if m != nil && m.TimestampMs != nil { + return *m.TimestampMs + } + return 0 +} + +type MetricFamily struct { + Name *string + Help *string + Type textparse.MetricType + Unit *string + Metric []*OpenMetric +} + +func (m *MetricFamily) GetName() string { + if m != nil && m.Name != nil { + return *m.Name + } + return "" +} +func (m *MetricFamily) GetUnit() string { + if m != nil && *m.Unit != "" { + return *m.Unit + } + return "" +} + +func (m *MetricFamily) GetMetric() []*OpenMetric { + if m != nil { + return m.Metric + } + return nil +} + +const ( + suffixTotal = "_total" + suffixGCount = "_gcount" + suffixGSum = "_gsum" + suffixCount = "_count" + suffixSum = "_sum" + suffixBucket = "_bucket" +) + +// Counters have _total suffix +func isTotal(name string) bool { + return strings.HasSuffix(name, suffixTotal) +} + +func isGCount(name string) bool { + return strings.HasSuffix(name, suffixGCount) +} + +func isGSum(name string) bool { + return strings.HasSuffix(name, suffixGSum) +} + +func isCount(name string) bool { + return strings.HasSuffix(name, suffixCount) +} + +func isSum(name string) bool { + return strings.HasSuffix(name, suffixSum) +} + +func isBucket(name string) bool { + return strings.HasSuffix(name, suffixBucket) +} + +func summaryMetricName(name string, s float64, qv string, lbls string, t *int64, summariesByName map[string]map[string]*OpenMetric) (string, *OpenMetric) { + var summary = &Summary{} + var quantile = []*Quantile{} + var quant = &Quantile{} + + switch { + case isCount(name): + u := uint64(s) + summary.SampleCount = &u + name = name[:len(name)-6] + case isSum(name): + summary.SampleSum = &s + name = name[:len(name)-4] + default: + f, err := strconv.ParseFloat(qv, 64) + if err != nil { + f = -1 + } + quant.Quantile = &f + quant.Value = &s + } + + _, k := summariesByName[name] + if !k { + summariesByName[name] = make(map[string]*OpenMetric) + } + metric, ok := summariesByName[name][lbls] + if !ok { + metric = &OpenMetric{} + metric.Name = &name + metric.Summary = summary + metric.Summary.Quantile = quantile + summariesByName[name][lbls] = metric + } + if metric.Summary.SampleSum == nil && summary.SampleSum != nil { + metric.Summary.SampleSum = summary.SampleSum + } else if metric.Summary.SampleCount == nil && summary.SampleCount != nil { + metric.Summary.SampleCount = summary.SampleCount + } else if quant.Quantile != nil { + metric.Summary.Quantile = append(metric.Summary.Quantile, quant) + } + + return name, metric +} + +func histogramMetricName(name string, s float64, qv string, lbls string, t *int64, isGaugeHistogram bool, e *exemplar.Exemplar, histogramsByName map[string]map[string]*OpenMetric) (string, *OpenMetric) { + var histogram = &Histogram{} + var bucket = []*Bucket{} + var bkt = &Bucket{} + + switch { + case isCount(name): + u := uint64(s) + histogram.SampleCount = &u + name = name[:len(name)-6] + case isSum(name): + histogram.SampleSum = &s + name = name[:len(name)-4] + case isGaugeHistogram && isGCount(name): + u := uint64(s) + histogram.SampleCount = &u + name = name[:len(name)-7] + case isGaugeHistogram && isGSum(name): + histogram.SampleSum = &s + name = name[:len(name)-5] + default: + if isBucket(name) { + name = name[:len(name)-7] + } + f, err := strconv.ParseFloat(qv, 64) + if err != nil { + f = math.MaxUint64 + } + cnt := uint64(s) + bkt.UpperBound = &f + bkt.CumulativeCount = &cnt + + if e != nil { + if !e.HasTs { + e.Ts = *t + } + bkt.Exemplar = e + } + } + + _, k := histogramsByName[name] + if !k { + histogramsByName[name] = make(map[string]*OpenMetric) + } + metric, ok := histogramsByName[name][lbls] + if !ok { + metric = &OpenMetric{} + metric.Name = &name + metric.Histogram = histogram + metric.Histogram.Bucket = bucket + histogramsByName[name][lbls] = metric + } + if metric.Histogram.SampleSum == nil && histogram.SampleSum != nil { + metric.Histogram.SampleSum = histogram.SampleSum + } else if metric.Histogram.SampleCount == nil && histogram.SampleCount != nil { + metric.Histogram.SampleCount = histogram.SampleCount + } else if bkt.UpperBound != nil { + metric.Histogram.Bucket = append(metric.Histogram.Bucket, bkt) + } + + return name, metric +} + +func ParseMetricFamilies(b []byte, contentType string, ts time.Time) ([]*MetricFamily, error) { + var ( + parser = textparse.New(b, contentType) + defTime = timestamp.FromTime(ts) + metricFamiliesByName = map[string]*MetricFamily{} + summariesByName = map[string]map[string]*OpenMetric{} + histogramsByName = map[string]map[string]*OpenMetric{} + fam *MetricFamily + mt = textparse.MetricTypeUnknown + ) + var err error + + for { + var ( + et textparse.Entry + ok bool + e exemplar.Exemplar + ) + if et, err = parser.Next(); err != nil { + // TODO: log here + // if errors.Is(err, io.EOF) {} + break + } + switch et { + case textparse.EntryType: + buf, t := parser.Type() + s := string(buf) + _, ok = metricFamiliesByName[s] + if !ok { + fam = &MetricFamily{Name: &s, Type: t} + metricFamiliesByName[s] = fam + } + mt = t + continue + case textparse.EntryHelp: + buf, t := parser.Help() + s := string(buf) + h := string(t) + _, ok = metricFamiliesByName[s] + if !ok { + fam = &MetricFamily{Name: &s, Help: &h, Type: textparse.MetricTypeUnknown} + metricFamiliesByName[s] = fam + } + fam.Help = &h + continue + case textparse.EntryUnit: + buf, t := parser.Unit() + s := string(buf) + u := string(t) + _, ok = metricFamiliesByName[s] + if !ok { + fam = &MetricFamily{Name: &s, Unit: &u, Type: textparse.MetricTypeUnknown} + metricFamiliesByName[string(buf)] = fam + } + fam.Unit = &u + continue + case textparse.EntryComment: + continue + default: + } + + t := defTime + _, tp, v := parser.Series() + + var ( + lset labels.Labels + mets string + ) + + mets = parser.Metric(&lset) + + if !lset.Has(labels.MetricName) { + // missing metric name from labels.MetricName, skip. + break + } + + var lbls strings.Builder + lbls.Grow(len(mets)) + var labelPairs = []*labels.Label{} + for _, l := range lset.Copy() { + if l.Name == labels.MetricName { + continue + } + + if l.Name != model.QuantileLabel && l.Name != labels.BucketLabel { // quantile and le are special labels handled below + + lbls.WriteString(l.Name) + lbls.WriteString(l.Value) + } + n := l.Name + v := l.Value + + labelPairs = append(labelPairs, &labels.Label{ + Name: n, + Value: v, + }) + } + + var metric *OpenMetric + + metricName := lset.Get(labels.MetricName) + var lookupMetricName string + var exm *exemplar.Exemplar + + // Suffixes - https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#suffixes + switch mt { + case textparse.MetricTypeCounter: + var counter = &Counter{Value: &v} + mn := lset.Get(labels.MetricName) + metric = &OpenMetric{Name: &mn, Counter: counter, Label: labelPairs} + if isTotal(metricName) && contentType == OpenMetricsType { // Remove suffix _total, get lookup metricname + lookupMetricName = metricName[:len(metricName)-6] + } else { + lookupMetricName = metricName + } + case textparse.MetricTypeGauge: + var gauge = &Gauge{Value: &v} + metric = &OpenMetric{Name: &metricName, Gauge: gauge, Label: labelPairs} + lookupMetricName = metricName + case textparse.MetricTypeInfo: + value := int64(v) + var info = &Info{Value: &value} + metric = &OpenMetric{Name: &metricName, Info: info, Label: labelPairs} + lookupMetricName = metricName + case textparse.MetricTypeSummary: + lookupMetricName, metric = summaryMetricName(metricName, v, lset.Get(model.QuantileLabel), lbls.String(), &t, summariesByName) + metric.Label = labelPairs + if !isSum(metricName) { + continue + } + metricName = lookupMetricName + case textparse.MetricTypeHistogram: + if hasExemplar := parser.Exemplar(&e); hasExemplar { + exm = &e + } + lookupMetricName, metric = histogramMetricName(metricName, v, lset.Get(labels.BucketLabel), lbls.String(), &t, false, exm, histogramsByName) + metric.Label = labelPairs + if !isSum(metricName) { + continue + } + metricName = lookupMetricName + case textparse.MetricTypeGaugeHistogram: + if hasExemplar := parser.Exemplar(&e); hasExemplar { + exm = &e + } + lookupMetricName, metric = histogramMetricName(metricName, v, lset.Get(labels.BucketLabel), lbls.String(), &t, true, exm, histogramsByName) + metric.Label = labelPairs + metric.Histogram.IsGaugeHistogram = true + if !isGSum(metricName) { + continue + } + metricName = lookupMetricName + case textparse.MetricTypeStateset: + value := int64(v) + var stateset = &Stateset{Value: &value} + metric = &OpenMetric{Name: &metricName, Stateset: stateset, Label: labelPairs} + lookupMetricName = metricName + case textparse.MetricTypeUnknown: + var unknown = &Unknown{Value: &v} + metric = &OpenMetric{Name: &metricName, Unknown: unknown, Label: labelPairs} + lookupMetricName = metricName + default: + lookupMetricName = metricName + } + + fam, ok = metricFamiliesByName[lookupMetricName] + if !ok { + fam = &MetricFamily{Type: mt} + metricFamiliesByName[lookupMetricName] = fam + } + + fam.Name = &metricName + + if hasExemplar := parser.Exemplar(&e); hasExemplar && mt != textparse.MetricTypeHistogram && metric != nil { + if !e.HasTs { + e.Ts = t + } + metric.Exemplar = &e + } + + if tp != nil && metric != nil { + t = *tp + metric.TimestampMs = &t + } + + fam.Metric = append(fam.Metric, metric) + } + + families := make([]*MetricFamily, 0, len(metricFamiliesByName)) + for _, v := range metricFamiliesByName { + if v.Metric != nil { + families = append(families, v) + } + } + return families, nil +} + +func GetContentType(h http.Header) string { + ct := h.Get(hdrContentType) + + mediatype, params, err := mime.ParseMediaType(ct) + if err != nil { + return FmtUnknown + } + + const textType = "text/plain" + + switch mediatype { + case OpenMetricsType: + if e, ok := params["encoding"]; ok && e != "delimited" { + return FmtUnknown + } + return OpenMetricsType + + case textType: + if v, ok := params["version"]; ok && v != TextVersion { + return FmtUnknown + } + return ContentTypeTextFormat + } + + return FmtUnknown +} diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index a5d8f71c821..1cb9ca037f9 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -23,7 +23,6 @@ import ( "time" "github.com/mitchellh/hashstructure" - dto "github.com/prometheus/client_model/go" "github.com/elastic/beats/v7/metricbeat/helper" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" @@ -40,13 +39,13 @@ func init() { type Module interface { mb.Module - GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) + GetStateMetricsFamilies(prometheus p.Prometheus) ([]*p.MetricFamily, error) GetKubeletStats(http *helper.HTTP) ([]byte, error) GetMetricsRepo() *util.MetricsRepo } type familiesCache struct { - sharedFamilies []*dto.MetricFamily + sharedFamilies []*p.MetricFamily lastFetchErr error lastFetchTimestamp time.Time } @@ -115,7 +114,7 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { } } -func (m *module) GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) { +func (m *module) GetStateMetricsFamilies(prometheus p.Prometheus) ([]*p.MetricFamily, error) { m.kubeStateMetricsCache.lock.Lock() defer m.kubeStateMetricsCache.lock.Unlock() diff --git a/metricbeat/module/openmetrics/collector/_meta/data.json b/metricbeat/module/openmetrics/collector/_meta/data.json index d60063a9157..ebbb4a0efd2 100644 --- a/metricbeat/module/openmetrics/collector/_meta/data.json +++ b/metricbeat/module/openmetrics/collector/_meta/data.json @@ -11,7 +11,6 @@ }, "openmetrics": { "labels": { - "device": "br-3a285aa5e58c", "job": "openmetrics" }, "metrics": { @@ -22,4 +21,4 @@ "address": "127.0.0.1:55555", "type": "openmetrics" } -} +} \ No newline at end of file diff --git a/metricbeat/module/openmetrics/collector/collector.go b/metricbeat/module/openmetrics/collector/collector.go index 2739f8cf310..3382fc5f9d8 100644 --- a/metricbeat/module/openmetrics/collector/collector.go +++ b/metricbeat/module/openmetrics/collector/collector.go @@ -18,13 +18,14 @@ package collector import ( + "fmt" "regexp" - "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" p "github.com/elastic/beats/v7/metricbeat/helper/openmetrics" + "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" "github.com/elastic/elastic-agent-libs/mapstr" @@ -47,7 +48,6 @@ var ( upMetricType = textparse.MetricTypeGauge upMetricInstanceLabel = "instance" upMetricJobLabel = "job" - upMetricJobValue = "openmetrics" ) func init() { @@ -64,7 +64,7 @@ type OpenMetricsEventsGenerator interface { Start() // converts a OpenMetrics metric family into a list of OpenMetricsEvents - GenerateOpenMetricsEvents(mf *p.OpenMetricFamily) []OpenMetricEvent + GenerateOpenMetricsEvents(mf *prometheus.MetricFamily) []OpenMetricEvent // Stop must be called when the generator won't be used anymore Stop() @@ -118,11 +118,11 @@ func MetricSetBuilder(namespace string, genFactory OpenMetricsEventsGeneratorFac ms.host = ms.Host() ms.excludeMetrics, err = p.CompilePatternList(config.MetricsFilters.ExcludeMetrics) if err != nil { - return nil, errors.Wrapf(err, "unable to compile exclude patterns") + return nil, fmt.Errorf("unable to compile exclude patterns: %w", err) } ms.includeMetrics, err = p.CompilePatternList(config.MetricsFilters.IncludeMetrics) if err != nil { - return nil, errors.Wrapf(err, "unable to compile include patterns") + return nil, fmt.Errorf("unable to compile include patterns: %w", err) } return ms, nil @@ -143,7 +143,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { families = append(families, m.upMetricFamily(0.0)) // set the error to report it after sending the up event - err = errors.Wrap(err, "unable to decode response from openmetrics endpoint") + err = fmt.Errorf("unable to decode response from openmetrics endpoint: %w", err) } else { // add up event to the list families = append(families, m.upMetricFamily(1.0)) @@ -170,11 +170,11 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { // Add default instance label if not already there if exists, _ := openMetricEvent.Labels.HasKey(upMetricInstanceLabel); !exists { - openMetricEvent.Labels.Put(upMetricInstanceLabel, m.Host()) + _, _ = openMetricEvent.Labels.Put(upMetricInstanceLabel, m.Host()) } // Add default job label if not already there if exists, _ := openMetricEvent.Labels.HasKey("job"); !exists { - openMetricEvent.Labels.Put("job", m.Module().Name()) + _, _ = openMetricEvent.Labels.Put("job", m.Module().Name()) } // Add labels if len(openMetricEvent.Labels) > 0 { @@ -225,8 +225,8 @@ func (m *MetricSet) Close() error { return nil } -func (m *MetricSet) upMetricFamily(value float64) *p.OpenMetricFamily { - gauge := p.Gauge{ +func (m *MetricSet) upMetricFamily(value float64) *prometheus.MetricFamily { + gauge := prometheus.Gauge{ Value: &value, } label1 := labels.Label{ @@ -237,18 +237,18 @@ func (m *MetricSet) upMetricFamily(value float64) *p.OpenMetricFamily { Name: upMetricJobLabel, Value: m.Module().Name(), } - metric := p.OpenMetric{ + metric := prometheus.OpenMetric{ Gauge: &gauge, Label: []*labels.Label{&label1, &label2}, } - return &p.OpenMetricFamily{ + return &prometheus.MetricFamily{ Name: &upMetricName, - Type: textparse.MetricType(upMetricType), - Metric: []*p.OpenMetric{&metric}, + Type: upMetricType, + Metric: []*prometheus.OpenMetric{&metric}, } } -func (m *MetricSet) skipFamily(family *p.OpenMetricFamily) bool { +func (m *MetricSet) skipFamily(family *prometheus.MetricFamily) bool { if family == nil || family.Name == nil { return false } diff --git a/metricbeat/module/openmetrics/collector/collector_test.go b/metricbeat/module/openmetrics/collector/collector_test.go index 069bd36f8c2..d2f9ab4ff97 100644 --- a/metricbeat/module/openmetrics/collector/collector_test.go +++ b/metricbeat/module/openmetrics/collector/collector_test.go @@ -23,12 +23,13 @@ package collector import ( "testing" - "github.com/golang/protobuf/proto" prometheuslabels "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" "github.com/elastic/beats/v7/metricbeat/helper/openmetrics" + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/elastic-agent-libs/mapstr" @@ -50,15 +51,15 @@ func TestGetOpenMetricsEventsFromMetricFamily(t *testing.T) { "handler": "query", } tests := []struct { - Family *openmetrics.OpenMetricFamily + Family *p.MetricFamily Event []OpenMetricEvent }{ { - Family: &openmetrics.OpenMetricFamily{ + Family: &p.MetricFamily{ Name: proto.String("http_request_duration_microseconds"), Help: proto.String("foo"), Type: textparse.MetricTypeCounter, - Metric: []*openmetrics.OpenMetric{ + Metric: []*p.OpenMetric{ { Name: proto.String("http_request_duration_microseconds_total"), Label: []*prometheuslabels.Label{ @@ -67,7 +68,7 @@ func TestGetOpenMetricsEventsFromMetricFamily(t *testing.T) { Value: "query", }, }, - Counter: &openmetrics.Counter{ + Counter: &p.Counter{ Value: proto.Float64(10), }, }, @@ -88,13 +89,13 @@ func TestGetOpenMetricsEventsFromMetricFamily(t *testing.T) { }, }, { - Family: &openmetrics.OpenMetricFamily{ + Family: &p.MetricFamily{ Name: proto.String("http_request_duration_microseconds"), Help: proto.String("foo"), Type: textparse.MetricTypeGauge, - Metric: []*openmetrics.OpenMetric{ + Metric: []*p.OpenMetric{ { - Gauge: &openmetrics.Gauge{ + Gauge: &p.Gauge{ Value: proto.Float64(10), }, }, @@ -114,16 +115,16 @@ func TestGetOpenMetricsEventsFromMetricFamily(t *testing.T) { }, }, { - Family: &openmetrics.OpenMetricFamily{ + Family: &p.MetricFamily{ Name: proto.String("http_request_duration_microseconds"), Help: proto.String("foo"), Type: textparse.MetricTypeSummary, - Metric: []*openmetrics.OpenMetric{ + Metric: []*p.OpenMetric{ { - Summary: &openmetrics.Summary{ + Summary: &p.Summary{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Quantile: []*openmetrics.Quantile{ + Quantile: []*p.Quantile{ { Quantile: proto.Float64(0.99), Value: proto.Float64(10), @@ -158,16 +159,16 @@ func TestGetOpenMetricsEventsFromMetricFamily(t *testing.T) { }, }, { - Family: &openmetrics.OpenMetricFamily{ + Family: &p.MetricFamily{ Name: proto.String("http_request_duration_microseconds"), Help: proto.String("foo"), Type: textparse.MetricTypeHistogram, - Metric: []*openmetrics.OpenMetric{ + Metric: []*p.OpenMetric{ { - Histogram: &openmetrics.Histogram{ + Histogram: &p.Histogram{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Bucket: []*openmetrics.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.99), CumulativeCount: proto.Uint64(10), @@ -201,11 +202,11 @@ func TestGetOpenMetricsEventsFromMetricFamily(t *testing.T) { }, }, { - Family: &openmetrics.OpenMetricFamily{ + Family: &p.MetricFamily{ Name: proto.String("http_request_duration_microseconds"), Help: proto.String("foo"), Type: textparse.MetricTypeUnknown, - Metric: []*openmetrics.OpenMetric{ + Metric: []*p.OpenMetric{ { Label: []*prometheuslabels.Label{ { @@ -213,7 +214,7 @@ func TestGetOpenMetricsEventsFromMetricFamily(t *testing.T) { Value: "query", }, }, - Unknown: &openmetrics.Unknown{ + Unknown: &p.Unknown{ Value: proto.Float64(10), }, }, @@ -242,12 +243,12 @@ func TestGetOpenMetricsEventsFromMetricFamily(t *testing.T) { } func TestSkipMetricFamily(t *testing.T) { - testFamilies := []*openmetrics.OpenMetricFamily{ + testFamilies := []*p.MetricFamily{ { Name: proto.String("http_request_duration_microseconds_a_a_in"), Help: proto.String("foo"), Type: textparse.MetricTypeCounter, - Metric: []*openmetrics.OpenMetric{ + Metric: []*p.OpenMetric{ { Label: []*prometheuslabels.Label{ { @@ -255,7 +256,7 @@ func TestSkipMetricFamily(t *testing.T) { Value: "query", }, }, - Counter: &openmetrics.Counter{ + Counter: &p.Counter{ Value: proto.Float64(10), }, }, @@ -265,7 +266,7 @@ func TestSkipMetricFamily(t *testing.T) { Name: proto.String("http_request_duration_microseconds_a_b_in"), Help: proto.String("foo"), Type: textparse.MetricTypeCounter, - Metric: []*openmetrics.OpenMetric{ + Metric: []*p.OpenMetric{ { Label: []*prometheuslabels.Label{ { @@ -273,7 +274,7 @@ func TestSkipMetricFamily(t *testing.T) { Value: "query", }, }, - Counter: &openmetrics.Counter{ + Counter: &p.Counter{ Value: proto.Float64(10), }, }, @@ -283,9 +284,9 @@ func TestSkipMetricFamily(t *testing.T) { Name: proto.String("http_request_duration_microseconds_b_in"), Help: proto.String("foo"), Type: textparse.MetricTypeGauge, - Metric: []*openmetrics.OpenMetric{ + Metric: []*p.OpenMetric{ { - Gauge: &openmetrics.Gauge{ + Gauge: &p.Gauge{ Value: proto.Float64(10), }, }, @@ -295,12 +296,12 @@ func TestSkipMetricFamily(t *testing.T) { Name: proto.String("http_request_duration_microseconds_c_in"), Help: proto.String("foo"), Type: textparse.MetricTypeSummary, - Metric: []*openmetrics.OpenMetric{ + Metric: []*p.OpenMetric{ { - Summary: &openmetrics.Summary{ + Summary: &p.Summary{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Quantile: []*openmetrics.Quantile{ + Quantile: []*p.Quantile{ { Quantile: proto.Float64(0.99), Value: proto.Float64(10), @@ -314,12 +315,12 @@ func TestSkipMetricFamily(t *testing.T) { Name: proto.String("http_request_duration_microseconds_d_in"), Help: proto.String("foo"), Type: textparse.MetricTypeHistogram, - Metric: []*openmetrics.OpenMetric{ + Metric: []*p.OpenMetric{ { - Histogram: &openmetrics.Histogram{ + Histogram: &p.Histogram{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Bucket: []*openmetrics.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.99), CumulativeCount: proto.Uint64(10), @@ -333,7 +334,7 @@ func TestSkipMetricFamily(t *testing.T) { Name: proto.String("http_request_duration_microseconds_e_in"), Help: proto.String("foo"), Type: textparse.MetricTypeUnknown, - Metric: []*openmetrics.OpenMetric{ + Metric: []*p.OpenMetric{ { Label: []*prometheuslabels.Label{ { @@ -341,7 +342,7 @@ func TestSkipMetricFamily(t *testing.T) { Value: "query", }, }, - Unknown: &openmetrics.Unknown{ + Unknown: &p.Unknown{ Value: proto.Float64(10), }, }, diff --git a/metricbeat/module/openmetrics/collector/data.go b/metricbeat/module/openmetrics/collector/data.go index 836b6107c88..e07d6f56b4f 100644 --- a/metricbeat/module/openmetrics/collector/data.go +++ b/metricbeat/module/openmetrics/collector/data.go @@ -23,7 +23,7 @@ import ( "github.com/prometheus/prometheus/pkg/textparse" - p "github.com/elastic/beats/v7/metricbeat/helper/openmetrics" + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/beats/v7/metricbeat/helper/labelhash" @@ -71,10 +71,11 @@ func (p *openmetricEventGenerator) Stop() {} // Default openmetricEventsGenerator stores all OpenMetrics metrics using // only double field type in Elasticsearch. -func (p *openmetricEventGenerator) GenerateOpenMetricsEvents(mf *p.OpenMetricFamily) []OpenMetricEvent { +func (p *openmetricEventGenerator) GenerateOpenMetricsEvents(mf *p.MetricFamily) []OpenMetricEvent { var events []OpenMetricEvent name := *mf.Name + _ = name // skip noisy linter metrics := mf.Metric help := "" unit := "" @@ -88,6 +89,7 @@ func (p *openmetricEventGenerator) GenerateOpenMetricsEvents(mf *p.OpenMetricFam for _, metric := range metrics { labels := mapstr.M{} mn := metric.GetName() + _ = mn // skip noisy linter if len(metric.Label) != 0 { for _, label := range metric.Label { @@ -101,11 +103,11 @@ func (p *openmetricEventGenerator) GenerateOpenMetricsEvents(mf *p.OpenMetricFam if metric.Exemplar != nil { exemplars = mapstr.M{*mn: metric.Exemplar.Value} if metric.Exemplar.HasTs { - exemplars.Put("timestamp", metric.Exemplar.Ts) + _, _ = exemplars.Put("timestamp", metric.Exemplar.Ts) } for _, label := range metric.Exemplar.Labels { if label.Name != "" && label.Value != "" { - exemplars.Put("labels."+label.Name, label.Value) + _, _ = exemplars.Put("labels."+label.Name, label.Value) } } } @@ -215,6 +217,7 @@ func (p *openmetricEventGenerator) GenerateOpenMetricsEvents(mf *p.OpenMetricFam if !math.IsNaN(histogram.GetSampleSum()) && !math.IsInf(histogram.GetSampleSum(), 0) { var sum = "_sum" var count = "_count" + _, _ = sum, count // skip noisy linter var typ = textparse.MetricTypeHistogram if histogram.IsGaugeHistogram { sum = "_gsum" @@ -244,11 +247,11 @@ func (p *openmetricEventGenerator) GenerateOpenMetricsEvents(mf *p.OpenMetricFam if bucket.Exemplar != nil { exemplars = mapstr.M{name: bucket.Exemplar.Value} if bucket.Exemplar.HasTs { - exemplars.Put("timestamp", bucket.Exemplar.Ts) + _, _ = exemplars.Put("timestamp", bucket.Exemplar.Ts) } for _, label := range bucket.Exemplar.Labels { if label.Name != "" && label.Value != "" { - exemplars.Put("labels."+label.Name, label.Value) + _, _ = exemplars.Put("labels."+label.Name, label.Value) } } } diff --git a/metricbeat/module/prometheus/collector/_meta/data.json b/metricbeat/module/prometheus/collector/_meta/data.json index dba9f7771c4..a46b63c74fe 100644 --- a/metricbeat/module/prometheus/collector/_meta/data.json +++ b/metricbeat/module/prometheus/collector/_meta/data.json @@ -11,12 +11,10 @@ }, "prometheus": { "labels": { - "job": "prometheus", - "listener_name": "http" + "job": "prometheus" }, "metrics": { - "net_conntrack_listener_conn_accepted_total": 3, - "net_conntrack_listener_conn_closed_total": 0 + "up": 1 } }, "service": { diff --git a/metricbeat/module/prometheus/collector/_meta/testdata/docs.plain-expected.json b/metricbeat/module/prometheus/collector/_meta/testdata/docs.plain-expected.json index b70af18613e..0abc4c113cc 100644 --- a/metricbeat/module/prometheus/collector/_meta/testdata/docs.plain-expected.json +++ b/metricbeat/module/prometheus/collector/_meta/testdata/docs.plain-expected.json @@ -11,7 +11,7 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55922", + "instance": "127.0.0.1:33379", "job": "prometheus" }, "metrics": { @@ -35,7 +35,7 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55922", + "instance": "127.0.0.1:33379", "job": "prometheus", "listener_name": "http" }, diff --git a/metricbeat/module/prometheus/collector/_meta/testdata/duplicate-metrics.plain b/metricbeat/module/prometheus/collector/_meta/testdata/duplicate-metrics.plain new file mode 100644 index 00000000000..135137f978e --- /dev/null +++ b/metricbeat/module/prometheus/collector/_meta/testdata/duplicate-metrics.plain @@ -0,0 +1,6 @@ +# TYPE base_gc_total_total counter +# HELP base_gc_total_total Displays the total number of collections that have occurred. This attribute lists -1 if the collection count is undefined for this collector. +base_gc_total_total{name="PS MarkSweep"} 4 +# TYPE base_gc_total_total counter +# HELP base_gc_total_total Displays the total number of collections that have occurred. This attribute lists -1 if the collection count is undefined for this collector. +base_gc_total_total{name="PS Scavenge"} 34 diff --git a/metricbeat/module/prometheus/collector/_meta/testdata/duplicate-metrics.plain-expected.json b/metricbeat/module/prometheus/collector/_meta/testdata/duplicate-metrics.plain-expected.json new file mode 100644 index 00000000000..bc547c74444 --- /dev/null +++ b/metricbeat/module/prometheus/collector/_meta/testdata/duplicate-metrics.plain-expected.json @@ -0,0 +1,76 @@ +[ + { + "event": { + "dataset": "prometheus.collector", + "duration": 115000, + "module": "prometheus" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "prometheus": { + "labels": { + "instance": "127.0.0.1:44753", + "job": "prometheus", + "name": "PS MarkSweep" + }, + "metrics": { + "base_gc_total_total": 4 + } + }, + "service": { + "address": "127.0.0.1:55555", + "type": "prometheus" + } + }, + { + "event": { + "dataset": "prometheus.collector", + "duration": 115000, + "module": "prometheus" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "prometheus": { + "labels": { + "instance": "127.0.0.1:44753", + "job": "prometheus", + "name": "PS Scavenge" + }, + "metrics": { + "base_gc_total_total": 34 + } + }, + "service": { + "address": "127.0.0.1:55555", + "type": "prometheus" + } + }, + { + "event": { + "dataset": "prometheus.collector", + "duration": 115000, + "module": "prometheus" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "prometheus": { + "labels": { + "instance": "127.0.0.1:44753", + "job": "prometheus" + }, + "metrics": { + "up": 1 + } + }, + "service": { + "address": "127.0.0.1:55555", + "type": "prometheus" + } + } +] \ No newline at end of file diff --git a/metricbeat/module/prometheus/collector/_meta/testdata/etcd-3.3.10-partial.plain-expected.json b/metricbeat/module/prometheus/collector/_meta/testdata/etcd-3.3.10-partial.plain-expected.json index 58ef5fff2e7..f9384edc0d7 100644 --- a/metricbeat/module/prometheus/collector/_meta/testdata/etcd-3.3.10-partial.plain-expected.json +++ b/metricbeat/module/prometheus/collector/_meta/testdata/etcd-3.3.10-partial.plain-expected.json @@ -11,12 +11,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55924", + "instance": "127.0.0.1:42999", "job": "prometheus", - "version": "go1.10.4" + "server_version": "3.3.10" }, "metrics": { - "go_info": 1 + "etcd_server_version": 1 } }, "service": { @@ -36,7 +36,7 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55924", + "instance": "127.0.0.1:42999", "job": "prometheus", "server_go_version": "go1.10.4" }, @@ -61,12 +61,11 @@ }, "prometheus": { "labels": { - "action": "create", - "instance": "127.0.0.1:55924", + "instance": "127.0.0.1:42999", "job": "prometheus" }, "metrics": { - "etcd_debugging_store_writes_total": 1 + "up": 1 } }, "service": { @@ -86,11 +85,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55924", + "action": "getRecursive", + "instance": "127.0.0.1:42999", "job": "prometheus" }, "metrics": { - "up": 1 + "etcd_debugging_store_reads_total": 1 } }, "service": { @@ -110,12 +110,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55924", - "job": "prometheus", - "server_id": "8e9e05c52164694d" + "action": "create", + "instance": "127.0.0.1:42999", + "job": "prometheus" }, "metrics": { - "etcd_server_id": 1 + "etcd_debugging_store_writes_total": 1 } }, "service": { @@ -135,12 +135,12 @@ }, "prometheus": { "labels": { - "action": "set", - "instance": "127.0.0.1:55924", - "job": "prometheus" + "instance": "127.0.0.1:42999", + "job": "prometheus", + "version": "go1.10.4" }, "metrics": { - "etcd_debugging_store_writes_total": 2 + "go_info": 1 } }, "service": { @@ -160,12 +160,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55924", + "instance": "127.0.0.1:42999", "job": "prometheus", - "server_version": "3.3.10" + "server_id": "8e9e05c52164694d" }, "metrics": { - "etcd_server_version": 1 + "etcd_server_id": 1 } }, "service": { @@ -185,12 +185,12 @@ }, "prometheus": { "labels": { - "action": "getRecursive", - "instance": "127.0.0.1:55924", + "action": "set", + "instance": "127.0.0.1:42999", "job": "prometheus" }, "metrics": { - "etcd_debugging_store_reads_total": 1 + "etcd_debugging_store_writes_total": 2 } }, "service": { @@ -210,7 +210,7 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55924", + "instance": "127.0.0.1:42999", "job": "prometheus" }, "metrics": { diff --git a/metricbeat/module/prometheus/collector/_meta/testdata/metrics-with-naninf.plain-expected.json b/metricbeat/module/prometheus/collector/_meta/testdata/metrics-with-naninf.plain-expected.json index 6e1423be845..82430fe6cde 100644 --- a/metricbeat/module/prometheus/collector/_meta/testdata/metrics-with-naninf.plain-expected.json +++ b/metricbeat/module/prometheus/collector/_meta/testdata/metrics-with-naninf.plain-expected.json @@ -11,7 +11,7 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55926", + "instance": "127.0.0.1:41287", "job": "prometheus", "listener_name": "http" }, @@ -36,14 +36,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55926", - "job": "prometheus" + "instance": "127.0.0.1:41287", + "job": "prometheus", + "method": "GET" }, "metrics": { - "go_gc_duration_seconds_count": 13118, - "go_gc_duration_seconds_sum": 3.451780079, - "http_request_duration_seconds_count": 3, - "http_request_duration_seconds_sum": 6 + "http_failures": 2 } }, "service": { @@ -63,12 +61,12 @@ }, "prometheus": { "labels": { - "client_id": "consumer4", - "instance": "127.0.0.1:55926", - "job": "prometheus" + "instance": "127.0.0.1:41287", + "job": "prometheus", + "quantile": "0.75" }, "metrics": { - "kafka_consumer_records_lag_records": 5 + "go_gc_duration_seconds": 0.000098154 } }, "service": { @@ -88,12 +86,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55926", + "instance": "127.0.0.1:41287", "job": "prometheus", - "quantile": "1" + "le": "2" }, "metrics": { - "go_gc_duration_seconds": 0.011689149 + "http_request_duration_seconds_bucket": 2 } }, "service": { @@ -113,7 +111,7 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55926", + "instance": "127.0.0.1:41287", "job": "prometheus", "le": "5" }, @@ -138,12 +136,14 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55926", - "job": "prometheus", - "le": "1" + "instance": "127.0.0.1:41287", + "job": "prometheus" }, "metrics": { - "http_request_duration_seconds_bucket": 1 + "go_gc_duration_seconds_count": 13118, + "go_gc_duration_seconds_sum": 3.451780079, + "http_request_duration_seconds_count": 3, + "http_request_duration_seconds_sum": 6 } }, "service": { @@ -163,12 +163,11 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55926", - "job": "prometheus", - "le": "2" + "instance": "127.0.0.1:41287", + "job": "prometheus" }, "metrics": { - "http_request_duration_seconds_bucket": 2 + "up": 1 } }, "service": { @@ -188,11 +187,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55926", - "job": "prometheus" + "instance": "127.0.0.1:41287", + "job": "prometheus", + "le": "+Inf" }, "metrics": { - "up": 1 + "http_request_duration_seconds_bucket": 3 } }, "service": { @@ -212,12 +212,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55926", + "instance": "127.0.0.1:41287", "job": "prometheus", - "le": "+Inf" + "le": "1" }, "metrics": { - "http_request_duration_seconds_bucket": 3 + "http_request_duration_seconds_bucket": 1 } }, "service": { @@ -237,12 +237,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55926", - "job": "prometheus", - "method": "GET" + "client_id": "consumer4", + "instance": "127.0.0.1:41287", + "job": "prometheus" }, "metrics": { - "http_failures": 2 + "kafka_consumer_records_lag_records": 5 } }, "service": { @@ -262,12 +262,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55926", + "instance": "127.0.0.1:41287", "job": "prometheus", - "quantile": "0.75" + "le": "3" }, "metrics": { - "go_gc_duration_seconds": 0.000098154 + "http_request_duration_seconds_bucket": 3 } }, "service": { @@ -287,12 +287,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55926", + "instance": "127.0.0.1:41287", "job": "prometheus", - "le": "3" + "quantile": "1" }, "metrics": { - "http_request_duration_seconds_bucket": 3 + "go_gc_duration_seconds": 0.011689149 } }, "service": { diff --git a/metricbeat/module/prometheus/collector/_meta/testdata/prometheus-2.6.0-partial.plain-expected.json b/metricbeat/module/prometheus/collector/_meta/testdata/prometheus-2.6.0-partial.plain-expected.json index ee0abcba74e..325ad853c95 100644 --- a/metricbeat/module/prometheus/collector/_meta/testdata/prometheus-2.6.0-partial.plain-expected.json +++ b/metricbeat/module/prometheus/collector/_meta/testdata/prometheus-2.6.0-partial.plain-expected.json @@ -11,12 +11,46 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55928", - "job": "prometheus", - "quantile": "0.5" + "instance": "127.0.0.1:39775", + "job": "prometheus" }, "metrics": { - "go_gc_duration_seconds": 0.000060618 + "go_gc_duration_seconds_count": 4, + "go_gc_duration_seconds_sum": 0.004534198, + "go_goroutines": 35, + "go_memstats_alloc_bytes": 10558112, + "go_memstats_alloc_bytes_total": 14087760, + "go_memstats_buck_hash_sys_bytes": 1447018, + "go_memstats_frees_total": 15673, + "go_memstats_gc_cpu_fraction": 0.0008429952574435172, + "go_memstats_gc_sys_bytes": 2379776, + "go_memstats_heap_alloc_bytes": 10558112, + "go_memstats_heap_idle_bytes": 54042624, + "go_memstats_heap_inuse_bytes": 12214272, + "go_memstats_heap_objects": 61771, + "go_memstats_heap_released_bytes": 0, + "go_memstats_heap_sys_bytes": 66256896, + "go_memstats_last_gc_time_seconds": 1553430316.1488917, + "go_memstats_lookups_total": 0, + "go_memstats_mallocs_total": 77444, + "go_memstats_mcache_inuse_bytes": 6912, + "go_memstats_mcache_sys_bytes": 16384, + "go_memstats_mspan_inuse_bytes": 127984, + "go_memstats_mspan_sys_bytes": 131072, + "go_memstats_next_gc_bytes": 18390112, + "go_memstats_other_sys_bytes": 1201294, + "go_memstats_stack_inuse_bytes": 851968, + "go_memstats_stack_sys_bytes": 851968, + "go_memstats_sys_bytes": 72284408, + "go_threads": 14, + "process_cpu_seconds_total": 0.14, + "process_max_fds": 1048576, + "process_open_fds": 13, + "process_resident_memory_bytes": 35934208, + "process_start_time_seconds": 1553430305.4, + "process_virtual_memory_bytes": 150646784, + "process_virtual_memory_max_bytes": -1, + "prometheus_api_remote_read_queries": 0 } }, "service": { @@ -36,11 +70,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55928", - "job": "prometheus" + "instance": "127.0.0.1:39775", + "job": "prometheus", + "quantile": "0" }, "metrics": { - "up": 1 + "go_gc_duration_seconds": 0.000038386 } }, "service": { @@ -60,8 +95,8 @@ }, "prometheus": { "labels": { - "dialer_name": "alertmanager", - "instance": "127.0.0.1:55928", + "dialer_name": "default", + "instance": "127.0.0.1:39775", "job": "prometheus" }, "metrics": { @@ -87,12 +122,11 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55928", - "job": "prometheus", - "quantile": "0" + "instance": "127.0.0.1:39775", + "job": "prometheus" }, "metrics": { - "go_gc_duration_seconds": 0.000038386 + "up": 1 } }, "service": { @@ -112,12 +146,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55928", + "instance": "127.0.0.1:39775", "job": "prometheus", - "quantile": "1" + "version": "go1.11.3" }, "metrics": { - "go_gc_duration_seconds": 0.004392391 + "go_info": 1 } }, "service": { @@ -137,46 +171,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55928", - "job": "prometheus" + "instance": "127.0.0.1:39775", + "job": "prometheus", + "quantile": "0.75" }, "metrics": { - "go_gc_duration_seconds_count": 4, - "go_gc_duration_seconds_sum": 0.004534198, - "go_goroutines": 35, - "go_memstats_alloc_bytes": 10558112, - "go_memstats_alloc_bytes_total": 14087760, - "go_memstats_buck_hash_sys_bytes": 1447018, - "go_memstats_frees_total": 15673, - "go_memstats_gc_cpu_fraction": 0.0008429952574435172, - "go_memstats_gc_sys_bytes": 2379776, - "go_memstats_heap_alloc_bytes": 10558112, - "go_memstats_heap_idle_bytes": 54042624, - "go_memstats_heap_inuse_bytes": 12214272, - "go_memstats_heap_objects": 61771, - "go_memstats_heap_released_bytes": 0, - "go_memstats_heap_sys_bytes": 66256896, - "go_memstats_last_gc_time_seconds": 1553430316.1488917, - "go_memstats_lookups_total": 0, - "go_memstats_mallocs_total": 77444, - "go_memstats_mcache_inuse_bytes": 6912, - "go_memstats_mcache_sys_bytes": 16384, - "go_memstats_mspan_inuse_bytes": 127984, - "go_memstats_mspan_sys_bytes": 131072, - "go_memstats_next_gc_bytes": 18390112, - "go_memstats_other_sys_bytes": 1201294, - "go_memstats_stack_inuse_bytes": 851968, - "go_memstats_stack_sys_bytes": 851968, - "go_memstats_sys_bytes": 72284408, - "go_threads": 14, - "process_cpu_seconds_total": 0.14, - "process_max_fds": 1048576, - "process_open_fds": 13, - "process_resident_memory_bytes": 35934208, - "process_start_time_seconds": 1553430305.4, - "process_virtual_memory_bytes": 150646784, - "process_virtual_memory_max_bytes": -1, - "prometheus_api_remote_read_queries": 0 + "go_gc_duration_seconds": 0.004392391 } }, "service": { @@ -196,14 +196,13 @@ }, "prometheus": { "labels": { - "dialer_name": "prometheus", - "instance": "127.0.0.1:55928", - "job": "prometheus" + "instance": "127.0.0.1:39775", + "job": "prometheus", + "listener_name": "http" }, "metrics": { - "net_conntrack_dialer_conn_attempted_total": 1, - "net_conntrack_dialer_conn_closed_total": 0, - "net_conntrack_dialer_conn_established_total": 1 + "net_conntrack_listener_conn_accepted_total": 3, + "net_conntrack_listener_conn_closed_total": 0 } }, "service": { @@ -223,12 +222,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55928", + "instance": "127.0.0.1:39775", "job": "prometheus", - "quantile": "0.25" + "quantile": "1" }, "metrics": { - "go_gc_duration_seconds": 0.000042803 + "go_gc_duration_seconds": 0.004392391 } }, "service": { @@ -248,13 +247,14 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55928", - "job": "prometheus", - "listener_name": "http" + "dialer_name": "prometheus", + "instance": "127.0.0.1:39775", + "job": "prometheus" }, "metrics": { - "net_conntrack_listener_conn_accepted_total": 3, - "net_conntrack_listener_conn_closed_total": 0 + "net_conntrack_dialer_conn_attempted_total": 1, + "net_conntrack_dialer_conn_closed_total": 0, + "net_conntrack_dialer_conn_established_total": 1 } }, "service": { @@ -274,12 +274,12 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55928", + "instance": "127.0.0.1:39775", "job": "prometheus", - "version": "go1.11.3" + "quantile": "0.25" }, "metrics": { - "go_info": 1 + "go_gc_duration_seconds": 0.000042803 } }, "service": { @@ -299,12 +299,14 @@ }, "prometheus": { "labels": { - "instance": "127.0.0.1:55928", - "job": "prometheus", - "quantile": "0.75" + "dialer_name": "alertmanager", + "instance": "127.0.0.1:39775", + "job": "prometheus" }, "metrics": { - "go_gc_duration_seconds": 0.004392391 + "net_conntrack_dialer_conn_attempted_total": 0, + "net_conntrack_dialer_conn_closed_total": 0, + "net_conntrack_dialer_conn_established_total": 0 } }, "service": { @@ -324,14 +326,12 @@ }, "prometheus": { "labels": { - "dialer_name": "default", - "instance": "127.0.0.1:55928", - "job": "prometheus" + "instance": "127.0.0.1:39775", + "job": "prometheus", + "quantile": "0.5" }, "metrics": { - "net_conntrack_dialer_conn_attempted_total": 0, - "net_conntrack_dialer_conn_closed_total": 0, - "net_conntrack_dialer_conn_established_total": 0 + "go_gc_duration_seconds": 0.000060618 } }, "service": { diff --git a/metricbeat/module/prometheus/collector/collector.go b/metricbeat/module/prometheus/collector/collector.go index 69a91a7618a..dd3cfc7be5d 100644 --- a/metricbeat/module/prometheus/collector/collector.go +++ b/metricbeat/module/prometheus/collector/collector.go @@ -18,10 +18,11 @@ package collector import ( + "fmt" "regexp" - "github.com/pkg/errors" - dto "github.com/prometheus/client_model/go" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" @@ -43,7 +44,7 @@ var ( }.Build() upMetricName = "up" - upMetricType = dto.MetricType_GAUGE + upMetricType = textparse.MetricTypeGauge upMetricInstanceLabel = "instance" upMetricJobLabel = "job" upMetricJobValue = "prometheus" @@ -63,7 +64,7 @@ type PromEventsGenerator interface { Start() // GeneratePromEvents converts a Prometheus metric family into a list of PromEvents - GeneratePromEvents(mf *dto.MetricFamily) []PromEvent + GeneratePromEvents(mf *p.MetricFamily) []PromEvent // Stop must be called when the generator won't be used anymore Stop() @@ -113,11 +114,11 @@ func MetricSetBuilder(namespace string, genFactory PromEventsGeneratorFactory) f ms.host = ms.Host() ms.excludeMetrics, err = p.CompilePatternList(config.MetricsFilters.ExcludeMetrics) if err != nil { - return nil, errors.Wrapf(err, "unable to compile exclude patterns") + return nil, fmt.Errorf("unable to compile exclude patterns: %w", err) } ms.includeMetrics, err = p.CompilePatternList(config.MetricsFilters.IncludeMetrics) if err != nil { - return nil, errors.Wrapf(err, "unable to compile include patterns") + return nil, fmt.Errorf("unable to compile include patterns: %w", err) } return ms, nil @@ -138,7 +139,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { families = append(families, m.upMetricFamily(0.0)) // set the error to report it after sending the up event - err = errors.Wrap(err, "unable to decode response from prometheus endpoint") + err = fmt.Errorf("unable to decode response from prometheus endpoint: %w", err) } else { // add up event to the list families = append(families, m.upMetricFamily(1.0)) @@ -157,11 +158,11 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { // Add default instance label if not already there if exists, _ := promEvent.Labels.HasKey(upMetricInstanceLabel); !exists { - promEvent.Labels.Put(upMetricInstanceLabel, m.Host()) + _, _ = promEvent.Labels.Put(upMetricInstanceLabel, m.Host()) } // Add default job label if not already there if exists, _ := promEvent.Labels.HasKey("job"); !exists { - promEvent.Labels.Put("job", m.Module().Name()) + _, _ = promEvent.Labels.Put("job", m.Module().Name()) } // Add labels if len(promEvent.Labels) > 0 { @@ -195,30 +196,30 @@ func (m *MetricSet) Close() error { return nil } -func (m *MetricSet) upMetricFamily(value float64) *dto.MetricFamily { - gauge := dto.Gauge{ +func (m *MetricSet) upMetricFamily(value float64) *p.MetricFamily { + gauge := p.Gauge{ Value: &value, } - label1 := dto.LabelPair{ - Name: &upMetricInstanceLabel, - Value: &m.host, + label1 := labels.Label{ + Name: upMetricInstanceLabel, + Value: m.host, } - label2 := dto.LabelPair{ - Name: &upMetricJobLabel, - Value: &upMetricJobValue, + label2 := labels.Label{ + Name: upMetricJobLabel, + Value: upMetricJobValue, } - metric := dto.Metric{ + metric := p.OpenMetric{ Gauge: &gauge, - Label: []*dto.LabelPair{&label1, &label2}, + Label: []*labels.Label{&label1, &label2}, } - return &dto.MetricFamily{ + return &p.MetricFamily{ Name: &upMetricName, - Type: &upMetricType, - Metric: []*dto.Metric{&metric}, + Type: upMetricType, + Metric: []*p.OpenMetric{&metric}, } } -func (m *MetricSet) skipFamily(family *dto.MetricFamily) bool { +func (m *MetricSet) skipFamily(family *p.MetricFamily) bool { if family == nil { return false } diff --git a/metricbeat/module/prometheus/collector/collector_test.go b/metricbeat/module/prometheus/collector/collector_test.go index 4ccea353559..25648b46840 100644 --- a/metricbeat/module/prometheus/collector/collector_test.go +++ b/metricbeat/module/prometheus/collector/collector_test.go @@ -26,9 +26,10 @@ import ( "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/golang/protobuf/proto" - dto "github.com/prometheus/client_model/go" + pl "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" @@ -41,23 +42,23 @@ func TestGetPromEventsFromMetricFamily(t *testing.T) { "handler": "query", } tests := []struct { - Family *dto.MetricFamily + Family *p.MetricFamily Event []PromEvent }{ { - Family: &dto.MetricFamily{ + Family: &p.MetricFamily{ Name: proto.String("http_request_duration_microseconds"), Help: proto.String("foo"), - Type: dto.MetricType_COUNTER.Enum(), - Metric: []*dto.Metric{ + Type: textparse.MetricTypeCounter, + Metric: []*p.OpenMetric{ { - Label: []*dto.LabelPair{ + Label: []*pl.Label{ { - Name: proto.String("handler"), - Value: proto.String("query"), + Name: "handler", + Value: "query", }, }, - Counter: &dto.Counter{ + Counter: &p.Counter{ Value: proto.Float64(10), }, }, @@ -75,13 +76,13 @@ func TestGetPromEventsFromMetricFamily(t *testing.T) { }, }, { - Family: &dto.MetricFamily{ + Family: &p.MetricFamily{ Name: proto.String("http_request_duration_microseconds"), Help: proto.String("foo"), - Type: dto.MetricType_GAUGE.Enum(), - Metric: []*dto.Metric{ + Type: textparse.MetricTypeGauge, + Metric: []*p.OpenMetric{ { - Gauge: &dto.Gauge{ + Gauge: &p.Gauge{ Value: proto.Float64(10), }, }, @@ -99,16 +100,16 @@ func TestGetPromEventsFromMetricFamily(t *testing.T) { }, }, { - Family: &dto.MetricFamily{ + Family: &p.MetricFamily{ Name: proto.String("http_request_duration_microseconds"), Help: proto.String("foo"), - Type: dto.MetricType_SUMMARY.Enum(), - Metric: []*dto.Metric{ + Type: textparse.MetricTypeSummary, + Metric: []*p.OpenMetric{ { - Summary: &dto.Summary{ + Summary: &p.Summary{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Quantile: []*dto.Quantile{ + Quantile: []*p.Quantile{ { Quantile: proto.Float64(0.99), Value: proto.Float64(10), @@ -141,16 +142,16 @@ func TestGetPromEventsFromMetricFamily(t *testing.T) { }, }, { - Family: &dto.MetricFamily{ + Family: &p.MetricFamily{ Name: proto.String("http_request_duration_microseconds"), Help: proto.String("foo"), - Type: dto.MetricType_HISTOGRAM.Enum(), - Metric: []*dto.Metric{ + Type: textparse.MetricTypeHistogram, + Metric: []*p.OpenMetric{ { - Histogram: &dto.Histogram{ + Histogram: &p.Histogram{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.99), CumulativeCount: proto.Uint64(10), @@ -181,19 +182,19 @@ func TestGetPromEventsFromMetricFamily(t *testing.T) { }, }, { - Family: &dto.MetricFamily{ + Family: &p.MetricFamily{ Name: proto.String("http_request_duration_microseconds"), Help: proto.String("foo"), - Type: dto.MetricType_UNTYPED.Enum(), - Metric: []*dto.Metric{ + Type: textparse.MetricTypeUnknown, + Metric: []*p.OpenMetric{ { - Label: []*dto.LabelPair{ + Label: []*pl.Label{ { - Name: proto.String("handler"), - Value: proto.String("query"), + Name: "handler", + Value: "query", }, }, - Untyped: &dto.Untyped{ + Unknown: &p.Unknown{ Value: proto.Float64(10), }, }, @@ -220,20 +221,20 @@ func TestGetPromEventsFromMetricFamily(t *testing.T) { } func TestSkipMetricFamily(t *testing.T) { - testFamilies := []*dto.MetricFamily{ + testFamilies := []*p.MetricFamily{ { Name: proto.String("http_request_duration_microseconds_a_a_in"), Help: proto.String("foo"), - Type: dto.MetricType_COUNTER.Enum(), - Metric: []*dto.Metric{ + Type: textparse.MetricTypeCounter, + Metric: []*p.OpenMetric{ { - Label: []*dto.LabelPair{ + Label: []*pl.Label{ { - Name: proto.String("handler"), - Value: proto.String("query"), + Name: "handler", + Value: "query", }, }, - Counter: &dto.Counter{ + Counter: &p.Counter{ Value: proto.Float64(10), }, }, @@ -242,16 +243,16 @@ func TestSkipMetricFamily(t *testing.T) { { Name: proto.String("http_request_duration_microseconds_a_b_in"), Help: proto.String("foo"), - Type: dto.MetricType_COUNTER.Enum(), - Metric: []*dto.Metric{ + Type: textparse.MetricTypeCounter, + Metric: []*p.OpenMetric{ { - Label: []*dto.LabelPair{ + Label: []*pl.Label{ { - Name: proto.String("handler"), - Value: proto.String("query"), + Name: "handler", + Value: "query", }, }, - Counter: &dto.Counter{ + Counter: &p.Counter{ Value: proto.Float64(10), }, }, @@ -260,10 +261,10 @@ func TestSkipMetricFamily(t *testing.T) { { Name: proto.String("http_request_duration_microseconds_b_in"), Help: proto.String("foo"), - Type: dto.MetricType_GAUGE.Enum(), - Metric: []*dto.Metric{ + Type: textparse.MetricTypeGauge, + Metric: []*p.OpenMetric{ { - Gauge: &dto.Gauge{ + Gauge: &p.Gauge{ Value: proto.Float64(10), }, }, @@ -272,13 +273,13 @@ func TestSkipMetricFamily(t *testing.T) { { Name: proto.String("http_request_duration_microseconds_c_in"), Help: proto.String("foo"), - Type: dto.MetricType_SUMMARY.Enum(), - Metric: []*dto.Metric{ + Type: textparse.MetricTypeSummary, + Metric: []*p.OpenMetric{ { - Summary: &dto.Summary{ + Summary: &p.Summary{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Quantile: []*dto.Quantile{ + Quantile: []*p.Quantile{ { Quantile: proto.Float64(0.99), Value: proto.Float64(10), @@ -291,13 +292,13 @@ func TestSkipMetricFamily(t *testing.T) { { Name: proto.String("http_request_duration_microseconds_d_in"), Help: proto.String("foo"), - Type: dto.MetricType_HISTOGRAM.Enum(), - Metric: []*dto.Metric{ + Type: textparse.MetricTypeHistogram, + Metric: []*p.OpenMetric{ { - Histogram: &dto.Histogram{ + Histogram: &p.Histogram{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.99), CumulativeCount: proto.Uint64(10), @@ -310,16 +311,16 @@ func TestSkipMetricFamily(t *testing.T) { { Name: proto.String("http_request_duration_microseconds_e_in"), Help: proto.String("foo"), - Type: dto.MetricType_UNTYPED.Enum(), - Metric: []*dto.Metric{ + Type: textparse.MetricTypeUnknown, + Metric: []*p.OpenMetric{ { - Label: []*dto.LabelPair{ + Label: []*pl.Label{ { - Name: proto.String("handler"), - Value: proto.String("query"), + Name: "handler", + Value: "query", }, }, - Untyped: &dto.Untyped{ + Unknown: &p.Unknown{ Value: proto.Float64(10), }, }, diff --git a/metricbeat/module/prometheus/collector/data.go b/metricbeat/module/prometheus/collector/data.go index aed18a21f5f..b9934bc2262 100644 --- a/metricbeat/module/prometheus/collector/data.go +++ b/metricbeat/module/prometheus/collector/data.go @@ -25,7 +25,7 @@ import ( "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/elastic-agent-libs/mapstr" - dto "github.com/prometheus/client_model/go" + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" ) // PromEvent stores a set of one or more metrics with the same labels @@ -51,18 +51,19 @@ func (p *promEventGenerator) Stop() {} // GeneratePromEvents DefaultPromEventsGenerator stores all Prometheus metrics using // only double field type in Elasticsearch. -func (p *promEventGenerator) GeneratePromEvents(mf *dto.MetricFamily) []PromEvent { +func (p *promEventGenerator) GeneratePromEvents(mf *p.MetricFamily) []PromEvent { var events []PromEvent name := *mf.Name + _ = name // skip noisy linter metrics := mf.Metric for _, metric := range metrics { labels := mapstr.M{} if len(metric.Label) != 0 { for _, label := range metric.Label { - if label.GetName() != "" && label.GetValue() != "" { - labels[label.GetName()] = label.GetValue() + if label.Name != "" && label.Value != "" { + labels[label.Name] = label.Value } } } @@ -160,7 +161,7 @@ func (p *promEventGenerator) GeneratePromEvents(mf *dto.MetricFamily) []PromEven } } - untyped := metric.GetUntyped() + untyped := metric.GetUnknown() if untyped != nil { if !math.IsNaN(untyped.GetValue()) && !math.IsInf(untyped.GetValue(), 0) { events = append(events, PromEvent{ diff --git a/metricbeat/module/prometheus/query/data.go b/metricbeat/module/prometheus/query/data.go index 1a660984007..298f1efd22d 100644 --- a/metricbeat/module/prometheus/query/data.go +++ b/metricbeat/module/prometheus/query/data.go @@ -24,8 +24,6 @@ import ( "strconv" "time" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -51,12 +49,14 @@ type arrayData struct { // InstantVectorResponse is for "vector" type from Prometheus Query API Request // instantVectorResult format: // [ -// { -// "metric": { "": "", ... }, -// "value": [ , "" ] -// }, -// ... -//] +// +// { +// "metric": { "": "", ... }, +// "value": [ , "" ] +// }, +// ... +// +// ] type InstantVectorResponse struct { Status string `json:"status"` Data instantVectorData `json:"data"` @@ -72,12 +72,14 @@ type instantVectorResult struct { // RangeVectorResponse is for "vector" type from Prometheus Query API Request // rangeVectorResult format: // [ -// { -// "metric": { "": "", ... }, -// "values": [ [ , "" ], ... ] -// }, -// ... -//] +// +// { +// "metric": { "": "", ... }, +// "values": [ [ , "" ], ... ] +// }, +// ... +// +// ] type RangeVectorResponse struct { Status string `json:"status"` Data rangeVectorData `json:"data"` @@ -119,7 +121,7 @@ func parseResponse(body []byte, pathConfig QueryConfig) ([]mb.Event, error) { events = append(events, evnts...) default: msg := fmt.Sprintf("Unknown resultType '%v'", resultType) - return events, errors.New(msg) + return events, fmt.Errorf(msg) } return events, nil } @@ -153,7 +155,7 @@ func getEventsFromMatrix(body []byte, queryName string) ([]mb.Event, error) { }, }) } else { - return []mb.Event{}, errors.New("Could not parse results") + return []mb.Event{}, fmt.Errorf("Could not parse results") } } } @@ -188,7 +190,7 @@ func getEventsFromVector(body []byte, queryName string) ([]mb.Event, error) { }, }) } else { - return []mb.Event{}, errors.New("Could not parse results") + return []mb.Event{}, fmt.Errorf("Could not parse results") } } return events, nil @@ -222,7 +224,7 @@ func getEventFromScalarOrString(body []byte, resultType string, queryName string value, ok := convertedArray.Data.Results[1].(string) if !ok { msg := fmt.Sprintf("Could not parse value of result: %v", convertedArray.Data.Results) - return mb.Event{}, errors.New(msg) + return mb.Event{}, fmt.Errorf(msg) } return mb.Event{ Timestamp: getTimestamp(timestamp), @@ -237,18 +239,18 @@ func getEventFromScalarOrString(body []byte, resultType string, queryName string }, nil } } - return mb.Event{}, errors.New("Could not parse results") + return mb.Event{}, fmt.Errorf("could not parse results") } func getTimestampFromVector(vector []interface{}) (float64, error) { // Example input: [ , "" ] if len(vector) != 2 { - return 0, errors.New("Could not parse results") + return 0, fmt.Errorf("could not parse results") } timestamp, ok := vector[0].(float64) if !ok { msg := fmt.Sprintf("Could not parse timestamp of result: %v", vector) - return 0, errors.New(msg) + return 0, fmt.Errorf(msg) } return timestamp, nil } @@ -256,17 +258,17 @@ func getTimestampFromVector(vector []interface{}) (float64, error) { func getValueFromVector(vector []interface{}) (float64, error) { // Example input: [ , "" ] if len(vector) != 2 { - return 0, errors.New("Could not parse results") + return 0, fmt.Errorf("could not parse results") } value, ok := vector[1].(string) if !ok { msg := fmt.Sprintf("Could not parse value of result: %v", vector) - return 0, errors.New(msg) + return 0, fmt.Errorf(msg) } val, err := strconv.ParseFloat(value, 64) if err != nil { msg := fmt.Sprintf("Could not parse value of result: %v", vector) - return 0, errors.New(msg) + return 0, fmt.Errorf(msg) } return val, nil } @@ -274,10 +276,10 @@ func getValueFromVector(vector []interface{}) (float64, error) { func getResultType(body []byte) (string, error) { response := Response{} if err := json.Unmarshal(body, &response); err != nil { - return "", errors.Wrap(err, "Failed to parse api response") + return "", fmt.Errorf("failed to parse api response: %w", err) } if response.Status == "error" { - return "", errors.Errorf("Failed to query") + return "", fmt.Errorf("failed to query") } return response.Data.ResultType, nil } @@ -285,10 +287,10 @@ func getResultType(body []byte) (string, error) { func convertJSONToArrayResponse(body []byte) (ArrayResponse, error) { arrayBody := ArrayResponse{} if err := json.Unmarshal(body, &arrayBody); err != nil { - return arrayBody, errors.Wrap(err, "Failed to parse api response") + return arrayBody, fmt.Errorf("failed to parse api response: %w", err) } if arrayBody.Status == "error" { - return arrayBody, errors.Errorf("Failed to query") + return arrayBody, fmt.Errorf("failed to query") } return arrayBody, nil } @@ -296,10 +298,10 @@ func convertJSONToArrayResponse(body []byte) (ArrayResponse, error) { func convertJSONToRangeVectorResponse(body []byte) (RangeVectorResponse, error) { mapBody := RangeVectorResponse{} if err := json.Unmarshal(body, &mapBody); err != nil { - return RangeVectorResponse{}, errors.Wrap(err, "Failed to parse api response") + return RangeVectorResponse{}, fmt.Errorf("failed to parse api response: %w", err) } if mapBody.Status == "error" { - return mapBody, errors.Errorf("Failed to query") + return mapBody, fmt.Errorf("failed to query") } return mapBody, nil } @@ -307,10 +309,10 @@ func convertJSONToRangeVectorResponse(body []byte) (RangeVectorResponse, error) func convertJSONToInstantVectorResponse(body []byte) (InstantVectorResponse, error) { mapBody := InstantVectorResponse{} if err := json.Unmarshal(body, &mapBody); err != nil { - return InstantVectorResponse{}, errors.Wrap(err, "Failed to parse api response") + return InstantVectorResponse{}, fmt.Errorf("failed to parse api response: %w", err) } if mapBody.Status == "error" { - return mapBody, errors.Errorf("Failed to query") + return mapBody, fmt.Errorf("failed to query") } return mapBody, nil } diff --git a/metricbeat/module/prometheus/query/query.go b/metricbeat/module/prometheus/query/query.go index 72013c66553..788ca9d50ac 100644 --- a/metricbeat/module/prometheus/query/query.go +++ b/metricbeat/module/prometheus/query/query.go @@ -18,9 +18,8 @@ package query import ( - "io/ioutil" - - "github.com/pkg/errors" + "fmt" + "io" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/mb" @@ -82,7 +81,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { m.http.SetURI(url) response, err := m.http.FetchResponse() if err != nil { - reporter.Error(errors.Wrapf(err, "unable to fetch data from prometheus endpoint: %v", url)) + reporter.Error(fmt.Errorf("unable to fetch data from prometheus endpoint %v: %w", url, err)) continue } defer func() { @@ -91,14 +90,14 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { } }() - body, err := ioutil.ReadAll(response.Body) + body, err := io.ReadAll(response.Body) if err != nil { return err } events, parseErr := parseResponse(body, pathConfig) if parseErr != nil { - reporter.Error(errors.Wrapf(parseErr, "error parsing response from: %v", url)) + reporter.Error(fmt.Errorf("error parsing response from %v: %w", url, parseErr)) continue } for _, e := range events { diff --git a/metricbeat/module/prometheus/query/query_test.go b/metricbeat/module/prometheus/query/query_test.go index 581063d4029..fe85f195a53 100644 --- a/metricbeat/module/prometheus/query/query_test.go +++ b/metricbeat/module/prometheus/query/query_test.go @@ -18,9 +18,9 @@ package query import ( - "io/ioutil" "net/http" "net/http/httptest" + "os" "path/filepath" "testing" @@ -39,11 +39,11 @@ func TestQueryFetchEventContentInstantVector(t *testing.T) { // }, // ... //] - response, _ := ioutil.ReadFile(absPath + "/querymetrics_instant_vector.json") + response, _ := os.ReadFile(absPath + "/querymetrics_instant_vector.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) + _, _ = w.Write(response) })) defer server.Close() @@ -65,7 +65,7 @@ func TestQueryFetchEventContentInstantVector(t *testing.T) { reporter := &mbtest.CapturingReporterV2{} metricSet := mbtest.NewReportingMetricSetV2Error(t, config) - metricSet.Fetch(reporter) + _ = metricSet.Fetch(reporter) events := reporter.GetEvents() if len(events) != 2 { @@ -88,11 +88,11 @@ func TestQueryFetchEventContentRangeVector(t *testing.T) { // }, // ... //] - response, _ := ioutil.ReadFile(absPath + "/querymetrics_range_vector.json") + response, _ := os.ReadFile(absPath + "/querymetrics_range_vector.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) + _, _ = w.Write(response) })) defer server.Close() @@ -117,7 +117,7 @@ func TestQueryFetchEventContentRangeVector(t *testing.T) { reporter := &mbtest.CapturingReporterV2{} metricSet := mbtest.NewReportingMetricSetV2Error(t, config) - metricSet.Fetch(reporter) + _ = metricSet.Fetch(reporter) events := reporter.GetEvents() if len(events) != 6 { @@ -134,11 +134,11 @@ func TestQueryFetchEventContentScalar(t *testing.T) { // test with response format like: //[ , "" ] - response, _ := ioutil.ReadFile(absPath + "/querymetrics_scalar.json") + response, _ := os.ReadFile(absPath + "/querymetrics_scalar.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) + _, _ = w.Write(response) })) defer server.Close() @@ -160,7 +160,7 @@ func TestQueryFetchEventContentScalar(t *testing.T) { reporter := &mbtest.CapturingReporterV2{} metricSet := mbtest.NewReportingMetricSetV2Error(t, config) - metricSet.Fetch(reporter) + _ = metricSet.Fetch(reporter) events := reporter.GetEvents() if len(events) != 1 { @@ -177,11 +177,11 @@ func TestQueryFetchEventContentString(t *testing.T) { // test with response format like: //[ , "" ] - response, _ := ioutil.ReadFile(absPath + "/querymetrics_string.json") + response, _ := os.ReadFile(absPath + "/querymetrics_string.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) w.Header().Set("Content-Type", "application/json;") - w.Write([]byte(response)) + _, _ = w.Write(response) })) defer server.Close() @@ -203,7 +203,7 @@ func TestQueryFetchEventContentString(t *testing.T) { reporter := &mbtest.CapturingReporterV2{} metricSet := mbtest.NewReportingMetricSetV2Error(t, config) - metricSet.Fetch(reporter) + _ = metricSet.Fetch(reporter) events := reporter.GetEvents() if len(events) != 1 { diff --git a/metricbeat/module/prometheus/remote_write/remote_write.go b/metricbeat/module/prometheus/remote_write/remote_write.go index 4dca6193761..5ab789aed46 100644 --- a/metricbeat/module/prometheus/remote_write/remote_write.go +++ b/metricbeat/module/prometheus/remote_write/remote_write.go @@ -18,7 +18,7 @@ package remote_write import ( - "io/ioutil" + "io" "net/http" "github.com/gogo/protobuf/proto" @@ -122,7 +122,7 @@ func MetricSetBuilder(genFactory RemoteWriteEventsGeneratorFactory) func(base mb func (m *MetricSet) Run(reporter mb.PushReporterV2) { // Start event watcher - m.server.Start() + _ = m.server.Start() for { select { @@ -149,7 +149,7 @@ func (m *MetricSet) handleFunc(writer http.ResponseWriter, req *http.Request) { m.eventGenStarted = true } - compressed, err := ioutil.ReadAll(req.Body) + compressed, err := io.ReadAll(req.Body) if err != nil { m.Logger().Errorf("Read error %v", err) http.Error(writer, err.Error(), http.StatusInternalServerError) diff --git a/x-pack/metricbeat/module/containerd/containerd.go b/x-pack/metricbeat/module/containerd/containerd.go index 2b0c1fd7e09..f7c5fabf205 100644 --- a/x-pack/metricbeat/module/containerd/containerd.go +++ b/x-pack/metricbeat/module/containerd/containerd.go @@ -5,12 +5,11 @@ package containerd import ( + "fmt" "sync" "time" "github.com/mitchellh/hashstructure" - "github.com/pkg/errors" - dto "github.com/prometheus/client_model/go" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" @@ -42,11 +41,11 @@ func init() { type Module interface { mb.Module - GetContainerdMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, time.Time, error) + GetContainerdMetricsFamilies(prometheus p.Prometheus) ([]*p.MetricFamily, time.Time, error) } type familiesCache struct { - sharedFamilies []*dto.MetricFamily + sharedFamilies []*p.MetricFamily lastFetchErr error lastFetchTimestamp time.Time } @@ -78,7 +77,7 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { return func(base mb.BaseModule) (mb.Module, error) { hash, err := generateCacheHash(base.Config().Hosts) if err != nil { - return nil, errors.Wrap(err, "error generating cache hash for containerdMetricsCache") + return nil, fmt.Errorf("error generating cache hash for containerdMetricsCache: %w", err) } m := module{ BaseModule: base, @@ -89,7 +88,7 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { } } -func (m *module) GetContainerdMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, time.Time, error) { +func (m *module) GetContainerdMetricsFamilies(prometheus p.Prometheus) ([]*p.MetricFamily, time.Time, error) { m.containerdMetricsCache.lock.Lock() defer m.containerdMetricsCache.lock.Unlock() diff --git a/x-pack/metricbeat/module/prometheus/collector/data.go b/x-pack/metricbeat/module/prometheus/collector/data.go index 3b7cc8de37b..f05d599f7af 100644 --- a/x-pack/metricbeat/module/prometheus/collector/data.go +++ b/x-pack/metricbeat/module/prometheus/collector/data.go @@ -8,13 +8,13 @@ import ( "math" "strconv" + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/module/prometheus/collector" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" - - dto "github.com/prometheus/client_model/go" ) func promEventsGeneratorFactory(base mb.BaseMetricSet) (collector.PromEventsGenerator, error) { @@ -61,7 +61,7 @@ func (g *typedGenerator) Stop() { // GeneratePromEvents stores all Prometheus metrics using // specific Elasticsearch data types. -func (g *typedGenerator) GeneratePromEvents(mf *dto.MetricFamily) []collector.PromEvent { +func (g *typedGenerator) GeneratePromEvents(mf *p.MetricFamily) []collector.PromEvent { var events []collector.PromEvent name := *mf.Name @@ -71,8 +71,8 @@ func (g *typedGenerator) GeneratePromEvents(mf *dto.MetricFamily) []collector.Pr if len(metric.Label) != 0 { for _, label := range metric.Label { - if label.GetName() != "" && label.GetValue() != "" { - labels[label.GetName()] = label.GetValue() + if label.Name != "" && label.Value != "" { + labels[label.Name] = label.Value } } } @@ -149,7 +149,7 @@ func (g *typedGenerator) GeneratePromEvents(mf *dto.MetricFamily) []collector.Pr */ } - untyped := metric.GetUntyped() + untyped := metric.GetUnknown() if untyped != nil { if !math.IsNaN(untyped.GetValue()) && !math.IsInf(untyped.GetValue(), 0) { events = append(events, collector.PromEvent{ diff --git a/x-pack/metricbeat/module/prometheus/collector/histogram.go b/x-pack/metricbeat/module/prometheus/collector/histogram.go index ebae19c09ca..375e62def93 100644 --- a/x-pack/metricbeat/module/prometheus/collector/histogram.go +++ b/x-pack/metricbeat/module/prometheus/collector/histogram.go @@ -8,7 +8,7 @@ import ( "fmt" "math" - dto "github.com/prometheus/client_model/go" + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -17,17 +17,17 @@ import ( // // ES histograms look like this: // -// "histogram_field" : { -// "values" : [0.1, 0.2, 0.3, 0.4, 0.5], -// "counts" : [3, 7, 23, 12, 6] -// } +// "histogram_field" : { +// "values" : [0.1, 0.2, 0.3, 0.4, 0.5], +// "counts" : [3, 7, 23, 12, 6] +// } // -// This code takes a Prometheus histogram and tries to accomodate it into an ES histogram by: -// - calculating centroids for each bucket (values) -// - undoing counters accumulation for each bucket (counts) +// This code takes a Prometheus histogram and tries to accommodate it into an ES histogram by: +// - calculating centroids for each bucket (values) +// - undoing counters accumulation for each bucket (counts) // // https://www.elastic.co/guide/en/elasticsearch/reference/master/histogram.html -func PromHistogramToES(cc CounterCache, name string, labels mapstr.M, histogram *dto.Histogram) mapstr.M { +func PromHistogramToES(cc CounterCache, name string, labels mapstr.M, histogram *p.Histogram) mapstr.M { var values []float64 var counts []uint64 diff --git a/x-pack/metricbeat/module/prometheus/collector/histogram_test.go b/x-pack/metricbeat/module/prometheus/collector/histogram_test.go index d85ed673dd1..280ebb1e5ac 100644 --- a/x-pack/metricbeat/module/prometheus/collector/histogram_test.go +++ b/x-pack/metricbeat/module/prometheus/collector/histogram_test.go @@ -12,9 +12,10 @@ import ( "testing" "time" - "github.com/golang/protobuf/proto" - dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" + + p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -23,7 +24,7 @@ import ( // times with the same cache produces each time the expected results. func TestPromHistogramToES(t *testing.T) { type sample struct { - histogram dto.Histogram + histogram p.Histogram expected mapstr.M } @@ -33,10 +34,10 @@ func TestPromHistogramToES(t *testing.T) { "one histogram": { samples: []sample{ { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.99), CumulativeCount: proto.Uint64(10), @@ -53,10 +54,10 @@ func TestPromHistogramToES(t *testing.T) { "two histogram": { samples: []sample{ { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.99), CumulativeCount: proto.Uint64(10), @@ -69,10 +70,10 @@ func TestPromHistogramToES(t *testing.T) { }, }, { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(12), SampleSum: proto.Float64(10.123), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.99), CumulativeCount: proto.Uint64(12), @@ -89,10 +90,10 @@ func TestPromHistogramToES(t *testing.T) { "new bucket on the go": { samples: []sample{ { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.99), CumulativeCount: proto.Uint64(10), @@ -105,10 +106,10 @@ func TestPromHistogramToES(t *testing.T) { }, }, { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(13), SampleSum: proto.Float64(15.23), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.99), CumulativeCount: proto.Uint64(12), @@ -126,10 +127,10 @@ func TestPromHistogramToES(t *testing.T) { }, }, { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(15), SampleSum: proto.Float64(16.33), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.99), CumulativeCount: proto.Uint64(13), @@ -146,10 +147,10 @@ func TestPromHistogramToES(t *testing.T) { }, }, { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(16), SampleSum: proto.Float64(16.33), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.99), CumulativeCount: proto.Uint64(13), @@ -170,10 +171,10 @@ func TestPromHistogramToES(t *testing.T) { "new smaller bucket on the go": { samples: []sample{ { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.99), CumulativeCount: proto.Uint64(10), @@ -186,10 +187,10 @@ func TestPromHistogramToES(t *testing.T) { }, }, { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(13), SampleSum: proto.Float64(15.23), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ // New bucket on the go { UpperBound: proto.Float64(0.09), @@ -207,10 +208,10 @@ func TestPromHistogramToES(t *testing.T) { }, }, { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(15), SampleSum: proto.Float64(16.33), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.09), CumulativeCount: proto.Uint64(2), @@ -227,10 +228,10 @@ func TestPromHistogramToES(t *testing.T) { }, }, { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(16), SampleSum: proto.Float64(16.33), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.09), CumulativeCount: proto.Uint64(3), @@ -251,10 +252,10 @@ func TestPromHistogramToES(t *testing.T) { "new bucket between two other buckets on the go": { samples: []sample{ { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.09), CumulativeCount: proto.Uint64(0), @@ -271,10 +272,10 @@ func TestPromHistogramToES(t *testing.T) { }, }, { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(13), SampleSum: proto.Float64(15.23), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.09), CumulativeCount: proto.Uint64(1), @@ -296,10 +297,10 @@ func TestPromHistogramToES(t *testing.T) { }, }, { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(16), SampleSum: proto.Float64(16.33), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.09), CumulativeCount: proto.Uint64(2), @@ -320,10 +321,10 @@ func TestPromHistogramToES(t *testing.T) { }, }, { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(18), SampleSum: proto.Float64(16.33), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.09), CumulativeCount: proto.Uint64(3), @@ -348,10 +349,10 @@ func TestPromHistogramToES(t *testing.T) { "wrong buckets": { samples: []sample{ { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(10), SampleSum: proto.Float64(10), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.09), CumulativeCount: proto.Uint64(10), @@ -368,10 +369,10 @@ func TestPromHistogramToES(t *testing.T) { }, }, { - histogram: dto.Histogram{ + histogram: p.Histogram{ SampleCount: proto.Uint64(12), SampleSum: proto.Float64(10.45), - Bucket: []*dto.Bucket{ + Bucket: []*p.Bucket{ { UpperBound: proto.Float64(0.09), CumulativeCount: proto.Uint64(12), diff --git a/x-pack/metricbeat/module/prometheus/remote_write/data.go b/x-pack/metricbeat/module/prometheus/remote_write/data.go index ef01f2bcbda..a1bdbd58c20 100644 --- a/x-pack/metricbeat/module/prometheus/remote_write/data.go +++ b/x-pack/metricbeat/module/prometheus/remote_write/data.go @@ -5,15 +5,13 @@ package remote_write import ( + "fmt" "math" "regexp" "strconv" "strings" "time" - "github.com/pkg/errors" - - dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" @@ -33,7 +31,7 @@ const ( type histogram struct { timestamp time.Time - buckets []*dto.Bucket + buckets []*p.Bucket labels mapstr.M metricName string } @@ -57,11 +55,11 @@ func remoteWriteEventsGeneratorFactory(base mb.BaseMetricSet) (remote_write.Remo g.counterPatterns, err = p.CompilePatternList(config.TypesPatterns.CounterPatterns) if err != nil { - return nil, errors.Wrapf(err, "unable to compile counter patterns") + return nil, fmt.Errorf("unable to compile counter patterns: %w", err) } g.histogramPatterns, err = p.CompilePatternList(config.TypesPatterns.HistogramPatterns) if err != nil { - return nil, errors.Wrapf(err, "unable to compile histogram patterns") + return nil, fmt.Errorf("unable to compile histogram patterns: %w", err) } return &g, nil @@ -124,7 +122,7 @@ func (g remoteWriteTypedGenerator) GenerateEvents(metrics model.Samples) map[str labelsHash := labels.String() + metric.Timestamp.Time().String() labelsClone := labels.Clone() - labelsClone.Delete("le") + _ = labelsClone.Delete("le") if promType == histogramType { labelsHash = labelsClone.String() + metric.Timestamp.Time().String() } @@ -168,7 +166,7 @@ func (g remoteWriteTypedGenerator) GenerateEvents(metrics model.Samples) map[str continue } v := uint64(val) - b := &dto.Bucket{ + b := &p.Bucket{ CumulativeCount: &v, UpperBound: &bucket, } @@ -192,19 +190,6 @@ func (g remoteWriteTypedGenerator) GenerateEvents(metrics model.Samples) map[str return eventList } -// rateCounterUint64 fills a counter value and optionally adds the rate if rate_counters is enabled -func (g *remoteWriteTypedGenerator) rateCounterUint64(name string, labels mapstr.M, value uint64) mapstr.M { - d := mapstr.M{ - "counter": value, - } - - if g.rateCounters { - d["rate"], _ = g.counterCache.RateUint64(name+labels.String(), value) - } - - return d -} - // rateCounterFloat64 fills a counter value and optionally adds the rate if rate_counters is enabled func (g *remoteWriteTypedGenerator) rateCounterFloat64(name string, labels mapstr.M, value float64) mapstr.M { d := mapstr.M{ @@ -235,10 +220,11 @@ func (g *remoteWriteTypedGenerator) processPromHistograms(eventList map[string]m e := eventList[labelsHash] - hist := dto.Histogram{ + hist := p.Histogram{ Bucket: histogram.buckets, } name := strings.TrimSuffix(histogram.metricName, "_bucket") + _ = name // skip noisy linter data := mapstr.M{ name: mapstr.M{ "histogram": collector.PromHistogramToES(g.counterCache, histogram.metricName, histogram.labels, &hist),