Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/gcppubsub: make metric collection persist beyond construction #35773

Merged
merged 1 commit into from Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -134,6 +134,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fixed a minor code error in the GCS input scheduler where a config value was being used directly instead of the source struct. {pull}35729[35729]
- Improve error reporting and fix IPv6 handling of TCP and UDP metric collection. {pull}35772[35772]
- Fix CEL input JSON marshalling of nested objects. {issue}35763[35763] {pull}35774[35774]
- Fix metric collection in GCPPubSub input. {pull}35773[35773]

*Heartbeat*

Expand Down
8 changes: 4 additions & 4 deletions x-pack/filebeat/input/gcppubsub/input.go
Expand Up @@ -85,6 +85,7 @@ type pubsubInput struct {
workerOnce sync.Once // Guarantees that the worker goroutine is only started once.
workerWg sync.WaitGroup // Waits on pubsub worker goroutine.

id string // id is the ID for metrics registration.
metrics *inputMetrics
}

Expand Down Expand Up @@ -122,9 +123,6 @@ func NewInput(cfg *conf.C, connector channel.Connector, inputContext input.Conte
}
}()

metrics := newInputMetrics(id, nil)
defer metrics.Close()

// If the input ever needs to be made restartable, then context would need
// to be recreated with each restart.
workerCtx, workerCancel := context.WithCancel(inputCtx)
Expand All @@ -135,7 +133,7 @@ func NewInput(cfg *conf.C, connector channel.Connector, inputContext input.Conte
inputCtx: inputCtx,
workerCtx: workerCtx,
workerCancel: workerCancel,
metrics: metrics,
id: id,
}

// Build outlet for events.
Expand Down Expand Up @@ -169,6 +167,7 @@ func NewInput(cfg *conf.C, connector channel.Connector, inputContext input.Conte
// will ever start the pubsub worker.
func (in *pubsubInput) Run() {
in.workerOnce.Do(func() {
in.metrics = newInputMetrics(in.id, nil)
in.workerWg.Add(1)
go func() {
in.log.Info("Pub/Sub input worker has started.")
Expand Down Expand Up @@ -236,6 +235,7 @@ func (in *pubsubInput) run() error {
func (in *pubsubInput) Stop() {
in.workerCancel()
in.workerWg.Wait()
in.metrics.Close()
}

// Wait is an alias for Stop.
Expand Down