Skip to content

Commit

Permalink
Fixes a bug in the S3 sink where events without handles throw NPE (op…
Browse files Browse the repository at this point in the history
…ensearch-project#2814)

Fixes a bug in the S3 sink where events without handles are throwing NPEs by skipping any such handles.

Signed-off-by: David Venable <dlv@amazon.com>
Signed-off-by: Marcos_Gonzalez_Mayedo <alemayed@amazon.com>
  • Loading branch information
dlvenable authored and Marcos_Gonzalez_Mayedo committed Jun 21, 2023
1 parent a5c3237 commit a138115
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ void output(Collection<Record<Event>> records) {
final byte[] encodedBytes = encodedEvent.getBytes();

currentBuffer.writeEvent(encodedBytes);
bufferedEventHandles.add(event.getEventHandle());
if(event.getEventHandle() != null) {
bufferedEventHandles.add(event.getEventHandle());
}
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
final String s3Key = generateKey();
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
Expand All @@ -127,14 +129,13 @@ void output(Collection<Record<Event>> records) {
currentBuffer = bufferFactory.getBuffer();
}
}
} catch (NullPointerException | IOException | InterruptedException e) {
} catch (IOException | InterruptedException e) {
LOG.error("Exception while write event into buffer :", e);
Thread.currentThread().interrupt();
}
reentrantLock.unlock();
}

private void releaseEventHandles(boolean result) {
private void releaseEventHandles(final boolean result) {
for (EventHandle eventHandle : bufferedEventHandles) {
eventHandle.release(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,6 @@ void test_output_with_threshold_set_as_zero_event_count() throws IOException {
verify(snapshotSuccessCounter, times(50)).increment();
}

@Test
void test_catch_output_exception_cover() {
codec = null;
S3SinkService s3SinkService = createObjectUnderTest();
assertNotNull(s3SinkService);
assertThat(s3SinkService, instanceOf(S3SinkService.class));
s3SinkService.output(generateRandomStringEventRecord());
verify(snapshotSuccessCounter, times(0)).increment();
}

@Test
void test_output_with_uploadedToS3_success() throws IOException {

Expand Down Expand Up @@ -340,6 +330,35 @@ void output_will_release_all_handles_since_a_flush() throws IOException {
}
}

@Test
void output_will_skip_releasing_events_without_EventHandle_objects() throws IOException {
bufferFactory = mock(BufferFactory.class);
final Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer()).thenReturn(buffer);

final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

when(codec.parse(any())).thenReturn(UUID.randomUUID().toString());
final S3SinkService s3SinkService = createObjectUnderTest();
final Collection<Record<Event>> records = generateRandomStringEventRecord();
records.stream()
.map(Record::getData)
.map(event -> (JacksonEvent) event)
.forEach(event -> event.setEventHandle(null));

s3SinkService.output(records);

final Collection<Record<Event>> records2 = generateRandomStringEventRecord();
s3SinkService.output(records2);

final List<EventHandle> eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles2) {
verify(eventHandle).release(true);
}
}

@Test
void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOException {
bufferFactory = mock(BufferFactory.class);
Expand Down

0 comments on commit a138115

Please sign in to comment.