Skip to content

Commit

Permalink
Don't safe-wrap processors that don't have Close (#34871)
Browse files Browse the repository at this point in the history
This will make the wrapping more efficient and will reduce unnecessary overhead.

(cherry picked from commit fe908d5)

# Conflicts:
#	libbeat/processors/safe_processor_test.go
  • Loading branch information
rdner authored and mergify[bot] committed Mar 21, 2023
1 parent b7a96ea commit ab2fa13
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 7 deletions.
6 changes: 6 additions & 0 deletions libbeat/processors/safe_processor.go
Expand Up @@ -67,6 +67,12 @@ func SafeWrap(constructor Constructor) Constructor {
if err != nil {
return nil, err
}
// if the processor does not implement `Closer`
// it does not need a wrap
if _, ok := processor.(Closer); !ok {
return processor, nil
}

return &SafeProcessor{
Processor: processor,
}, nil
Expand Down
61 changes: 54 additions & 7 deletions libbeat/processors/safe_processor_test.go
Expand Up @@ -29,6 +29,7 @@ import (
var mockEvent = &beat.Event{}

type mockProcessor struct {
<<<<<<< HEAD
runCount int
closeCount int
}
Expand All @@ -39,23 +40,66 @@ func newMockConstructor() (Constructor, *mockProcessor) {
return &p, nil
}
return constructor, &p
=======
runCount int
>>>>>>> fe908d5ade (Don't safe-wrap processors that don't have `Close` (#34871))
}

func (p *mockProcessor) Run(event *beat.Event) (*beat.Event, error) {
p.runCount++
return mockEvent, nil
}

func (p *mockProcessor) Close() error {
func (p *mockProcessor) String() string {
return "mock-processor"
}

type mockCloserProcessor struct {
mockProcessor
closeCount int
}

func (p *mockCloserProcessor) Close() error {
p.closeCount++
return nil
}
func (p *mockProcessor) String() string {
return "mock-processor"

func newMockCloserConstructor() (Constructor, *mockCloserProcessor) {
p := mockCloserProcessor{}
constructor := func(config *config.C) (Processor, error) {
return &p, nil
}
return constructor, &p
}

func mockConstructor(config *config.C) (Processor, error) {
return &mockProcessor{}, nil
}

func mockCloserConstructor(config *config.C) (Processor, error) {
return &mockCloserProcessor{}, nil
}

func TestSafeWrap(t *testing.T) {
t.Run("does not wrap a non-closer processor", func(t *testing.T) {
nonCloser := mockConstructor
wrappedNonCloser := SafeWrap(nonCloser)
wp, err := wrappedNonCloser(nil)
require.NoError(t, err)
require.IsType(t, &mockProcessor{}, wp)
})

t.Run("wraps a closer processor", func(t *testing.T) {
closer := mockCloserConstructor
wrappedCloser := SafeWrap(closer)
wcp, err := wrappedCloser(nil)
require.NoError(t, err)
require.IsType(t, &SafeProcessor{}, wcp)
})
}

func TestSafeProcessor(t *testing.T) {
cons, p := newMockConstructor()
cons, p := newMockCloserConstructor()
var (
sp Processor
err error
Expand All @@ -67,10 +111,11 @@ func TestSafeProcessor(t *testing.T) {
})

t.Run("propagates Run to a processor", func(t *testing.T) {
require.Equal(t, 0, p.runCount)

e, err := sp.Run(nil)
require.NoError(t, err)
require.Equal(t, e, mockEvent)

e, err = sp.Run(nil)
require.NoError(t, err)
require.Equal(t, e, mockEvent)
Expand All @@ -79,19 +124,21 @@ func TestSafeProcessor(t *testing.T) {
})

t.Run("propagates Close to a processor only once", func(t *testing.T) {
require.Equal(t, 0, p.closeCount)

err := Close(sp)
require.NoError(t, err)

err = Close(sp)
require.NoError(t, err)

require.Equal(t, 1, p.closeCount)
})

t.Run("does not propagate Run when closed", func(t *testing.T) {
require.Equal(t, 2, p.runCount) // still 2 from the previous test case
e, err := sp.Run(nil)
require.Nil(t, e)
require.ErrorIs(t, err, ErrClosed)
require.Equal(t, 2, p.runCount) // still 2 from the previous test case
require.Equal(t, 2, p.runCount)
})
}

0 comments on commit ab2fa13

Please sign in to comment.