Skip to content

Commit

Permalink
[WIP] Support for Tags
Browse files Browse the repository at this point in the history
Write:
- Support tags, in carbon and openmetrics format
- TODO: Support for upcomming Carbon native Prometheus support: graphite-project/carbon#735

Read:
- Support tags (TODO: add tests)
- TODO: Support for upcomming Graphite native Prometheus support: graphite-project/graphite-web#2195

Some of this code comes from https://github.com/prometheus/prometheus/pull/3533/files
  • Loading branch information
Corentin Chary committed Jan 15, 2018
1 parent f7bc7d0 commit 77efea1
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 37 deletions.
13 changes: 13 additions & 0 deletions client/graphite/client.go
Expand Up @@ -42,6 +42,7 @@ type Client struct {
readTimeout time.Duration
readDelay time.Duration
ignoredSamples prometheus.Counter
format Format

carbonCon net.Conn
carbonLastReconnectTime time.Time
Expand All @@ -63,10 +64,22 @@ func NewClient(cfg *config.Config, logger log.Logger) *Client {
"PathsCachePurgeInterval", cfg.Graphite.Write.PathsCachePurgeInterval,
"msg", "Paths cache initialized")
}

// Which format are we using to write points?
format := FormatCarbon
if cfg.Graphite.EnableTags {
if cfg.Graphite.UseOpenMetricsFormat {
format = FormatCarbonOpenMetrics
} else {
format = FormatCarbonTags
}
}

return &Client{
logger: logger,
cfg: &cfg.Graphite,
writeTimeout: cfg.Write.Timeout,
format: format,
readTimeout: cfg.Read.Timeout,
readDelay: cfg.Read.Delay,
ignoredSamples: prometheus.NewCounter(
Expand Down
16 changes: 8 additions & 8 deletions client/graphite/config/config.go
Expand Up @@ -29,9 +29,9 @@ import (

// DefaultConfig is the default graphite configuration.
var DefaultConfig = Config{
DefaultPrefix: "",
EnableTags: false,
UseOpenMetricsFormat: true,
DefaultPrefix: "",
EnableTags: false,
UseOpenMetricsFormat: false,
Write: WriteConfig{
CarbonAddress: "",
CarbonTransport: "tcp",
Expand All @@ -47,11 +47,11 @@ var DefaultConfig = Config{

// Config is the graphite configuration.
type Config struct {
Write WriteConfig `yaml:"write,omitempty" json:"write,omitempty"`
Read ReadConfig `yaml:"read,omitempty" json:"read,omitempty"`
DefaultPrefix string `yaml:"default_prefix,omitempty" json:"default_prefix,omitempty"`
EnableTags bool `yaml:"enable_tags,omitempty" json:"enable_tags,omitempty"`
UseOpenMetricsFormat bool `yaml:"openmetrics,omitempty" json:"openmetrics,omitempty"`
Write WriteConfig `yaml:"write,omitempty" json:"write,omitempty"`
Read ReadConfig `yaml:"read,omitempty" json:"read,omitempty"`
DefaultPrefix string `yaml:"default_prefix,omitempty" json:"default_prefix,omitempty"`
EnableTags bool `yaml:"enable_tags,omitempty" json:"enable_tags,omitempty"`
UseOpenMetricsFormat bool `yaml:"openmetrics,omitempty" json:"openmetrics,omitempty"`

// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline" json:"-"`
Expand Down
4 changes: 3 additions & 1 deletion client/graphite/config/config_test.go
Expand Up @@ -27,7 +27,9 @@ import (

var (
expectedConf = &Config{
DefaultPrefix: "test.prefix.",
DefaultPrefix: "test.prefix.",
EnableTags: true,
UseOpenMetricsFormat: true,
Read: ReadConfig{
URL: "greatGraphiteWebURL",
},
Expand Down
4 changes: 3 additions & 1 deletion client/graphite/config/testdata/graphite.good.yml
@@ -1,4 +1,6 @@
default_prefix: test.prefix.
enable_tags: true
openmetrics: true
read:
url: greatGraphiteWebURL
write:
Expand Down Expand Up @@ -26,4 +28,4 @@ write:
continue: true
- match:
owner: team-Z
continue: false
continue: false
1 change: 1 addition & 0 deletions client/graphite/http.go
Expand Up @@ -34,6 +34,7 @@ type ExpandResponse struct {
type RenderResponse struct {
Target string `yaml:"target,omitempty" json:"target,omitempty"`
Datapoints []*Datapoint `yaml:"datapoints,omitempty" json:"datapoints,omitempty"`
// FIXME: handle tags
}

// Datapoint pairs a timestamp to a value.
Expand Down
48 changes: 47 additions & 1 deletion client/graphite/read.go
Expand Up @@ -27,6 +27,8 @@ import (
pmetric "github.com/prometheus/prometheus/storage/metric"

"golang.org/x/net/context"
"net/url"
"strings"
)

func (c *Client) queryToTargets(ctx context.Context, query *prompb.Query) ([]string, error) {
Expand Down Expand Up @@ -75,6 +77,35 @@ func (c *Client) queryToTargets(ctx context.Context, query *prompb.Query) ([]str
return targets, err
}

func (c *Client) queryToTargetswithTags(ctx context.Context, query *prompb.Query) ([]string, error) {
tagSet := []string{}

for _, m := range query.Matchers {
var name string
if m.Name == model.MetricNameLabel {
name = "name"
} else {
name = m.Name
}

switch m.Type {
case prompb.LabelMatcher_EQ:
tagSet = append(tagSet, "\""+name+"="+m.Value+"\"")
case prompb.LabelMatcher_NEQ:
tagSet = append(tagSet, "\""+name+"!="+m.Value+"\"")
case prompb.LabelMatcher_RE:
tagSet = append(tagSet, "\""+name+"=~^("+m.Value+")$\"")
case prompb.LabelMatcher_NRE:
tagSet = append(tagSet, "\""+name+"!=~^("+m.Value+")$\"")
default:
return nil, fmt.Errorf("unknown match type %v", m.Type)
}
}

targets := []string{"seriesByTag(" + strings.Join(tagSet, ",") + ")"}
return targets, nil
}

func (c *Client) filterTargets(query *prompb.Query, targets []string) ([]string, error) {
// Filter out targets that do not match the query's label matcher
var results []string
Expand Down Expand Up @@ -146,6 +177,7 @@ func (c *Client) targetToTimeseries(ctx context.Context, target string, from str
renderResponse := renderResponses[0]

ts := &prompb.TimeSeries{}
// FIXME: handle tags
ts.Labels, err = metricLabelsFromPath(renderResponse.Target, c.cfg.DefaultPrefix)
if err != nil {
level.Warn(c.logger).Log(
Expand Down Expand Up @@ -185,13 +217,26 @@ func (c *Client) handleReadQuery(ctx context.Context, query *prompb.Query) (*pro
fromStr := strconv.Itoa(from)
untilStr := strconv.Itoa(until)

targets, err := c.queryToTargets(ctx, query)
targets := []string{}
var err error

if c.cfg.EnableTags {
targets, err = c.queryToTargetsWithTags(ctx, query)
} else {
// If we don't have tags we try to emulate then with normal paths.
targets, err = c.queryToTargets(ctx, query)
}
if err != nil {
return nil, err
}

c.fetchData(ctx, queryResult, targets, fromStr, untilStr)
return queryResult, nil

}

func (c *Client) fetchDataWithTags(ctx context.Context, queryResult *prompb.QueryResult, targets []string, fromStr string, untilStr string) {

}

func (c *Client) fetchData(ctx context.Context, queryResult *prompb.QueryResult, targets []string, fromStr string, untilStr string) {
Expand All @@ -200,6 +245,7 @@ func (c *Client) fetchData(ctx context.Context, queryResult *prompb.QueryResult,

wg := sync.WaitGroup{}

// TODO: Send multiple targets per query, Graphite supports that.
// Start only a few workers to avoid killing graphite.
for i := 0; i < maxFetchWorkers; i++ {
wg.Add(1)
Expand Down
4 changes: 4 additions & 0 deletions client/graphite/read_test.go
Expand Up @@ -79,6 +79,8 @@ func TestQueryToTargets(t *testing.T) {
if !reflect.DeepEqual(expectedTargets, actualTargets) {
t.Errorf("Expected %s, got %s", expectedTargets, actualTargets)
}

// FIXME: Test with Tags
}

func TestInvalideQueryToTargets(t *testing.T) {
Expand Down Expand Up @@ -116,4 +118,6 @@ func TestTargetToTimeseries(t *testing.T) {
if !reflect.DeepEqual(expectedTs, actualTs) {
t.Errorf("Expected %s, got %s", expectedTs, actualTs)
}

// FIXME: Test with Tags
}
26 changes: 10 additions & 16 deletions client/graphite/rules.go
Expand Up @@ -32,9 +32,9 @@ import (
type Format int

const (
FormatCarbon Format = 1
FormatCarbonTags = 2
FormatCarbonOpenMetrics = 3
FormatCarbon Format = 1
FormatCarbonTags = 2
FormatCarbonOpenMetrics = 3
)

var (
Expand Down Expand Up @@ -120,8 +120,6 @@ func defaultPath(m model.Metric, format Format, prefix string) string {
var buffer bytes.Buffer
var lbuffer bytes.Buffer

tags := format == FormatCarbonOpenMetrics || format == FormatCarbonOpenMetrics

buffer.WriteString(prefix)
buffer.WriteString(utils.Escape(string(m[model.MetricNameLabel])))

Expand All @@ -130,12 +128,9 @@ func defaultPath(m model.Metric, format Format, prefix string) string {
for l := range m {
labels = append(labels, l)
}
if tags {
// Only sort if we don't support tags
sort.Sort(labels)
}
sort.Sort(labels)

first := false
first := true
for _, l := range labels {
if l == model.MetricNameLabel || len(l) == 0 {
continue
Expand All @@ -144,31 +139,30 @@ func defaultPath(m model.Metric, format Format, prefix string) string {
k := string(l)
v := utils.Escape(string(m[l]))


if format == FormatCarbonOpenMetrics {
// https://github.com/RichiH/OpenMetrics/blob/master/metric_exposition_format.md
if !first {
lbuffer.WriteString(",")
}
lbuffer.WriteString(fmt.Sprintf("%s=%s", k, v))
} else if format == FormatCarbon {
lbuffer.WriteString(fmt.Sprintf("%s=\"%s\"", k, v))
} else if format == FormatCarbonTags {
// See http://graphite.readthedocs.io/en/latest/tags.html
lbuffer.WriteString(fmt.Sprintf(";%s=%s", k, v))
} else {
// For each label, in order, add ".<label>.<value>".
// Since we use '.' instead of '=' to separate label and values
// it means that we can't have an '.' in the metric name. Fortunately
// this is prohibited in prometheus metrics.
lbuffer.WriteString(fmt.Sprintf(".%s.%s", k, v)
lbuffer.WriteString(fmt.Sprintf(".%s.%s", k, v))
}
first = false
}

if (lbuffer.Len() > 0) {
if lbuffer.Len() > 0 {
if format == FormatCarbonOpenMetrics {
buffer.WriteRune('{')
buffer.Write(lbuffer.Bytes())
buffer.WriteRune('{')
buffer.WriteRune('}')
} else {
buffer.Write(lbuffer.Bytes())
}
Expand Down
40 changes: 31 additions & 9 deletions client/graphite/rules_test.go
Expand Up @@ -64,14 +64,36 @@ func loadTestConfig(s string) *config.Config {
}

func TestDefaultPathsFromMetric(t *testing.T) {
expected := make([]string, 0)
expected = append(expected, "prefix."+
expected := "prefix."+
"test:metric"+
".many_chars.abc!ABC:012-3!45%C3%B667~89%2E%2F\\(\\)\\{\\}\\,%3D%2E\\\"\\\\"+
".owner.team-X"+
".testlabel.test:value")
actual := pathsFromMetric(metric, "prefix.", nil, nil)
if len(actual) != 1 || expected[0] != actual[0] {
".testlabel.test:value"
actual := pathsFromMetric(metric, FormatCarbon, "prefix.", nil, nil)
if len(actual) != 1 || expected != actual[0] {
t.Errorf("Expected %s, got %s", expected, actual)
}

expected = "prefix."+
"test:metric"+
";many_chars=abc!ABC:012-3!45%C3%B667~89%2E%2F\\(\\)\\{\\}\\,%3D%2E\\\"\\\\"+
";owner=team-X"+
";testlabel=test:value"


actual = pathsFromMetric(metric, FormatCarbonTags, "prefix.", nil, nil)
if len(actual) != 1 || expected != actual[0] {
t.Errorf("Expected %s, got %s", expected, actual)
}

expected = "prefix."+
"test:metric{"+
"many_chars=\"abc!ABC:012-3!45%C3%B667~89%2E%2F\\(\\)\\{\\}\\,%3D%2E\\\"\\\\\""+
",owner=\"team-X\""+
",testlabel=\"test:value\""+
"}"
actual = pathsFromMetric(metric, FormatCarbonOpenMetrics, "prefix.", nil, nil)
if len(actual) != 1 || expected != actual[0] {
t.Errorf("Expected %s, got %s", expected, actual)
}
}
Expand All @@ -89,7 +111,7 @@ func TestUnmatchedMetricPathsFromMetric(t *testing.T) {
".owner.team-Y"+
".testlabel.test:value"+
".testlabel2.test:value2")
actual := pathsFromMetric(unmatchedMetric, "prefix.", testConfig.Write.Rules, testConfig.Write.TemplateData)
actual := pathsFromMetric(unmatchedMetric, FormatCarbon, "prefix.", testConfig.Write.Rules, testConfig.Write.TemplateData)
if len(actual) != 1 || expected[0] != actual[0] {
t.Errorf("Expected %s, got %s", expected, actual)
}
Expand All @@ -98,7 +120,7 @@ func TestUnmatchedMetricPathsFromMetric(t *testing.T) {
func TestTemplatedPathsFromMetric(t *testing.T) {
expected := make([]string, 0)
expected = append(expected, "tmpl_1.data%2Efoo.team-X")
actual := pathsFromMetric(metric, "", testConfig.Write.Rules, testConfig.Write.TemplateData)
actual := pathsFromMetric(metric, FormatCarbon, "", testConfig.Write.Rules, testConfig.Write.TemplateData)
if len(actual) != 1 || expected[0] != actual[0] {
t.Errorf("Expected %s, got %s", expected, actual)
}
Expand All @@ -114,7 +136,7 @@ func TestMultiTemplatedPathsFromMetric(t *testing.T) {
expected := make([]string, 0)
expected = append(expected, "tmpl_1.data%2Efoo.team-X")
expected = append(expected, "tmpl_2.team-X.data.foo")
actual := pathsFromMetric(multiMatchMetric, "", testConfig.Write.Rules, testConfig.Write.TemplateData)
actual := pathsFromMetric(multiMatchMetric, FormatCarbon, "", testConfig.Write.Rules, testConfig.Write.TemplateData)
if len(actual) != 2 || expected[0] != actual[0] || expected[1] != actual[1] {
t.Errorf("Expected %s, got %s", expected, actual)
}
Expand All @@ -129,7 +151,7 @@ func TestSkipedTemplatedPathsFromMetric(t *testing.T) {
}
t.Log(testConfig.Write.Rules[2])
expected := make([]string, 0)
actual := pathsFromMetric(skipedMetric, "", testConfig.Write.Rules, testConfig.Write.TemplateData)
actual := pathsFromMetric(skipedMetric, FormatCarbon, "", testConfig.Write.Rules, testConfig.Write.TemplateData)
if len(actual) != 0 {
t.Errorf("Expected %s, got %s", expected, actual)
}
Expand Down
2 changes: 1 addition & 1 deletion client/graphite/write.go
Expand Up @@ -83,7 +83,7 @@ func (c *Client) Write(samples model.Samples) error {

var buf bytes.Buffer
for _, s := range samples {
paths := pathsFromMetric(s.Metric, c.cfg.DefaultPrefix, c.cfg.Write.Rules, c.cfg.Write.TemplateData)
paths := pathsFromMetric(s.Metric, c.format, c.cfg.DefaultPrefix, c.cfg.Write.Rules, c.cfg.Write.TemplateData)
for _, k := range paths {
if str := c.prepareDataPoint(k, s); str != "" {
fmt.Fprint(&buf, str)
Expand Down

0 comments on commit 77efea1

Please sign in to comment.