Skip to content

Commit

Permalink
[dev.multiple-integrations] Introduce autoscraper (#1195)
Browse files Browse the repository at this point in the history
* pkg/integrations/v2: introduce self-scraping

* linting
  • Loading branch information
rfratto committed Dec 18, 2021
1 parent 7810ec3 commit f61d7ee
Show file tree
Hide file tree
Showing 21 changed files with 789 additions and 222 deletions.
45 changes: 45 additions & 0 deletions pkg/config/integrations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package config

import (
"flag"
"testing"

"github.com/stretchr/testify/require"

_ "github.com/grafana/agent/pkg/integrations/install" // Install integrations for tests
)

func TestIntegrations_v1(t *testing.T) {
cfg := `
metrics:
wal_directory: /tmp/wal
integrations:
agent:
enabled: true`

fs := flag.NewFlagSet("test", flag.ExitOnError)
c, err := load(fs, []string{"-config.file", "test"}, func(_ string, _ bool, c *Config) error {
return LoadBytes([]byte(cfg), false, c)
})
require.NoError(t, err)
require.NotNil(t, c.Integrations.configV1)
}

func TestIntegrations_v2(t *testing.T) {
cfg := `
metrics:
wal_directory: /tmp/wal
integrations:
agent:
autoscrape:
enabled: false`

fs := flag.NewFlagSet("test", flag.ExitOnError)
c, err := load(fs, []string{"-config.file", "test", "-enable-features=integrations-next"}, func(_ string, _ bool, c *Config) error {
return LoadBytes([]byte(cfg), false, c)
})
require.NoError(t, err)
require.NotNil(t, c.Integrations.configV2)
}
4 changes: 2 additions & 2 deletions pkg/integrations/install/shims.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package install

import (
v1 "github.com/grafana/agent/pkg/integrations"
"github.com/grafana/agent/pkg/integrations/config"
v2 "github.com/grafana/agent/pkg/integrations/v2"
"github.com/grafana/agent/pkg/integrations/v2/common"
metricsutils "github.com/grafana/agent/pkg/integrations/v2/metricsutils"
)

Expand All @@ -25,7 +25,7 @@ func init() {
}
}
if !found {
v2.RegisterLegacy(v1Integration, v2.TypeSingleton, func(cfg v1.Config, common config.Common) v2.UpgradedConfig {
v2.RegisterLegacy(v1Integration, v2.TypeSingleton, func(cfg v1.Config, common common.MetricsConfig) v2.UpgradedConfig {
return metricsutils.CreateShim(cfg, common)
})
}
Expand Down
18 changes: 11 additions & 7 deletions pkg/integrations/v2/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,36 @@ package agent
import (
"github.com/go-kit/log"
"github.com/grafana/agent/pkg/integrations/v2"
"github.com/grafana/agent/pkg/integrations/v2/common"
"github.com/grafana/agent/pkg/integrations/v2/metricsutils"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// Config controls the Agent integration.
type Config struct {
metricsutils.CommonConfig `yaml:",inline"`
Common common.MetricsConfig `yaml:",inline"`
}

// Name returns the name of the integration that this config represents.
func (c *Config) Name() string { return "agent" }

// ApplyDefaults applies runtime-specific defaults to c.
func (c *Config) ApplyDefaults(globals integrations.Globals) error {
c.Common.ApplyDefaults(globals.SubsystemOpts.Metrics.Autoscrape)
return nil
}

// Identifier uniquely identifies this instance of Config.
func (c *Config) Identifier(globals integrations.Globals) (string, error) {
if c.InstanceKey != nil {
return *c.InstanceKey, nil
if c.Common.InstanceKey != nil {
return *c.Common.InstanceKey, nil
}
return globals.AgentIdentifier, nil
}

// MetricsConfig implements metricsutils.MetricsConfig.
func (c *Config) MetricsConfig() metricsutils.CommonConfig { return c.CommonConfig }

// NewIntegration converts this config into an instance of an integration.
func (c *Config) NewIntegration(l log.Logger, globals integrations.Globals) (integrations.Integration, error) {
return metricsutils.NewMetricsHandlerIntegration(l, c, globals, promhttp.Handler())
return metricsutils.NewMetricsHandlerIntegration(l, c, c.Common, globals, promhttp.Handler())
}

func init() {
Expand Down
32 changes: 32 additions & 0 deletions pkg/integrations/v2/autoscrape/appender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package autoscrape

import (
"fmt"

"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
)

// failedAppender is used as the appender when an instance couldn't be found.
type failedAppender struct {
instanceName string
}

var _ storage.Appender = (*failedAppender)(nil)

func (fa *failedAppender) Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) {
return 0, fmt.Errorf("no such instance %s", fa.instanceName)
}

func (fa *failedAppender) Commit() error {
return fmt.Errorf("no such instance %s", fa.instanceName)
}

func (fa *failedAppender) Rollback() error {
return fmt.Errorf("no such instance %s", fa.instanceName)
}

func (fa *failedAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) {
return 0, fmt.Errorf("no such instance %s", fa.instanceName)
}
251 changes: 251 additions & 0 deletions pkg/integrations/v2/autoscrape/autoscrape.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// Package autoscrape implements a scraper for integrations.
package autoscrape

import (
"context"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/pkg/metrics"
"github.com/grafana/agent/pkg/metrics/instance"
"github.com/prometheus/common/model"
prom_config "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
)

// DefaultGlobal holds default values for Global.
var DefaultGlobal Global = Global{
Enable: true,
MetricsInstance: "default",
}

// Global holds default settings for metrics integrations that support
// autoscraping. Integrations may override their settings.
type Global struct {
Enable bool `yaml:"enable,omitempty"` // Whether self-scraping should be enabled.
MetricsInstance string `yaml:"metrics_instance,omitempty"` // Metrics instance name to send metrics to.
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"` // Self-scraping frequency.
ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"` // Self-scraping timeout.
}

// UnmarshalYAML implements yaml.Unmarshaler.
func (g *Global) UnmarshalYAML(f func(interface{}) error) error {
*g = DefaultGlobal
type global Global
return f((*global)(g))
}

// Config configure autoscrape for an individual integration. Override defaults.
type Config struct {
Enable *bool `yaml:"enable,omitempty"` // Whether self-scraping should be enabled.
MetricsInstance string `yaml:"metrics_instance,omitempty"` // Metrics instance name to send metrics to.
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"` // Self-scraping frequency.
ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"` // Self-scraping timeout.

RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"` // Relabel the autoscrape job
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty"` // Relabel individual autoscrape metrics
}

// ScrapeConfig bind a Prometheus scrape config with an instance to send
// scraped metrics to.
type ScrapeConfig struct {
Instance string
Config prom_config.ScrapeConfig
}

// Scraper is a metrics autoscraper.
type Scraper struct {
ctx context.Context
cancel context.CancelFunc

log log.Logger
im instance.Manager

scrapersMut sync.RWMutex
scrapers map[string]*instanceScraper
}

// NewScraper creates a new autoscraper. Scraper will run until Stop is called.
// Instances to send scraped metrics to will be looked up via im.
func NewScraper(l log.Logger, im instance.Manager) *Scraper {
l = log.With(l, "component", "autoscraper")

ctx, cancel := context.WithCancel(context.Background())

s := &Scraper{
ctx: ctx,
cancel: cancel,

log: l,
im: im,
scrapers: map[string]*instanceScraper{},
}
return s
}

// ApplyConfig will apply the given jobs. An error will be returned for any
// jobs that failed to be applied.
func (s *Scraper) ApplyConfig(jobs []*ScrapeConfig) error {
s.scrapersMut.Lock()
defer s.scrapersMut.Unlock()

var firstError error
saveError := func(e error) {
if firstError == nil {
firstError = e
}
}

// Shard our jobs by target instance.
shardedJobs := map[string][]*prom_config.ScrapeConfig{}
for _, j := range jobs {
_, err := s.im.GetInstance(j.Instance)
if err != nil {
level.Error(s.log).Log("msg", "cannot autoscrape integration", "name", j.Config.JobName, "err", err)
saveError(err)
continue
}

shardedJobs[j.Instance] = append(shardedJobs[j.Instance], &j.Config)
}

// Then pass the jobs to instanceScraper, creating them if we need to.
for instance, jobs := range shardedJobs {
is, ok := s.scrapers[instance]
if !ok {
is = newInstanceScraper(s.ctx, s.log, s.im, instance)
s.scrapers[instance] = is
}
if err := is.ApplyConfig(jobs); err != nil {
// Not logging here; is.ApplyConfig already logged the errors.
saveError(err)
}
}

// Garbage collect: if if there's a key in s.scrapers that wasn't in
// shardedJobs, stop that unused scraper.
for instance, is := range s.scrapers {
_, current := shardedJobs[instance]
if !current {
is.Stop()
delete(s.scrapers, instance)
}
}

return firstError
}

// TargetsActive returns the set of active scrape targets for all target
// instances.
func (s *Scraper) TargetsActive() map[string]metrics.TargetSet {
s.scrapersMut.RLock()
defer s.scrapersMut.RUnlock()

allTargets := make(map[string]metrics.TargetSet, len(s.scrapers))
for instance, is := range s.scrapers {
allTargets[instance] = is.sm.TargetsActive()
}
return allTargets
}

// Stop stops the Scraper.
func (s *Scraper) Stop() {
s.scrapersMut.Lock()
defer s.scrapersMut.Unlock()

for instance, is := range s.scrapers {
is.Stop()
delete(s.scrapers, instance)
}

s.cancel()
}

// instanceScraper is a Scraper which always sends to the same instance.
type instanceScraper struct {
cancel context.CancelFunc
log log.Logger

sd *discovery.Manager
sm *scrape.Manager
}

// newInstanceScraper returns a new instanceScraper. Must be stopped by calling
// Stop.
func newInstanceScraper(
ctx context.Context,
l log.Logger,
im instance.Manager,
instanceName string,
) *instanceScraper {
ctx, cancel := context.WithCancel(ctx)

l = log.With(l, "target_instance", instanceName)

sd := discovery.NewManager(ctx, l, discovery.Name("autoscraper/"+instanceName))
sm := scrape.NewManager(&scrape.Options{}, l, &agentAppender{
inst: instanceName,
im: im,
})

go func() { _ = sd.Run() }()
go func() { _ = sm.Run(sd.SyncCh()) }()

return &instanceScraper{
cancel: cancel,
log: l,

sd: sd,
sm: sm,
}
}

type agentAppender struct {
inst string
im instance.Manager
}

func (aa *agentAppender) Appender(ctx context.Context) storage.Appender {
mi, err := aa.im.GetInstance(aa.inst)
if err != nil {
return &failedAppender{instanceName: aa.inst}
}
return mi.Appender(ctx)
}

func (is *instanceScraper) ApplyConfig(jobs []*prom_config.ScrapeConfig) error {
var firstError error
saveError := func(e error) {
if firstError == nil && e != nil {
firstError = e
}
}

var (
scrapeConfigs = make([]*prom_config.ScrapeConfig, 0, len(jobs))
sdConfigs = make(map[string]discovery.Configs, len(jobs))
)
for _, job := range jobs {
sdConfigs[job.JobName] = job.ServiceDiscoveryConfigs
scrapeConfigs = append(scrapeConfigs, job)
}
if err := is.sd.ApplyConfig(sdConfigs); err != nil {
level.Error(is.log).Log("msg", "error when applying SD to autoscraper", "err", err)
saveError(err)
}
if err := is.sm.ApplyConfig(&prom_config.Config{ScrapeConfigs: scrapeConfigs}); err != nil {
level.Error(is.log).Log("msg", "error when applying jobs to scraper", "err", err)
saveError(err)
}

return firstError
}

func (is *instanceScraper) Stop() {
is.cancel()
is.sm.Stop()
}
Loading

0 comments on commit f61d7ee

Please sign in to comment.