-
Notifications
You must be signed in to change notification settings - Fork 12
/
rollup.go
75 lines (61 loc) · 1.67 KB
/
rollup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package rollup
import (
"encoding/csv"
"strings"
"github.com/cloudfoundry/metric-store-release/src/pkg/logger"
"github.com/cloudfoundry/metric-store-release/src/pkg/rpc"
)
const (
GorouterHttpMetricName = "http"
)
type PointsBatch struct {
Points []*rpc.Point
Size int
}
type Rollup interface {
Record(sourceId string, tags map[string]string, value int64)
Rollup(timestamp int64) []*PointsBatch
}
func keyFromTags(rollupTags []string, sourceId string, tags map[string]string) string {
filteredTags := []string{sourceId}
for _, tag := range rollupTags {
filteredTags = append(filteredTags, tags[tag])
}
csvOutput := &strings.Builder{}
csvWriter := csv.NewWriter(csvOutput)
_ = csvWriter.Write(filteredTags)
csvWriter.Flush()
return csvOutput.String()
}
func labelsFromKey(key, nodeIndex string, rollupTags []string, log *logger.Logger) (map[string]string, error) {
keyParts, err := csv.NewReader(strings.NewReader(key)).Read()
if err != nil {
log.Error(
"skipping rollup metric",
err,
logger.String("reason", "failed to decode"),
logger.String("key", key),
)
return nil, err
}
// if we can't parse the key, there's probably some garbage in one
// of the tags, so let's skip it
if len(keyParts) != len(rollupTags)+1 {
log.Info(
"skipping rollup metric",
logger.String("reason", "wrong number of parts"),
logger.String("key", key),
logger.Count(len(keyParts)),
)
return nil, err
}
labels := make(map[string]string)
for index, tagName := range rollupTags {
if value := keyParts[index+1]; value != "" {
labels[tagName] = value
}
}
labels["source_id"] = keyParts[0]
labels["node_index"] = nodeIndex
return labels, nil
}