Skip to content

Commit

Permalink
[WIP] Support for Tags
Browse files Browse the repository at this point in the history
Write:
- Support tags, in carbon and openmetrics format

Read:
- Support tags

TODO in a later patch:
- Support Graphite native Prometheus support: graphite-project/graphite-web#2195
- Support for upcomming Carbon native Prometheus support: graphite-project/carbon#735
- For both of these, just make sure the prefix is handled correctly.

Some of this code comes from https://github.com/prometheus/prometheus/pull/3533/files
  • Loading branch information
Corentin Chary committed Jan 17, 2018
1 parent 3ab6cd5 commit a3e502f
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 28 deletions.
13 changes: 13 additions & 0 deletions client/graphite/client.go
Expand Up @@ -42,6 +42,7 @@ type Client struct {
readTimeout time.Duration
readDelay time.Duration
ignoredSamples prometheus.Counter
format Format

carbonCon net.Conn
carbonLastReconnectTime time.Time
Expand All @@ -63,10 +64,22 @@ func NewClient(cfg *config.Config, logger log.Logger) *Client {
"PathsCachePurgeInterval", cfg.Graphite.Write.PathsCachePurgeInterval,
"msg", "Paths cache initialized")
}

// Which format are we using to write points?
format := FormatCarbon
if cfg.Graphite.EnableTags {
if cfg.Graphite.UseOpenMetricsFormat {
format = FormatCarbonOpenMetrics
} else {
format = FormatCarbonTags
}
}

return &Client{
logger: logger,
cfg: &cfg.Graphite,
writeTimeout: cfg.Write.Timeout,
format: format,
readTimeout: cfg.Read.Timeout,
readDelay: cfg.Read.Delay,
ignoredSamples: prometheus.NewCounter(
Expand Down
4 changes: 4 additions & 0 deletions client/graphite/config/cli.go
Expand Up @@ -33,4 +33,8 @@ func AddCommandLine(app *kingpin.Application, cfg *Config) {
app.Flag("graphite.write.paths-cache-purge-interval",
"Duration between purges for expired items in the paths cache.").
DurationVar(&cfg.Write.PathsCachePurgeInterval)

app.Flag("graphite.enable-tags",
"Use Graphite tags.").
BoolVar(&cfg.EnableTags)
}
12 changes: 8 additions & 4 deletions client/graphite/config/config.go
Expand Up @@ -29,7 +29,9 @@ import (

// DefaultConfig is the default graphite configuration.
var DefaultConfig = Config{
DefaultPrefix: "",
DefaultPrefix: "",
EnableTags: false,
UseOpenMetricsFormat: false,
Write: WriteConfig{
CarbonAddress: "",
CarbonTransport: "tcp",
Expand All @@ -45,9 +47,11 @@ var DefaultConfig = Config{

// Config is the graphite configuration.
type Config struct {
Write WriteConfig `yaml:"write,omitempty" json:"write,omitempty"`
Read ReadConfig `yaml:"read,omitempty" json:"read,omitempty"`
DefaultPrefix string `yaml:"default_prefix,omitempty" json:"default_prefix,omitempty"`
Write WriteConfig `yaml:"write,omitempty" json:"write,omitempty"`
Read ReadConfig `yaml:"read,omitempty" json:"read,omitempty"`
DefaultPrefix string `yaml:"default_prefix,omitempty" json:"default_prefix,omitempty"`
EnableTags bool `yaml:"enable_tags,omitempty" json:"enable_tags,omitempty"`
UseOpenMetricsFormat bool `yaml:"openmetrics,omitempty" json:"openmetrics,omitempty"`

// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline" json:"-"`
Expand Down
4 changes: 3 additions & 1 deletion client/graphite/config/config_test.go
Expand Up @@ -27,7 +27,9 @@ import (

var (
expectedConf = &Config{
DefaultPrefix: "test.prefix.",
DefaultPrefix: "test.prefix.",
EnableTags: true,
UseOpenMetricsFormat: true,
Read: ReadConfig{
URL: "greatGraphiteWebURL",
},
Expand Down
4 changes: 3 additions & 1 deletion client/graphite/config/testdata/graphite.good.yml
@@ -1,4 +1,6 @@
default_prefix: test.prefix.
enable_tags: true
openmetrics: true
read:
url: greatGraphiteWebURL
write:
Expand Down Expand Up @@ -26,4 +28,4 @@ write:
continue: true
- match:
owner: team-Z
continue: false
continue: false
1 change: 1 addition & 0 deletions client/graphite/http.go
Expand Up @@ -34,6 +34,7 @@ type ExpandResponse struct {
type RenderResponse struct {
Target string `yaml:"target,omitempty" json:"target,omitempty"`
Datapoints []*Datapoint `yaml:"datapoints,omitempty" json:"datapoints,omitempty"`
// FIXME: handle tags
}

// Datapoint pairs a timestamp to a value.
Expand Down
48 changes: 47 additions & 1 deletion client/graphite/read.go
Expand Up @@ -27,6 +27,8 @@ import (
pmetric "github.com/prometheus/prometheus/storage/metric"

"golang.org/x/net/context"
"net/url"
"strings"
)

func (c *Client) queryToTargets(ctx context.Context, query *prompb.Query) ([]string, error) {
Expand Down Expand Up @@ -75,6 +77,35 @@ func (c *Client) queryToTargets(ctx context.Context, query *prompb.Query) ([]str
return targets, err
}

func (c *Client) queryToTargetswithTags(ctx context.Context, query *prompb.Query) ([]string, error) {
tagSet := []string{}

for _, m := range query.Matchers {
var name string
if m.Name == model.MetricNameLabel {
name = "name"
} else {
name = m.Name
}

switch m.Type {
case prompb.LabelMatcher_EQ:
tagSet = append(tagSet, "\""+name+"="+m.Value+"\"")
case prompb.LabelMatcher_NEQ:
tagSet = append(tagSet, "\""+name+"!="+m.Value+"\"")
case prompb.LabelMatcher_RE:
tagSet = append(tagSet, "\""+name+"=~^("+m.Value+")$\"")
case prompb.LabelMatcher_NRE:
tagSet = append(tagSet, "\""+name+"!=~^("+m.Value+")$\"")
default:
return nil, fmt.Errorf("unknown match type %v", m.Type)
}
}

targets := []string{"seriesByTag(" + strings.Join(tagSet, ",") + ")"}
return targets, nil
}

func (c *Client) filterTargets(query *prompb.Query, targets []string) ([]string, error) {
// Filter out targets that do not match the query's label matcher
var results []string
Expand Down Expand Up @@ -146,6 +177,7 @@ func (c *Client) targetToTimeseries(ctx context.Context, target string, from str
renderResponse := renderResponses[0]

ts := &prompb.TimeSeries{}
// FIXME: handle tags
ts.Labels, err = metricLabelsFromPath(renderResponse.Target, c.cfg.DefaultPrefix)
if err != nil {
level.Warn(c.logger).Log(
Expand Down Expand Up @@ -185,13 +217,26 @@ func (c *Client) handleReadQuery(ctx context.Context, query *prompb.Query) (*pro
fromStr := strconv.Itoa(from)
untilStr := strconv.Itoa(until)

targets, err := c.queryToTargets(ctx, query)
targets := []string{}
var err error

if c.cfg.EnableTags {
targets, err = c.queryToTargetsWithTags(ctx, query)
} else {
// If we don't have tags we try to emulate then with normal paths.
targets, err = c.queryToTargets(ctx, query)
}
if err != nil {
return nil, err
}

c.fetchData(ctx, queryResult, targets, fromStr, untilStr)
return queryResult, nil

}

func (c *Client) fetchDataWithTags(ctx context.Context, queryResult *prompb.QueryResult, targets []string, fromStr string, untilStr string) {

}

func (c *Client) fetchData(ctx context.Context, queryResult *prompb.QueryResult, targets []string, fromStr string, untilStr string) {
Expand All @@ -200,6 +245,7 @@ func (c *Client) fetchData(ctx context.Context, queryResult *prompb.QueryResult,

wg := sync.WaitGroup{}

// TODO: Send multiple targets per query, Graphite supports that.
// Start only a few workers to avoid killing graphite.
for i := 0; i < maxFetchWorkers; i++ {
wg.Add(1)
Expand Down
4 changes: 4 additions & 0 deletions client/graphite/read_test.go
Expand Up @@ -79,6 +79,8 @@ func TestQueryToTargets(t *testing.T) {
if !reflect.DeepEqual(expectedTargets, actualTargets) {
t.Errorf("Expected %s, got %s", expectedTargets, actualTargets)
}

// FIXME: Test with Tags
}

func TestInvalideQueryToTargets(t *testing.T) {
Expand Down Expand Up @@ -116,4 +118,6 @@ func TestTargetToTimeseries(t *testing.T) {
if !reflect.DeepEqual(expectedTs, actualTs) {
t.Errorf("Expected %s, got %s", expectedTs, actualTs)
}

// FIXME: Test with Tags
}
55 changes: 44 additions & 11 deletions client/graphite/rules.go
Expand Up @@ -29,6 +29,14 @@ import (
"github.com/patrickmn/go-cache"
)

type Format int

const (
FormatCarbon Format = 1
FormatCarbonTags = 2
FormatCarbonOpenMetrics = 3
)

var (
pathsCache *cache.Cache
pathsCacheEnabled = false
Expand Down Expand Up @@ -66,7 +74,7 @@ func match(m model.Metric, match config.LabelSet, matchRE config.LabelSetRE) boo
return true
}

func pathsFromMetric(m model.Metric, prefix string, rules []*config.Rule, templateData map[string]interface{}) []string {
func pathsFromMetric(m model.Metric, format Format, prefix string, rules []*config.Rule, templateData map[string]interface{}) []string {
if pathsCacheEnabled {
cachedPaths, cached := pathsCache.Get(m.Fingerprint().String())
if cached {
Expand All @@ -76,7 +84,7 @@ func pathsFromMetric(m model.Metric, prefix string, rules []*config.Rule, templa
paths, skipped := templatedPaths(m, rules, templateData)
// if it doesn't match any rule, use default path
if len(paths) == 0 && !skipped {
paths = append(paths, defaultPath(m, prefix))
paths = append(paths, defaultPath(m, format, prefix))
}
if pathsCacheEnabled {
pathsCache.Set(m.Fingerprint().String(), paths, cache.DefaultExpiration)
Expand Down Expand Up @@ -108,8 +116,9 @@ func templatedPaths(m model.Metric, rules []*config.Rule, templateData map[strin
return paths, false
}

func defaultPath(m model.Metric, prefix string) string {
func defaultPath(m model.Metric, format Format, prefix string) string {
var buffer bytes.Buffer
var lbuffer bytes.Buffer

buffer.WriteString(prefix)
buffer.WriteString(utils.Escape(string(m[model.MetricNameLabel])))
Expand All @@ -121,18 +130,42 @@ func defaultPath(m model.Metric, prefix string) string {
}
sort.Sort(labels)

// For each label, in order, add ".<label>.<value>".
first := true
for _, l := range labels {
v := m[l]

if l == model.MetricNameLabel || len(l) == 0 {
continue
}
// Since we use '.' instead of '=' to separate label and values
// it means that we can't have an '.' in the metric name. Fortunately
// this is prohibited in prometheus metrics.
buffer.WriteString(fmt.Sprintf(
".%s.%s", string(l), utils.Escape(string(v))))

k := string(l)
v := utils.Escape(string(m[l]))

if format == FormatCarbonOpenMetrics {
// https://github.com/RichiH/OpenMetrics/blob/master/metric_exposition_format.md
if !first {
lbuffer.WriteString(",")
}
lbuffer.WriteString(fmt.Sprintf("%s=\"%s\"", k, v))
} else if format == FormatCarbonTags {
// See http://graphite.readthedocs.io/en/latest/tags.html
lbuffer.WriteString(fmt.Sprintf(";%s=%s", k, v))
} else {
// For each label, in order, add ".<label>.<value>".
// Since we use '.' instead of '=' to separate label and values
// it means that we can't have an '.' in the metric name. Fortunately
// this is prohibited in prometheus metrics.
lbuffer.WriteString(fmt.Sprintf(".%s.%s", k, v))
}
first = false
}

if lbuffer.Len() > 0 {
if format == FormatCarbonOpenMetrics {
buffer.WriteRune('{')
buffer.Write(lbuffer.Bytes())
buffer.WriteRune('}')
} else {
buffer.Write(lbuffer.Bytes())
}
}
return buffer.String()
}
Expand Down
40 changes: 31 additions & 9 deletions client/graphite/rules_test.go
Expand Up @@ -64,14 +64,36 @@ func loadTestConfig(s string) *config.Config {
}

func TestDefaultPathsFromMetric(t *testing.T) {
expected := make([]string, 0)
expected = append(expected, "prefix."+
expected := "prefix."+
"test:metric"+
".many_chars.abc!ABC:012-3!45%C3%B667~89%2E%2F\\(\\)\\{\\}\\,%3D%2E\\\"\\\\"+
".owner.team-X"+
".testlabel.test:value")
actual := pathsFromMetric(metric, "prefix.", nil, nil)
if len(actual) != 1 || expected[0] != actual[0] {
".testlabel.test:value"
actual := pathsFromMetric(metric, FormatCarbon, "prefix.", nil, nil)
if len(actual) != 1 || expected != actual[0] {
t.Errorf("Expected %s, got %s", expected, actual)
}

expected = "prefix."+
"test:metric"+
";many_chars=abc!ABC:012-3!45%C3%B667~89%2E%2F\\(\\)\\{\\}\\,%3D%2E\\\"\\\\"+
";owner=team-X"+
";testlabel=test:value"


actual = pathsFromMetric(metric, FormatCarbonTags, "prefix.", nil, nil)
if len(actual) != 1 || expected != actual[0] {
t.Errorf("Expected %s, got %s", expected, actual)
}

expected = "prefix."+
"test:metric{"+
"many_chars=\"abc!ABC:012-3!45%C3%B667~89%2E%2F\\(\\)\\{\\}\\,%3D%2E\\\"\\\\\""+
",owner=\"team-X\""+
",testlabel=\"test:value\""+
"}"
actual = pathsFromMetric(metric, FormatCarbonOpenMetrics, "prefix.", nil, nil)
if len(actual) != 1 || expected != actual[0] {
t.Errorf("Expected %s, got %s", expected, actual)
}
}
Expand All @@ -89,7 +111,7 @@ func TestUnmatchedMetricPathsFromMetric(t *testing.T) {
".owner.team-Y"+
".testlabel.test:value"+
".testlabel2.test:value2")
actual := pathsFromMetric(unmatchedMetric, "prefix.", testConfig.Write.Rules, testConfig.Write.TemplateData)
actual := pathsFromMetric(unmatchedMetric, FormatCarbon, "prefix.", testConfig.Write.Rules, testConfig.Write.TemplateData)
if len(actual) != 1 || expected[0] != actual[0] {
t.Errorf("Expected %s, got %s", expected, actual)
}
Expand All @@ -98,7 +120,7 @@ func TestUnmatchedMetricPathsFromMetric(t *testing.T) {
func TestTemplatedPathsFromMetric(t *testing.T) {
expected := make([]string, 0)
expected = append(expected, "tmpl_1.data%2Efoo.team-X")
actual := pathsFromMetric(metric, "", testConfig.Write.Rules, testConfig.Write.TemplateData)
actual := pathsFromMetric(metric, FormatCarbon, "", testConfig.Write.Rules, testConfig.Write.TemplateData)
if len(actual) != 1 || expected[0] != actual[0] {
t.Errorf("Expected %s, got %s", expected, actual)
}
Expand All @@ -114,7 +136,7 @@ func TestMultiTemplatedPathsFromMetric(t *testing.T) {
expected := make([]string, 0)
expected = append(expected, "tmpl_1.data%2Efoo.team-X")
expected = append(expected, "tmpl_2.team-X.data.foo")
actual := pathsFromMetric(multiMatchMetric, "", testConfig.Write.Rules, testConfig.Write.TemplateData)
actual := pathsFromMetric(multiMatchMetric, FormatCarbon, "", testConfig.Write.Rules, testConfig.Write.TemplateData)
if len(actual) != 2 || expected[0] != actual[0] || expected[1] != actual[1] {
t.Errorf("Expected %s, got %s", expected, actual)
}
Expand All @@ -129,7 +151,7 @@ func TestSkipedTemplatedPathsFromMetric(t *testing.T) {
}
t.Log(testConfig.Write.Rules[2])
expected := make([]string, 0)
actual := pathsFromMetric(skipedMetric, "", testConfig.Write.Rules, testConfig.Write.TemplateData)
actual := pathsFromMetric(skipedMetric, FormatCarbon, "", testConfig.Write.Rules, testConfig.Write.TemplateData)
if len(actual) != 0 {
t.Errorf("Expected %s, got %s", expected, actual)
}
Expand Down
2 changes: 1 addition & 1 deletion client/graphite/write.go
Expand Up @@ -83,7 +83,7 @@ func (c *Client) Write(samples model.Samples) error {

var buf bytes.Buffer
for _, s := range samples {
paths := pathsFromMetric(s.Metric, c.cfg.DefaultPrefix, c.cfg.Write.Rules, c.cfg.Write.TemplateData)
paths := pathsFromMetric(s.Metric, c.format, c.cfg.DefaultPrefix, c.cfg.Write.Rules, c.cfg.Write.TemplateData)
for _, k := range paths {
if str := c.prepareDataPoint(k, s); str != "" {
fmt.Fprint(&buf, str)
Expand Down

0 comments on commit a3e502f

Please sign in to comment.