diff --git a/client/graphite/client.go b/client/graphite/client.go index a81f77a3..bd3b9f2f 100644 --- a/client/graphite/client.go +++ b/client/graphite/client.go @@ -42,6 +42,7 @@ type Client struct { readTimeout time.Duration readDelay time.Duration ignoredSamples prometheus.Counter + format Format carbonCon net.Conn carbonLastReconnectTime time.Time @@ -63,10 +64,22 @@ func NewClient(cfg *config.Config, logger log.Logger) *Client { "PathsCachePurgeInterval", cfg.Graphite.Write.PathsCachePurgeInterval, "msg", "Paths cache initialized") } + + // Which format are we using to write points? + format := FormatCarbon + if cfg.Graphite.EnableTags { + if cfg.Graphite.UseOpenMetricsFormat { + format = FormatCarbonOpenMetrics + } else { + format = FormatCarbonTags + } + } + return &Client{ logger: logger, cfg: &cfg.Graphite, writeTimeout: cfg.Write.Timeout, + format: format, readTimeout: cfg.Read.Timeout, readDelay: cfg.Read.Delay, ignoredSamples: prometheus.NewCounter( diff --git a/client/graphite/config/cli.go b/client/graphite/config/cli.go index 82e02040..1eb4ff49 100644 --- a/client/graphite/config/cli.go +++ b/client/graphite/config/cli.go @@ -33,4 +33,8 @@ func AddCommandLine(app *kingpin.Application, cfg *Config) { app.Flag("graphite.write.paths-cache-purge-interval", "Duration between purges for expired items in the paths cache."). DurationVar(&cfg.Write.PathsCachePurgeInterval) + + app.Flag("graphite.enable-tags", + "Use Graphite tags."). + BoolVar(&cfg.EnableTags) } diff --git a/client/graphite/config/config.go b/client/graphite/config/config.go index 1a6dde12..4ea939e8 100644 --- a/client/graphite/config/config.go +++ b/client/graphite/config/config.go @@ -29,7 +29,9 @@ import ( // DefaultConfig is the default graphite configuration. var DefaultConfig = Config{ - DefaultPrefix: "", + DefaultPrefix: "", + EnableTags: false, + UseOpenMetricsFormat: false, Write: WriteConfig{ CarbonAddress: "", CarbonTransport: "tcp", @@ -45,9 +47,11 @@ var DefaultConfig = Config{ // Config is the graphite configuration. type Config struct { - Write WriteConfig `yaml:"write,omitempty" json:"write,omitempty"` - Read ReadConfig `yaml:"read,omitempty" json:"read,omitempty"` - DefaultPrefix string `yaml:"default_prefix,omitempty" json:"default_prefix,omitempty"` + Write WriteConfig `yaml:"write,omitempty" json:"write,omitempty"` + Read ReadConfig `yaml:"read,omitempty" json:"read,omitempty"` + DefaultPrefix string `yaml:"default_prefix,omitempty" json:"default_prefix,omitempty"` + EnableTags bool `yaml:"enable_tags,omitempty" json:"enable_tags,omitempty"` + UseOpenMetricsFormat bool `yaml:"openmetrics,omitempty" json:"openmetrics,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline" json:"-"` diff --git a/client/graphite/config/config_test.go b/client/graphite/config/config_test.go index b93e37b0..e07c0d2b 100644 --- a/client/graphite/config/config_test.go +++ b/client/graphite/config/config_test.go @@ -27,7 +27,9 @@ import ( var ( expectedConf = &Config{ - DefaultPrefix: "test.prefix.", + DefaultPrefix: "test.prefix.", + EnableTags: true, + UseOpenMetricsFormat: true, Read: ReadConfig{ URL: "greatGraphiteWebURL", }, diff --git a/client/graphite/config/testdata/graphite.good.yml b/client/graphite/config/testdata/graphite.good.yml index 1fe100a8..0430efa3 100644 --- a/client/graphite/config/testdata/graphite.good.yml +++ b/client/graphite/config/testdata/graphite.good.yml @@ -1,4 +1,6 @@ default_prefix: test.prefix. +enable_tags: true +openmetrics: true read: url: greatGraphiteWebURL write: @@ -26,4 +28,4 @@ write: continue: true - match: owner: team-Z - continue: false + continue: false \ No newline at end of file diff --git a/client/graphite/http.go b/client/graphite/http.go index 5da7ddb8..57cd1d36 100644 --- a/client/graphite/http.go +++ b/client/graphite/http.go @@ -34,6 +34,7 @@ type ExpandResponse struct { type RenderResponse struct { Target string `yaml:"target,omitempty" json:"target,omitempty"` Datapoints []*Datapoint `yaml:"datapoints,omitempty" json:"datapoints,omitempty"` + // FIXME: handle tags } // Datapoint pairs a timestamp to a value. diff --git a/client/graphite/read.go b/client/graphite/read.go index 287c8a27..e174cad9 100644 --- a/client/graphite/read.go +++ b/client/graphite/read.go @@ -27,6 +27,8 @@ import ( pmetric "github.com/prometheus/prometheus/storage/metric" "golang.org/x/net/context" + "net/url" + "strings" ) func (c *Client) queryToTargets(ctx context.Context, query *prompb.Query) ([]string, error) { @@ -75,6 +77,35 @@ func (c *Client) queryToTargets(ctx context.Context, query *prompb.Query) ([]str return targets, err } +func (c *Client) queryToTargetswithTags(ctx context.Context, query *prompb.Query) ([]string, error) { + tagSet := []string{} + + for _, m := range query.Matchers { + var name string + if m.Name == model.MetricNameLabel { + name = "name" + } else { + name = m.Name + } + + switch m.Type { + case prompb.LabelMatcher_EQ: + tagSet = append(tagSet, "\""+name+"="+m.Value+"\"") + case prompb.LabelMatcher_NEQ: + tagSet = append(tagSet, "\""+name+"!="+m.Value+"\"") + case prompb.LabelMatcher_RE: + tagSet = append(tagSet, "\""+name+"=~^("+m.Value+")$\"") + case prompb.LabelMatcher_NRE: + tagSet = append(tagSet, "\""+name+"!=~^("+m.Value+")$\"") + default: + return nil, fmt.Errorf("unknown match type %v", m.Type) + } + } + + targets := []string{"seriesByTag(" + strings.Join(tagSet, ",") + ")"} + return targets, nil +} + func (c *Client) filterTargets(query *prompb.Query, targets []string) ([]string, error) { // Filter out targets that do not match the query's label matcher var results []string @@ -146,6 +177,7 @@ func (c *Client) targetToTimeseries(ctx context.Context, target string, from str renderResponse := renderResponses[0] ts := &prompb.TimeSeries{} + // FIXME: handle tags ts.Labels, err = metricLabelsFromPath(renderResponse.Target, c.cfg.DefaultPrefix) if err != nil { level.Warn(c.logger).Log( @@ -185,13 +217,26 @@ func (c *Client) handleReadQuery(ctx context.Context, query *prompb.Query) (*pro fromStr := strconv.Itoa(from) untilStr := strconv.Itoa(until) - targets, err := c.queryToTargets(ctx, query) + targets := []string{} + var err error + + if c.cfg.EnableTags { + targets, err = c.queryToTargetsWithTags(ctx, query) + } else { + // If we don't have tags we try to emulate then with normal paths. + targets, err = c.queryToTargets(ctx, query) + } if err != nil { return nil, err } c.fetchData(ctx, queryResult, targets, fromStr, untilStr) return queryResult, nil + +} + +func (c *Client) fetchDataWithTags(ctx context.Context, queryResult *prompb.QueryResult, targets []string, fromStr string, untilStr string) { + } func (c *Client) fetchData(ctx context.Context, queryResult *prompb.QueryResult, targets []string, fromStr string, untilStr string) { @@ -200,6 +245,7 @@ func (c *Client) fetchData(ctx context.Context, queryResult *prompb.QueryResult, wg := sync.WaitGroup{} + // TODO: Send multiple targets per query, Graphite supports that. // Start only a few workers to avoid killing graphite. for i := 0; i < maxFetchWorkers; i++ { wg.Add(1) diff --git a/client/graphite/read_test.go b/client/graphite/read_test.go index 96cb762a..c179d4c4 100644 --- a/client/graphite/read_test.go +++ b/client/graphite/read_test.go @@ -79,6 +79,8 @@ func TestQueryToTargets(t *testing.T) { if !reflect.DeepEqual(expectedTargets, actualTargets) { t.Errorf("Expected %s, got %s", expectedTargets, actualTargets) } + + // FIXME: Test with Tags } func TestInvalideQueryToTargets(t *testing.T) { @@ -116,4 +118,6 @@ func TestTargetToTimeseries(t *testing.T) { if !reflect.DeepEqual(expectedTs, actualTs) { t.Errorf("Expected %s, got %s", expectedTs, actualTs) } + + // FIXME: Test with Tags } diff --git a/client/graphite/rules.go b/client/graphite/rules.go index b63ed63f..7cc34f1c 100644 --- a/client/graphite/rules.go +++ b/client/graphite/rules.go @@ -29,6 +29,14 @@ import ( "github.com/patrickmn/go-cache" ) +type Format int + +const ( + FormatCarbon Format = 1 + FormatCarbonTags = 2 + FormatCarbonOpenMetrics = 3 +) + var ( pathsCache *cache.Cache pathsCacheEnabled = false @@ -66,7 +74,7 @@ func match(m model.Metric, match config.LabelSet, matchRE config.LabelSetRE) boo return true } -func pathsFromMetric(m model.Metric, prefix string, rules []*config.Rule, templateData map[string]interface{}) []string { +func pathsFromMetric(m model.Metric, format Format, prefix string, rules []*config.Rule, templateData map[string]interface{}) []string { if pathsCacheEnabled { cachedPaths, cached := pathsCache.Get(m.Fingerprint().String()) if cached { @@ -76,7 +84,7 @@ func pathsFromMetric(m model.Metric, prefix string, rules []*config.Rule, templa paths, skipped := templatedPaths(m, rules, templateData) // if it doesn't match any rule, use default path if len(paths) == 0 && !skipped { - paths = append(paths, defaultPath(m, prefix)) + paths = append(paths, defaultPath(m, format, prefix)) } if pathsCacheEnabled { pathsCache.Set(m.Fingerprint().String(), paths, cache.DefaultExpiration) @@ -108,8 +116,9 @@ func templatedPaths(m model.Metric, rules []*config.Rule, templateData map[strin return paths, false } -func defaultPath(m model.Metric, prefix string) string { +func defaultPath(m model.Metric, format Format, prefix string) string { var buffer bytes.Buffer + var lbuffer bytes.Buffer buffer.WriteString(prefix) buffer.WriteString(utils.Escape(string(m[model.MetricNameLabel]))) @@ -121,18 +130,42 @@ func defaultPath(m model.Metric, prefix string) string { } sort.Sort(labels) - // For each label, in order, add ".