diff --git a/cmd/agent/flow.go b/cmd/agent/flow.go index 89f673af1f11..ef88b451bef1 100644 --- a/cmd/agent/flow.go +++ b/cmd/agent/flow.go @@ -11,6 +11,8 @@ import ( "os/signal" "sync" + "go.uber.org/atomic" + "github.com/fatih/color" "github.com/go-kit/log/level" "github.com/gorilla/mux" @@ -43,6 +45,7 @@ func runFlow() error { httpListenAddr = "127.0.0.1:12345" configFile string storagePath = "data-agent/" + ready = atomic.NewBool(true) ) fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError) @@ -111,6 +114,15 @@ func runFlow() error { } r := mux.NewRouter() + r.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) { + if ready.Load() { + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "Agent is Ready.\n") + } else { + w.WriteHeader(http.StatusServiceUnavailable) + fmt.Fprint(w, "Config failed to load.\n") + } + }) r.Handle("/metrics", promhttp.Handler()) r.Handle("/debug/config", f.ConfigHandler()) r.Handle("/debug/graph", f.GraphHandler()) @@ -119,6 +131,7 @@ func runFlow() error { r.HandleFunc("/-/reload", func(w http.ResponseWriter, _ *http.Request) { err := reload() + ready.Store(err == nil) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return diff --git a/component/local/file/file.go b/component/local/file/file.go index 5d47831678d0..66be129c16a3 100644 --- a/component/local/file/file.go +++ b/component/local/file/file.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/go-kit/log/level" "github.com/grafana/agent/component" "github.com/grafana/agent/pkg/flow/rivertypes" @@ -84,7 +86,8 @@ type Component struct { // reloadCh is a buffered channel which is written to when the watched file // should be reloaded by the component. - reloadCh chan struct{} + reloadCh chan struct{} + lastAccessed prometheus.Gauge } var ( @@ -98,11 +101,19 @@ func New(o component.Options, args Arguments) (*Component, error) { opts: o, reloadCh: make(chan struct{}, 1), + lastAccessed: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "agent_local_file_timestamp_last_accessed_unix_seconds", + Help: "The last successful access in unix seconds", + }), } + err := o.Registerer.Register(c.lastAccessed) + if err != nil { + return nil, err + } // Perform an update which will immediately set our exports to the initial // contents of the file. - if err := c.Update(args); err != nil { + if err = c.Update(args); err != nil { return nil, err } return c, nil @@ -162,6 +173,7 @@ func (c *Component) readFile() error { return err } c.latestContent = string(bb) + c.lastAccessed.SetToCurrentTime() c.opts.OnStateChange(Exports{ Content: rivertypes.OptionalSecret{ diff --git a/component/metrics/mutate/mutate.go b/component/metrics/mutate/mutate.go index 4ea9fc9aad97..614d6e7177c6 100644 --- a/component/metrics/mutate/mutate.go +++ b/component/metrics/mutate/mutate.go @@ -8,6 +8,7 @@ import ( fa "github.com/grafana/agent/component/common/appendable" flow_relabel "github.com/grafana/agent/component/common/relabel" "github.com/grafana/agent/component/metrics" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" ) @@ -17,7 +18,6 @@ func init() { Name: "metrics.mutate", Args: Arguments{}, Exports: Exports{}, - Build: func(opts component.Options, args component.Arguments) (component.Component, error) { return New(opts, args.(Arguments)) }, @@ -44,8 +44,9 @@ type Component struct { opts component.Options mrc []*relabel.Config - appendable *fa.FlowAppendable - receiver *metrics.Receiver + appendable *fa.FlowAppendable + receiver *metrics.Receiver + metricsProcessed prometheus.Counter } var ( @@ -57,9 +58,17 @@ func New(o component.Options, args Arguments) (*Component, error) { c := &Component{opts: o} c.appendable = fa.NewFlowAppendable(args.ForwardTo...) c.receiver = &metrics.Receiver{Receive: c.Receive} + c.metricsProcessed = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "agent_metrics_mutate_metrics_processed", + Help: "Total number of metrics processed", + }) + err := o.Registerer.Register(c.metricsProcessed) + if err != nil { + return nil, err + } // Call to Update() to set the relabelling rules once at the start. - if err := c.Update(args); err != nil { + if err = c.Update(args); err != nil { return nil, err } @@ -69,6 +78,7 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { <-ctx.Done() + c.opts.Registerer.Unregister(c.metricsProcessed) return nil } diff --git a/component/metrics/remotewrite/remote_write.go b/component/metrics/remotewrite/remote_write.go index 76253c2aec72..8deeaf6262cf 100644 --- a/component/metrics/remotewrite/remote_write.go +++ b/component/metrics/remotewrite/remote_write.go @@ -20,7 +20,6 @@ import ( "github.com/grafana/agent/pkg/build" "github.com/grafana/agent/pkg/flow/rivertypes" "github.com/grafana/agent/pkg/metrics/wal" - "github.com/prometheus/client_golang/prometheus" common "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" @@ -85,7 +84,6 @@ type BasicAuthConfig struct { type Component struct { log log.Logger opts component.Options - reg *metrics.CollectorRegistry walStore *wal.Storage remoteStore *remote.Storage @@ -99,23 +97,19 @@ type Component struct { // NewComponent creates a new metrics_forwarder component. func NewComponent(o component.Options, c RemoteConfig) (*Component, error) { - reg := metrics.NewCollectorRegistry() - walLogger := log.With(o.Logger, "subcomponent", "wal") dataPath := filepath.Join(o.DataPath, "wal", o.ID) - walStorage, err := wal.NewStorage(walLogger, reg, dataPath) + walStorage, err := wal.NewStorage(walLogger, o.Registerer, dataPath) if err != nil { return nil, err } remoteLogger := log.With(o.Logger, "subcomponent", "rw") - remoteStore := remote.NewStorage(remoteLogger, reg, startTime, dataPath, remoteFlushDeadline, nil) + remoteStore := remote.NewStorage(remoteLogger, o.Registerer, startTime, dataPath, remoteFlushDeadline, nil) res := &Component{ - log: o.Logger, - opts: o, - reg: reg, - + log: o.Logger, + opts: o, walStore: walStorage, remoteStore: remoteStore, storage: storage.NewFanout(o.Logger, walStorage, remoteStore), @@ -278,13 +272,3 @@ func (c *Component) Config() RemoteConfig { defer c.mut.RUnlock() return c.cfg } - -// Describe implements prometheus.Collector. -func (c *Component) Describe(ch chan<- *prometheus.Desc) { - c.reg.Describe(ch) -} - -// Collect implements prometheus.Collector. -func (c *Component) Collect(ch chan<- prometheus.Metric) { - c.reg.Collect(ch) -} diff --git a/component/metrics/scrape/scrape_test.go b/component/metrics/scrape/scrape_test.go index fef28d747e94..97dd360cdceb 100644 --- a/component/metrics/scrape/scrape_test.go +++ b/component/metrics/scrape/scrape_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/agent/component" "github.com/grafana/agent/component/metrics" "github.com/grafana/agent/pkg/flow/logging" @@ -16,7 +18,10 @@ import ( func TestForwardingToAppendable(t *testing.T) { l, err := logging.New(os.Stderr, logging.DefaultOptions) require.NoError(t, err) - opts := component.Options{Logger: l} + opts := component.Options{ + Logger: l, + Registerer: prometheus.NewRegistry(), + } nilReceivers := []*metrics.Receiver{nil, nil} diff --git a/component/metrics/util.go b/component/metrics/util.go deleted file mode 100644 index 124ca61d267c..000000000000 --- a/component/metrics/util.go +++ /dev/null @@ -1,91 +0,0 @@ -package metrics - -import ( - "fmt" - "sync" - - "github.com/prometheus/client_golang/prometheus" -) - -// CollectorRegistry is both a prometheus.Registerer and prometheus.Collector: -// it encapsulates a set of collectors. -// -// Unlike a real Prometheus Registry, CollectorRegistry is unchecked and does -// not validate that metrics are unique at registration time. -type CollectorRegistry struct { - mut sync.RWMutex - cs []prometheus.Collector -} - -var _ prometheus.Registerer = (*CollectorRegistry)(nil) -var _ prometheus.Collector = (*CollectorRegistry)(nil) - -// NewCollectorRegistry returns a new CollectorRegistry. -func NewCollectorRegistry() *CollectorRegistry { - return &CollectorRegistry{} -} - -// Register implements prometheus.Collector. Unlike a real Prometheus registry, -// Register does not ensure that c provides unique metrics. -func (cr *CollectorRegistry) Register(c prometheus.Collector) error { - cr.mut.Lock() - defer cr.mut.Unlock() - - for _, exist := range cr.cs { - if exist == c { - return fmt.Errorf("collector already registered") - } - } - - cr.cs = append(cr.cs, c) - return nil -} - -// MustRegister implements prometheus.Collector. -func (cr *CollectorRegistry) MustRegister(cs ...prometheus.Collector) { - for _, c := range cs { - if err := cr.Register(c); err != nil { - panic(err) - } - } -} - -// Unregister implements prometheus.Collector. -func (cr *CollectorRegistry) Unregister(c prometheus.Collector) bool { - cr.mut.Lock() - defer cr.mut.Unlock() - - rem := make([]prometheus.Collector, 0, len(cr.cs)) - - var removed bool - for _, exist := range cr.cs { - if c == exist { - removed = true - continue - } - rem = append(rem, exist) - } - - cr.cs = rem - return removed -} - -// Describe implements prometheus.Collector. -func (cr *CollectorRegistry) Describe(ch chan<- *prometheus.Desc) { - cr.mut.RLock() - defer cr.mut.RUnlock() - - for _, c := range cr.cs { - c.Describe(ch) - } -} - -// Collect implements prometheus.Collector. -func (cr *CollectorRegistry) Collect(ch chan<- prometheus.Metric) { - cr.mut.RLock() - defer cr.mut.RUnlock() - - for _, c := range cr.cs { - c.Collect(ch) - } -} diff --git a/component/registry.go b/component/registry.go index 57ab0920f2a5..81967be3d5a3 100644 --- a/component/registry.go +++ b/component/registry.go @@ -7,6 +7,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/regexp" + "github.com/prometheus/client_golang/prometheus" ) // The parsedName of a component is the parts of its name ("remote.http") split @@ -49,6 +50,9 @@ type Options struct { // by the component; a component must use the same Exports type for its // lifetime. OnStateChange func(e Exports) + + // Registerer allows components to add their own metrics. The register will come pre-wrapped with the component ID. It is not necessary for components to unregister metrics on shutdown. + Registerer prometheus.Registerer } // Registration describes a single component. diff --git a/component/remote/s3/s3.go b/component/remote/s3/s3.go index 342e43820b53..d0009d5d9595 100644 --- a/component/remote/s3/s3.go +++ b/component/remote/s3/s3.go @@ -8,12 +8,12 @@ import ( "sync" "time" - "github.com/grafana/agent/pkg/flow/rivertypes" - "github.com/aws/aws-sdk-go-v2/aws" aws_config "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/grafana/agent/component" + "github.com/grafana/agent/pkg/flow/rivertypes" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/net/context" ) @@ -36,8 +36,10 @@ type S3 struct { health component.Health content string - watcher *watcher - updateChan chan result + watcher *watcher + updateChan chan result + s3Errors prometheus.Counter + lastAccessed prometheus.Gauge } var ( @@ -62,11 +64,28 @@ func New(o component.Options, args Arguments) (*S3, error) { args: args, health: component.Health{}, updateChan: make(chan result), + s3Errors: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "agent_remote_s3_errors_total", + Help: "The number of errors while accessing s3", + }), + lastAccessed: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "agent_remote_s3_timestamp_last_accessed_unix_seconds", + Help: "The last successful access in unix seconds", + }), } w := newWatcher(bucket, file, s.updateChan, args.PollFrequency, s3Client) s.watcher = w + err = o.Registerer.Register(s.s3Errors) + if err != nil { + return nil, err + } + err = o.Registerer.Register(s.lastAccessed) + if err != nil { + return nil, err + } + content, err := w.downloadSynchronously() s.handleContentPolling(content, err) return s, nil @@ -77,6 +96,7 @@ func (s *S3) Run(ctx context.Context) error { go s.handleContentUpdate(ctx) go s.watcher.run(ctx) <-ctx.Done() + return nil } @@ -185,10 +205,12 @@ func (s *S3) handleContentPolling(newContent string, err error) { Value: newContent, }, }) + s.lastAccessed.SetToCurrentTime() s.content = newContent s.health.Health = component.HealthTypeHealthy s.health.Message = "s3 file updated" } else { + s.s3Errors.Inc() s.health.Health = component.HealthTypeUnhealthy s.health.Message = err.Error() } diff --git a/component/remote/s3/s3_test.go b/component/remote/s3/s3_test.go index b50b0f861049..869b0c116f2e 100644 --- a/component/remote/s3/s3_test.go +++ b/component/remote/s3/s3_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/agent/component" "github.com/johannesboyne/gofakes3" "github.com/johannesboyne/gofakes3/backend/s3mem" @@ -22,6 +24,7 @@ func TestCorrectBucket(t *testing.T) { o := component.Options{ ID: "t1", OnStateChange: func(_ component.Exports) {}, + Registerer: prometheus.NewRegistry(), } s3File, err := New(o, Arguments{ @@ -44,6 +47,7 @@ func TestWatchingFile(t *testing.T) { defer mut.Unlock() output = e.(Exports).Content.Value }, + Registerer: prometheus.NewRegistry(), }, Arguments{ Path: "s3://mybucket/test.txt", PollFrequency: 10 * time.Second, diff --git a/pkg/flow/componenttest/componenttest.go b/pkg/flow/componenttest/componenttest.go index 4f1441cf9d84..75b8ebc829b5 100644 --- a/pkg/flow/componenttest/componenttest.go +++ b/pkg/flow/componenttest/componenttest.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/go-kit/log" "github.com/grafana/agent/component" ) @@ -126,6 +128,7 @@ func (c *Controller) buildComponent(dataPath string, args component.Arguments) ( Logger: c.log, DataPath: dataPath, OnStateChange: c.onStateChange, + Registerer: prometheus.NewRegistry(), } inner, err := c.reg.Build(opts, args) diff --git a/pkg/flow/flow.go b/pkg/flow/flow.go index 8f9385821905..fc0398a77803 100644 --- a/pkg/flow/flow.go +++ b/pkg/flow/flow.go @@ -67,6 +67,7 @@ type Options struct { // subdirectories for component-specific data. DataPath string + // Reg is the prometheus register to use Reg prometheus.Registerer } @@ -117,7 +118,8 @@ func newFlow(o Options) (*Flow, context.Context) { // Changed components should be queued for reevaluation. queue.Enqueue(cn) }, - }, o.Reg) + Registerer: o.Reg, + }) ) return &Flow{ diff --git a/pkg/flow/internal/controller/component.go b/pkg/flow/internal/controller/component.go index 332aa90a3e5b..9356379d87a6 100644 --- a/pkg/flow/internal/controller/component.go +++ b/pkg/flow/internal/controller/component.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/agent/pkg/flow/internal/dag" "github.com/grafana/agent/pkg/river/ast" "github.com/grafana/agent/pkg/river/vm" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" ) @@ -58,6 +59,7 @@ type ComponentGlobals struct { Logger log.Logger // Logger shared between all managed components. DataPath string // Shared directory where component data may be stored OnExportsChange func(cn *ComponentNode) // Invoked when the managed component updated its exports + Registerer prometheus.Registerer // Registerer for serving agent and component metrics } // ComponentNode is a controller node which manages a user-defined component. @@ -70,6 +72,7 @@ type ComponentNode struct { nodeID string // Cached from id.String() to avoid allocating new strings every time NodeID is called. reg component.Registration managedOpts component.Options + register *wrappedRegisterer exportsType reflect.Type onExportsChange func(cn *ComponentNode) // Informs controller that we changed our exports @@ -143,11 +146,16 @@ func NewComponentNode(globals ComponentGlobals, b *ast.BlockStmt) *ComponentNode } func getManagedOptions(globals ComponentGlobals, cn *ComponentNode) component.Options { + wrapped := newWrappedRegisterer() + cn.register = wrapped return component.Options{ ID: cn.nodeID, Logger: log.With(globals.Logger, "component", cn.nodeID), DataPath: filepath.Join(globals.DataPath, cn.nodeID), OnStateChange: cn.setExports, + Registerer: prometheus.WrapRegistererWith(prometheus.Labels{ + "component_id": cn.nodeID, + }, wrapped), } } diff --git a/pkg/flow/internal/controller/loader.go b/pkg/flow/internal/controller/loader.go index 5f9e0adae773..9014a0aa5b8a 100644 --- a/pkg/flow/internal/controller/loader.go +++ b/pkg/flow/internal/controller/loader.go @@ -16,7 +16,6 @@ import ( "github.com/grafana/agent/pkg/river/token/builder" "github.com/grafana/agent/pkg/river/vm" "github.com/hashicorp/go-multierror" - "github.com/prometheus/client_golang/prometheus" _ "github.com/grafana/agent/pkg/flow/internal/testcomponents" // Include test components ) @@ -36,18 +35,18 @@ type Loader struct { // NewLoader creates a new Loader. Components built by the Loader will be built // with co for their options. -func NewLoader(globals ComponentGlobals, reg prometheus.Registerer) *Loader { +func NewLoader(globals ComponentGlobals) *Loader { l := &Loader{ log: globals.Logger, globals: globals, graph: &dag.Graph{}, cache: newValueCache(), - cm: newControllerMetrics(reg), + cm: newControllerMetrics(globals.Registerer), } cc := newControllerCollector(l) - if reg != nil { - reg.MustRegister(cc) + if globals.Registerer != nil { + globals.Registerer.MustRegister(cc) } return l } diff --git a/pkg/flow/internal/controller/loader_test.go b/pkg/flow/internal/controller/loader_test.go index 15c3fc82bdd0..e0c67386cf06 100644 --- a/pkg/flow/internal/controller/loader_test.go +++ b/pkg/flow/internal/controller/loader_test.go @@ -47,14 +47,17 @@ func TestLoader(t *testing.T) { }, } - globals := controller.ComponentGlobals{ - Logger: log.NewNopLogger(), - DataPath: t.TempDir(), - OnExportsChange: func(cn *controller.ComponentNode) { /* no-op */ }, + newGlobals := func() controller.ComponentGlobals { + return controller.ComponentGlobals{ + Logger: log.NewNopLogger(), + DataPath: t.TempDir(), + OnExportsChange: func(cn *controller.ComponentNode) { /* no-op */ }, + Registerer: prometheus.NewRegistry(), + } } t.Run("New Graph", func(t *testing.T) { - l := controller.NewLoader(globals, prometheus.DefaultRegisterer) + l := controller.NewLoader(newGlobals()) diags := applyFromContent(t, l, []byte(testFile)) require.NoError(t, diags.ErrorOrNil()) requireGraph(t, l.Graph(), testGraphDefinition) @@ -72,7 +75,7 @@ func TestLoader(t *testing.T) { frequency = "1m" } ` - l := controller.NewLoader(globals, nil) + l := controller.NewLoader(newGlobals()) diags := applyFromContent(t, l, []byte(startFile)) origGraph := l.Graph() require.NoError(t, diags.ErrorOrNil()) @@ -91,7 +94,7 @@ func TestLoader(t *testing.T) { doesnotexist "bad_component" { } ` - l := controller.NewLoader(globals, nil) + l := controller.NewLoader(newGlobals()) diags := applyFromContent(t, l, []byte(invalidFile)) require.ErrorContains(t, diags.ErrorOrNil(), `Unrecognized component name "doesnotexist`) }) @@ -110,7 +113,7 @@ func TestLoader(t *testing.T) { input = testcomponents.tick.doesnotexist.tick_time } ` - l := controller.NewLoader(globals, nil) + l := controller.NewLoader(newGlobals()) diags := applyFromContent(t, l, []byte(invalidFile)) require.Error(t, diags.ErrorOrNil()) @@ -144,7 +147,7 @@ func TestLoader(t *testing.T) { input = testcomponents.passthrough.ticker.output } ` - l := controller.NewLoader(globals, nil) + l := controller.NewLoader(newGlobals()) diags := applyFromContent(t, l, []byte(invalidFile)) require.Error(t, diags.ErrorOrNil()) }) diff --git a/pkg/flow/internal/controller/metrics.go b/pkg/flow/internal/controller/metrics.go index f6eab386aae3..e7132d7a5a8c 100644 --- a/pkg/flow/internal/controller/metrics.go +++ b/pkg/flow/internal/controller/metrics.go @@ -58,6 +58,7 @@ func (cc *controllerCollector) Collect(ch chan<- prometheus.Metric) { for _, component := range cc.l.Components() { health := component.CurrentHealth().Health.String() componentsByHealth[health]++ + component.register.Collect(ch) } for health, count := range componentsByHealth { diff --git a/pkg/flow/internal/controller/wrap_registerer.go b/pkg/flow/internal/controller/wrap_registerer.go new file mode 100644 index 000000000000..beb39be40ec1 --- /dev/null +++ b/pkg/flow/internal/controller/wrap_registerer.go @@ -0,0 +1,65 @@ +package controller + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +type wrappedRegisterer struct { + mut sync.RWMutex + internalCollectors map[prometheus.Collector]struct{} +} + +// newWrappedRegisterer creates a wrapped register +func newWrappedRegisterer() *wrappedRegisterer { + return &wrappedRegisterer{internalCollectors: make(map[prometheus.Collector]struct{})} +} + +// Describe implements the interface +func (w *wrappedRegisterer) Describe(descs chan<- *prometheus.Desc) { + w.mut.RLock() + defer w.mut.RUnlock() + + for c := range w.internalCollectors { + c.Describe(descs) + } +} + +// Collect implements the interface +func (w *wrappedRegisterer) Collect(metrics chan<- prometheus.Metric) { + w.mut.RLock() + defer w.mut.RUnlock() + + for c := range w.internalCollectors { + c.Collect(metrics) + } +} + +// Register implements the interface +func (w *wrappedRegisterer) Register(collector prometheus.Collector) error { + w.mut.Lock() + defer w.mut.Unlock() + + w.internalCollectors[collector] = struct{}{} + return nil +} + +// MustRegister implements the interface +func (w *wrappedRegisterer) MustRegister(collector ...prometheus.Collector) { + w.mut.Lock() + defer w.mut.Unlock() + + for _, c := range collector { + w.internalCollectors[c] = struct{}{} + } +} + +// Unregister implements the interface +func (w *wrappedRegisterer) Unregister(collector prometheus.Collector) bool { + w.mut.Lock() + defer w.mut.Unlock() + + delete(w.internalCollectors, collector) + return true +}