Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions cmd/stackdriver-prometheus-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (

"github.com/Stackdriver/stackdriver-prometheus-sidecar/retrieval"
"github.com/Stackdriver/stackdriver-prometheus-sidecar/stackdriver"
"github.com/Stackdriver/stackdriver-prometheus-sidecar/targets"
"github.com/prometheus/common/promlog"
promlogflag "github.com/prometheus/common/promlog/flag"
)
Expand All @@ -62,7 +63,7 @@ func main() {
globalLabels map[string]string
stackdriverAddress *url.URL
walDirectory string
prometheusAddress string
prometheusURL *url.URL
listenAddress string

logLevel promlog.AllowedLevel
Expand Down Expand Up @@ -91,7 +92,7 @@ func main() {
Default("data/wal").StringVar(&cfg.walDirectory)

a.Flag("prometheus.api-address", "Address to listen on for UI, API, and telemetry.").
Default("0.0.0.0:9090").StringVar(&cfg.prometheusAddress)
Default("http://127.0.0.1:9090/").URLVar(&cfg.prometheusURL)

a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry.").
Default("0.0.0.0:9091").StringVar(&cfg.listenAddress)
Expand All @@ -106,8 +107,6 @@ func main() {
}

logger := promlog.New(cfg.logLevel)
cfg.globalLabels["_stackdriver_project_id"] = *projectId
cfg.projectIdResource = fmt.Sprintf("projects/%v", *projectId)

// XXX(fabxc): Kubernetes does background logging which we can only customize by modifying
// a global variable.
Expand All @@ -123,6 +122,14 @@ func main() {
level.Info(logger).Log("host_details", Uname())
level.Info(logger).Log("fd_limits", FdLimits())

cfg.globalLabels["_stackdriver_project_id"] = *projectId
cfg.projectIdResource = fmt.Sprintf("projects/%v", *projectId)
targetsURL, err := cfg.prometheusURL.Parse(targets.DefaultAPIEndpoint)
if err != nil {
panic(err)
}
targetCache := targets.NewCache(logger, nil, targetsURL)

// TODO(jkohen): Remove once we have proper translation of all metric
// types. Currently Stackdriver fails the entire request if you attempt
// to write to the different metric type, which we do fairly often at
Expand Down Expand Up @@ -171,7 +178,7 @@ func main() {
timeout: 10 * time.Second,
},
)
prometheusReader = retrieval.NewPrometheusReader(log.With(logger, "component", "Prometheus reader"), cfg.walDirectory, queueManager)
prometheusReader = retrieval.NewPrometheusReader(log.With(logger, "component", "Prometheus reader"), cfg.walDirectory, targetCache, queueManager)
)

// Exclude kingpin default flags to expose only Prometheus ones.
Expand All @@ -191,6 +198,15 @@ func main() {
http.Handle("/metrics", promhttp.Handler())

var g group.Group
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
targetCache.Run(ctx)
return nil
}, func(error) {
cancel()
})
}
{
term := make(chan os.Signal)
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
Expand Down
53 changes: 33 additions & 20 deletions retrieval/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,39 @@ package retrieval
import (
"context"
"fmt"
"sort"

"github.com/Stackdriver/stackdriver-prometheus-sidecar/tail"
"github.com/Stackdriver/stackdriver-prometheus-sidecar/targets"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/tsdb"
tsdblabels "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/wal"
)

type TargetGetter interface {
Get(ctx context.Context, lset labels.Labels) (*targets.Target, error)
}

// NewPrometheusReader is the PrometheusReader constructor
func NewPrometheusReader(logger log.Logger, walDirectory string, appender Appender) *PrometheusReader {
func NewPrometheusReader(logger log.Logger, walDirectory string, targetGetter TargetGetter, appender Appender) *PrometheusReader {
return &PrometheusReader{
appender: appender,
logger: logger,
walDirectory: walDirectory,
targetGetter: targetGetter,
}
}

type PrometheusReader struct {
logger log.Logger
walDirectory string
targetGetter TargetGetter
appender Appender
cancelTail context.CancelFunc
}
Expand Down Expand Up @@ -86,7 +94,7 @@ func (r *PrometheusReader) Run() error {
}
for len(recordSamples) > 0 {
var outputSample *MetricFamily
outputSample, recordSamples, err = buildSample(seriesCache, recordSamples)
outputSample, recordSamples, err = buildSample(ctx, seriesCache, r.targetGetter, recordSamples)
if err != nil {
level.Warn(r.logger).Log("msg", "Failed to build sample", "err", err)
continue
Expand All @@ -108,19 +116,26 @@ func (r *PrometheusReader) Stop() {
// Creates a MetricFamily instance from the head of recordSamples, or error if
// that fails. In either case, this function returns the recordSamples items
// that weren't consumed.
func buildSample(seriesGetter seriesGetter, recordSamples []tsdb.RefSample) (*MetricFamily, []tsdb.RefSample, error) {
func buildSample(ctx context.Context, seriesGetter seriesGetter, targetGetter TargetGetter, recordSamples []tsdb.RefSample) (*MetricFamily, []tsdb.RefSample, error) {
sample := recordSamples[0]
lset, ok := seriesGetter.get(sample.Ref)
tsdblset, ok := seriesGetter.get(sample.Ref)
if !ok {
return nil, recordSamples[1:], fmt.Errorf("sample=%v", sample)
return nil, recordSamples[1:], fmt.Errorf("No series matched sample by ref %v", sample)
}
lset := pkgLabels(tsdblset)
// Fill in the discovered labels from the Targets API.
target, err := targetGetter.Get(ctx, lset)
if err != nil {
return nil, recordSamples[1:], errors.Wrapf(err, "No target matched labels %v", lset)
}
metricLabels := targets.DropTargetLabels(lset, target.Labels)
// TODO(jkohen): Rebuild histograms and summary from individual time series.
metricFamily := &dto.MetricFamily{
Metric: []*dto.Metric{{}},
}
metric := metricFamily.Metric[0]
metric.Label = make([]*dto.LabelPair, 0, len(lset)-1)
for _, l := range lset {
metric.Label = make([]*dto.LabelPair, 0, len(metricLabels)-1)
for _, l := range metricLabels {
if l.Name == labels.MetricName {
metricFamily.Name = proto.String(l.Value)
continue
Expand All @@ -136,17 +151,15 @@ func buildSample(seriesGetter seriesGetter, recordSamples []tsdb.RefSample) (*Me
metric.TimestampMs = proto.Int64(sample.T)
// TODO(jkohen): track reset timestamps.
metricResetTimestampMs := []int64{NoTimestamp}
// TODO(jkohen): fill in the discovered labels from the Targets API.
targetLabels := make(labels.Labels, 0, len(lset))
for _, l := range lset {
if l.Name == labels.MetricName {
continue
}
targetLabels = append(targetLabels, labels.Label(l))
}
// labels.Labels expects the contents to be sorted. We could move to an
// interface that doesn't require order, to save some cycles.
sort.Sort(targetLabels)
m, err := NewMetricFamily(metricFamily, metricResetTimestampMs, targetLabels)
m, err := NewMetricFamily(metricFamily, metricResetTimestampMs, target.DiscoveredLabels)
return m, recordSamples[1:], err
}

// TODO(jkohen): We should be able to avoid this conversion.
func pkgLabels(input tsdblabels.Labels) labels.Labels {
output := make(labels.Labels, 0, len(input))
for _, l := range input {
output = append(output, labels.Label(l))
}
return output
}
132 changes: 94 additions & 38 deletions retrieval/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ limitations under the License.
package retrieval

import (
"context"
"fmt"
"testing"

"github.com/Stackdriver/stackdriver-prometheus-sidecar/targets"
promlabels "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
Expand All @@ -35,49 +38,102 @@ func (g *seriesMap) get(ref uint64) (labels.Labels, bool) {
return ls, ok
}

// Implements TargetGetter.
// The map key is the value of the first label in the lset given as an input to Get.
type targetMap struct {
m map[string]targets.Target
}

func newTargetMap() targetMap {
return targetMap{m: make(map[string]targets.Target)}
}

func (g *targetMap) Get(ctx context.Context, lset promlabels.Labels) (*targets.Target, error) {
key := lset[0].Value
t, ok := g.m[key]
if !ok {
return nil, fmt.Errorf("no target match for label %v", lset[0])
}
return &t, nil
}

func TestBuildSample(t *testing.T) {
ctx := context.Background()
seriesMap := newSeriesMap()
targetMap := newTargetMap()

timestamp := int64(1234)
value := 2.1

recordSamples := []tsdb.RefSample{
{Ref: /*unknown*/ 999, T: timestamp, V: value},
{Ref: /*unknown*/ 999, T: timestamp, V: value},
}
sample, recordSamples, err := buildSample(&seriesMap, recordSamples)
if err == nil {
t.Errorf("Expected error, got sample %v", sample)
}
if len(recordSamples) != 1 {
t.Errorf("Expected one leftover sample, got samples %v", recordSamples)
}
t.Run("NoSeries", func(t *testing.T) {
recordSamples := []tsdb.RefSample{
{Ref: /*unknown*/ 999, T: timestamp, V: value},
{Ref: /*unknown*/ 999, T: timestamp, V: value},
}
sample, recordSamples, err := buildSample(ctx, &seriesMap, &targetMap, recordSamples)
if err == nil {
t.Errorf("Expected error, got sample %v", sample)
}
if len(recordSamples) != 1 {
t.Errorf("Expected one leftover sample, got samples %v", recordSamples)
}
})

ref := uint64(0)
seriesLabels := labels.Labels{{"__name__", "my_metric"}, {"job", "job1"}, {"instance", "i1"}}
seriesMap.m[ref] = seriesLabels
recordSamples = []tsdb.RefSample{{Ref: ref, T: timestamp, V: value}}
sample, recordSamples, err = buildSample(&seriesMap, recordSamples)
if err != nil {
t.Error(err)
}
if len(recordSamples) != 0 {
t.Errorf("Expected all samples to be consumed, got samples %v", recordSamples)
}
if sample == nil {
t.Error("Unexpected nil sample")
}
if sample.GetName() != "my_metric" {
t.Errorf("Expected name 'my_metric', got %v", sample.GetName())
}
if sample.Metric[0].GetTimestampMs() != timestamp {
t.Errorf("Expected timestamp '%v', got %v", timestamp, sample.Metric[0].GetTimestampMs())
}
if sample.Metric[0].Untyped.GetValue() != value {
t.Errorf("Expected value '%v', got %v", value, sample.Metric[0].Untyped.GetValue())
}
targetLabels := promlabels.FromStrings("job", "job1", "instance", "i1")
if !promlabels.Equal(sample.TargetLabels, targetLabels) {
t.Errorf("Expected target labels '%v', got %v", targetLabels, sample.TargetLabels)
}
t.Run("NoTarget", func(t *testing.T) {
ref := uint64(0)
seriesLabels := labels.Labels{{"__name__", "my_metric"}, {"job", "job1"}, {"instance", "i1"}}
seriesMap.m[ref] = seriesLabels
recordSamples := []tsdb.RefSample{{Ref: ref, T: timestamp, V: value}}
sample, recordSamples, err := buildSample(ctx, &seriesMap, &targetMap, recordSamples)
if err == nil {
t.Errorf("Expected error, got sample %v", sample)
}
if len(recordSamples) != 0 {
t.Errorf("Expected all samples to be consumed, got samples %v", recordSamples)
}
})

t.Run("Successful", func(t *testing.T) {
ref := uint64(0)
seriesLabels := labels.Labels{{"__name__", "my_metric"}, {"job", "job1"}, {"instance", "i1"}, {"mkey", "mvalue"}}
seriesMap.m[ref] = seriesLabels
// The discovered labels include a label "job" and no "instance"
// label, which will cause the metric labels to include
// "instance", but not "job".
targetMap.m[seriesLabels[0].Value] = targets.Target{
DiscoveredLabels: promlabels.Labels{{"dkey", "dvalue"}},
Labels: promlabels.Labels{{"job", "job1"}},
}
recordSamples := []tsdb.RefSample{{Ref: ref, T: timestamp, V: value}}
sample, recordSamples, err := buildSample(ctx, &seriesMap, &targetMap, recordSamples)
if err != nil {
t.Error(err)
}
if len(recordSamples) != 0 {
t.Errorf("Expected all samples to be consumed, got samples %v", recordSamples)
}
if sample == nil {
t.Fatal("Unexpected nil sample")
}
if sample.MetricFamily == nil {
t.Fatalf("Unexpected nil MetricFamily %v", sample)
}
if sample.GetName() != "my_metric" {
t.Errorf("Expected name 'my_metric', got %v", sample.GetName())
}
if sample.Metric[0].GetTimestampMs() != timestamp {
t.Errorf("Expected timestamp '%v', got %v", timestamp, sample.Metric[0].GetTimestampMs())
}
if sample.Metric[0].Untyped.GetValue() != value {
t.Errorf("Expected value '%v', got %v", value, sample.Metric[0].Untyped.GetValue())
}
targetLabels := promlabels.FromStrings("dkey", "dvalue")
if !promlabels.Equal(sample.TargetLabels, targetLabels) {
t.Errorf("Expected target labels '%v', got %v", targetLabels, sample.TargetLabels)
}
metricLabels := promlabels.FromStrings("instance", "i1", "mkey", "mvalue")
if !promlabels.Equal(LabelPairsToLabels(sample.Metric[0].Label), metricLabels) {
t.Errorf("Expected metric labels '%v', got %v", metricLabels, sample.Metric[0].Label)
}
})
}
6 changes: 4 additions & 2 deletions targets/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
)

const DefaultTargetsEndpoint = "/api/v1/targets"
const DefaultAPIEndpoint = "/api/v1/targets"

func cacheKey(job, instance string) string {
return job + "\xff" + instance
Expand All @@ -38,6 +38,7 @@ func cacheKey(job, instance string) string {
// unique by job and instance label and an optional but consistent set of additional labels.
// It only provides best effort matching for configurations where targets are identified
// by a varying set of labels within a job and instance combination.
// Implements TargetGetter.
type Cache struct {
logger log.Logger
client *http.Client
Expand All @@ -48,14 +49,15 @@ type Cache struct {
targets map[string][]*Target
}

func NewCache(ctx context.Context, logger log.Logger, client *http.Client, promURL *url.URL) *Cache {
func NewCache(logger log.Logger, client *http.Client, promURL *url.URL) *Cache {
if client == nil {
client = http.DefaultClient
}
if logger == nil {
logger = log.NewNopLogger()
}
return &Cache{
logger: logger,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching that and the obsolete arg.

client: client,
url: promURL,
targets: map[string][]*Target{},
Expand Down
4 changes: 2 additions & 2 deletions targets/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestTargetCache_Error(t *testing.T) {
if err != nil {
t.Fatal(err)
}
c := NewCache(ctx, nil, nil, u)
c := NewCache(nil, nil, u)

expectedTarget := &Target{
Labels: labels.FromStrings("job", "a", "instance", "c"),
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestTargetCache_Success(t *testing.T) {
if err != nil {
t.Fatal(err)
}
c := NewCache(ctx, nil, nil, u)
c := NewCache(nil, nil, u)

handler = func() []*Target {
return []*Target{
Expand Down
3 changes: 2 additions & 1 deletion test-in-prod
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ exec ./stackdriver-prometheus-sidecar \
--stackdriver.project-id=prometheus-to-sd \
--stackdriver.global-label=_kubernetes_cluster_name=prom-test-cluster-1 \
--stackdriver.global-label=_kubernetes_location=us-central1-a \
--stackdriver.global-label=__meta_kubernetes_namespace=stackdriver
--stackdriver.global-label=__meta_kubernetes_namespace=stackdriver \
"$@"