From 37e03e1c8fdfc0cae50ac08491127e9a6c7ff0bb Mon Sep 17 00:00:00 2001 From: Javier Kohen Date: Tue, 12 Jun 2018 16:18:54 -0400 Subject: [PATCH 1/7] Support command-line arguments in test-in-prod. --- test-in-prod | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test-in-prod b/test-in-prod index 960be6a8..de8a887c 100755 --- a/test-in-prod +++ b/test-in-prod @@ -3,4 +3,5 @@ exec ./stackdriver-prometheus-sidecar \ --stackdriver.project-id=prometheus-to-sd \ --stackdriver.global-label=_kubernetes_cluster_name=prom-test-cluster-1 \ --stackdriver.global-label=_kubernetes_location=us-central1-a \ - --stackdriver.global-label=__meta_kubernetes_namespace=stackdriver + --stackdriver.global-label=__meta_kubernetes_namespace=stackdriver \ + "$@" From 292a1a74ea9679a686537ce87e50d00ba59053d0 Mon Sep 17 00:00:00 2001 From: Javier Kohen Date: Tue, 12 Jun 2018 16:20:49 -0400 Subject: [PATCH 2/7] Use discovered target labels when building samples to send to Stackdriver. --- cmd/stackdriver-prometheus-sidecar/main.go | 16 ++- retrieval/manager.go | 50 ++++++--- retrieval/manager_test.go | 119 ++++++++++++++------- targets/cache.go | 10 +- targets/cache_test.go | 4 +- 5 files changed, 139 insertions(+), 60 deletions(-) diff --git a/cmd/stackdriver-prometheus-sidecar/main.go b/cmd/stackdriver-prometheus-sidecar/main.go index fa001e69..b59e91ce 100644 --- a/cmd/stackdriver-prometheus-sidecar/main.go +++ b/cmd/stackdriver-prometheus-sidecar/main.go @@ -43,6 +43,7 @@ import ( "github.com/Stackdriver/stackdriver-prometheus-sidecar/retrieval" "github.com/Stackdriver/stackdriver-prometheus-sidecar/stackdriver" + "github.com/Stackdriver/stackdriver-prometheus-sidecar/targets" "github.com/prometheus/common/promlog" promlogflag "github.com/prometheus/common/promlog/flag" ) @@ -62,7 +63,7 @@ func main() { globalLabels map[string]string stackdriverAddress *url.URL walDirectory string - prometheusAddress string + prometheusURL *url.URL listenAddress string logLevel promlog.AllowedLevel @@ -91,7 +92,7 @@ func main() { Default("data/wal").StringVar(&cfg.walDirectory) a.Flag("prometheus.api-address", "Address to listen on for UI, API, and telemetry."). - Default("0.0.0.0:9090").StringVar(&cfg.prometheusAddress) + Default("http://127.0.0.1:9090/").URLVar(&cfg.prometheusURL) a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry."). Default("0.0.0.0:9091").StringVar(&cfg.listenAddress) @@ -106,8 +107,6 @@ func main() { } logger := promlog.New(cfg.logLevel) - cfg.globalLabels["_stackdriver_project_id"] = *projectId - cfg.projectIdResource = fmt.Sprintf("projects/%v", *projectId) // XXX(fabxc): Kubernetes does background logging which we can only customize by modifying // a global variable. @@ -123,6 +122,13 @@ func main() { level.Info(logger).Log("host_details", Uname()) level.Info(logger).Log("fd_limits", FdLimits()) + cfg.globalLabels["_stackdriver_project_id"] = *projectId + cfg.projectIdResource = fmt.Sprintf("projects/%v", *projectId) + targetsURL, err := cfg.prometheusURL.Parse(targets.DefaultApiEndpoint) + if err != nil { + panic(err) + } + // TODO(jkohen): Remove once we have proper translation of all metric // types. Currently Stackdriver fails the entire request if you attempt // to write to the different metric type, which we do fairly often at @@ -171,7 +177,7 @@ func main() { timeout: 10 * time.Second, }, ) - prometheusReader = retrieval.NewPrometheusReader(log.With(logger, "component", "Prometheus reader"), cfg.walDirectory, queueManager) + prometheusReader = retrieval.NewPrometheusReader(log.With(logger, "component", "Prometheus reader"), cfg.walDirectory, targetsURL, queueManager) ) // Exclude kingpin default flags to expose only Prometheus ones. diff --git a/retrieval/manager.go b/retrieval/manager.go index 186ccb95..9a049471 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -16,9 +16,11 @@ package retrieval import ( "context" "fmt" + "net/url" "sort" "github.com/Stackdriver/stackdriver-prometheus-sidecar/tail" + "github.com/Stackdriver/stackdriver-prometheus-sidecar/targets" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" @@ -26,21 +28,24 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/tsdb" + tsdblabels "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/wal" ) // NewPrometheusReader is the PrometheusReader constructor -func NewPrometheusReader(logger log.Logger, walDirectory string, appender Appender) *PrometheusReader { +func NewPrometheusReader(logger log.Logger, walDirectory string, promURL *url.URL, appender Appender) *PrometheusReader { return &PrometheusReader{ appender: appender, logger: logger, walDirectory: walDirectory, + promURL: promURL, } } type PrometheusReader struct { logger log.Logger walDirectory string + promURL *url.URL appender Appender cancelTail context.CancelFunc } @@ -57,6 +62,9 @@ func (r *PrometheusReader) Run() error { seriesCache := newSeriesCache(r.logger, r.walDirectory) go seriesCache.run(ctx) + targetCache := targets.NewCache(r.logger, nil, r.promURL) + go targetCache.Run(ctx) + // NOTE(fabxc): wrap the tailer into a buffered reader once we become concerned // with performance. The WAL reader will do a lot of tiny reads otherwise. // This is also the reason for the series cache dealing with "maxSegment" hints @@ -86,7 +94,7 @@ func (r *PrometheusReader) Run() error { } for len(recordSamples) > 0 { var outputSample *MetricFamily - outputSample, recordSamples, err = buildSample(seriesCache, recordSamples) + outputSample, recordSamples, err = buildSample(ctx, seriesCache, targetCache, recordSamples) if err != nil { level.Warn(r.logger).Log("msg", "Failed to build sample", "err", err) continue @@ -108,7 +116,7 @@ func (r *PrometheusReader) Stop() { // Creates a MetricFamily instance from the head of recordSamples, or error if // that fails. In either case, this function returns the recordSamples items // that weren't consumed. -func buildSample(seriesGetter seriesGetter, recordSamples []tsdb.RefSample) (*MetricFamily, []tsdb.RefSample, error) { +func buildSample(ctx context.Context, seriesGetter seriesGetter, targetGetter targets.Getter, recordSamples []tsdb.RefSample) (*MetricFamily, []tsdb.RefSample, error) { sample := recordSamples[0] lset, ok := seriesGetter.get(sample.Ref) if !ok { @@ -120,6 +128,7 @@ func buildSample(seriesGetter seriesGetter, recordSamples []tsdb.RefSample) (*Me } metric := metricFamily.Metric[0] metric.Label = make([]*dto.LabelPair, 0, len(lset)-1) + // TODO(jkohen): filter `lset` with targets.DropTargetLabels. for _, l := range lset { if l.Name == labels.MetricName { metricFamily.Name = proto.String(l.Value) @@ -136,17 +145,32 @@ func buildSample(seriesGetter seriesGetter, recordSamples []tsdb.RefSample) (*Me metric.TimestampMs = proto.Int64(sample.T) // TODO(jkohen): track reset timestamps. metricResetTimestampMs := []int64{NoTimestamp} - // TODO(jkohen): fill in the discovered labels from the Targets API. - targetLabels := make(labels.Labels, 0, len(lset)) - for _, l := range lset { - if l.Name == labels.MetricName { - continue - } - targetLabels = append(targetLabels, labels.Label(l)) + // Fill in the discovered labels from the Targets API. + target, err := targetGetter.Get(ctx, pkgLabels(lset)) + if err != nil { + return nil, recordSamples[1:], err + } + m, err := NewMetricFamily(metricFamily, metricResetTimestampMs, target.DiscoveredLabels) + return m, recordSamples[1:], err +} + +// TODO(jkohen): We should be able to avoid this conversion. +func tsdbLabels(input labels.Labels) tsdblabels.Labels { + output := make(tsdblabels.Labels, 0, len(input)) + for _, l := range input { + output = append(output, tsdblabels.Label(l)) + } + return output +} + +// TODO(jkohen): We should be able to avoid this conversion. +func pkgLabels(input tsdblabels.Labels) labels.Labels { + output := make(labels.Labels, 0, len(input)) + for _, l := range input { + output = append(output, labels.Label(l)) } // labels.Labels expects the contents to be sorted. We could move to an // interface that doesn't require order, to save some cycles. - sort.Sort(targetLabels) - m, err := NewMetricFamily(metricFamily, metricResetTimestampMs, targetLabels) - return m, recordSamples[1:], err + sort.Sort(output) + return output } diff --git a/retrieval/manager_test.go b/retrieval/manager_test.go index 7a2a3bd7..9ff58b53 100644 --- a/retrieval/manager_test.go +++ b/retrieval/manager_test.go @@ -14,8 +14,11 @@ limitations under the License. package retrieval import ( + "context" + "fmt" "testing" + "github.com/Stackdriver/stackdriver-prometheus-sidecar/targets" promlabels "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/labels" @@ -35,49 +38,89 @@ func (g *seriesMap) get(ref uint64) (labels.Labels, bool) { return ls, ok } +// Implements TargetGetter. +// The map key is the value of the first label in the lset given as an input to Get. +type targetMap struct { + m map[string]targets.Target +} + +func newTargetMap() targetMap { + return targetMap{m: make(map[string]targets.Target)} +} + +func (g *targetMap) Get(ctx context.Context, lset promlabels.Labels) (*targets.Target, error) { + key := lset[0].Value + t, ok := g.m[key] + if !ok { + return nil, fmt.Errorf("no target match for label %v", lset[0]) + } + return &t, nil +} + func TestBuildSample(t *testing.T) { + ctx := context.Background() seriesMap := newSeriesMap() + targetMap := newTargetMap() timestamp := int64(1234) value := 2.1 - recordSamples := []tsdb.RefSample{ - {Ref: /*unknown*/ 999, T: timestamp, V: value}, - {Ref: /*unknown*/ 999, T: timestamp, V: value}, - } - sample, recordSamples, err := buildSample(&seriesMap, recordSamples) - if err == nil { - t.Errorf("Expected error, got sample %v", sample) - } - if len(recordSamples) != 1 { - t.Errorf("Expected one leftover sample, got samples %v", recordSamples) - } + t.Run("NoSeries", func(t *testing.T) { + recordSamples := []tsdb.RefSample{ + {Ref: /*unknown*/ 999, T: timestamp, V: value}, + {Ref: /*unknown*/ 999, T: timestamp, V: value}, + } + sample, recordSamples, err := buildSample(ctx, &seriesMap, &targetMap, recordSamples) + if err == nil { + t.Errorf("Expected error, got sample %v", sample) + } + if len(recordSamples) != 1 { + t.Errorf("Expected one leftover sample, got samples %v", recordSamples) + } + }) - ref := uint64(0) - seriesLabels := labels.Labels{{"__name__", "my_metric"}, {"job", "job1"}, {"instance", "i1"}} - seriesMap.m[ref] = seriesLabels - recordSamples = []tsdb.RefSample{{Ref: ref, T: timestamp, V: value}} - sample, recordSamples, err = buildSample(&seriesMap, recordSamples) - if err != nil { - t.Error(err) - } - if len(recordSamples) != 0 { - t.Errorf("Expected all samples to be consumed, got samples %v", recordSamples) - } - if sample == nil { - t.Error("Unexpected nil sample") - } - if sample.GetName() != "my_metric" { - t.Errorf("Expected name 'my_metric', got %v", sample.GetName()) - } - if sample.Metric[0].GetTimestampMs() != timestamp { - t.Errorf("Expected timestamp '%v', got %v", timestamp, sample.Metric[0].GetTimestampMs()) - } - if sample.Metric[0].Untyped.GetValue() != value { - t.Errorf("Expected value '%v', got %v", value, sample.Metric[0].Untyped.GetValue()) - } - targetLabels := promlabels.FromStrings("job", "job1", "instance", "i1") - if !promlabels.Equal(sample.TargetLabels, targetLabels) { - t.Errorf("Expected target labels '%v', got %v", targetLabels, sample.TargetLabels) - } + t.Run("NoTarget", func(t *testing.T) { + ref := uint64(0) + seriesLabels := labels.Labels{{"__name__", "my_metric"}, {"job", "job1"}, {"instance", "i1"}} + seriesMap.m[ref] = seriesLabels + recordSamples := []tsdb.RefSample{{Ref: ref, T: timestamp, V: value}} + sample, recordSamples, err := buildSample(ctx, &seriesMap, &targetMap, recordSamples) + if err == nil { + t.Errorf("Expected error, got sample %v", sample) + } + if len(recordSamples) != 0 { + t.Errorf("Expected all samples to be consumed, got samples %v", recordSamples) + } + }) + + t.Run("Successful", func(t *testing.T) { + ref := uint64(0) + seriesLabels := labels.Labels{{"__name__", "my_metric"}, {"job", "job1"}, {"instance", "i1"}} + seriesMap.m[ref] = seriesLabels + targetMap.m[seriesLabels[0].Value] = targets.Target{DiscoveredLabels: promlabels.Labels{{"dkey", "dvalue"}}} + recordSamples := []tsdb.RefSample{{Ref: ref, T: timestamp, V: value}} + sample, recordSamples, err := buildSample(ctx, &seriesMap, &targetMap, recordSamples) + if err != nil { + t.Error(err) + } + if len(recordSamples) != 0 { + t.Errorf("Expected all samples to be consumed, got samples %v", recordSamples) + } + if sample == nil { + t.Error("Unexpected nil sample") + } + if sample.GetName() != "my_metric" { + t.Errorf("Expected name 'my_metric', got %v", sample.GetName()) + } + if sample.Metric[0].GetTimestampMs() != timestamp { + t.Errorf("Expected timestamp '%v', got %v", timestamp, sample.Metric[0].GetTimestampMs()) + } + if sample.Metric[0].Untyped.GetValue() != value { + t.Errorf("Expected value '%v', got %v", value, sample.Metric[0].Untyped.GetValue()) + } + targetLabels := promlabels.FromStrings("dkey", "dvalue") + if !promlabels.Equal(sample.TargetLabels, targetLabels) { + t.Errorf("Expected target labels '%v', got %v", targetLabels, sample.TargetLabels) + } + }) } diff --git a/targets/cache.go b/targets/cache.go index 45a919a8..8426cb80 100644 --- a/targets/cache.go +++ b/targets/cache.go @@ -27,7 +27,11 @@ import ( "github.com/prometheus/prometheus/pkg/labels" ) -const DefaultTargetsEndpoint = "/api/v1/targets" +type Getter interface { + Get(ctx context.Context, lset labels.Labels) (*Target, error) +} + +const DefaultApiEndpoint = "/api/v1/targets" func cacheKey(job, instance string) string { return job + "\xff" + instance @@ -38,6 +42,7 @@ func cacheKey(job, instance string) string { // unique by job and instance label and an optional but consistent set of additional labels. // It only provides best effort matching for configurations where targets are identified // by a varying set of labels within a job and instance combination. +// Implements TargetGetter. type Cache struct { logger log.Logger client *http.Client @@ -48,7 +53,7 @@ type Cache struct { targets map[string][]*Target } -func NewCache(ctx context.Context, logger log.Logger, client *http.Client, promURL *url.URL) *Cache { +func NewCache(logger log.Logger, client *http.Client, promURL *url.URL) *Cache { if client == nil { client = http.DefaultClient } @@ -56,6 +61,7 @@ func NewCache(ctx context.Context, logger log.Logger, client *http.Client, promU logger = log.NewNopLogger() } return &Cache{ + logger: logger, client: client, url: promURL, targets: map[string][]*Target{}, diff --git a/targets/cache_test.go b/targets/cache_test.go index f55d9b0d..633556ba 100644 --- a/targets/cache_test.go +++ b/targets/cache_test.go @@ -39,7 +39,7 @@ func TestTargetCache_Error(t *testing.T) { if err != nil { t.Fatal(err) } - c := NewCache(ctx, nil, nil, u) + c := NewCache(nil, nil, u) expectedTarget := &Target{ Labels: labels.FromStrings("job", "a", "instance", "c"), @@ -104,7 +104,7 @@ func TestTargetCache_Success(t *testing.T) { if err != nil { t.Fatal(err) } - c := NewCache(ctx, nil, nil, u) + c := NewCache(nil, nil, u) handler = func() []*Target { return []*Target{ From 2e9e79f7d73ea639588b1b8ae59537bb944cbc2b Mon Sep 17 00:00:00 2001 From: Javier Kohen Date: Tue, 12 Jun 2018 17:17:05 -0400 Subject: [PATCH 3/7] Drop target labels from metric label set. --- retrieval/manager.go | 28 ++++++++++------------------ retrieval/manager_test.go | 15 +++++++++++++-- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/retrieval/manager.go b/retrieval/manager.go index 9a049471..773a1743 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -118,18 +118,24 @@ func (r *PrometheusReader) Stop() { // that weren't consumed. func buildSample(ctx context.Context, seriesGetter seriesGetter, targetGetter targets.Getter, recordSamples []tsdb.RefSample) (*MetricFamily, []tsdb.RefSample, error) { sample := recordSamples[0] - lset, ok := seriesGetter.get(sample.Ref) + tsdblset, ok := seriesGetter.get(sample.Ref) if !ok { return nil, recordSamples[1:], fmt.Errorf("sample=%v", sample) } + lset := pkgLabels(tsdblset) + // Fill in the discovered labels from the Targets API. + target, err := targetGetter.Get(ctx, lset) + if err != nil { + return nil, recordSamples[1:], err + } + metricLabels := targets.DropTargetLabels(lset, target.DiscoveredLabels) // TODO(jkohen): Rebuild histograms and summary from individual time series. metricFamily := &dto.MetricFamily{ Metric: []*dto.Metric{{}}, } metric := metricFamily.Metric[0] - metric.Label = make([]*dto.LabelPair, 0, len(lset)-1) - // TODO(jkohen): filter `lset` with targets.DropTargetLabels. - for _, l := range lset { + metric.Label = make([]*dto.LabelPair, 0, len(metricLabels)-1) + for _, l := range metricLabels { if l.Name == labels.MetricName { metricFamily.Name = proto.String(l.Value) continue @@ -145,24 +151,10 @@ func buildSample(ctx context.Context, seriesGetter seriesGetter, targetGetter ta metric.TimestampMs = proto.Int64(sample.T) // TODO(jkohen): track reset timestamps. metricResetTimestampMs := []int64{NoTimestamp} - // Fill in the discovered labels from the Targets API. - target, err := targetGetter.Get(ctx, pkgLabels(lset)) - if err != nil { - return nil, recordSamples[1:], err - } m, err := NewMetricFamily(metricFamily, metricResetTimestampMs, target.DiscoveredLabels) return m, recordSamples[1:], err } -// TODO(jkohen): We should be able to avoid this conversion. -func tsdbLabels(input labels.Labels) tsdblabels.Labels { - output := make(tsdblabels.Labels, 0, len(input)) - for _, l := range input { - output = append(output, tsdblabels.Label(l)) - } - return output -} - // TODO(jkohen): We should be able to avoid this conversion. func pkgLabels(input tsdblabels.Labels) labels.Labels { output := make(labels.Labels, 0, len(input)) diff --git a/retrieval/manager_test.go b/retrieval/manager_test.go index 9ff58b53..c61ecdf8 100644 --- a/retrieval/manager_test.go +++ b/retrieval/manager_test.go @@ -97,7 +97,14 @@ func TestBuildSample(t *testing.T) { ref := uint64(0) seriesLabels := labels.Labels{{"__name__", "my_metric"}, {"job", "job1"}, {"instance", "i1"}} seriesMap.m[ref] = seriesLabels - targetMap.m[seriesLabels[0].Value] = targets.Target{DiscoveredLabels: promlabels.Labels{{"dkey", "dvalue"}}} + // The discovered labels include a label "job" and no "instance" + // label, which will cause the metric labels to include + // "instance", but not "job". + targetMap.m[seriesLabels[0].Value] = targets.Target{ + DiscoveredLabels: promlabels.Labels{ + {"dkey", "dvalue"}, + {"job", "job1"}, + }} recordSamples := []tsdb.RefSample{{Ref: ref, T: timestamp, V: value}} sample, recordSamples, err := buildSample(ctx, &seriesMap, &targetMap, recordSamples) if err != nil { @@ -118,9 +125,13 @@ func TestBuildSample(t *testing.T) { if sample.Metric[0].Untyped.GetValue() != value { t.Errorf("Expected value '%v', got %v", value, sample.Metric[0].Untyped.GetValue()) } - targetLabels := promlabels.FromStrings("dkey", "dvalue") + targetLabels := promlabels.FromStrings("dkey", "dvalue", "job", "job1") if !promlabels.Equal(sample.TargetLabels, targetLabels) { t.Errorf("Expected target labels '%v', got %v", targetLabels, sample.TargetLabels) } + metricLabels := promlabels.FromStrings("instance", "i1") + if !promlabels.Equal(LabelPairsToLabels(sample.Metric[0].Label), metricLabels) { + t.Errorf("Expected metric labels '%v', got %v", metricLabels, sample.Metric[0].Label) + } }) } From c057c7d83e923576fa1a0c9dce2d2cbd4a7c3241 Mon Sep 17 00:00:00 2001 From: Javier Kohen Date: Wed, 13 Jun 2018 09:10:56 -0400 Subject: [PATCH 4/7] Review cleanups. --- cmd/stackdriver-prometheus-sidecar/main.go | 2 +- retrieval/manager.go | 15 ++++++++------- targets/cache.go | 6 +----- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/cmd/stackdriver-prometheus-sidecar/main.go b/cmd/stackdriver-prometheus-sidecar/main.go index b59e91ce..fa3c2dde 100644 --- a/cmd/stackdriver-prometheus-sidecar/main.go +++ b/cmd/stackdriver-prometheus-sidecar/main.go @@ -124,7 +124,7 @@ func main() { cfg.globalLabels["_stackdriver_project_id"] = *projectId cfg.projectIdResource = fmt.Sprintf("projects/%v", *projectId) - targetsURL, err := cfg.prometheusURL.Parse(targets.DefaultApiEndpoint) + targetsURL, err := cfg.prometheusURL.Parse(targets.DefaultAPIEndpoint) if err != nil { panic(err) } diff --git a/retrieval/manager.go b/retrieval/manager.go index 773a1743..c4286f74 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -17,13 +17,13 @@ import ( "context" "fmt" "net/url" - "sort" "github.com/Stackdriver/stackdriver-prometheus-sidecar/tail" "github.com/Stackdriver/stackdriver-prometheus-sidecar/targets" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" "github.com/prometheus/prometheus/pkg/labels" @@ -113,20 +113,24 @@ func (r *PrometheusReader) Stop() { r.cancelTail() } +type Getter interface { + Get(ctx context.Context, lset labels.Labels) (*targets.Target, error) +} + // Creates a MetricFamily instance from the head of recordSamples, or error if // that fails. In either case, this function returns the recordSamples items // that weren't consumed. -func buildSample(ctx context.Context, seriesGetter seriesGetter, targetGetter targets.Getter, recordSamples []tsdb.RefSample) (*MetricFamily, []tsdb.RefSample, error) { +func buildSample(ctx context.Context, seriesGetter seriesGetter, targetGetter Getter, recordSamples []tsdb.RefSample) (*MetricFamily, []tsdb.RefSample, error) { sample := recordSamples[0] tsdblset, ok := seriesGetter.get(sample.Ref) if !ok { - return nil, recordSamples[1:], fmt.Errorf("sample=%v", sample) + return nil, recordSamples[1:], fmt.Errorf("No series matched sample by ref %v", sample) } lset := pkgLabels(tsdblset) // Fill in the discovered labels from the Targets API. target, err := targetGetter.Get(ctx, lset) if err != nil { - return nil, recordSamples[1:], err + return nil, recordSamples[1:], errors.Wrapf(err, "No target matched labels %v", lset) } metricLabels := targets.DropTargetLabels(lset, target.DiscoveredLabels) // TODO(jkohen): Rebuild histograms and summary from individual time series. @@ -161,8 +165,5 @@ func pkgLabels(input tsdblabels.Labels) labels.Labels { for _, l := range input { output = append(output, labels.Label(l)) } - // labels.Labels expects the contents to be sorted. We could move to an - // interface that doesn't require order, to save some cycles. - sort.Sort(output) return output } diff --git a/targets/cache.go b/targets/cache.go index 8426cb80..a9fe8758 100644 --- a/targets/cache.go +++ b/targets/cache.go @@ -27,11 +27,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" ) -type Getter interface { - Get(ctx context.Context, lset labels.Labels) (*Target, error) -} - -const DefaultApiEndpoint = "/api/v1/targets" +const DefaultAPIEndpoint = "/api/v1/targets" func cacheKey(job, instance string) string { return job + "\xff" + instance From 9e9933145104c95b775a4b8f0be015c0364c2f21 Mon Sep 17 00:00:00 2001 From: Javier Kohen Date: Wed, 13 Jun 2018 10:53:13 -0400 Subject: [PATCH 5/7] Drop Target.Labels instead of DiscoveredLabels. This was the original intention. --- retrieval/manager.go | 2 +- retrieval/manager_test.go | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/retrieval/manager.go b/retrieval/manager.go index c4286f74..ab684993 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -132,7 +132,7 @@ func buildSample(ctx context.Context, seriesGetter seriesGetter, targetGetter Ge if err != nil { return nil, recordSamples[1:], errors.Wrapf(err, "No target matched labels %v", lset) } - metricLabels := targets.DropTargetLabels(lset, target.DiscoveredLabels) + metricLabels := targets.DropTargetLabels(lset, target.Labels) // TODO(jkohen): Rebuild histograms and summary from individual time series. metricFamily := &dto.MetricFamily{ Metric: []*dto.Metric{{}}, diff --git a/retrieval/manager_test.go b/retrieval/manager_test.go index c61ecdf8..6b1e19b7 100644 --- a/retrieval/manager_test.go +++ b/retrieval/manager_test.go @@ -95,16 +95,15 @@ func TestBuildSample(t *testing.T) { t.Run("Successful", func(t *testing.T) { ref := uint64(0) - seriesLabels := labels.Labels{{"__name__", "my_metric"}, {"job", "job1"}, {"instance", "i1"}} + seriesLabels := labels.Labels{{"__name__", "my_metric"}, {"job", "job1"}, {"instance", "i1"}, {"mkey", "mvalue"}} seriesMap.m[ref] = seriesLabels // The discovered labels include a label "job" and no "instance" // label, which will cause the metric labels to include // "instance", but not "job". targetMap.m[seriesLabels[0].Value] = targets.Target{ - DiscoveredLabels: promlabels.Labels{ - {"dkey", "dvalue"}, - {"job", "job1"}, - }} + DiscoveredLabels: promlabels.Labels{{"dkey", "dvalue"}}, + Labels: promlabels.Labels{{"job", "job1"}}, + } recordSamples := []tsdb.RefSample{{Ref: ref, T: timestamp, V: value}} sample, recordSamples, err := buildSample(ctx, &seriesMap, &targetMap, recordSamples) if err != nil { @@ -114,7 +113,10 @@ func TestBuildSample(t *testing.T) { t.Errorf("Expected all samples to be consumed, got samples %v", recordSamples) } if sample == nil { - t.Error("Unexpected nil sample") + t.Fatal("Unexpected nil sample") + } + if sample.MetricFamily == nil { + t.Fatalf("Unexpected nil MetricFamily %v", sample) } if sample.GetName() != "my_metric" { t.Errorf("Expected name 'my_metric', got %v", sample.GetName()) @@ -125,11 +127,11 @@ func TestBuildSample(t *testing.T) { if sample.Metric[0].Untyped.GetValue() != value { t.Errorf("Expected value '%v', got %v", value, sample.Metric[0].Untyped.GetValue()) } - targetLabels := promlabels.FromStrings("dkey", "dvalue", "job", "job1") + targetLabels := promlabels.FromStrings("dkey", "dvalue") if !promlabels.Equal(sample.TargetLabels, targetLabels) { t.Errorf("Expected target labels '%v', got %v", targetLabels, sample.TargetLabels) } - metricLabels := promlabels.FromStrings("instance", "i1") + metricLabels := promlabels.FromStrings("instance", "i1", "mkey", "mvalue") if !promlabels.Equal(LabelPairsToLabels(sample.Metric[0].Label), metricLabels) { t.Errorf("Expected metric labels '%v', got %v", metricLabels, sample.Metric[0].Label) } From ee3e3f9c0fe1f1e279b11f769ba40c4447e43811 Mon Sep 17 00:00:00 2001 From: Javier Kohen Date: Wed, 13 Jun 2018 11:01:02 -0400 Subject: [PATCH 6/7] Refactoring from code review. --- cmd/stackdriver-prometheus-sidecar/main.go | 4 +++- retrieval/manager.go | 22 +++++++++------------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/cmd/stackdriver-prometheus-sidecar/main.go b/cmd/stackdriver-prometheus-sidecar/main.go index fa3c2dde..7cf678e4 100644 --- a/cmd/stackdriver-prometheus-sidecar/main.go +++ b/cmd/stackdriver-prometheus-sidecar/main.go @@ -128,6 +128,8 @@ func main() { if err != nil { panic(err) } + targetCache := targets.NewCache(logger, nil, targetsURL) + go targetCache.Run(context.Background()) // TODO(jkohen): Remove once we have proper translation of all metric // types. Currently Stackdriver fails the entire request if you attempt @@ -177,7 +179,7 @@ func main() { timeout: 10 * time.Second, }, ) - prometheusReader = retrieval.NewPrometheusReader(log.With(logger, "component", "Prometheus reader"), cfg.walDirectory, targetsURL, queueManager) + prometheusReader = retrieval.NewPrometheusReader(log.With(logger, "component", "Prometheus reader"), cfg.walDirectory, targetCache, queueManager) ) // Exclude kingpin default flags to expose only Prometheus ones. diff --git a/retrieval/manager.go b/retrieval/manager.go index ab684993..bf311bc8 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -16,7 +16,6 @@ package retrieval import ( "context" "fmt" - "net/url" "github.com/Stackdriver/stackdriver-prometheus-sidecar/tail" "github.com/Stackdriver/stackdriver-prometheus-sidecar/targets" @@ -32,20 +31,24 @@ import ( "github.com/prometheus/tsdb/wal" ) +type TargetGetter interface { + Get(ctx context.Context, lset labels.Labels) (*targets.Target, error) +} + // NewPrometheusReader is the PrometheusReader constructor -func NewPrometheusReader(logger log.Logger, walDirectory string, promURL *url.URL, appender Appender) *PrometheusReader { +func NewPrometheusReader(logger log.Logger, walDirectory string, targetGetter TargetGetter, appender Appender) *PrometheusReader { return &PrometheusReader{ appender: appender, logger: logger, walDirectory: walDirectory, - promURL: promURL, + targetGetter: targetGetter, } } type PrometheusReader struct { logger log.Logger walDirectory string - promURL *url.URL + targetGetter TargetGetter appender Appender cancelTail context.CancelFunc } @@ -62,9 +65,6 @@ func (r *PrometheusReader) Run() error { seriesCache := newSeriesCache(r.logger, r.walDirectory) go seriesCache.run(ctx) - targetCache := targets.NewCache(r.logger, nil, r.promURL) - go targetCache.Run(ctx) - // NOTE(fabxc): wrap the tailer into a buffered reader once we become concerned // with performance. The WAL reader will do a lot of tiny reads otherwise. // This is also the reason for the series cache dealing with "maxSegment" hints @@ -94,7 +94,7 @@ func (r *PrometheusReader) Run() error { } for len(recordSamples) > 0 { var outputSample *MetricFamily - outputSample, recordSamples, err = buildSample(ctx, seriesCache, targetCache, recordSamples) + outputSample, recordSamples, err = buildSample(ctx, seriesCache, r.targetGetter, recordSamples) if err != nil { level.Warn(r.logger).Log("msg", "Failed to build sample", "err", err) continue @@ -113,14 +113,10 @@ func (r *PrometheusReader) Stop() { r.cancelTail() } -type Getter interface { - Get(ctx context.Context, lset labels.Labels) (*targets.Target, error) -} - // Creates a MetricFamily instance from the head of recordSamples, or error if // that fails. In either case, this function returns the recordSamples items // that weren't consumed. -func buildSample(ctx context.Context, seriesGetter seriesGetter, targetGetter Getter, recordSamples []tsdb.RefSample) (*MetricFamily, []tsdb.RefSample, error) { +func buildSample(ctx context.Context, seriesGetter seriesGetter, targetGetter TargetGetter, recordSamples []tsdb.RefSample) (*MetricFamily, []tsdb.RefSample, error) { sample := recordSamples[0] tsdblset, ok := seriesGetter.get(sample.Ref) if !ok { From d81773c30deb3a0d656d7db358b245f562c32284 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 14 Jun 2018 04:37:30 -0400 Subject: [PATCH 7/7] Run target cache in run.Group Signed-off-by: Fabian Reinartz --- cmd/stackdriver-prometheus-sidecar/main.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cmd/stackdriver-prometheus-sidecar/main.go b/cmd/stackdriver-prometheus-sidecar/main.go index 7cf678e4..42a16bd5 100644 --- a/cmd/stackdriver-prometheus-sidecar/main.go +++ b/cmd/stackdriver-prometheus-sidecar/main.go @@ -129,7 +129,6 @@ func main() { panic(err) } targetCache := targets.NewCache(logger, nil, targetsURL) - go targetCache.Run(context.Background()) // TODO(jkohen): Remove once we have proper translation of all metric // types. Currently Stackdriver fails the entire request if you attempt @@ -199,6 +198,15 @@ func main() { http.Handle("/metrics", promhttp.Handler()) var g group.Group + { + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + targetCache.Run(ctx) + return nil + }, func(error) { + cancel() + }) + } { term := make(chan os.Signal) signal.Notify(term, os.Interrupt, syscall.SIGTERM)