diff --git a/cmd/stackdriver-prometheus-sidecar/main.go b/cmd/stackdriver-prometheus-sidecar/main.go index fa001e69..42a16bd5 100644 --- a/cmd/stackdriver-prometheus-sidecar/main.go +++ b/cmd/stackdriver-prometheus-sidecar/main.go @@ -43,6 +43,7 @@ import ( "github.com/Stackdriver/stackdriver-prometheus-sidecar/retrieval" "github.com/Stackdriver/stackdriver-prometheus-sidecar/stackdriver" + "github.com/Stackdriver/stackdriver-prometheus-sidecar/targets" "github.com/prometheus/common/promlog" promlogflag "github.com/prometheus/common/promlog/flag" ) @@ -62,7 +63,7 @@ func main() { globalLabels map[string]string stackdriverAddress *url.URL walDirectory string - prometheusAddress string + prometheusURL *url.URL listenAddress string logLevel promlog.AllowedLevel @@ -91,7 +92,7 @@ func main() { Default("data/wal").StringVar(&cfg.walDirectory) a.Flag("prometheus.api-address", "Address to listen on for UI, API, and telemetry."). - Default("0.0.0.0:9090").StringVar(&cfg.prometheusAddress) + Default("http://127.0.0.1:9090/").URLVar(&cfg.prometheusURL) a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry."). Default("0.0.0.0:9091").StringVar(&cfg.listenAddress) @@ -106,8 +107,6 @@ func main() { } logger := promlog.New(cfg.logLevel) - cfg.globalLabels["_stackdriver_project_id"] = *projectId - cfg.projectIdResource = fmt.Sprintf("projects/%v", *projectId) // XXX(fabxc): Kubernetes does background logging which we can only customize by modifying // a global variable. @@ -123,6 +122,14 @@ func main() { level.Info(logger).Log("host_details", Uname()) level.Info(logger).Log("fd_limits", FdLimits()) + cfg.globalLabels["_stackdriver_project_id"] = *projectId + cfg.projectIdResource = fmt.Sprintf("projects/%v", *projectId) + targetsURL, err := cfg.prometheusURL.Parse(targets.DefaultAPIEndpoint) + if err != nil { + panic(err) + } + targetCache := targets.NewCache(logger, nil, targetsURL) + // TODO(jkohen): Remove once we have proper translation of all metric // types. Currently Stackdriver fails the entire request if you attempt // to write to the different metric type, which we do fairly often at @@ -171,7 +178,7 @@ func main() { timeout: 10 * time.Second, }, ) - prometheusReader = retrieval.NewPrometheusReader(log.With(logger, "component", "Prometheus reader"), cfg.walDirectory, queueManager) + prometheusReader = retrieval.NewPrometheusReader(log.With(logger, "component", "Prometheus reader"), cfg.walDirectory, targetCache, queueManager) ) // Exclude kingpin default flags to expose only Prometheus ones. @@ -191,6 +198,15 @@ func main() { http.Handle("/metrics", promhttp.Handler()) var g group.Group + { + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + targetCache.Run(ctx) + return nil + }, func(error) { + cancel() + }) + } { term := make(chan os.Signal) signal.Notify(term, os.Interrupt, syscall.SIGTERM) diff --git a/retrieval/manager.go b/retrieval/manager.go index 186ccb95..bf311bc8 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -16,31 +16,39 @@ package retrieval import ( "context" "fmt" - "sort" "github.com/Stackdriver/stackdriver-prometheus-sidecar/tail" + "github.com/Stackdriver/stackdriver-prometheus-sidecar/targets" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" dto "github.com/prometheus/client_model/go" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/tsdb" + tsdblabels "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/wal" ) +type TargetGetter interface { + Get(ctx context.Context, lset labels.Labels) (*targets.Target, error) +} + // NewPrometheusReader is the PrometheusReader constructor -func NewPrometheusReader(logger log.Logger, walDirectory string, appender Appender) *PrometheusReader { +func NewPrometheusReader(logger log.Logger, walDirectory string, targetGetter TargetGetter, appender Appender) *PrometheusReader { return &PrometheusReader{ appender: appender, logger: logger, walDirectory: walDirectory, + targetGetter: targetGetter, } } type PrometheusReader struct { logger log.Logger walDirectory string + targetGetter TargetGetter appender Appender cancelTail context.CancelFunc } @@ -86,7 +94,7 @@ func (r *PrometheusReader) Run() error { } for len(recordSamples) > 0 { var outputSample *MetricFamily - outputSample, recordSamples, err = buildSample(seriesCache, recordSamples) + outputSample, recordSamples, err = buildSample(ctx, seriesCache, r.targetGetter, recordSamples) if err != nil { level.Warn(r.logger).Log("msg", "Failed to build sample", "err", err) continue @@ -108,19 +116,26 @@ func (r *PrometheusReader) Stop() { // Creates a MetricFamily instance from the head of recordSamples, or error if // that fails. In either case, this function returns the recordSamples items // that weren't consumed. -func buildSample(seriesGetter seriesGetter, recordSamples []tsdb.RefSample) (*MetricFamily, []tsdb.RefSample, error) { +func buildSample(ctx context.Context, seriesGetter seriesGetter, targetGetter TargetGetter, recordSamples []tsdb.RefSample) (*MetricFamily, []tsdb.RefSample, error) { sample := recordSamples[0] - lset, ok := seriesGetter.get(sample.Ref) + tsdblset, ok := seriesGetter.get(sample.Ref) if !ok { - return nil, recordSamples[1:], fmt.Errorf("sample=%v", sample) + return nil, recordSamples[1:], fmt.Errorf("No series matched sample by ref %v", sample) + } + lset := pkgLabels(tsdblset) + // Fill in the discovered labels from the Targets API. + target, err := targetGetter.Get(ctx, lset) + if err != nil { + return nil, recordSamples[1:], errors.Wrapf(err, "No target matched labels %v", lset) } + metricLabels := targets.DropTargetLabels(lset, target.Labels) // TODO(jkohen): Rebuild histograms and summary from individual time series. metricFamily := &dto.MetricFamily{ Metric: []*dto.Metric{{}}, } metric := metricFamily.Metric[0] - metric.Label = make([]*dto.LabelPair, 0, len(lset)-1) - for _, l := range lset { + metric.Label = make([]*dto.LabelPair, 0, len(metricLabels)-1) + for _, l := range metricLabels { if l.Name == labels.MetricName { metricFamily.Name = proto.String(l.Value) continue @@ -136,17 +151,15 @@ func buildSample(seriesGetter seriesGetter, recordSamples []tsdb.RefSample) (*Me metric.TimestampMs = proto.Int64(sample.T) // TODO(jkohen): track reset timestamps. metricResetTimestampMs := []int64{NoTimestamp} - // TODO(jkohen): fill in the discovered labels from the Targets API. - targetLabels := make(labels.Labels, 0, len(lset)) - for _, l := range lset { - if l.Name == labels.MetricName { - continue - } - targetLabels = append(targetLabels, labels.Label(l)) - } - // labels.Labels expects the contents to be sorted. We could move to an - // interface that doesn't require order, to save some cycles. - sort.Sort(targetLabels) - m, err := NewMetricFamily(metricFamily, metricResetTimestampMs, targetLabels) + m, err := NewMetricFamily(metricFamily, metricResetTimestampMs, target.DiscoveredLabels) return m, recordSamples[1:], err } + +// TODO(jkohen): We should be able to avoid this conversion. +func pkgLabels(input tsdblabels.Labels) labels.Labels { + output := make(labels.Labels, 0, len(input)) + for _, l := range input { + output = append(output, labels.Label(l)) + } + return output +} diff --git a/retrieval/manager_test.go b/retrieval/manager_test.go index 7a2a3bd7..6b1e19b7 100644 --- a/retrieval/manager_test.go +++ b/retrieval/manager_test.go @@ -14,8 +14,11 @@ limitations under the License. package retrieval import ( + "context" + "fmt" "testing" + "github.com/Stackdriver/stackdriver-prometheus-sidecar/targets" promlabels "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/labels" @@ -35,49 +38,102 @@ func (g *seriesMap) get(ref uint64) (labels.Labels, bool) { return ls, ok } +// Implements TargetGetter. +// The map key is the value of the first label in the lset given as an input to Get. +type targetMap struct { + m map[string]targets.Target +} + +func newTargetMap() targetMap { + return targetMap{m: make(map[string]targets.Target)} +} + +func (g *targetMap) Get(ctx context.Context, lset promlabels.Labels) (*targets.Target, error) { + key := lset[0].Value + t, ok := g.m[key] + if !ok { + return nil, fmt.Errorf("no target match for label %v", lset[0]) + } + return &t, nil +} + func TestBuildSample(t *testing.T) { + ctx := context.Background() seriesMap := newSeriesMap() + targetMap := newTargetMap() timestamp := int64(1234) value := 2.1 - recordSamples := []tsdb.RefSample{ - {Ref: /*unknown*/ 999, T: timestamp, V: value}, - {Ref: /*unknown*/ 999, T: timestamp, V: value}, - } - sample, recordSamples, err := buildSample(&seriesMap, recordSamples) - if err == nil { - t.Errorf("Expected error, got sample %v", sample) - } - if len(recordSamples) != 1 { - t.Errorf("Expected one leftover sample, got samples %v", recordSamples) - } + t.Run("NoSeries", func(t *testing.T) { + recordSamples := []tsdb.RefSample{ + {Ref: /*unknown*/ 999, T: timestamp, V: value}, + {Ref: /*unknown*/ 999, T: timestamp, V: value}, + } + sample, recordSamples, err := buildSample(ctx, &seriesMap, &targetMap, recordSamples) + if err == nil { + t.Errorf("Expected error, got sample %v", sample) + } + if len(recordSamples) != 1 { + t.Errorf("Expected one leftover sample, got samples %v", recordSamples) + } + }) - ref := uint64(0) - seriesLabels := labels.Labels{{"__name__", "my_metric"}, {"job", "job1"}, {"instance", "i1"}} - seriesMap.m[ref] = seriesLabels - recordSamples = []tsdb.RefSample{{Ref: ref, T: timestamp, V: value}} - sample, recordSamples, err = buildSample(&seriesMap, recordSamples) - if err != nil { - t.Error(err) - } - if len(recordSamples) != 0 { - t.Errorf("Expected all samples to be consumed, got samples %v", recordSamples) - } - if sample == nil { - t.Error("Unexpected nil sample") - } - if sample.GetName() != "my_metric" { - t.Errorf("Expected name 'my_metric', got %v", sample.GetName()) - } - if sample.Metric[0].GetTimestampMs() != timestamp { - t.Errorf("Expected timestamp '%v', got %v", timestamp, sample.Metric[0].GetTimestampMs()) - } - if sample.Metric[0].Untyped.GetValue() != value { - t.Errorf("Expected value '%v', got %v", value, sample.Metric[0].Untyped.GetValue()) - } - targetLabels := promlabels.FromStrings("job", "job1", "instance", "i1") - if !promlabels.Equal(sample.TargetLabels, targetLabels) { - t.Errorf("Expected target labels '%v', got %v", targetLabels, sample.TargetLabels) - } + t.Run("NoTarget", func(t *testing.T) { + ref := uint64(0) + seriesLabels := labels.Labels{{"__name__", "my_metric"}, {"job", "job1"}, {"instance", "i1"}} + seriesMap.m[ref] = seriesLabels + recordSamples := []tsdb.RefSample{{Ref: ref, T: timestamp, V: value}} + sample, recordSamples, err := buildSample(ctx, &seriesMap, &targetMap, recordSamples) + if err == nil { + t.Errorf("Expected error, got sample %v", sample) + } + if len(recordSamples) != 0 { + t.Errorf("Expected all samples to be consumed, got samples %v", recordSamples) + } + }) + + t.Run("Successful", func(t *testing.T) { + ref := uint64(0) + seriesLabels := labels.Labels{{"__name__", "my_metric"}, {"job", "job1"}, {"instance", "i1"}, {"mkey", "mvalue"}} + seriesMap.m[ref] = seriesLabels + // The discovered labels include a label "job" and no "instance" + // label, which will cause the metric labels to include + // "instance", but not "job". + targetMap.m[seriesLabels[0].Value] = targets.Target{ + DiscoveredLabels: promlabels.Labels{{"dkey", "dvalue"}}, + Labels: promlabels.Labels{{"job", "job1"}}, + } + recordSamples := []tsdb.RefSample{{Ref: ref, T: timestamp, V: value}} + sample, recordSamples, err := buildSample(ctx, &seriesMap, &targetMap, recordSamples) + if err != nil { + t.Error(err) + } + if len(recordSamples) != 0 { + t.Errorf("Expected all samples to be consumed, got samples %v", recordSamples) + } + if sample == nil { + t.Fatal("Unexpected nil sample") + } + if sample.MetricFamily == nil { + t.Fatalf("Unexpected nil MetricFamily %v", sample) + } + if sample.GetName() != "my_metric" { + t.Errorf("Expected name 'my_metric', got %v", sample.GetName()) + } + if sample.Metric[0].GetTimestampMs() != timestamp { + t.Errorf("Expected timestamp '%v', got %v", timestamp, sample.Metric[0].GetTimestampMs()) + } + if sample.Metric[0].Untyped.GetValue() != value { + t.Errorf("Expected value '%v', got %v", value, sample.Metric[0].Untyped.GetValue()) + } + targetLabels := promlabels.FromStrings("dkey", "dvalue") + if !promlabels.Equal(sample.TargetLabels, targetLabels) { + t.Errorf("Expected target labels '%v', got %v", targetLabels, sample.TargetLabels) + } + metricLabels := promlabels.FromStrings("instance", "i1", "mkey", "mvalue") + if !promlabels.Equal(LabelPairsToLabels(sample.Metric[0].Label), metricLabels) { + t.Errorf("Expected metric labels '%v', got %v", metricLabels, sample.Metric[0].Label) + } + }) } diff --git a/targets/cache.go b/targets/cache.go index 45a919a8..a9fe8758 100644 --- a/targets/cache.go +++ b/targets/cache.go @@ -27,7 +27,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" ) -const DefaultTargetsEndpoint = "/api/v1/targets" +const DefaultAPIEndpoint = "/api/v1/targets" func cacheKey(job, instance string) string { return job + "\xff" + instance @@ -38,6 +38,7 @@ func cacheKey(job, instance string) string { // unique by job and instance label and an optional but consistent set of additional labels. // It only provides best effort matching for configurations where targets are identified // by a varying set of labels within a job and instance combination. +// Implements TargetGetter. type Cache struct { logger log.Logger client *http.Client @@ -48,7 +49,7 @@ type Cache struct { targets map[string][]*Target } -func NewCache(ctx context.Context, logger log.Logger, client *http.Client, promURL *url.URL) *Cache { +func NewCache(logger log.Logger, client *http.Client, promURL *url.URL) *Cache { if client == nil { client = http.DefaultClient } @@ -56,6 +57,7 @@ func NewCache(ctx context.Context, logger log.Logger, client *http.Client, promU logger = log.NewNopLogger() } return &Cache{ + logger: logger, client: client, url: promURL, targets: map[string][]*Target{}, diff --git a/targets/cache_test.go b/targets/cache_test.go index f55d9b0d..633556ba 100644 --- a/targets/cache_test.go +++ b/targets/cache_test.go @@ -39,7 +39,7 @@ func TestTargetCache_Error(t *testing.T) { if err != nil { t.Fatal(err) } - c := NewCache(ctx, nil, nil, u) + c := NewCache(nil, nil, u) expectedTarget := &Target{ Labels: labels.FromStrings("job", "a", "instance", "c"), @@ -104,7 +104,7 @@ func TestTargetCache_Success(t *testing.T) { if err != nil { t.Fatal(err) } - c := NewCache(ctx, nil, nil, u) + c := NewCache(nil, nil, u) handler = func() []*Target { return []*Target{ diff --git a/test-in-prod b/test-in-prod index 960be6a8..de8a887c 100755 --- a/test-in-prod +++ b/test-in-prod @@ -3,4 +3,5 @@ exec ./stackdriver-prometheus-sidecar \ --stackdriver.project-id=prometheus-to-sd \ --stackdriver.global-label=_kubernetes_cluster_name=prom-test-cluster-1 \ --stackdriver.global-label=_kubernetes_location=us-central1-a \ - --stackdriver.global-label=__meta_kubernetes_namespace=stackdriver + --stackdriver.global-label=__meta_kubernetes_namespace=stackdriver \ + "$@"