Skip to content

Commit

Permalink
[7.17](backport #34871) Don't safe-wrap processors that don't have `C…
Browse files Browse the repository at this point in the history
…lose` (#34878)

* Don't safe-wrap processors that don't have `Close` (#34871)

This will make the wrapping more efficient and will reduce unnecessary overhead.

(cherry picked from commit fe908d5)

# Conflicts:
#	libbeat/processors/safe_processor_test.go

* Resolve conflicts

* Remove left-over from the conflict resolution

---------

Co-authored-by: Denis <denis.rechkunov@elastic.co>
  • Loading branch information
mergify[bot] and rdner committed Mar 21, 2023
1 parent 838902e commit ece26c3
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 15 deletions.
6 changes: 6 additions & 0 deletions libbeat/processors/safe_processor.go
Expand Up @@ -67,6 +67,12 @@ func SafeWrap(constructor Constructor) Constructor {
if err != nil {
return nil, err
}
// if the processor does not implement `Closer`
// it does not need a wrap
if _, ok := processor.(Closer); !ok {
return processor, nil
}

return &SafeProcessor{
Processor: processor,
}, nil
Expand Down
64 changes: 49 additions & 15 deletions libbeat/processors/safe_processor_test.go
Expand Up @@ -29,33 +29,64 @@ import (
var mockEvent = &beat.Event{}

type mockProcessor struct {
runCount int
runCount int
}

func (p *mockProcessor) Run(event *beat.Event) (*beat.Event, error) {
p.runCount++
return mockEvent, nil
}

func (p *mockProcessor) String() string {
return "mock-processor"
}

type mockCloserProcessor struct {
mockProcessor
closeCount int
}

func newMockConstructor() (Constructor, *mockProcessor) {
p := mockProcessor{}
func (p *mockCloserProcessor) Close() error {
p.closeCount++
return nil
}

func newMockCloserConstructor() (Constructor, *mockCloserProcessor) {
p := mockCloserProcessor{}
constructor := func(config *common.Config) (Processor, error) {
return &p, nil
}
return constructor, &p
}

func (p *mockProcessor) Run(event *beat.Event) (*beat.Event, error) {
p.runCount++
return mockEvent, nil
func mockConstructor(config *common.Config) (Processor, error) {
return &mockProcessor{}, nil
}

func (p *mockProcessor) Close() error {
p.closeCount++
return nil
func mockCloserConstructor(config *common.Config) (Processor, error) {
return &mockCloserProcessor{}, nil
}
func (p *mockProcessor) String() string {
return "mock-processor"

func TestSafeWrap(t *testing.T) {
t.Run("does not wrap a non-closer processor", func(t *testing.T) {
nonCloser := mockConstructor
wrappedNonCloser := SafeWrap(nonCloser)
wp, err := wrappedNonCloser(nil)
require.NoError(t, err)
require.IsType(t, &mockProcessor{}, wp)
})

t.Run("wraps a closer processor", func(t *testing.T) {
closer := mockCloserConstructor
wrappedCloser := SafeWrap(closer)
wcp, err := wrappedCloser(nil)
require.NoError(t, err)
require.IsType(t, &SafeProcessor{}, wcp)
})
}

func TestSafeProcessor(t *testing.T) {
cons, p := newMockConstructor()
cons, p := newMockCloserConstructor()
var (
sp Processor
err error
Expand All @@ -67,10 +98,11 @@ func TestSafeProcessor(t *testing.T) {
})

t.Run("propagates Run to a processor", func(t *testing.T) {
require.Equal(t, 0, p.runCount)

e, err := sp.Run(nil)
require.NoError(t, err)
require.Equal(t, e, mockEvent)

e, err = sp.Run(nil)
require.NoError(t, err)
require.Equal(t, e, mockEvent)
Expand All @@ -79,19 +111,21 @@ func TestSafeProcessor(t *testing.T) {
})

t.Run("propagates Close to a processor only once", func(t *testing.T) {
require.Equal(t, 0, p.closeCount)

err := Close(sp)
require.NoError(t, err)

err = Close(sp)
require.NoError(t, err)

require.Equal(t, 1, p.closeCount)
})

t.Run("does not propagate Run when closed", func(t *testing.T) {
require.Equal(t, 2, p.runCount) // still 2 from the previous test case
e, err := sp.Run(nil)
require.Nil(t, e)
require.ErrorIs(t, err, ErrClosed)
require.Equal(t, 2, p.runCount) // still 2 from the previous test case
require.Equal(t, 2, p.runCount)
})
}

0 comments on commit ece26c3

Please sign in to comment.