diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6aac8b8ac66..5340390fd0e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -30,6 +30,24 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d ==== Bugfixes *Affecting all Beats* +- Fix Windows service install/uninstall when Win32_Service returns error, add logic to wait until the Windows Service is stopped before proceeding. {pull}33322[33322] +- Support for multiline zookeeper logs {issue}2496[2496] +- Allow `clock_nanosleep` in the default seccomp profiles for amd64 and 386. Newer versions of glibc (e.g. 2.31) require it. {issue}33792[33792] +- Disable lockfile when running under elastic-agent. {pull}33988[33988] +- Fix lockfile logic, retry locking {pull}34194[34194] +- Add checks to ensure reloading of units if the configuration actually changed. {pull}34346[34346] +- Fix namespacing on self-monitoring {pull}32336[32336] +- Fix race condition when stopping runners {pull}32433[32433] +- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491] +- Log errors from the Elastic Agent V2 client errors channel. Avoids blocking when error occurs communicating with the Elastic Agent. {pull}34392[34392] +- Only log publish event messages in trace log level under elastic-agent. {pull}34391[34391] +- Fix issue where updating a single Elastic Agent configuration unit results in other units being turned off. {pull}34504[34504] +- Fix dropped events when monitor a beat under the agent and send its `Host info` log entry. {pull}34599[34599] + +- Fix namespacing on self-monitoring {pull}32336[32336] +- Fix race condition when stopping runners {pull}32433[32433] +- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491] +- Fix panics when a processor is closed twice {pull}34647[34647] *Auditbeat* diff --git a/libbeat/processors/registry.go b/libbeat/processors/registry.go index 2e771bc331d..b17e9268ea7 100644 --- a/libbeat/processors/registry.go +++ b/libbeat/processors/registry.go @@ -54,7 +54,7 @@ var registry = NewNamespace() func RegisterPlugin(name string, constructor Constructor) { logp.L().Named(logName).Debugf("Register plugin %s", name) - err := registry.Register(name, constructor) + err := registry.Register(name, SafeWrap(constructor)) if err != nil { panic(err) } diff --git a/libbeat/processors/safe_processor.go b/libbeat/processors/safe_processor.go new file mode 100644 index 00000000000..0204e9c971a --- /dev/null +++ b/libbeat/processors/safe_processor.go @@ -0,0 +1,72 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processors + +import ( + "errors" + "sync/atomic" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/config" +) + +var ErrClosed = errors.New("attempt to use a closed processor") + +type SafeProcessor struct { + Processor + closed uint32 +} + +// Run allows to run processor only when `Close` was not called prior +func (p *SafeProcessor) Run(event *beat.Event) (*beat.Event, error) { + if atomic.LoadUint32(&p.closed) == 1 { + return nil, ErrClosed + } + return p.Processor.Run(event) +} + +// Close makes sure the underlying `Close` function is called only once. +func (p *SafeProcessor) Close() (err error) { + if atomic.CompareAndSwapUint32(&p.closed, 0, 1) { + return Close(p.Processor) + } + return nil +} + +// SafeWrap makes sure that the processor handles all the required edge-cases. +// +// Each processor might end up in multiple processor groups. +// Every group has its own `Close` that calls `Close` on each +// processor of that group which leads to multiple `Close` calls +// on the same processor. +// +// If `SafeWrap` is not used, the processor must handle multiple +// `Close` calls by using `sync.Once` in its `Close` function. +// We make it easer for processor developers and take care of it +// in the processor registry instead. +func SafeWrap(constructor Constructor) Constructor { + return func(config *config.C) (Processor, error) { + processor, err := constructor(config) + if err != nil { + return nil, err + } + return &SafeProcessor{ + Processor: processor, + }, nil + } +} diff --git a/libbeat/processors/safe_processor_test.go b/libbeat/processors/safe_processor_test.go new file mode 100644 index 00000000000..81ef8219e83 --- /dev/null +++ b/libbeat/processors/safe_processor_test.go @@ -0,0 +1,97 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processors + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/config" +) + +var mockEvent = &beat.Event{} + +type mockProcessor struct { + runCount int + closeCount int +} + +func newMockConstructor() (Constructor, *mockProcessor) { + p := mockProcessor{} + constructor := func(config *config.C) (Processor, error) { + return &p, nil + } + return constructor, &p +} + +func (p *mockProcessor) Run(event *beat.Event) (*beat.Event, error) { + p.runCount++ + return mockEvent, nil +} + +func (p *mockProcessor) Close() error { + p.closeCount++ + return nil +} +func (p *mockProcessor) String() string { + return "mock-processor" +} + +func TestSafeProcessor(t *testing.T) { + cons, p := newMockConstructor() + var ( + sp Processor + err error + ) + t.Run("creates a wrapped processor", func(t *testing.T) { + sw := SafeWrap(cons) + sp, err = sw(nil) + require.NoError(t, err) + }) + + t.Run("propagates Run to a processor", func(t *testing.T) { + e, err := sp.Run(nil) + require.NoError(t, err) + require.Equal(t, e, mockEvent) + + e, err = sp.Run(nil) + require.NoError(t, err) + require.Equal(t, e, mockEvent) + + require.Equal(t, 2, p.runCount) + }) + + t.Run("propagates Close to a processor only once", func(t *testing.T) { + err := Close(sp) + require.NoError(t, err) + + err = Close(sp) + require.NoError(t, err) + + require.Equal(t, 1, p.closeCount) + }) + + t.Run("does not propagate Run when closed", func(t *testing.T) { + e, err := sp.Run(nil) + require.Nil(t, e) + require.ErrorIs(t, err, ErrClosed) + require.Equal(t, 2, p.runCount) // still 2 from the previous test case + }) +}