diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 277b692e72bd..efe453136ce5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -133,6 +133,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix error when trying to use `include_message` parser {issue}35440[35440] - Fix handling of IPv6 unspecified addresses in TCP input. {issue}35064[35064] {pull}35637[35637] - Fixed a minor code error in the GCS input scheduler where a config value was being used directly instead of the source struct. {pull}35729[35729] +- Fix metric collection in GCPPubSub input. {pull}35773[35773] *Heartbeat* diff --git a/x-pack/filebeat/input/gcppubsub/input.go b/x-pack/filebeat/input/gcppubsub/input.go index 0ab9b907d608..890e8b1ee10f 100644 --- a/x-pack/filebeat/input/gcppubsub/input.go +++ b/x-pack/filebeat/input/gcppubsub/input.go @@ -85,6 +85,7 @@ type pubsubInput struct { workerOnce sync.Once // Guarantees that the worker goroutine is only started once. workerWg sync.WaitGroup // Waits on pubsub worker goroutine. + id string // id is the ID for metrics registration. metrics *inputMetrics } @@ -122,9 +123,6 @@ func NewInput(cfg *conf.C, connector channel.Connector, inputContext input.Conte } }() - metrics := newInputMetrics(id, nil) - defer metrics.Close() - // If the input ever needs to be made restartable, then context would need // to be recreated with each restart. workerCtx, workerCancel := context.WithCancel(inputCtx) @@ -135,7 +133,7 @@ func NewInput(cfg *conf.C, connector channel.Connector, inputContext input.Conte inputCtx: inputCtx, workerCtx: workerCtx, workerCancel: workerCancel, - metrics: metrics, + id: id, } // Build outlet for events. @@ -169,6 +167,7 @@ func NewInput(cfg *conf.C, connector channel.Connector, inputContext input.Conte // will ever start the pubsub worker. func (in *pubsubInput) Run() { in.workerOnce.Do(func() { + in.metrics = newInputMetrics(in.id, nil) in.workerWg.Add(1) go func() { in.log.Info("Pub/Sub input worker has started.") @@ -236,6 +235,7 @@ func (in *pubsubInput) run() error { func (in *pubsubInput) Stop() { in.workerCancel() in.workerWg.Wait() + in.metrics.Close() } // Wait is an alias for Stop.