Skip to content

Commit

Permalink
Stop re-using processors defined in the config (#34761)
Browse files Browse the repository at this point in the history
* Stop re-using processors defined in the config

After introducing the `SafeProcessor` wrapper in
#34647 we started returning
errors when a processor is being used after its `Close` function has
been called.

This led to dropped events and error spam in logs but also confirmed
that the root cause of the problem was not just a race condition on
`Close` but re-used processors somewhere.

After a long investigation such code that's re-using processors was
finally found.

This is the change that removes re-using the processors and
instantiates them on each input restart.

* Fix linter issues

* Add changelog entry

(cherry picked from commit 5cfe62c)

# Conflicts:
#	filebeat/channel/runner.go
#	libbeat/processors/safe_processor.go
  • Loading branch information
rdner authored and mergify[bot] committed Mar 7, 2023
1 parent b850575 commit f535e3e
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 1 deletion.
78 changes: 78 additions & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -40,6 +40,84 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- [Azure Logs] Fix authentication_processing_details parsing in sign-in logs. {issue}34330[34330] {pull}34478[34478]
- Fix dropped events when monitor a beat under the agent and send its `Host info` log entry. {pull}34599[34599]

- Fix namespacing on self-monitoring {pull}32336[32336]
- Fix race condition when stopping runners {pull}32433[32433]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Fix panics when a processor is closed twice {pull}34647[34647]
- Update elastic-agent-system-metrics to v0.4.6 to allow builds on mips platforms. {pull}[]

*Auditbeat*


*Filebeat*
- [Auditbeat System Package] Added support for Apple Silicon chips. {pull}34433[34433]
- [Azure blob storage] Changed logger field name from `container` to `container_name` so that it does not clash
with the ecs field name `container`. {pull}34403[34403]
- [GCS] Added support for more mime types & introduced offset tracking via cursor state. Also added support for
automatic splitting at root level, if root level element is an array. {pull}34155[34155]
- [httpsjon] Improved error handling during pagination with chaining & split processor {pull}34127[34127]
- [Azure blob storage] Added support for more mime types & introduced offset tracking via cursor state. {pull}33981[33981]
- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568]
- Fix handling of error in states in direct aws-s3 listing input {issue}33513[33513] {pull}33722[33722]
- Fix `httpjson` input page number initialization and documentation. {pull}33400[33400]
- Add handling of AAA operations for Cisco ASA module. {issue}32257[32257] {pull}32789[32789]
- Fix gc.log always shipped even if gc fileset is disabled {issue}30995[30995]
- Fix handling of empty array in httpjson input. {pull}32001[32001]
- Fix reporting of `filebeat.events.active` in log events such that the current value is always reported instead of the difference from the last value. {pull}33597[33597]
- Fix splitting array of strings/arrays in httpjson input {issue}30345[30345] {pull}33609[33609]
- Fix Google workspace pagination and document ID generation. {pull}33666[33666]
- Fix PANW handling of messages with event.original already set. {issue}33829[33829] {pull}33830[33830]
- Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654]
- Fix 'requires pointer' error while getting cursor metadata. {pull}33956[33956]
- Fix input cancellation handling when HTTP client does not support contexts. {issue}33962[33962] {pull}33968[33968]
- Update mito CEL extension library to v0.0.0-20221207004749-2f0f2875e464 {pull}33974[33974]
- Fix CEL result deserialisation when evaluation fails. {issue}33992[33992] {pull}33996[33996]
- Fix handling of non-200/non-429 status codes. {issue}33999[33999] {pull}34002[34002]
- [azure-eventhub input] Switch the run EPH run mode to non-blocking {pull}34075[34075]
- [google_workspace] Fix pagination and cursor value update. {pull}34274[34274]
- Fix handling of quoted values in auditd module. {issue}22587[22587] {pull}34069[34069]
- Fixing system tests not returning expected content encoding for azure blob storage input. {pull}34412[34412]
- [Azure Logs] Fix authentication_processing_details parsing in sign-in logs. {issue}34330[34330] {pull}34478[34478]
- Prevent Elasticsearch from spewing log warnings about redundant wildcard when setting up ingest pipelines. {issue}34249[34249] {pull}34550[34550]
- Gracefully handle Windows event channel not found errors in winlog input. {issue}30201[30201] {pull}34605[34605]
- Fix the issue of `cometd` input worker getting closed in case of a network connection issue and an EOF error. {issue}34326[34326] {pull}34327[34327]
- Fix errors and panics due to re-used processors {pull}34761[34761]

*Heartbeat*

- Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702]
- Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723]
- Fix integration hashing to prevent reloading all when updated. {pull}34697[34697]
- Fix release of job limit semaphore when context is cancelled. {pull}34697[34697]
- Fix bug where states.duration_ms was incorrect type. {pull}33563[33563]
- Fix handling of long UDP messages in UDP input. {issue}33836[33836] {pull}33837[33837]
- Fix browser monitor summary reporting as up when monitor is down. {issue}33374[33374] {pull}33819[33819]
- Fix beat capabilities on Docker image. {pull}33584[33584]
- Fix serialization of state duration to avoid scientific notation. {pull}34280[34280]
- Enable nodejs engine strict validation when bundling synthetics. {pull}34470[34470]
with the ecs field name `container`. {pull}34403[34403]
automatic splitting at root level, if root level element is an array. {pull}34155[34155]

*Heartbeat*


*Heartbeat*


*Auditbeat*


*Filebeat*

- Allow the `misp` fileset in the Filebeat `threatintel` module to ignore CIDR ranges for an IP field. {issue}29949[29949] {pull}34195[34195]
- Remove incorrect reference to CEL ext extensions package. {issue}34610[34610] {pull}34620[34620]

*Auditbeat*


*Filebeat*


*Heartbeat*

- Enable nodejs engine strict validation when bundling synthetics. {pull}34471[34471]
Expand Down
26 changes: 25 additions & 1 deletion filebeat/channel/runner.go
Expand Up @@ -25,6 +25,12 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
<<<<<<< HEAD
=======

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
>>>>>>> 5cfe62cb2f (Stop re-using processors defined in the config (#34761))
)

type onCreateFactory struct {
Expand Down Expand Up @@ -126,6 +132,7 @@ func newCommonConfigEditor(
return nil, err
}

<<<<<<< HEAD
var indexProcessor processors.Processor
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
Expand All @@ -142,12 +149,29 @@ func newCommonConfigEditor(
return nil, err
}

=======
>>>>>>> 5cfe62cb2f (Stop re-using processors defined in the config (#34761))
serviceType := config.ServiceType
if serviceType == "" {
serviceType = config.Module
}

return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) {
var indexProcessor processors.Processor
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
timestampFormat, err := fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return clientCfg, err
}
indexProcessor = add_formatted_index.New(timestampFormat)
}

userProcessors, err := processors.New(config.Processors)
if err != nil {
return clientCfg, err
}

meta := clientCfg.Processing.Meta.Clone()
fields := clientCfg.Processing.Fields.Clone()

Expand Down Expand Up @@ -191,6 +215,6 @@ func newCommonConfigEditor(

func setOptional(to common.MapStr, key string, value string) {
if value != "" {
to.Put(key, value)
_, _ = to.Put(key, value)
}
}
6 changes: 6 additions & 0 deletions libbeat/processors/safe_processor.go
Expand Up @@ -22,7 +22,12 @@ import (
"sync/atomic"

"github.com/elastic/beats/v7/libbeat/beat"
<<<<<<< HEAD
"github.com/elastic/beats/v7/libbeat/common"
=======
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
>>>>>>> 5cfe62cb2f (Stop re-using processors defined in the config (#34761))
)

var ErrClosed = errors.New("attempt to use a closed processor")
Expand All @@ -45,6 +50,7 @@ func (p *SafeProcessor) Close() (err error) {
if atomic.CompareAndSwapUint32(&p.closed, 0, 1) {
return Close(p.Processor)
}
logp.L().Warnf("tried to close already closed %q processor", p.Processor.String())
return nil
}

Expand Down

0 comments on commit f535e3e

Please sign in to comment.