Skip to content

Commit

Permalink
x-pack/filebeat/input/gcppubsub: make metric collection persiste beyo…
Browse files Browse the repository at this point in the history
…nd construction

The current code registers metrics and defers closing them in the input
constructor meaning that the metrics are not registered by the time the
input is running. Delay registering the metrics until the input is
started and close the metrics when the input is stopped.
  • Loading branch information
efd6 committed Jun 18, 2023
1 parent e3db1ab commit 4aa2680
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -134,6 +134,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fixed a minor code error in the GCS input scheduler where a config value was being used directly instead of the source struct. {pull}35729[35729]
- Improve error reporting and fix IPv6 handling of TCP and UDP metric collection. {pull}35772[35772]
- Fix CEL input JSON marshalling of nested objects. {issue}35763[35763] {pull}35774[35774]
- Fix metric collection in GCPPubSub input. {pull}35773[35773]

*Heartbeat*

Expand Down
8 changes: 4 additions & 4 deletions x-pack/filebeat/input/gcppubsub/input.go
Expand Up @@ -85,6 +85,7 @@ type pubsubInput struct {
workerOnce sync.Once // Guarantees that the worker goroutine is only started once.
workerWg sync.WaitGroup // Waits on pubsub worker goroutine.

id string // id is the ID for metrics registration.
metrics *inputMetrics
}

Expand Down Expand Up @@ -122,9 +123,6 @@ func NewInput(cfg *conf.C, connector channel.Connector, inputContext input.Conte
}
}()

metrics := newInputMetrics(id, nil)
defer metrics.Close()

// If the input ever needs to be made restartable, then context would need
// to be recreated with each restart.
workerCtx, workerCancel := context.WithCancel(inputCtx)
Expand All @@ -135,7 +133,7 @@ func NewInput(cfg *conf.C, connector channel.Connector, inputContext input.Conte
inputCtx: inputCtx,
workerCtx: workerCtx,
workerCancel: workerCancel,
metrics: metrics,
id: id,
}

// Build outlet for events.
Expand Down Expand Up @@ -169,6 +167,7 @@ func NewInput(cfg *conf.C, connector channel.Connector, inputContext input.Conte
// will ever start the pubsub worker.
func (in *pubsubInput) Run() {
in.workerOnce.Do(func() {
in.metrics = newInputMetrics(in.id, nil)
in.workerWg.Add(1)
go func() {
in.log.Info("Pub/Sub input worker has started.")
Expand Down Expand Up @@ -236,6 +235,7 @@ func (in *pubsubInput) run() error {
func (in *pubsubInput) Stop() {
in.workerCancel()
in.workerWg.Wait()
in.metrics.Close()
}

// Wait is an alias for Stop.
Expand Down

0 comments on commit 4aa2680

Please sign in to comment.