From b1a0c43be47aafcc59c9ef59d46c9314a49647bb Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Tue, 2 Aug 2022 17:36:19 -0400 Subject: [PATCH 01/15] Add component metrics to /metrics endpoint, and add a few filler metrics. --- component/metrics/mutate/mutate.go | 14 +++++++++--- component/metrics/remotewrite/remote_write.go | 21 +++++------------- component/registry.go | 4 ++++ component/remote/s3/s3.go | 22 +++++++++++++++++-- pkg/flow/flow.go | 3 ++- pkg/flow/internal/controller/component.go | 5 +++++ pkg/flow/internal/controller/loader.go | 9 ++++---- pkg/flow/internal/controller/loader_test.go | 11 +++++----- 8 files changed, 58 insertions(+), 31 deletions(-) diff --git a/component/metrics/mutate/mutate.go b/component/metrics/mutate/mutate.go index 4ea9fc9aad97..ce7f07e0105c 100644 --- a/component/metrics/mutate/mutate.go +++ b/component/metrics/mutate/mutate.go @@ -8,6 +8,8 @@ import ( fa "github.com/grafana/agent/component/common/appendable" flow_relabel "github.com/grafana/agent/component/common/relabel" "github.com/grafana/agent/component/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" ) @@ -17,7 +19,6 @@ func init() { Name: "metrics.mutate", Args: Arguments{}, Exports: Exports{}, - Build: func(opts component.Options, args component.Arguments) (component.Component, error) { return New(opts, args.(Arguments)) }, @@ -44,8 +45,9 @@ type Component struct { opts component.Options mrc []*relabel.Config - appendable *fa.FlowAppendable - receiver *metrics.Receiver + appendable *fa.FlowAppendable + receiver *metrics.Receiver + metricsProcessed prometheus.Counter } var ( @@ -57,7 +59,12 @@ func New(o component.Options, args Arguments) (*Component, error) { c := &Component{opts: o} c.appendable = fa.NewFlowAppendable(args.ForwardTo...) c.receiver = &metrics.Receiver{Receive: c.Receive} + c.metricsProcessed = promauto.NewCounter(prometheus.CounterOpts{ + Name: "metrics_mutate_metrics_processed", + Help: "Total number of metrics processed", + }) + o.Registerer.Register(c.metricsProcessed) // Call to Update() to set the relabelling rules once at the start. if err := c.Update(args); err != nil { return nil, err @@ -69,6 +76,7 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { <-ctx.Done() + c.opts.Registerer.Unregister(c.metricsProcessed) return nil } diff --git a/component/metrics/remotewrite/remote_write.go b/component/metrics/remotewrite/remote_write.go index 76253c2aec72..ea0a55c6926b 100644 --- a/component/metrics/remotewrite/remote_write.go +++ b/component/metrics/remotewrite/remote_write.go @@ -85,7 +85,7 @@ type BasicAuthConfig struct { type Component struct { log log.Logger opts component.Options - reg *metrics.CollectorRegistry + reg prometheus.Registerer walStore *wal.Storage remoteStore *remote.Storage @@ -99,22 +99,21 @@ type Component struct { // NewComponent creates a new metrics_forwarder component. func NewComponent(o component.Options, c RemoteConfig) (*Component, error) { - reg := metrics.NewCollectorRegistry() walLogger := log.With(o.Logger, "subcomponent", "wal") dataPath := filepath.Join(o.DataPath, "wal", o.ID) - walStorage, err := wal.NewStorage(walLogger, reg, dataPath) + walStorage, err := wal.NewStorage(walLogger, o.Registerer, dataPath) if err != nil { return nil, err } remoteLogger := log.With(o.Logger, "subcomponent", "rw") - remoteStore := remote.NewStorage(remoteLogger, reg, startTime, dataPath, remoteFlushDeadline, nil) + remoteStore := remote.NewStorage(remoteLogger, o.Registerer, startTime, dataPath, remoteFlushDeadline, nil) res := &Component{ log: o.Logger, opts: o, - reg: reg, + reg: o.Registerer, walStore: walStorage, remoteStore: remoteStore, @@ -150,6 +149,8 @@ func (c *Component) Run(ctx context.Context) error { for { select { case <-ctx.Done(): + c.walStore.Close() + c.storage.Close() return nil case <-time.After(walTruncateFrequency): // The timestamp ts is used to determine which series are not receiving @@ -278,13 +279,3 @@ func (c *Component) Config() RemoteConfig { defer c.mut.RUnlock() return c.cfg } - -// Describe implements prometheus.Collector. -func (c *Component) Describe(ch chan<- *prometheus.Desc) { - c.reg.Describe(ch) -} - -// Collect implements prometheus.Collector. -func (c *Component) Collect(ch chan<- prometheus.Metric) { - c.reg.Collect(ch) -} diff --git a/component/registry.go b/component/registry.go index 57ab0920f2a5..f1ea06d43d5f 100644 --- a/component/registry.go +++ b/component/registry.go @@ -7,6 +7,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/regexp" + "github.com/prometheus/client_golang/prometheus" ) // The parsedName of a component is the parts of its name ("remote.http") split @@ -49,6 +50,9 @@ type Options struct { // by the component; a component must use the same Exports type for its // lifetime. OnStateChange func(e Exports) + + //Registerer allows components to add their own metrics. The register will come pre-wrapped with the component id + Registerer prometheus.Registerer } // Registration describes a single component. diff --git a/component/remote/s3/s3.go b/component/remote/s3/s3.go index 342e43820b53..eb54b222a07b 100644 --- a/component/remote/s3/s3.go +++ b/component/remote/s3/s3.go @@ -9,6 +9,8 @@ import ( "time" "github.com/grafana/agent/pkg/flow/rivertypes" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/aws/aws-sdk-go-v2/aws" aws_config "github.com/aws/aws-sdk-go-v2/config" @@ -36,8 +38,10 @@ type S3 struct { health component.Health content string - watcher *watcher - updateChan chan result + watcher *watcher + updateChan chan result + s3Errors prometheus.Counter + lastAccessed prometheus.Gauge } var ( @@ -62,8 +66,18 @@ func New(o component.Options, args Arguments) (*S3, error) { args: args, health: component.Health{}, updateChan: make(chan result), + s3Errors: promauto.NewCounter(prometheus.CounterOpts{ + Name: "remote_s3_errors_total", + Help: "The number of errors while accessing s3", + }), + lastAccessed: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "remote_s3_timestamp_last_accessed_unix_seconds", + Help: "The last successful access in unix seconds", + }), } + o.Registerer.Register(s.s3Errors) + o.Registerer.Register(s.lastAccessed) w := newWatcher(bucket, file, s.updateChan, args.PollFrequency, s3Client) s.watcher = w @@ -77,6 +91,8 @@ func (s *S3) Run(ctx context.Context) error { go s.handleContentUpdate(ctx) go s.watcher.run(ctx) <-ctx.Done() + s.opts.Registerer.Unregister(s.s3Errors) + s.opts.Registerer.Unregister(s.lastAccessed) return nil } @@ -185,10 +201,12 @@ func (s *S3) handleContentPolling(newContent string, err error) { Value: newContent, }, }) + s.lastAccessed.SetToCurrentTime() s.content = newContent s.health.Health = component.HealthTypeHealthy s.health.Message = "s3 file updated" } else { + s.s3Errors.Inc() s.health.Health = component.HealthTypeUnhealthy s.health.Message = err.Error() } diff --git a/pkg/flow/flow.go b/pkg/flow/flow.go index 8f9385821905..dfc729057438 100644 --- a/pkg/flow/flow.go +++ b/pkg/flow/flow.go @@ -117,7 +117,8 @@ func newFlow(o Options) (*Flow, context.Context) { // Changed components should be queued for reevaluation. queue.Enqueue(cn) }, - }, o.Reg) + Registerer: o.Reg, + }) ) return &Flow{ diff --git a/pkg/flow/internal/controller/component.go b/pkg/flow/internal/controller/component.go index 332aa90a3e5b..1acf17b166e7 100644 --- a/pkg/flow/internal/controller/component.go +++ b/pkg/flow/internal/controller/component.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/agent/pkg/flow/internal/dag" "github.com/grafana/agent/pkg/river/ast" "github.com/grafana/agent/pkg/river/vm" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" ) @@ -58,6 +59,7 @@ type ComponentGlobals struct { Logger log.Logger // Logger shared between all managed components. DataPath string // Shared directory where component data may be stored OnExportsChange func(cn *ComponentNode) // Invoked when the managed component updated its exports + Registerer prometheus.Registerer // Registerer for serving agent and component metrics } // ComponentNode is a controller node which manages a user-defined component. @@ -148,6 +150,9 @@ func getManagedOptions(globals ComponentGlobals, cn *ComponentNode) component.Op Logger: log.With(globals.Logger, "component", cn.nodeID), DataPath: filepath.Join(globals.DataPath, cn.nodeID), OnStateChange: cn.setExports, + Registerer: prometheus.WrapRegistererWith(prometheus.Labels{ + "component_id": cn.nodeID, + }, globals.Registerer), } } diff --git a/pkg/flow/internal/controller/loader.go b/pkg/flow/internal/controller/loader.go index 5f9e0adae773..9014a0aa5b8a 100644 --- a/pkg/flow/internal/controller/loader.go +++ b/pkg/flow/internal/controller/loader.go @@ -16,7 +16,6 @@ import ( "github.com/grafana/agent/pkg/river/token/builder" "github.com/grafana/agent/pkg/river/vm" "github.com/hashicorp/go-multierror" - "github.com/prometheus/client_golang/prometheus" _ "github.com/grafana/agent/pkg/flow/internal/testcomponents" // Include test components ) @@ -36,18 +35,18 @@ type Loader struct { // NewLoader creates a new Loader. Components built by the Loader will be built // with co for their options. -func NewLoader(globals ComponentGlobals, reg prometheus.Registerer) *Loader { +func NewLoader(globals ComponentGlobals) *Loader { l := &Loader{ log: globals.Logger, globals: globals, graph: &dag.Graph{}, cache: newValueCache(), - cm: newControllerMetrics(reg), + cm: newControllerMetrics(globals.Registerer), } cc := newControllerCollector(l) - if reg != nil { - reg.MustRegister(cc) + if globals.Registerer != nil { + globals.Registerer.MustRegister(cc) } return l } diff --git a/pkg/flow/internal/controller/loader_test.go b/pkg/flow/internal/controller/loader_test.go index 15c3fc82bdd0..7267a977f65f 100644 --- a/pkg/flow/internal/controller/loader_test.go +++ b/pkg/flow/internal/controller/loader_test.go @@ -51,10 +51,11 @@ func TestLoader(t *testing.T) { Logger: log.NewNopLogger(), DataPath: t.TempDir(), OnExportsChange: func(cn *controller.ComponentNode) { /* no-op */ }, + Registerer: prometheus.DefaultRegisterer, } t.Run("New Graph", func(t *testing.T) { - l := controller.NewLoader(globals, prometheus.DefaultRegisterer) + l := controller.NewLoader(globals) diags := applyFromContent(t, l, []byte(testFile)) require.NoError(t, diags.ErrorOrNil()) requireGraph(t, l.Graph(), testGraphDefinition) @@ -72,7 +73,7 @@ func TestLoader(t *testing.T) { frequency = "1m" } ` - l := controller.NewLoader(globals, nil) + l := controller.NewLoader(globals) diags := applyFromContent(t, l, []byte(startFile)) origGraph := l.Graph() require.NoError(t, diags.ErrorOrNil()) @@ -91,7 +92,7 @@ func TestLoader(t *testing.T) { doesnotexist "bad_component" { } ` - l := controller.NewLoader(globals, nil) + l := controller.NewLoader(globals) diags := applyFromContent(t, l, []byte(invalidFile)) require.ErrorContains(t, diags.ErrorOrNil(), `Unrecognized component name "doesnotexist`) }) @@ -110,7 +111,7 @@ func TestLoader(t *testing.T) { input = testcomponents.tick.doesnotexist.tick_time } ` - l := controller.NewLoader(globals, nil) + l := controller.NewLoader(globals) diags := applyFromContent(t, l, []byte(invalidFile)) require.Error(t, diags.ErrorOrNil()) @@ -144,7 +145,7 @@ func TestLoader(t *testing.T) { input = testcomponents.passthrough.ticker.output } ` - l := controller.NewLoader(globals, nil) + l := controller.NewLoader(globals) diags := applyFromContent(t, l, []byte(invalidFile)) require.Error(t, diags.ErrorOrNil()) }) From bf83fff7c247112fa4c4de05fe80c374ca332ef8 Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 3 Aug 2022 08:53:32 -0400 Subject: [PATCH 02/15] Fix linting --- component/metrics/mutate/mutate.go | 7 +++++-- component/metrics/remotewrite/remote_write.go | 1 - component/remote/s3/s3.go | 10 ++++++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/component/metrics/mutate/mutate.go b/component/metrics/mutate/mutate.go index ce7f07e0105c..7f5e8195327d 100644 --- a/component/metrics/mutate/mutate.go +++ b/component/metrics/mutate/mutate.go @@ -64,9 +64,12 @@ func New(o component.Options, args Arguments) (*Component, error) { Help: "Total number of metrics processed", }) - o.Registerer.Register(c.metricsProcessed) + err := o.Registerer.Register(c.metricsProcessed) + if err != nil { + return nil, err + } // Call to Update() to set the relabelling rules once at the start. - if err := c.Update(args); err != nil { + if err = c.Update(args); err != nil { return nil, err } diff --git a/component/metrics/remotewrite/remote_write.go b/component/metrics/remotewrite/remote_write.go index ea0a55c6926b..2c0e6a34bfdc 100644 --- a/component/metrics/remotewrite/remote_write.go +++ b/component/metrics/remotewrite/remote_write.go @@ -99,7 +99,6 @@ type Component struct { // NewComponent creates a new metrics_forwarder component. func NewComponent(o component.Options, c RemoteConfig) (*Component, error) { - walLogger := log.With(o.Logger, "subcomponent", "wal") dataPath := filepath.Join(o.DataPath, "wal", o.ID) walStorage, err := wal.NewStorage(walLogger, o.Registerer, dataPath) diff --git a/component/remote/s3/s3.go b/component/remote/s3/s3.go index eb54b222a07b..8f51248edfc1 100644 --- a/component/remote/s3/s3.go +++ b/component/remote/s3/s3.go @@ -76,8 +76,14 @@ func New(o component.Options, args Arguments) (*S3, error) { }), } - o.Registerer.Register(s.s3Errors) - o.Registerer.Register(s.lastAccessed) + err = o.Registerer.Register(s.s3Errors) + if err != nil { + return nil, err + } + err = o.Registerer.Register(s.lastAccessed) + if err != nil { + return nil, err + } w := newWatcher(bucket, file, s.updateChan, args.PollFrequency, s3Client) s.watcher = w From 534397dad1739250811f13e83577f2b755147c8c Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 3 Aug 2022 09:11:22 -0400 Subject: [PATCH 03/15] Fix linting, add ready endpoint, fix metrics --- cmd/agent/flow.go | 11 +++++++++++ component/local/file/file.go | 17 +++++++++++++++-- component/metrics/mutate/mutate.go | 3 +-- component/remote/s3/s3.go | 10 ++++------ 4 files changed, 31 insertions(+), 10 deletions(-) diff --git a/cmd/agent/flow.go b/cmd/agent/flow.go index 89f673af1f11..dc47bba803c3 100644 --- a/cmd/agent/flow.go +++ b/cmd/agent/flow.go @@ -43,6 +43,7 @@ func runFlow() error { httpListenAddr = "127.0.0.1:12345" configFile string storagePath = "data-agent/" + ready = true ) fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError) @@ -111,6 +112,15 @@ func runFlow() error { } r := mux.NewRouter() + r.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) { + if ready { + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "Agent is Ready.\n") + } else { + w.WriteHeader(http.StatusServiceUnavailable) + fmt.Fprint(w, "Config failed to load.\n") + } + }) r.Handle("/metrics", promhttp.Handler()) r.Handle("/debug/config", f.ConfigHandler()) r.Handle("/debug/graph", f.GraphHandler()) @@ -119,6 +129,7 @@ func runFlow() error { r.HandleFunc("/-/reload", func(w http.ResponseWriter, _ *http.Request) { err := reload() + ready = err == nil if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return diff --git a/component/local/file/file.go b/component/local/file/file.go index 5d47831678d0..6c9b2722b570 100644 --- a/component/local/file/file.go +++ b/component/local/file/file.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/go-kit/log/level" "github.com/grafana/agent/component" "github.com/grafana/agent/pkg/flow/rivertypes" @@ -84,7 +86,8 @@ type Component struct { // reloadCh is a buffered channel which is written to when the watched file // should be reloaded by the component. - reloadCh chan struct{} + reloadCh chan struct{} + lastAccessed prometheus.Gauge } var ( @@ -98,11 +101,19 @@ func New(o component.Options, args Arguments) (*Component, error) { opts: o, reloadCh: make(chan struct{}, 1), + lastAccessed: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "local_file_timestamp_last_accessed_unix_seconds", + Help: "The last successful access in unix seconds", + }), } + err := o.Registerer.Register(c.lastAccessed) + if err != nil { + return nil, err + } // Perform an update which will immediately set our exports to the initial // contents of the file. - if err := c.Update(args); err != nil { + if err = c.Update(args); err != nil { return nil, err } return c, nil @@ -136,6 +147,7 @@ func (c *Component) Run(ctx context.Context) error { for { select { case <-ctx.Done(): + c.opts.Registerer.Unregister(c.lastAccessed) return nil case <-c.reloadCh: time.Sleep(waitReadPeriod) @@ -162,6 +174,7 @@ func (c *Component) readFile() error { return err } c.latestContent = string(bb) + c.lastAccessed.SetToCurrentTime() c.opts.OnStateChange(Exports{ Content: rivertypes.OptionalSecret{ diff --git a/component/metrics/mutate/mutate.go b/component/metrics/mutate/mutate.go index 7f5e8195327d..8100c5af948f 100644 --- a/component/metrics/mutate/mutate.go +++ b/component/metrics/mutate/mutate.go @@ -9,7 +9,6 @@ import ( flow_relabel "github.com/grafana/agent/component/common/relabel" "github.com/grafana/agent/component/metrics" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" ) @@ -59,7 +58,7 @@ func New(o component.Options, args Arguments) (*Component, error) { c := &Component{opts: o} c.appendable = fa.NewFlowAppendable(args.ForwardTo...) c.receiver = &metrics.Receiver{Receive: c.Receive} - c.metricsProcessed = promauto.NewCounter(prometheus.CounterOpts{ + c.metricsProcessed = prometheus.NewCounter(prometheus.CounterOpts{ Name: "metrics_mutate_metrics_processed", Help: "Total number of metrics processed", }) diff --git a/component/remote/s3/s3.go b/component/remote/s3/s3.go index 8f51248edfc1..989d061ec677 100644 --- a/component/remote/s3/s3.go +++ b/component/remote/s3/s3.go @@ -8,14 +8,12 @@ import ( "sync" "time" - "github.com/grafana/agent/pkg/flow/rivertypes" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/aws/aws-sdk-go-v2/aws" aws_config "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/flow/rivertypes" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/net/context" ) @@ -66,11 +64,11 @@ func New(o component.Options, args Arguments) (*S3, error) { args: args, health: component.Health{}, updateChan: make(chan result), - s3Errors: promauto.NewCounter(prometheus.CounterOpts{ + s3Errors: prometheus.NewCounter(prometheus.CounterOpts{ Name: "remote_s3_errors_total", Help: "The number of errors while accessing s3", }), - lastAccessed: promauto.NewGauge(prometheus.GaugeOpts{ + lastAccessed: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "remote_s3_timestamp_last_accessed_unix_seconds", Help: "The last successful access in unix seconds", }), From 8e54312ded4f426e0f1218ec7e1ed31e29eb5ceb Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 3 Aug 2022 09:28:05 -0400 Subject: [PATCH 04/15] Fix reusing of default registerer in tests --- pkg/flow/internal/controller/loader_test.go | 23 ++++++++++++--------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/flow/internal/controller/loader_test.go b/pkg/flow/internal/controller/loader_test.go index 7267a977f65f..aa16aa960f75 100644 --- a/pkg/flow/internal/controller/loader_test.go +++ b/pkg/flow/internal/controller/loader_test.go @@ -47,15 +47,18 @@ func TestLoader(t *testing.T) { }, } - globals := controller.ComponentGlobals{ - Logger: log.NewNopLogger(), - DataPath: t.TempDir(), - OnExportsChange: func(cn *controller.ComponentNode) { /* no-op */ }, - Registerer: prometheus.DefaultRegisterer, + newGlobals := func() controller.ComponentGlobals { + return controller.ComponentGlobals{ + Logger: log.NewNopLogger(), + DataPath: t.TempDir(), + OnExportsChange: func(cn *controller.ComponentNode) { /* no-op */ }, + Registerer: prometheus.NewRegistry(), + } } t.Run("New Graph", func(t *testing.T) { - l := controller.NewLoader(globals) + + l := controller.NewLoader(newGlobals()) diags := applyFromContent(t, l, []byte(testFile)) require.NoError(t, diags.ErrorOrNil()) requireGraph(t, l.Graph(), testGraphDefinition) @@ -73,7 +76,7 @@ func TestLoader(t *testing.T) { frequency = "1m" } ` - l := controller.NewLoader(globals) + l := controller.NewLoader(newGlobals()) diags := applyFromContent(t, l, []byte(startFile)) origGraph := l.Graph() require.NoError(t, diags.ErrorOrNil()) @@ -92,7 +95,7 @@ func TestLoader(t *testing.T) { doesnotexist "bad_component" { } ` - l := controller.NewLoader(globals) + l := controller.NewLoader(newGlobals()) diags := applyFromContent(t, l, []byte(invalidFile)) require.ErrorContains(t, diags.ErrorOrNil(), `Unrecognized component name "doesnotexist`) }) @@ -111,7 +114,7 @@ func TestLoader(t *testing.T) { input = testcomponents.tick.doesnotexist.tick_time } ` - l := controller.NewLoader(globals) + l := controller.NewLoader(newGlobals()) diags := applyFromContent(t, l, []byte(invalidFile)) require.Error(t, diags.ErrorOrNil()) @@ -145,7 +148,7 @@ func TestLoader(t *testing.T) { input = testcomponents.passthrough.ticker.output } ` - l := controller.NewLoader(globals) + l := controller.NewLoader(newGlobals()) diags := applyFromContent(t, l, []byte(invalidFile)) require.Error(t, diags.ErrorOrNil()) }) From 3bdc3c6a17adf9415db4814d8a792f648457e52f Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 3 Aug 2022 09:43:44 -0400 Subject: [PATCH 05/15] Fix tests --- component/metrics/scrape/scrape_test.go | 7 ++++++- component/remote/s3/s3_test.go | 4 ++++ pkg/flow/componenttest/componenttest.go | 3 +++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/component/metrics/scrape/scrape_test.go b/component/metrics/scrape/scrape_test.go index fef28d747e94..97dd360cdceb 100644 --- a/component/metrics/scrape/scrape_test.go +++ b/component/metrics/scrape/scrape_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/agent/component" "github.com/grafana/agent/component/metrics" "github.com/grafana/agent/pkg/flow/logging" @@ -16,7 +18,10 @@ import ( func TestForwardingToAppendable(t *testing.T) { l, err := logging.New(os.Stderr, logging.DefaultOptions) require.NoError(t, err) - opts := component.Options{Logger: l} + opts := component.Options{ + Logger: l, + Registerer: prometheus.NewRegistry(), + } nilReceivers := []*metrics.Receiver{nil, nil} diff --git a/component/remote/s3/s3_test.go b/component/remote/s3/s3_test.go index b50b0f861049..869b0c116f2e 100644 --- a/component/remote/s3/s3_test.go +++ b/component/remote/s3/s3_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/agent/component" "github.com/johannesboyne/gofakes3" "github.com/johannesboyne/gofakes3/backend/s3mem" @@ -22,6 +24,7 @@ func TestCorrectBucket(t *testing.T) { o := component.Options{ ID: "t1", OnStateChange: func(_ component.Exports) {}, + Registerer: prometheus.NewRegistry(), } s3File, err := New(o, Arguments{ @@ -44,6 +47,7 @@ func TestWatchingFile(t *testing.T) { defer mut.Unlock() output = e.(Exports).Content.Value }, + Registerer: prometheus.NewRegistry(), }, Arguments{ Path: "s3://mybucket/test.txt", PollFrequency: 10 * time.Second, diff --git a/pkg/flow/componenttest/componenttest.go b/pkg/flow/componenttest/componenttest.go index 4f1441cf9d84..75b8ebc829b5 100644 --- a/pkg/flow/componenttest/componenttest.go +++ b/pkg/flow/componenttest/componenttest.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/go-kit/log" "github.com/grafana/agent/component" ) @@ -126,6 +128,7 @@ func (c *Controller) buildComponent(dataPath string, args component.Arguments) ( Logger: c.log, DataPath: dataPath, OnStateChange: c.onStateChange, + Registerer: prometheus.NewRegistry(), } inner, err := c.reg.Build(opts, args) From d113fdbacea045ea5d38f0f6524317ff70be580d Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 3 Aug 2022 09:57:55 -0400 Subject: [PATCH 06/15] Fix lint --- pkg/flow/internal/controller/loader_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/flow/internal/controller/loader_test.go b/pkg/flow/internal/controller/loader_test.go index aa16aa960f75..e0c67386cf06 100644 --- a/pkg/flow/internal/controller/loader_test.go +++ b/pkg/flow/internal/controller/loader_test.go @@ -57,7 +57,6 @@ func TestLoader(t *testing.T) { } t.Run("New Graph", func(t *testing.T) { - l := controller.NewLoader(newGlobals()) diags := applyFromContent(t, l, []byte(testFile)) require.NoError(t, diags.ErrorOrNil()) From 32c3dfa9a93b60ab93d0ef5fae00199379c7e13c Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 3 Aug 2022 11:49:30 -0400 Subject: [PATCH 07/15] Switch to auto unregisterer --- cmd/agent/flow.go | 8 +- component/local/file/file.go | 3 +- component/metrics/remotewrite/remote_write.go | 2 - component/metrics/scrape/scrape_test.go | 23 ++++- component/metrics/util.go | 91 ------------------ component/registry.go | 13 ++- component/remote/s3/s3.go | 14 +-- pkg/flow/flow.go | 1 + pkg/flow/internal/controller/component.go | 5 +- pkg/flow/internal/controller/registerer.go | 95 +++++++++++++++++++ .../internal/controller/registerer_test.go | 46 +++++++++ 11 files changed, 190 insertions(+), 111 deletions(-) delete mode 100644 component/metrics/util.go create mode 100644 pkg/flow/internal/controller/registerer.go create mode 100644 pkg/flow/internal/controller/registerer_test.go diff --git a/cmd/agent/flow.go b/cmd/agent/flow.go index dc47bba803c3..ef88b451bef1 100644 --- a/cmd/agent/flow.go +++ b/cmd/agent/flow.go @@ -11,6 +11,8 @@ import ( "os/signal" "sync" + "go.uber.org/atomic" + "github.com/fatih/color" "github.com/go-kit/log/level" "github.com/gorilla/mux" @@ -43,7 +45,7 @@ func runFlow() error { httpListenAddr = "127.0.0.1:12345" configFile string storagePath = "data-agent/" - ready = true + ready = atomic.NewBool(true) ) fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError) @@ -113,7 +115,7 @@ func runFlow() error { r := mux.NewRouter() r.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) { - if ready { + if ready.Load() { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "Agent is Ready.\n") } else { @@ -129,7 +131,7 @@ func runFlow() error { r.HandleFunc("/-/reload", func(w http.ResponseWriter, _ *http.Request) { err := reload() - ready = err == nil + ready.Store(err == nil) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return diff --git a/component/local/file/file.go b/component/local/file/file.go index 6c9b2722b570..f59b54d72463 100644 --- a/component/local/file/file.go +++ b/component/local/file/file.go @@ -107,7 +107,7 @@ func New(o component.Options, args Arguments) (*Component, error) { }), } - err := o.Registerer.Register(c.lastAccessed) + err := o.Registerer.RegisterComponent(c.lastAccessed) if err != nil { return nil, err } @@ -147,7 +147,6 @@ func (c *Component) Run(ctx context.Context) error { for { select { case <-ctx.Done(): - c.opts.Registerer.Unregister(c.lastAccessed) return nil case <-c.reloadCh: time.Sleep(waitReadPeriod) diff --git a/component/metrics/remotewrite/remote_write.go b/component/metrics/remotewrite/remote_write.go index 2c0e6a34bfdc..d1a578a13e7b 100644 --- a/component/metrics/remotewrite/remote_write.go +++ b/component/metrics/remotewrite/remote_write.go @@ -148,8 +148,6 @@ func (c *Component) Run(ctx context.Context) error { for { select { case <-ctx.Done(): - c.walStore.Close() - c.storage.Close() return nil case <-time.After(walTruncateFrequency): // The timestamp ts is used to determine which series are not receiving diff --git a/component/metrics/scrape/scrape_test.go b/component/metrics/scrape/scrape_test.go index 97dd360cdceb..fb22fe4f4332 100644 --- a/component/metrics/scrape/scrape_test.go +++ b/component/metrics/scrape/scrape_test.go @@ -20,7 +20,7 @@ func TestForwardingToAppendable(t *testing.T) { require.NoError(t, err) opts := component.Options{ Logger: l, - Registerer: prometheus.NewRegistry(), + Registerer: &fakeRegisterer{}, } nilReceivers := []*metrics.Receiver{nil, nil} @@ -81,3 +81,24 @@ func TestForwardingToAppendable(t *testing.T) { require.Len(t, receivedSamples, 1) require.Equal(t, receivedSamples[0], &sample) } + +type fakeRegisterer struct { +} + +func (f fakeRegisterer) Register(collector prometheus.Collector) error { + return nil +} + +func (f fakeRegisterer) MustRegister(collector ...prometheus.Collector) {} + +func (f fakeRegisterer) Unregister(collector prometheus.Collector) bool { + return true +} + +func (f fakeRegisterer) RegisterComponent(collector ...prometheus.Collector) error { + return nil +} + +func (f fakeRegisterer) UnregisterComponent() bool { + return true +} diff --git a/component/metrics/util.go b/component/metrics/util.go deleted file mode 100644 index 124ca61d267c..000000000000 --- a/component/metrics/util.go +++ /dev/null @@ -1,91 +0,0 @@ -package metrics - -import ( - "fmt" - "sync" - - "github.com/prometheus/client_golang/prometheus" -) - -// CollectorRegistry is both a prometheus.Registerer and prometheus.Collector: -// it encapsulates a set of collectors. -// -// Unlike a real Prometheus Registry, CollectorRegistry is unchecked and does -// not validate that metrics are unique at registration time. -type CollectorRegistry struct { - mut sync.RWMutex - cs []prometheus.Collector -} - -var _ prometheus.Registerer = (*CollectorRegistry)(nil) -var _ prometheus.Collector = (*CollectorRegistry)(nil) - -// NewCollectorRegistry returns a new CollectorRegistry. -func NewCollectorRegistry() *CollectorRegistry { - return &CollectorRegistry{} -} - -// Register implements prometheus.Collector. Unlike a real Prometheus registry, -// Register does not ensure that c provides unique metrics. -func (cr *CollectorRegistry) Register(c prometheus.Collector) error { - cr.mut.Lock() - defer cr.mut.Unlock() - - for _, exist := range cr.cs { - if exist == c { - return fmt.Errorf("collector already registered") - } - } - - cr.cs = append(cr.cs, c) - return nil -} - -// MustRegister implements prometheus.Collector. -func (cr *CollectorRegistry) MustRegister(cs ...prometheus.Collector) { - for _, c := range cs { - if err := cr.Register(c); err != nil { - panic(err) - } - } -} - -// Unregister implements prometheus.Collector. -func (cr *CollectorRegistry) Unregister(c prometheus.Collector) bool { - cr.mut.Lock() - defer cr.mut.Unlock() - - rem := make([]prometheus.Collector, 0, len(cr.cs)) - - var removed bool - for _, exist := range cr.cs { - if c == exist { - removed = true - continue - } - rem = append(rem, exist) - } - - cr.cs = rem - return removed -} - -// Describe implements prometheus.Collector. -func (cr *CollectorRegistry) Describe(ch chan<- *prometheus.Desc) { - cr.mut.RLock() - defer cr.mut.RUnlock() - - for _, c := range cr.cs { - c.Describe(ch) - } -} - -// Collect implements prometheus.Collector. -func (cr *CollectorRegistry) Collect(ch chan<- prometheus.Metric) { - cr.mut.RLock() - defer cr.mut.RUnlock() - - for _, c := range cr.cs { - c.Collect(ch) - } -} diff --git a/component/registry.go b/component/registry.go index f1ea06d43d5f..9db8274dbed5 100644 --- a/component/registry.go +++ b/component/registry.go @@ -52,7 +52,7 @@ type Options struct { OnStateChange func(e Exports) //Registerer allows components to add their own metrics. The register will come pre-wrapped with the component id - Registerer prometheus.Registerer + Registerer Registerer } // Registration describes a single component. @@ -181,3 +181,14 @@ func Get(name string) (Registration, bool) { r, ok := registered[name] return r, ok } + +// Registerer handles registering any custom metrics for a component and teardown +type Registerer interface { + prometheus.Registerer + + // RegisterComponent registers metrics for a component + RegisterComponent(...prometheus.Collector) error + + // UnregisterComponent removes all metrics for a given component + UnregisterComponent() bool +} diff --git a/component/remote/s3/s3.go b/component/remote/s3/s3.go index 989d061ec677..3efc25fba38f 100644 --- a/component/remote/s3/s3.go +++ b/component/remote/s3/s3.go @@ -74,16 +74,13 @@ func New(o component.Options, args Arguments) (*S3, error) { }), } - err = o.Registerer.Register(s.s3Errors) - if err != nil { - return nil, err - } - err = o.Registerer.Register(s.lastAccessed) + w := newWatcher(bucket, file, s.updateChan, args.PollFrequency, s3Client) + s.watcher = w + + err = o.Registerer.RegisterComponent(s.s3Errors, s.lastAccessed) if err != nil { return nil, err } - w := newWatcher(bucket, file, s.updateChan, args.PollFrequency, s3Client) - s.watcher = w content, err := w.downloadSynchronously() s.handleContentPolling(content, err) @@ -95,8 +92,7 @@ func (s *S3) Run(ctx context.Context) error { go s.handleContentUpdate(ctx) go s.watcher.run(ctx) <-ctx.Done() - s.opts.Registerer.Unregister(s.s3Errors) - s.opts.Registerer.Unregister(s.lastAccessed) + return nil } diff --git a/pkg/flow/flow.go b/pkg/flow/flow.go index dfc729057438..fc0398a77803 100644 --- a/pkg/flow/flow.go +++ b/pkg/flow/flow.go @@ -67,6 +67,7 @@ type Options struct { // subdirectories for component-specific data. DataPath string + // Reg is the prometheus register to use Reg prometheus.Registerer } diff --git a/pkg/flow/internal/controller/component.go b/pkg/flow/internal/controller/component.go index 1acf17b166e7..fc58342f122d 100644 --- a/pkg/flow/internal/controller/component.go +++ b/pkg/flow/internal/controller/component.go @@ -150,9 +150,9 @@ func getManagedOptions(globals ComponentGlobals, cn *ComponentNode) component.Op Logger: log.With(globals.Logger, "component", cn.nodeID), DataPath: filepath.Join(globals.DataPath, cn.nodeID), OnStateChange: cn.setExports, - Registerer: prometheus.WrapRegistererWith(prometheus.Labels{ + Registerer: newRegister(prometheus.WrapRegistererWith(prometheus.Labels{ "component_id": cn.nodeID, - }, globals.Registerer), + }, globals.Registerer)), } } @@ -277,6 +277,7 @@ func (cn *ComponentNode) Run(ctx context.Context) error { } cn.setRunHealth(component.HealthTypeHealthy, "started component") + defer cn.managedOpts.Registerer.UnregisterComponent() err := cn.managed.Run(ctx) var exitMsg string diff --git a/pkg/flow/internal/controller/registerer.go b/pkg/flow/internal/controller/registerer.go new file mode 100644 index 000000000000..d3587cb73901 --- /dev/null +++ b/pkg/flow/internal/controller/registerer.go @@ -0,0 +1,95 @@ +package controller + +import ( + "fmt" + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +// register handles a single components collector wrapping a registerer +type register struct { + mut sync.Mutex + internal prometheus.Registerer + internalCollectors map[prometheus.Collector]struct{} +} + +func newRegister(reg prometheus.Registerer) *register { + return ®ister{ + internal: reg, + internalCollectors: make(map[prometheus.Collector]struct{}, 0), + } +} + +// RegisterComponent registers a set of collectors +func (r *register) RegisterComponent(collectors ...prometheus.Collector) error { + r.mut.Lock() + defer r.mut.Unlock() + + if r.internal == nil { + return fmt.Errorf("internal registerer not set") + } + for _, c := range collectors { + err := r.internal.Register(c) + if err != nil { + return err + } + r.internalCollectors[c] = struct{}{} + } + return nil +} + +// UnregisterComponent unregisters all collectors from EITHER Register or RegisterComponent +func (r *register) UnregisterComponent() bool { + r.mut.Lock() + defer r.mut.Unlock() + + if r.internal == nil { + return false + } + + for coll := range r.internalCollectors { + r.internal.Unregister(coll) + delete(r.internalCollectors, coll) + } + return true +} + +// Register registers a single collector +func (r *register) Register(collector prometheus.Collector) error { + r.mut.Lock() + defer r.mut.Unlock() + + if r.internal == nil { + return fmt.Errorf("internal registerer not set") + } + err := r.internal.Register(collector) + if err != nil { + return err + } + r.internalCollectors[collector] = struct{}{} + return nil +} + +// MustRegister calls the internal register and adds the collectors to the internal collection +func (r *register) MustRegister(collector ...prometheus.Collector) { + r.mut.Lock() + defer r.mut.Unlock() + + r.internal.MustRegister(collector...) + for _, c := range collector { + r.internalCollectors[c] = struct{}{} + } +} + +// Unregister calls the internal unregister and removes the collector from the internal collection +func (r *register) Unregister(collector prometheus.Collector) bool { + r.mut.Lock() + defer r.mut.Unlock() + + if r.internal == nil { + return false + } + delete(r.internalCollectors, collector) + return r.internal.Unregister(collector) +} diff --git a/pkg/flow/internal/controller/registerer_test.go b/pkg/flow/internal/controller/registerer_test.go new file mode 100644 index 000000000000..5d0174752fa9 --- /dev/null +++ b/pkg/flow/internal/controller/registerer_test.go @@ -0,0 +1,46 @@ +package controller + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func TestRegister(t *testing.T) { + r := newRegister(prometheus.NewRegistry()) + err := r.RegisterComponent(prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_counter", + Help: "testing a counter", + })) + require.NoError(t, err) + require.Len(t, r.internalCollectors, 1) + require.True(t, r.UnregisterComponent()) + require.Len(t, r.internalCollectors, 0) +} + +func TestRegisterNormal(t *testing.T) { + r := newRegister(prometheus.NewRegistry()) + err := r.Register(prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_counter", + Help: "testing a counter", + })) + require.NoError(t, err) + require.Len(t, r.internalCollectors, 1) + require.True(t, r.UnregisterComponent()) + require.Len(t, r.internalCollectors, 0) +} + +func TestRegisterNormalUnregisterNormal(t *testing.T) { + r := newRegister(prometheus.NewRegistry()) + testCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_counter", + Help: "testing a counter", + }) + err := r.Register(testCounter) + require.NoError(t, err) + require.Len(t, r.internalCollectors, 1) + success := r.Unregister(testCounter) + require.True(t, success) + require.Len(t, r.internalCollectors, 0) +} From 6a70acb8dbca75be9ec5b476a7a894c4f2e02dcd Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 3 Aug 2022 11:52:21 -0400 Subject: [PATCH 08/15] Switch to auto unregisterer --- pkg/flow/componenttest/componenttest.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/pkg/flow/componenttest/componenttest.go b/pkg/flow/componenttest/componenttest.go index 75b8ebc829b5..ec0d3318fd9d 100644 --- a/pkg/flow/componenttest/componenttest.go +++ b/pkg/flow/componenttest/componenttest.go @@ -128,7 +128,7 @@ func (c *Controller) buildComponent(dataPath string, args component.Arguments) ( Logger: c.log, DataPath: dataPath, OnStateChange: c.onStateChange, - Registerer: prometheus.NewRegistry(), + Registerer: &fakeRegisterer{}, } inner, err := c.reg.Build(opts, args) @@ -150,3 +150,24 @@ func (c *Controller) Update(args component.Arguments) error { } return c.inner.Update(args) } + +type fakeRegisterer struct { +} + +func (f fakeRegisterer) Register(collector prometheus.Collector) error { + return nil +} + +func (f fakeRegisterer) MustRegister(collector ...prometheus.Collector) {} + +func (f fakeRegisterer) Unregister(collector prometheus.Collector) bool { + return true +} + +func (f fakeRegisterer) RegisterComponent(collector ...prometheus.Collector) error { + return nil +} + +func (f fakeRegisterer) UnregisterComponent() bool { + return true +} From 2eb32c91e9d6979230f0811a8f86cf179d44da93 Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 3 Aug 2022 11:53:13 -0400 Subject: [PATCH 09/15] change metric naming --- component/local/file/file.go | 2 +- component/remote/s3/s3.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/component/local/file/file.go b/component/local/file/file.go index f59b54d72463..95af71d660b8 100644 --- a/component/local/file/file.go +++ b/component/local/file/file.go @@ -102,7 +102,7 @@ func New(o component.Options, args Arguments) (*Component, error) { reloadCh: make(chan struct{}, 1), lastAccessed: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "local_file_timestamp_last_accessed_unix_seconds", + Name: "agent_local_file_timestamp_last_accessed_unix_seconds", Help: "The last successful access in unix seconds", }), } diff --git a/component/remote/s3/s3.go b/component/remote/s3/s3.go index 3efc25fba38f..6b5f9479bff1 100644 --- a/component/remote/s3/s3.go +++ b/component/remote/s3/s3.go @@ -65,11 +65,11 @@ func New(o component.Options, args Arguments) (*S3, error) { health: component.Health{}, updateChan: make(chan result), s3Errors: prometheus.NewCounter(prometheus.CounterOpts{ - Name: "remote_s3_errors_total", + Name: "agent_remote_s3_errors_total", Help: "The number of errors while accessing s3", }), lastAccessed: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "remote_s3_timestamp_last_accessed_unix_seconds", + Name: "agent_remote_s3_timestamp_last_accessed_unix_seconds", Help: "The last successful access in unix seconds", }), } From ea57c1d766cfbcf4b30675ec994a4d46b48a5fd7 Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 3 Aug 2022 14:34:09 -0400 Subject: [PATCH 10/15] fix s3 test --- component/remote/s3/s3_test.go | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/component/remote/s3/s3_test.go b/component/remote/s3/s3_test.go index 869b0c116f2e..5f23cf5636ca 100644 --- a/component/remote/s3/s3_test.go +++ b/component/remote/s3/s3_test.go @@ -24,7 +24,7 @@ func TestCorrectBucket(t *testing.T) { o := component.Options{ ID: "t1", OnStateChange: func(_ component.Exports) {}, - Registerer: prometheus.NewRegistry(), + Registerer: &fakeRegisterer{}, } s3File, err := New(o, Arguments{ @@ -47,7 +47,7 @@ func TestWatchingFile(t *testing.T) { defer mut.Unlock() output = e.(Exports).Content.Value }, - Registerer: prometheus.NewRegistry(), + Registerer: &fakeRegisterer{}, }, Arguments{ Path: "s3://mybucket/test.txt", PollFrequency: 10 * time.Second, @@ -92,3 +92,24 @@ func pushFile(t *testing.T, backend *s3mem.Backend, filename string, filecontent ) assert.NoError(t, err) } + +type fakeRegisterer struct { +} + +func (f fakeRegisterer) Register(collector prometheus.Collector) error { + return nil +} + +func (f fakeRegisterer) MustRegister(collector ...prometheus.Collector) {} + +func (f fakeRegisterer) Unregister(collector prometheus.Collector) bool { + return true +} + +func (f fakeRegisterer) RegisterComponent(collector ...prometheus.Collector) error { + return nil +} + +func (f fakeRegisterer) UnregisterComponent() bool { + return true +} From 992d944337dc24624bcf552b0bf7105bd70b9746 Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 3 Aug 2022 14:35:25 -0400 Subject: [PATCH 11/15] fix metric naming --- component/metrics/mutate/mutate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/metrics/mutate/mutate.go b/component/metrics/mutate/mutate.go index 8100c5af948f..614d6e7177c6 100644 --- a/component/metrics/mutate/mutate.go +++ b/component/metrics/mutate/mutate.go @@ -59,7 +59,7 @@ func New(o component.Options, args Arguments) (*Component, error) { c.appendable = fa.NewFlowAppendable(args.ForwardTo...) c.receiver = &metrics.Receiver{Receive: c.Receive} c.metricsProcessed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "metrics_mutate_metrics_processed", + Name: "agent_metrics_mutate_metrics_processed", Help: "Total number of metrics processed", }) From fef03aa5a2ef4deb007abd89a0d0f8a13466726c Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 3 Aug 2022 16:59:45 -0400 Subject: [PATCH 12/15] switch to different style of collector based on roberts feedback --- component/local/file/file.go | 2 +- component/metrics/remotewrite/remote_write.go | 8 +- component/metrics/scrape/scrape_test.go | 23 +---- component/registry.go | 2 +- component/remote/s3/s3.go | 6 +- component/remote/s3/s3_test.go | 25 +---- pkg/flow/componenttest/componenttest.go | 23 +---- pkg/flow/internal/controller/component.go | 8 +- pkg/flow/internal/controller/metrics.go | 4 + pkg/flow/internal/controller/registerer.go | 95 ------------------- .../internal/controller/registerer_test.go | 46 --------- .../internal/controller/wrap_registerer.go | 65 +++++++++++++ 12 files changed, 87 insertions(+), 220 deletions(-) delete mode 100644 pkg/flow/internal/controller/registerer.go delete mode 100644 pkg/flow/internal/controller/registerer_test.go create mode 100644 pkg/flow/internal/controller/wrap_registerer.go diff --git a/component/local/file/file.go b/component/local/file/file.go index 95af71d660b8..66be129c16a3 100644 --- a/component/local/file/file.go +++ b/component/local/file/file.go @@ -107,7 +107,7 @@ func New(o component.Options, args Arguments) (*Component, error) { }), } - err := o.Registerer.RegisterComponent(c.lastAccessed) + err := o.Registerer.Register(c.lastAccessed) if err != nil { return nil, err } diff --git a/component/metrics/remotewrite/remote_write.go b/component/metrics/remotewrite/remote_write.go index d1a578a13e7b..8deeaf6262cf 100644 --- a/component/metrics/remotewrite/remote_write.go +++ b/component/metrics/remotewrite/remote_write.go @@ -20,7 +20,6 @@ import ( "github.com/grafana/agent/pkg/build" "github.com/grafana/agent/pkg/flow/rivertypes" "github.com/grafana/agent/pkg/metrics/wal" - "github.com/prometheus/client_golang/prometheus" common "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" @@ -85,7 +84,6 @@ type BasicAuthConfig struct { type Component struct { log log.Logger opts component.Options - reg prometheus.Registerer walStore *wal.Storage remoteStore *remote.Storage @@ -110,10 +108,8 @@ func NewComponent(o component.Options, c RemoteConfig) (*Component, error) { remoteStore := remote.NewStorage(remoteLogger, o.Registerer, startTime, dataPath, remoteFlushDeadline, nil) res := &Component{ - log: o.Logger, - opts: o, - reg: o.Registerer, - + log: o.Logger, + opts: o, walStore: walStorage, remoteStore: remoteStore, storage: storage.NewFanout(o.Logger, walStorage, remoteStore), diff --git a/component/metrics/scrape/scrape_test.go b/component/metrics/scrape/scrape_test.go index fb22fe4f4332..97dd360cdceb 100644 --- a/component/metrics/scrape/scrape_test.go +++ b/component/metrics/scrape/scrape_test.go @@ -20,7 +20,7 @@ func TestForwardingToAppendable(t *testing.T) { require.NoError(t, err) opts := component.Options{ Logger: l, - Registerer: &fakeRegisterer{}, + Registerer: prometheus.NewRegistry(), } nilReceivers := []*metrics.Receiver{nil, nil} @@ -81,24 +81,3 @@ func TestForwardingToAppendable(t *testing.T) { require.Len(t, receivedSamples, 1) require.Equal(t, receivedSamples[0], &sample) } - -type fakeRegisterer struct { -} - -func (f fakeRegisterer) Register(collector prometheus.Collector) error { - return nil -} - -func (f fakeRegisterer) MustRegister(collector ...prometheus.Collector) {} - -func (f fakeRegisterer) Unregister(collector prometheus.Collector) bool { - return true -} - -func (f fakeRegisterer) RegisterComponent(collector ...prometheus.Collector) error { - return nil -} - -func (f fakeRegisterer) UnregisterComponent() bool { - return true -} diff --git a/component/registry.go b/component/registry.go index 9db8274dbed5..7a90efc87aeb 100644 --- a/component/registry.go +++ b/component/registry.go @@ -52,7 +52,7 @@ type Options struct { OnStateChange func(e Exports) //Registerer allows components to add their own metrics. The register will come pre-wrapped with the component id - Registerer Registerer + Registerer prometheus.Registerer } // Registration describes a single component. diff --git a/component/remote/s3/s3.go b/component/remote/s3/s3.go index 6b5f9479bff1..d0009d5d9595 100644 --- a/component/remote/s3/s3.go +++ b/component/remote/s3/s3.go @@ -77,7 +77,11 @@ func New(o component.Options, args Arguments) (*S3, error) { w := newWatcher(bucket, file, s.updateChan, args.PollFrequency, s3Client) s.watcher = w - err = o.Registerer.RegisterComponent(s.s3Errors, s.lastAccessed) + err = o.Registerer.Register(s.s3Errors) + if err != nil { + return nil, err + } + err = o.Registerer.Register(s.lastAccessed) if err != nil { return nil, err } diff --git a/component/remote/s3/s3_test.go b/component/remote/s3/s3_test.go index 5f23cf5636ca..869b0c116f2e 100644 --- a/component/remote/s3/s3_test.go +++ b/component/remote/s3/s3_test.go @@ -24,7 +24,7 @@ func TestCorrectBucket(t *testing.T) { o := component.Options{ ID: "t1", OnStateChange: func(_ component.Exports) {}, - Registerer: &fakeRegisterer{}, + Registerer: prometheus.NewRegistry(), } s3File, err := New(o, Arguments{ @@ -47,7 +47,7 @@ func TestWatchingFile(t *testing.T) { defer mut.Unlock() output = e.(Exports).Content.Value }, - Registerer: &fakeRegisterer{}, + Registerer: prometheus.NewRegistry(), }, Arguments{ Path: "s3://mybucket/test.txt", PollFrequency: 10 * time.Second, @@ -92,24 +92,3 @@ func pushFile(t *testing.T, backend *s3mem.Backend, filename string, filecontent ) assert.NoError(t, err) } - -type fakeRegisterer struct { -} - -func (f fakeRegisterer) Register(collector prometheus.Collector) error { - return nil -} - -func (f fakeRegisterer) MustRegister(collector ...prometheus.Collector) {} - -func (f fakeRegisterer) Unregister(collector prometheus.Collector) bool { - return true -} - -func (f fakeRegisterer) RegisterComponent(collector ...prometheus.Collector) error { - return nil -} - -func (f fakeRegisterer) UnregisterComponent() bool { - return true -} diff --git a/pkg/flow/componenttest/componenttest.go b/pkg/flow/componenttest/componenttest.go index ec0d3318fd9d..75b8ebc829b5 100644 --- a/pkg/flow/componenttest/componenttest.go +++ b/pkg/flow/componenttest/componenttest.go @@ -128,7 +128,7 @@ func (c *Controller) buildComponent(dataPath string, args component.Arguments) ( Logger: c.log, DataPath: dataPath, OnStateChange: c.onStateChange, - Registerer: &fakeRegisterer{}, + Registerer: prometheus.NewRegistry(), } inner, err := c.reg.Build(opts, args) @@ -150,24 +150,3 @@ func (c *Controller) Update(args component.Arguments) error { } return c.inner.Update(args) } - -type fakeRegisterer struct { -} - -func (f fakeRegisterer) Register(collector prometheus.Collector) error { - return nil -} - -func (f fakeRegisterer) MustRegister(collector ...prometheus.Collector) {} - -func (f fakeRegisterer) Unregister(collector prometheus.Collector) bool { - return true -} - -func (f fakeRegisterer) RegisterComponent(collector ...prometheus.Collector) error { - return nil -} - -func (f fakeRegisterer) UnregisterComponent() bool { - return true -} diff --git a/pkg/flow/internal/controller/component.go b/pkg/flow/internal/controller/component.go index fc58342f122d..9356379d87a6 100644 --- a/pkg/flow/internal/controller/component.go +++ b/pkg/flow/internal/controller/component.go @@ -72,6 +72,7 @@ type ComponentNode struct { nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called. reg component.Registration managedOpts component.Options + register *wrappedRegisterer exportsType reflect.Type onExportsChange func(cn *ComponentNode) // Informs controller that we changed our exports @@ -145,14 +146,16 @@ func NewComponentNode(globals ComponentGlobals, b *ast.BlockStmt) *ComponentNode } func getManagedOptions(globals ComponentGlobals, cn *ComponentNode) component.Options { + wrapped := newWrappedRegisterer() + cn.register = wrapped return component.Options{ ID: cn.nodeID, Logger: log.With(globals.Logger, "component", cn.nodeID), DataPath: filepath.Join(globals.DataPath, cn.nodeID), OnStateChange: cn.setExports, - Registerer: newRegister(prometheus.WrapRegistererWith(prometheus.Labels{ + Registerer: prometheus.WrapRegistererWith(prometheus.Labels{ "component_id": cn.nodeID, - }, globals.Registerer)), + }, wrapped), } } @@ -277,7 +280,6 @@ func (cn *ComponentNode) Run(ctx context.Context) error { } cn.setRunHealth(component.HealthTypeHealthy, "started component") - defer cn.managedOpts.Registerer.UnregisterComponent() err := cn.managed.Run(ctx) var exitMsg string diff --git a/pkg/flow/internal/controller/metrics.go b/pkg/flow/internal/controller/metrics.go index f6eab386aae3..efb22388fbc4 100644 --- a/pkg/flow/internal/controller/metrics.go +++ b/pkg/flow/internal/controller/metrics.go @@ -58,6 +58,7 @@ func (cc *controllerCollector) Collect(ch chan<- prometheus.Metric) { for _, component := range cc.l.Components() { health := component.CurrentHealth().Health.String() componentsByHealth[health]++ + component.register.Collect(ch) } for health, count := range componentsByHealth { @@ -67,4 +68,7 @@ func (cc *controllerCollector) Collect(ch chan<- prometheus.Metric) { func (cc *controllerCollector) Describe(ch chan<- *prometheus.Desc) { ch <- cc.runningComponentsTotal + for _, component := range cc.l.components { + component.register.Describe(ch) + } } diff --git a/pkg/flow/internal/controller/registerer.go b/pkg/flow/internal/controller/registerer.go deleted file mode 100644 index d3587cb73901..000000000000 --- a/pkg/flow/internal/controller/registerer.go +++ /dev/null @@ -1,95 +0,0 @@ -package controller - -import ( - "fmt" - "sync" - - "github.com/prometheus/client_golang/prometheus" -) - -// register handles a single components collector wrapping a registerer -type register struct { - mut sync.Mutex - internal prometheus.Registerer - internalCollectors map[prometheus.Collector]struct{} -} - -func newRegister(reg prometheus.Registerer) *register { - return ®ister{ - internal: reg, - internalCollectors: make(map[prometheus.Collector]struct{}, 0), - } -} - -// RegisterComponent registers a set of collectors -func (r *register) RegisterComponent(collectors ...prometheus.Collector) error { - r.mut.Lock() - defer r.mut.Unlock() - - if r.internal == nil { - return fmt.Errorf("internal registerer not set") - } - for _, c := range collectors { - err := r.internal.Register(c) - if err != nil { - return err - } - r.internalCollectors[c] = struct{}{} - } - return nil -} - -// UnregisterComponent unregisters all collectors from EITHER Register or RegisterComponent -func (r *register) UnregisterComponent() bool { - r.mut.Lock() - defer r.mut.Unlock() - - if r.internal == nil { - return false - } - - for coll := range r.internalCollectors { - r.internal.Unregister(coll) - delete(r.internalCollectors, coll) - } - return true -} - -// Register registers a single collector -func (r *register) Register(collector prometheus.Collector) error { - r.mut.Lock() - defer r.mut.Unlock() - - if r.internal == nil { - return fmt.Errorf("internal registerer not set") - } - err := r.internal.Register(collector) - if err != nil { - return err - } - r.internalCollectors[collector] = struct{}{} - return nil -} - -// MustRegister calls the internal register and adds the collectors to the internal collection -func (r *register) MustRegister(collector ...prometheus.Collector) { - r.mut.Lock() - defer r.mut.Unlock() - - r.internal.MustRegister(collector...) - for _, c := range collector { - r.internalCollectors[c] = struct{}{} - } -} - -// Unregister calls the internal unregister and removes the collector from the internal collection -func (r *register) Unregister(collector prometheus.Collector) bool { - r.mut.Lock() - defer r.mut.Unlock() - - if r.internal == nil { - return false - } - delete(r.internalCollectors, collector) - return r.internal.Unregister(collector) -} diff --git a/pkg/flow/internal/controller/registerer_test.go b/pkg/flow/internal/controller/registerer_test.go deleted file mode 100644 index 5d0174752fa9..000000000000 --- a/pkg/flow/internal/controller/registerer_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package controller - -import ( - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" -) - -func TestRegister(t *testing.T) { - r := newRegister(prometheus.NewRegistry()) - err := r.RegisterComponent(prometheus.NewCounter(prometheus.CounterOpts{ - Name: "test_counter", - Help: "testing a counter", - })) - require.NoError(t, err) - require.Len(t, r.internalCollectors, 1) - require.True(t, r.UnregisterComponent()) - require.Len(t, r.internalCollectors, 0) -} - -func TestRegisterNormal(t *testing.T) { - r := newRegister(prometheus.NewRegistry()) - err := r.Register(prometheus.NewCounter(prometheus.CounterOpts{ - Name: "test_counter", - Help: "testing a counter", - })) - require.NoError(t, err) - require.Len(t, r.internalCollectors, 1) - require.True(t, r.UnregisterComponent()) - require.Len(t, r.internalCollectors, 0) -} - -func TestRegisterNormalUnregisterNormal(t *testing.T) { - r := newRegister(prometheus.NewRegistry()) - testCounter := prometheus.NewCounter(prometheus.CounterOpts{ - Name: "test_counter", - Help: "testing a counter", - }) - err := r.Register(testCounter) - require.NoError(t, err) - require.Len(t, r.internalCollectors, 1) - success := r.Unregister(testCounter) - require.True(t, success) - require.Len(t, r.internalCollectors, 0) -} diff --git a/pkg/flow/internal/controller/wrap_registerer.go b/pkg/flow/internal/controller/wrap_registerer.go new file mode 100644 index 000000000000..ffc413280611 --- /dev/null +++ b/pkg/flow/internal/controller/wrap_registerer.go @@ -0,0 +1,65 @@ +package controller + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +type wrappedRegisterer struct { + mut sync.Mutex + internalCollectors map[prometheus.Collector]struct{} +} + +// newWrappedRegisterer creates a wrapped register +func newWrappedRegisterer() *wrappedRegisterer { + return &wrappedRegisterer{internalCollectors: make(map[prometheus.Collector]struct{})} +} + +// Describe implements the interface +func (w *wrappedRegisterer) Describe(descs chan<- *prometheus.Desc) { + w.mut.Lock() + defer w.mut.Unlock() + + for c, _ := range w.internalCollectors { + c.Describe(descs) + } +} + +// Collect implements the interface +func (w *wrappedRegisterer) Collect(metrics chan<- prometheus.Metric) { + w.mut.Lock() + defer w.mut.Unlock() + + for c, _ := range w.internalCollectors { + c.Collect(metrics) + } +} + +// Register implements the interface +func (w *wrappedRegisterer) Register(collector prometheus.Collector) error { + w.mut.Lock() + defer w.mut.Unlock() + + w.internalCollectors[collector] = struct{}{} + return nil +} + +// MustRegister implements the interface +func (w *wrappedRegisterer) MustRegister(collector ...prometheus.Collector) { + w.mut.Lock() + defer w.mut.Unlock() + + for _, c := range collector { + w.internalCollectors[c] = struct{}{} + } +} + +// Unregister implements the interface +func (w *wrappedRegisterer) Unregister(collector prometheus.Collector) bool { + w.mut.Lock() + defer w.mut.Unlock() + + delete(w.internalCollectors, collector) + return true +} From e9dbb430ebaf46138a50a9c21ab5815cb80f4fad Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Thu, 4 Aug 2022 08:43:37 -0400 Subject: [PATCH 13/15] Remove dead code --- component/registry.go | 11 ----------- pkg/flow/internal/controller/wrap_registerer.go | 4 ++-- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/component/registry.go b/component/registry.go index 7a90efc87aeb..f1ea06d43d5f 100644 --- a/component/registry.go +++ b/component/registry.go @@ -181,14 +181,3 @@ func Get(name string) (Registration, bool) { r, ok := registered[name] return r, ok } - -// Registerer handles registering any custom metrics for a component and teardown -type Registerer interface { - prometheus.Registerer - - // RegisterComponent registers metrics for a component - RegisterComponent(...prometheus.Collector) error - - // UnregisterComponent removes all metrics for a given component - UnregisterComponent() bool -} diff --git a/pkg/flow/internal/controller/wrap_registerer.go b/pkg/flow/internal/controller/wrap_registerer.go index ffc413280611..995ac68a03c9 100644 --- a/pkg/flow/internal/controller/wrap_registerer.go +++ b/pkg/flow/internal/controller/wrap_registerer.go @@ -21,7 +21,7 @@ func (w *wrappedRegisterer) Describe(descs chan<- *prometheus.Desc) { w.mut.Lock() defer w.mut.Unlock() - for c, _ := range w.internalCollectors { + for c := range w.internalCollectors { c.Describe(descs) } } @@ -31,7 +31,7 @@ func (w *wrappedRegisterer) Collect(metrics chan<- prometheus.Metric) { w.mut.Lock() defer w.mut.Unlock() - for c, _ := range w.internalCollectors { + for c := range w.internalCollectors { c.Collect(metrics) } } From aa71ce8b7cbdca8a3ad247dc2965f911b45eb0aa Mon Sep 17 00:00:00 2001 From: mattdurham Date: Thu, 4 Aug 2022 10:27:04 -0400 Subject: [PATCH 14/15] Update component/registry.go Co-authored-by: Robert Fratto --- component/registry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/registry.go b/component/registry.go index f1ea06d43d5f..81967be3d5a3 100644 --- a/component/registry.go +++ b/component/registry.go @@ -51,7 +51,7 @@ type Options struct { // lifetime. OnStateChange func(e Exports) - //Registerer allows components to add their own metrics. The register will come pre-wrapped with the component id + // Registerer allows components to add their own metrics. The register will come pre-wrapped with the component ID. It is not necessary for components to unregister metrics on shutdown. Registerer prometheus.Registerer } From a83af3d8ce8f02ccaf0b9b60127e1ebbc6d007df Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Thu, 4 Aug 2022 10:30:53 -0400 Subject: [PATCH 15/15] Resolve PR feedback --- pkg/flow/internal/controller/metrics.go | 3 --- pkg/flow/internal/controller/wrap_registerer.go | 10 +++++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/pkg/flow/internal/controller/metrics.go b/pkg/flow/internal/controller/metrics.go index efb22388fbc4..e7132d7a5a8c 100644 --- a/pkg/flow/internal/controller/metrics.go +++ b/pkg/flow/internal/controller/metrics.go @@ -68,7 +68,4 @@ func (cc *controllerCollector) Collect(ch chan<- prometheus.Metric) { func (cc *controllerCollector) Describe(ch chan<- *prometheus.Desc) { ch <- cc.runningComponentsTotal - for _, component := range cc.l.components { - component.register.Describe(ch) - } } diff --git a/pkg/flow/internal/controller/wrap_registerer.go b/pkg/flow/internal/controller/wrap_registerer.go index 995ac68a03c9..beb39be40ec1 100644 --- a/pkg/flow/internal/controller/wrap_registerer.go +++ b/pkg/flow/internal/controller/wrap_registerer.go @@ -7,7 +7,7 @@ import ( ) type wrappedRegisterer struct { - mut sync.Mutex + mut sync.RWMutex internalCollectors map[prometheus.Collector]struct{} } @@ -18,8 +18,8 @@ func newWrappedRegisterer() *wrappedRegisterer { // Describe implements the interface func (w *wrappedRegisterer) Describe(descs chan<- *prometheus.Desc) { - w.mut.Lock() - defer w.mut.Unlock() + w.mut.RLock() + defer w.mut.RUnlock() for c := range w.internalCollectors { c.Describe(descs) @@ -28,8 +28,8 @@ func (w *wrappedRegisterer) Describe(descs chan<- *prometheus.Desc) { // Collect implements the interface func (w *wrappedRegisterer) Collect(metrics chan<- prometheus.Metric) { - w.mut.Lock() - defer w.mut.Unlock() + w.mut.RLock() + defer w.mut.RUnlock() for c := range w.internalCollectors { c.Collect(metrics)