Skip to content

Commit

Permalink
[WIP] Support for Tags
Browse files Browse the repository at this point in the history
Write:
- Support tags, in carbon and openmetrics format

Read:
- Support tags

TODO in a later patch:
- Support Graphite native Prometheus support: graphite-project/graphite-web#2195
- Support for upcomming Carbon native Prometheus support: graphite-project/carbon#735
- For both of these, just make sure the prefix is handled correctly.

Some of this code comes from https://github.com/prometheus/prometheus/pull/3533/files
  • Loading branch information
Corentin Chary committed Jan 17, 2018
1 parent 3ab6cd5 commit 0e18318
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 53 deletions.
13 changes: 13 additions & 0 deletions client/graphite/client.go
Expand Up @@ -42,6 +42,7 @@ type Client struct {
readTimeout time.Duration
readDelay time.Duration
ignoredSamples prometheus.Counter
format Format

carbonCon net.Conn
carbonLastReconnectTime time.Time
Expand All @@ -63,10 +64,22 @@ func NewClient(cfg *config.Config, logger log.Logger) *Client {
"PathsCachePurgeInterval", cfg.Graphite.Write.PathsCachePurgeInterval,
"msg", "Paths cache initialized")
}

// Which format are we using to write points?
format := FormatCarbon
if cfg.Graphite.EnableTags {
if cfg.Graphite.UseOpenMetricsFormat {
format = FormatCarbonOpenMetrics
} else {
format = FormatCarbonTags
}
}

return &Client{
logger: logger,
cfg: &cfg.Graphite,
writeTimeout: cfg.Write.Timeout,
format: format,
readTimeout: cfg.Read.Timeout,
readDelay: cfg.Read.Delay,
ignoredSamples: prometheus.NewCounter(
Expand Down
4 changes: 4 additions & 0 deletions client/graphite/config/cli.go
Expand Up @@ -33,4 +33,8 @@ func AddCommandLine(app *kingpin.Application, cfg *Config) {
app.Flag("graphite.write.paths-cache-purge-interval",
"Duration between purges for expired items in the paths cache.").
DurationVar(&cfg.Write.PathsCachePurgeInterval)

app.Flag("graphite.enable-tags",
"Use Graphite tags.").
BoolVar(&cfg.EnableTags)
}
12 changes: 8 additions & 4 deletions client/graphite/config/config.go
Expand Up @@ -29,7 +29,9 @@ import (

// DefaultConfig is the default graphite configuration.
var DefaultConfig = Config{
DefaultPrefix: "",
DefaultPrefix: "",
EnableTags: false,
UseOpenMetricsFormat: false,
Write: WriteConfig{
CarbonAddress: "",
CarbonTransport: "tcp",
Expand All @@ -45,9 +47,11 @@ var DefaultConfig = Config{

// Config is the graphite configuration.
type Config struct {
Write WriteConfig `yaml:"write,omitempty" json:"write,omitempty"`
Read ReadConfig `yaml:"read,omitempty" json:"read,omitempty"`
DefaultPrefix string `yaml:"default_prefix,omitempty" json:"default_prefix,omitempty"`
Write WriteConfig `yaml:"write,omitempty" json:"write,omitempty"`
Read ReadConfig `yaml:"read,omitempty" json:"read,omitempty"`
DefaultPrefix string `yaml:"default_prefix,omitempty" json:"default_prefix,omitempty"`
EnableTags bool `yaml:"enable_tags,omitempty" json:"enable_tags,omitempty"`
UseOpenMetricsFormat bool `yaml:"openmetrics,omitempty" json:"openmetrics,omitempty"`

// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline" json:"-"`
Expand Down
4 changes: 3 additions & 1 deletion client/graphite/config/config_test.go
Expand Up @@ -27,7 +27,9 @@ import (

var (
expectedConf = &Config{
DefaultPrefix: "test.prefix.",
DefaultPrefix: "test.prefix.",
EnableTags: true,
UseOpenMetricsFormat: true,
Read: ReadConfig{
URL: "greatGraphiteWebURL",
},
Expand Down
4 changes: 3 additions & 1 deletion client/graphite/config/testdata/graphite.good.yml
@@ -1,4 +1,6 @@
default_prefix: test.prefix.
enable_tags: true
openmetrics: true
read:
url: greatGraphiteWebURL
write:
Expand Down Expand Up @@ -26,4 +28,4 @@ write:
continue: true
- match:
owner: team-Z
continue: false
continue: false
4 changes: 4 additions & 0 deletions client/graphite/http.go
Expand Up @@ -34,6 +34,7 @@ type ExpandResponse struct {
type RenderResponse struct {
Target string `yaml:"target,omitempty" json:"target,omitempty"`
Datapoints []*Datapoint `yaml:"datapoints,omitempty" json:"datapoints,omitempty"`
Tags Tags `yaml:"tags,omitempty" json:"tags,omitempty"`
}

// Datapoint pairs a timestamp to a value.
Expand All @@ -42,6 +43,9 @@ type Datapoint struct {
Timestamp int64
}

// Tags
type Tags map[string]string

// UnmarshalJSON unmarshals a Datapoint from json
func (d *Datapoint) UnmarshalJSON(b []byte) error {
var x []*interface{}
Expand Down
114 changes: 91 additions & 23 deletions client/graphite/read.go
Expand Up @@ -27,6 +27,8 @@ import (
pmetric "github.com/prometheus/prometheus/storage/metric"

"golang.org/x/net/context"
"strings"
"github.com/gogo/protobuf/proto"
)

func (c *Client) queryToTargets(ctx context.Context, query *prompb.Query) ([]string, error) {
Expand Down Expand Up @@ -75,6 +77,35 @@ func (c *Client) queryToTargets(ctx context.Context, query *prompb.Query) ([]str
return targets, err
}

func (c *Client) queryToTargetsWithTags(ctx context.Context, query *prompb.Query) ([]string, error) {
tagSet := []string{}

for _, m := range query.Matchers {
var name string
if m.Name == model.MetricNameLabel {
name = "name"
} else {
name = m.Name
}

switch m.Type {
case prompb.LabelMatcher_EQ:
tagSet = append(tagSet, "\""+name+"="+m.Value+"\"")
case prompb.LabelMatcher_NEQ:
tagSet = append(tagSet, "\""+name+"!="+m.Value+"\"")
case prompb.LabelMatcher_RE:
tagSet = append(tagSet, "\""+name+"=~^("+m.Value+")$\"")
case prompb.LabelMatcher_NRE:
tagSet = append(tagSet, "\""+name+"!=~^("+m.Value+")$\"")
default:
return nil, fmt.Errorf("unknown match type %v", m.Type)
}
}

targets := []string{"seriesByTag(" + strings.Join(tagSet, ",") + ")"}
return targets, nil
}

func (c *Client) filterTargets(query *prompb.Query, targets []string) ([]string, error) {
// Filter out targets that do not match the query's label matcher
var results []string
Expand Down Expand Up @@ -119,7 +150,7 @@ func (c *Client) filterTargets(query *prompb.Query, targets []string) ([]string,
return results, nil
}

func (c *Client) targetToTimeseries(ctx context.Context, target string, from string, until string) (*prompb.TimeSeries, error) {
func (c *Client) targetToTimeseries(ctx context.Context, target string, from string, until string) ([]*prompb.TimeSeries, error) {
renderURL, err := prepareURL(c.cfg.Read.URL, renderEndpoint, map[string]string{"format": "json", "from": from, "until": until, "target": target})
if err != nil {
level.Warn(c.logger).Log(
Expand All @@ -143,23 +174,32 @@ func (c *Client) targetToTimeseries(ctx context.Context, target string, from str
"msg", "Error parsing render endpoint response body")
return nil, err
}
renderResponse := renderResponses[0]

ts := &prompb.TimeSeries{}
ts.Labels, err = metricLabelsFromPath(renderResponse.Target, c.cfg.DefaultPrefix)
if err != nil {
level.Warn(c.logger).Log(
"path", renderResponse.Target, "prefix", c.cfg.DefaultPrefix, "err", err)
return nil, err
}
for _, datapoint := range renderResponse.Datapoints {
timstampMs := datapoint.Timestamp * 1000
if datapoint.Value == nil {
continue
ret := make([]*prompb.TimeSeries, len(renderResponses))
for _, renderResponse := range renderResponses {
ts := &prompb.TimeSeries{}

if c.cfg.EnableTags {
ts.Labels, err = metricLabelsFromTags(renderResponse.Tags, c.cfg.DefaultPrefix)
} else {
ts.Labels, err = metricLabelsFromPath(renderResponse.Target, c.cfg.DefaultPrefix)
}

if err != nil {
level.Warn(c.logger).Log(
"path", renderResponse.Target, "prefix", c.cfg.DefaultPrefix, "err", err)
return nil, err
}
for _, datapoint := range renderResponse.Datapoints {
timstampMs := datapoint.Timestamp * 1000
if datapoint.Value == nil {
continue
}
ts.Samples = append(ts.Samples, &prompb.Sample{Value: *datapoint.Value, Timestamp: timstampMs})
}
ts.Samples = append(ts.Samples, &prompb.Sample{Value: *datapoint.Value, Timestamp: timstampMs})
ret = append(ret, ts)
}
return ts, nil
return ret, nil
}

func min(a, b int) int {
Expand All @@ -185,13 +225,24 @@ func (c *Client) handleReadQuery(ctx context.Context, query *prompb.Query) (*pro
fromStr := strconv.Itoa(from)
untilStr := strconv.Itoa(until)

targets, err := c.queryToTargets(ctx, query)
targets := []string{}
var err error

if c.cfg.EnableTags {
targets, err = c.queryToTargetsWithTags(ctx, query)
} else {
// If we don't have tags we try to emulate then with normal paths.
targets, err = c.queryToTargets(ctx, query)
}
if err != nil {
return nil, err
}

level.Debug(c.logger).Log(
"targets", targets, "from", fromStr, "until", untilStr, "msg", "Fetching data")
c.fetchData(ctx, queryResult, targets, fromStr, untilStr)
return queryResult, nil

}

func (c *Client) fetchData(ctx context.Context, queryResult *prompb.QueryResult, targets []string, fromStr string, untilStr string) {
Expand All @@ -200,6 +251,7 @@ func (c *Client) fetchData(ctx context.Context, queryResult *prompb.QueryResult,

wg := sync.WaitGroup{}

// TODO: Send multiple targets per query, Graphite supports that.
// Start only a few workers to avoid killing graphite.
for i := 0; i < maxFetchWorkers; i++ {
wg.Add(1)
Expand All @@ -214,23 +266,39 @@ func (c *Client) fetchData(ctx context.Context, queryResult *prompb.QueryResult,
if err != nil {
level.Warn(c.logger).Log("target", target, "err", err, "msg", "Error fetching and parsing target datapoints")
} else {
output <- ts
level.Debug(c.logger).Log("reading responses")
for _, t := range ts {
output <- t
}
}
}
}(fromStr, untilStr, ctx)
}

// Feed the inut.
// Feed the input.
for _, target := range targets {
input <- target
}
close(input)

wg.Wait()
close(output)

for ts := range output {
queryResult.Timeseries = append(queryResult.Timeseries, ts)
// Close the output as soon as all jobs are done.
go func() {
wg.Wait()
close(output)
}()

// Read output until channel is closed.
for {
done := false
select {
case ts := <-output:
queryResult.Timeseries = append(queryResult.Timeseries, ts)
default:
done = true
}
if done {
break
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions client/graphite/read_test.go
Expand Up @@ -79,6 +79,8 @@ func TestQueryToTargets(t *testing.T) {
if !reflect.DeepEqual(expectedTargets, actualTargets) {
t.Errorf("Expected %s, got %s", expectedTargets, actualTargets)
}

// FIXME: Test with Tags
}

func TestInvalideQueryToTargets(t *testing.T) {
Expand Down Expand Up @@ -116,4 +118,6 @@ func TestTargetToTimeseries(t *testing.T) {
if !reflect.DeepEqual(expectedTs, actualTs) {
t.Errorf("Expected %s, got %s", expectedTs, actualTs)
}

// FIXME: Test with Tags
}

0 comments on commit 0e18318

Please sign in to comment.