diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java index 6796ed933b..4bfda637b7 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -106,7 +106,9 @@ void output(Collection> records) { final byte[] encodedBytes = encodedEvent.getBytes(); currentBuffer.writeEvent(encodedBytes); - bufferedEventHandles.add(event.getEventHandle()); + if(event.getEventHandle() != null) { + bufferedEventHandles.add(event.getEventHandle()); + } if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) { final String s3Key = generateKey(); LOG.info("Writing {} to S3 with {} events and size of {} bytes.", @@ -127,14 +129,13 @@ void output(Collection> records) { currentBuffer = bufferFactory.getBuffer(); } } - } catch (NullPointerException | IOException | InterruptedException e) { + } catch (IOException | InterruptedException e) { LOG.error("Exception while write event into buffer :", e); - Thread.currentThread().interrupt(); } reentrantLock.unlock(); } - private void releaseEventHandles(boolean result) { + private void releaseEventHandles(final boolean result) { for (EventHandle eventHandle : bufferedEventHandles) { eventHandle.release(result); } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java index 009f65c0b1..f89eb9026f 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java @@ -209,16 +209,6 @@ void test_output_with_threshold_set_as_zero_event_count() throws IOException { verify(snapshotSuccessCounter, times(50)).increment(); } - @Test - void test_catch_output_exception_cover() { - codec = null; - S3SinkService s3SinkService = createObjectUnderTest(); - assertNotNull(s3SinkService); - assertThat(s3SinkService, instanceOf(S3SinkService.class)); - s3SinkService.output(generateRandomStringEventRecord()); - verify(snapshotSuccessCounter, times(0)).increment(); - } - @Test void test_output_with_uploadedToS3_success() throws IOException { @@ -340,6 +330,35 @@ void output_will_release_all_handles_since_a_flush() throws IOException { } } + @Test + void output_will_skip_releasing_events_without_EventHandle_objects() throws IOException { + bufferFactory = mock(BufferFactory.class); + final Buffer buffer = mock(Buffer.class); + when(bufferFactory.getBuffer()).thenReturn(buffer); + + final long objectSize = random.nextInt(1_000_000) + 10_000; + when(buffer.getSize()).thenReturn(objectSize); + + when(codec.parse(any())).thenReturn(UUID.randomUUID().toString()); + final S3SinkService s3SinkService = createObjectUnderTest(); + final Collection> records = generateRandomStringEventRecord(); + records.stream() + .map(Record::getData) + .map(event -> (JacksonEvent) event) + .forEach(event -> event.setEventHandle(null)); + + s3SinkService.output(records); + + final Collection> records2 = generateRandomStringEventRecord(); + s3SinkService.output(records2); + + final List eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); + + for (EventHandle eventHandle : eventHandles2) { + verify(eventHandle).release(true); + } + } + @Test void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOException { bufferFactory = mock(BufferFactory.class);