This repository has been archived by the owner on Jun 25, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 13
/
metric_sink.go
165 lines (154 loc) · 6.1 KB
/
metric_sink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/*
* Copyright 2017 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nozzle
import (
"bytes"
"fmt"
"regexp"
"time"
"code.cloudfoundry.org/lager"
"github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/messages"
"github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/stackdriver"
"github.com/cloudfoundry/sonde-go/events"
)
// NewLogSink returns a Sink that can receive sonde Events, translate them and send them to a stackdriver.MetricAdapter
func NewMetricSink(logger lager.Logger, pathPrefix string, labelMaker LabelMaker, metricAdapter stackdriver.MetricAdapter, ct *CounterTracker, unitParser UnitParser, runtimeMetricRegex string) (Sink, error) {
r, err := regexp.Compile(runtimeMetricRegex)
if err != nil {
return nil, fmt.Errorf("cannot compile runtime metric regex: %v", err)
}
return &metricSink{
pathPrefix: pathPrefix,
labelMaker: labelMaker,
metricAdapter: metricAdapter,
unitParser: unitParser,
counterTracker: ct,
logger: logger,
runtimeMetricRe: r,
}, nil
}
type metricSink struct {
pathPrefix string
labelMaker LabelMaker
metricAdapter stackdriver.MetricAdapter
unitParser UnitParser
counterTracker *CounterTracker
logger lager.Logger
runtimeMetricRe *regexp.Regexp
}
// isRuntimeMetric determines whether a given metric is a runtime metric.
// "Runtime metrics" are the ones that are exported by multiple processes (with different values of the 'origin' label).
// By default 'origin' label value gets prepended to metric name, however for runtime metrics we instead add it as a metric label.
// As the result, instead of creating a separate copy of each runtime metric per origin, we have a single metric with origin available as a label.
// This allows aggregating values of these metrics across origins, and also helps us stay below the Stackdriver limit for the number of custom metrics.
func (ms *metricSink) isRuntimeMetric(envelope *events.Envelope) bool {
return envelope.GetEventType() == events.Envelope_ValueMetric && ms.runtimeMetricRe.MatchString(envelope.GetValueMetric().GetName())
}
func (ms *metricSink) getPrefix(envelope *events.Envelope) string {
buf := bytes.Buffer{}
if ms.pathPrefix != "" {
buf.WriteString(ms.pathPrefix)
buf.WriteString("/")
}
// Non-runtime metrics get origin prepended to metric name.
if !ms.isRuntimeMetric(envelope) && envelope.GetOrigin() != "" {
buf.WriteString(envelope.GetOrigin())
buf.WriteString(".")
}
return buf.String()
}
func (ms *metricSink) Receive(envelope *events.Envelope) {
labels := ms.labelMaker.MetricLabels(envelope, ms.isRuntimeMetric(envelope))
metricPrefix := ms.getPrefix(envelope)
eventType := envelope.GetEventType()
timestamp := time.Duration(envelope.GetTimestamp())
eventTime := time.Unix(
int64(timestamp/time.Second),
int64(timestamp%time.Second),
)
var metrics []*messages.Metric
switch envelope.GetEventType() {
case events.Envelope_ValueMetric:
valueMetric := envelope.GetValueMetric()
metrics = []*messages.Metric{{
Name: metricPrefix + valueMetric.GetName(),
Labels: labels,
Type: eventType,
Value: valueMetric.GetValue(),
EventTime: eventTime,
StartTime: eventTime,
Unit: ms.unitParser.Parse(valueMetric.GetUnit()),
}}
case events.Envelope_ContainerMetric:
containerMetric := envelope.GetContainerMetric()
metrics = []*messages.Metric{
{Name: metricPrefix + "diskBytesQuota", Value: float64(containerMetric.GetDiskBytesQuota())},
{Name: metricPrefix + "cpuPercentage", Value: containerMetric.GetCpuPercentage()},
{Name: metricPrefix + "diskBytes", Value: float64(containerMetric.GetDiskBytes())},
{Name: metricPrefix + "memoryBytes", Value: float64(containerMetric.GetMemoryBytes())},
{Name: metricPrefix + "memoryBytesQuota", Value: float64(containerMetric.GetMemoryBytesQuota())},
}
for _, metric := range metrics {
metric.Labels = labels
metric.Type = eventType
metric.EventTime = eventTime
metric.StartTime = eventTime
}
case events.Envelope_CounterEvent:
counterEvent := envelope.GetCounterEvent()
if ms.counterTracker == nil {
// When there is no counter tracker, report CounterEvent metrics as two gauges: 'delta' and 'total'.
metrics = []*messages.Metric{
{
Name: fmt.Sprintf("%s%v.delta", metricPrefix, counterEvent.GetName()),
Labels: labels,
Type: events.Envelope_ValueMetric,
Value: float64(counterEvent.GetDelta()),
EventTime: eventTime,
StartTime: eventTime,
},
{
Name: fmt.Sprintf("%s%v.total", metricPrefix, counterEvent.GetName()),
Labels: labels,
Type: events.Envelope_ValueMetric,
Value: float64(counterEvent.GetTotal()),
EventTime: eventTime,
StartTime: eventTime,
},
}
} else {
// Create a partial metric struct (lacking IntValue and StartTime) to allow determining metric.Hash (used as
// the counter name) based on metric name and labels.
metric := &messages.Metric{
Name: metricPrefix + counterEvent.GetName(),
Labels: labels,
Type: eventType,
EventTime: eventTime,
}
total, st := ms.counterTracker.Update(metric.Hash(), counterEvent.GetTotal(), eventTime)
// Stackdriver expects non-zero time intervals, so only add a metric if event time is older than start time.
if eventTime.After(st) {
metric.StartTime = st
metric.IntValue = total
metrics = append(metrics, metric)
}
}
default:
ms.logger.Error("metricSink.Receive", fmt.Errorf("unknown event type: %v", envelope.EventType))
return
}
ms.metricAdapter.PostMetrics(metrics)
}