diff --git a/libbeat/processors/safe_processor.go b/libbeat/processors/safe_processor.go index 0032dd53855..c0c65b29117 100644 --- a/libbeat/processors/safe_processor.go +++ b/libbeat/processors/safe_processor.go @@ -67,6 +67,12 @@ func SafeWrap(constructor Constructor) Constructor { if err != nil { return nil, err } + // if the processor does not implement `Closer` + // it does not need a wrap + if _, ok := processor.(Closer); !ok { + return processor, nil + } + return &SafeProcessor{ Processor: processor, }, nil diff --git a/libbeat/processors/safe_processor_test.go b/libbeat/processors/safe_processor_test.go index 68e8ee8a594..4f393522f07 100644 --- a/libbeat/processors/safe_processor_test.go +++ b/libbeat/processors/safe_processor_test.go @@ -29,33 +29,64 @@ import ( var mockEvent = &beat.Event{} type mockProcessor struct { - runCount int + runCount int +} + +func (p *mockProcessor) Run(event *beat.Event) (*beat.Event, error) { + p.runCount++ + return mockEvent, nil +} + +func (p *mockProcessor) String() string { + return "mock-processor" +} + +type mockCloserProcessor struct { + mockProcessor closeCount int } -func newMockConstructor() (Constructor, *mockProcessor) { - p := mockProcessor{} +func (p *mockCloserProcessor) Close() error { + p.closeCount++ + return nil +} + +func newMockCloserConstructor() (Constructor, *mockCloserProcessor) { + p := mockCloserProcessor{} constructor := func(config *common.Config) (Processor, error) { return &p, nil } return constructor, &p } -func (p *mockProcessor) Run(event *beat.Event) (*beat.Event, error) { - p.runCount++ - return mockEvent, nil +func mockConstructor(config *common.Config) (Processor, error) { + return &mockProcessor{}, nil } -func (p *mockProcessor) Close() error { - p.closeCount++ - return nil +func mockCloserConstructor(config *common.Config) (Processor, error) { + return &mockCloserProcessor{}, nil } -func (p *mockProcessor) String() string { - return "mock-processor" + +func TestSafeWrap(t *testing.T) { + t.Run("does not wrap a non-closer processor", func(t *testing.T) { + nonCloser := mockConstructor + wrappedNonCloser := SafeWrap(nonCloser) + wp, err := wrappedNonCloser(nil) + require.NoError(t, err) + require.IsType(t, &mockProcessor{}, wp) + }) + + t.Run("wraps a closer processor", func(t *testing.T) { + closer := mockCloserConstructor + wrappedCloser := SafeWrap(closer) + wcp, err := wrappedCloser(nil) + require.NoError(t, err) + require.IsType(t, &SafeProcessor{}, wcp) + }) } func TestSafeProcessor(t *testing.T) { - cons, p := newMockConstructor() + cons, p := newMockCloserConstructor() var ( sp Processor err error @@ -67,10 +98,11 @@ func TestSafeProcessor(t *testing.T) { }) t.Run("propagates Run to a processor", func(t *testing.T) { + require.Equal(t, 0, p.runCount) + e, err := sp.Run(nil) require.NoError(t, err) require.Equal(t, e, mockEvent) - e, err = sp.Run(nil) require.NoError(t, err) require.Equal(t, e, mockEvent) @@ -79,9 +111,10 @@ func TestSafeProcessor(t *testing.T) { }) t.Run("propagates Close to a processor only once", func(t *testing.T) { + require.Equal(t, 0, p.closeCount) + err := Close(sp) require.NoError(t, err) - err = Close(sp) require.NoError(t, err) @@ -89,9 +122,10 @@ func TestSafeProcessor(t *testing.T) { }) t.Run("does not propagate Run when closed", func(t *testing.T) { + require.Equal(t, 2, p.runCount) // still 2 from the previous test case e, err := sp.Run(nil) require.Nil(t, e) require.ErrorIs(t, err, ErrClosed) - require.Equal(t, 2, p.runCount) // still 2 from the previous test case + require.Equal(t, 2, p.runCount) }) }