Skip to content

Commit

Permalink
merge: #8798
Browse files Browse the repository at this point in the history
8798: Add API to probe the logstream batch writer if more bytes can be written without writing them r=npepinpe a=npepinpe

## Description

This PR adds a new API method, `LogStreamBatchWriter#canWriteAdditionalEvent(int)`. This allows users of the writer to probe if adding the given amount of bytes to the batch would cause it to become un-writable, without actually having to write anything to the batch, or even modify their DTO (e.g. the `TypedRecord<?>` in the engine).

To avoid having dispatcher details leak into the implementation, an analogous method is added to the dispatcher, `Dispatcher#canClaimFragmentBatch(int, int)`, which will compare the given size, framed and aligned, with the max fragment length. This is the main building block to eventually solve #5525, and enable other use cases (e.g. multi-instance creation) which deal with large batches until we have a more permanent solution (e.g. chunking follow up batches).

NOTE: the tests added in the dispatcher are not very good, but I couldn't come up with something else that wouldn't be too coupled to the implementation (i.e. essentially reusing `LogBufferAppender`). I would like some ideas/suggestions.

NOTE: this PR comes out of the larger one, #8491. You can check that one out to see how the new API would be used, e.g. in the `JobBatchCollector`. As such, this is marked for backporting, since we'll backport the complete fix for #5525.

## Related issues

related to #5525 



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-cloud[bot] and npepinpe committed Feb 17, 2022
2 parents b7462ff + 409a7dc commit 6fe0922
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 17 deletions.
6 changes: 6 additions & 0 deletions dispatcher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,19 @@ public long claimFragmentBatch(
LogBufferAppender.claimedBatchLength(fragmentCount, batchLength));
}

/**
* Returns whether a batch of length {@code batchLength}, containing {@code fragmentCount}
* fragments, can be claimed by this dispatcher.
*
* @param fragmentCount the count of fragments in the batch
* @param batchLength the total length of the batch (all fragments included), unframed
* @return true if the batch can be claimed, false otherwise
*/
public boolean canClaimFragmentBatch(final int fragmentCount, final int batchLength) {
final int framedLength = LogBufferAppender.claimedBatchLength(fragmentCount, batchLength);
return framedLength < maxFragmentLength;
}

private synchronized long offer(
final BiFunction<LogBufferPartition, Integer, Integer> claimer,
final int fragmentCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public final class DispatcherTest {
final class DispatcherTest {

static final byte[] A_MSG_PAYLOAD = "some bytes".getBytes(StandardCharsets.UTF_8);
static final int A_MSG_PAYLOAD_LENGTH = A_MSG_PAYLOAD.length;
static final int A_FRAGMENT_LENGTH = align(A_MSG_PAYLOAD_LENGTH + HEADER_LENGTH, FRAME_ALIGNMENT);
static final int AN_INITIAL_PARTITION_ID = 0;
static final int A_LOG_WINDOW_LENGTH = 128;
static final int A_PARTITION_SIZE = 1024;
static final int A_STREAM_ID = 20;
Expand All @@ -57,8 +56,8 @@ public final class DispatcherTest {
ClaimedFragmentBatch claimedFragmentBatch;
AtomicPosition subscriberPosition;

@Before
public void setup() {
@BeforeEach
void beforeEach() {
logBuffer = mock(LogBuffer.class);
logBufferPartition0 = mock(LogBufferPartition.class);
logBufferPartition1 = mock(LogBufferPartition.class);
Expand Down Expand Up @@ -110,7 +109,7 @@ protected Subscription newSubscription(
}

@Test
public void shouldNotClaimBeyondPublisherLimit() {
void shouldNotClaimBeyondPublisherLimit() {
// given
// position of 0,0
when(logBuffer.getActivePartitionIdVolatile()).thenReturn(0);
Expand All @@ -134,7 +133,48 @@ public void shouldNotClaimBeyondPublisherLimit() {
}

@Test
public void shouldClaimFragment() {
void canClaimFragmentBatch() {
// given
final int fragmentCount = 2;
final int batchLength = dispatcher.getMaxFragmentLength() / 2;

// when
final var canClaimFragmentBatch = dispatcher.canClaimFragmentBatch(fragmentCount, batchLength);

// then
assertThat(canClaimFragmentBatch).isTrue();
}

@Test
void cannotClaimFragmentBatch() {
// given - a fragment of max length, unframed
final int fragmentCount = 1;
final int batchLength = 2 * dispatcher.getMaxFragmentLength();

// when
final var canClaimFragmentBatch = dispatcher.canClaimFragmentBatch(fragmentCount, batchLength);

// then
assertThat(canClaimFragmentBatch).isFalse();
}

@Test
void cannotClaimFragmentBatchOfMaxLength() {
// given - a fragment of max length, unframed
final int fragmentCount = 1;
final int batchLength = dispatcher.getMaxFragmentLength();

// when
final var canClaimFragmentBatch = dispatcher.canClaimFragmentBatch(fragmentCount, batchLength);

// then
assertThat(canClaimFragmentBatch)
.as("cannot claim when the unframed, unaligned batch is the max fragment length")
.isFalse();
}

@Test
void shouldClaimFragment() {
// given
// position is 0,0
when(logBuffer.getActivePartitionIdVolatile()).thenReturn(0);
Expand Down Expand Up @@ -175,7 +215,7 @@ public void shouldClaimFragment() {
}

@Test
public void shouldReadFragmentsFromPartition() {
void shouldReadFragmentsFromPartition() {
// given
dispatcher.doOpenSubscription("test", mock(ActorCondition.class));
when(subscriberPosition.get()).thenReturn(0L);
Expand All @@ -198,7 +238,7 @@ public void shouldReadFragmentsFromPartition() {
}

@Test
public void shouldNotReadBeyondPublisherPosition() {
void shouldNotReadBeyondPublisherPosition() {
// given
dispatcher.doOpenSubscription("test", mock(ActorCondition.class));
when(subscriptionSpy.getPosition()).thenReturn(0L);
Expand All @@ -212,7 +252,7 @@ public void shouldNotReadBeyondPublisherPosition() {
}

@Test
public void shouldUpdatePublisherLimit() {
void shouldUpdatePublisherLimit() {
when(subscriberPosition.get()).thenReturn(position(10, 100));

dispatcher.doOpenSubscription("test", mock(ActorCondition.class));
Expand All @@ -222,7 +262,7 @@ public void shouldUpdatePublisherLimit() {
}

@Test
public void shouldUpdatePublisherLimitToNextPartition() {
void shouldUpdatePublisherLimitToNextPartition() {
when(subscriberPosition.get()).thenReturn(position(10, A_PARTITION_SIZE - A_LOG_WINDOW_LENGTH));

dispatcher.doOpenSubscription("test", mock(ActorCondition.class));
Expand All @@ -232,7 +272,7 @@ public void shouldUpdatePublisherLimitToNextPartition() {
}

@Test
public void shouldReadFragmentsFromPartitionOnPeekAndConsume() {
void shouldReadFragmentsFromPartitionOnPeekAndConsume() {
// given
dispatcher.doOpenSubscription("test", mock(ActorCondition.class));
when(subscriberPosition.get()).thenReturn(0L);
Expand All @@ -255,7 +295,7 @@ public void shouldReadFragmentsFromPartitionOnPeekAndConsume() {
}

@Test
public void shouldNotOpenSubscriptionWithSameName() {
void shouldNotOpenSubscriptionWithSameName() {
dispatcher.doOpenSubscription("s1", mock(ActorCondition.class));
Assert.assertThrows(
"subscription with name 's1' already exists",
Expand All @@ -264,7 +304,7 @@ public void shouldNotOpenSubscriptionWithSameName() {
}

@Test
public void shouldIncrementRecordPositionAfterClaimingFragment() {
void shouldIncrementRecordPositionAfterClaimingFragment() {
// given
when(publisherLimit.get()).thenReturn(position(0, A_FRAGMENT_LENGTH));
when(logAppender.claim(
Expand All @@ -288,7 +328,7 @@ public void shouldIncrementRecordPositionAfterClaimingFragment() {
}

@Test
public void shouldIncreasePositionByFragmentCountAfterClaimingBatch() {
void shouldIncreasePositionByFragmentCountAfterClaimingBatch() {
// given
final int fragmentCount = 3;
when(logBuffer.getActivePartitionIdVolatile()).thenReturn(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ public void reset() {
resetEvent();
}

@Override
public boolean canWriteAdditionalEvent(final int length) {
final var count = eventCount + 1;
final var batchLength = computeBatchLength(count, eventLength + length);
return logWriteBuffer.canClaimFragmentBatch(count, batchLength);
}

@Override
public LogEntryBuilder keyNull() {
return key(LogEntryDescriptor.KEY_NULL_VALUE);
Expand All @@ -105,6 +112,7 @@ public LogEntryBuilder key(final long key) {
return this;
}

@Override
public LogEntryBuilder sourceIndex(final int index) {
sourceIndex = index;
return this;
Expand Down Expand Up @@ -215,7 +223,7 @@ public long tryWrite() {
}

private long claimBatchForEvents() {
final int batchLength = eventLength + (eventCount * HEADER_BLOCK_LENGTH);
final var batchLength = computeBatchLength(eventCount, eventLength);

long claimedPosition;
do {
Expand Down Expand Up @@ -284,4 +292,8 @@ private void resetEvent() {
bufferWriterInstance.reset();
metadataWriterInstance.reset();
}

private int computeBatchLength(final int eventsCount, final int eventsLength) {
return eventsLength + (eventsCount * HEADER_BLOCK_LENGTH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ public interface LogStreamBatchWriter extends LogStreamWriter {
/** Discard all non-written batch data. */
void reset();

/**
* Returns whether an additional event of length {@code length} could be written to the batch or
* not.
*
* @param length the length of the event's value and metadata summed, i.e. {@link
* LoggedEvent#getValueLength()} + {@link LoggedEvent#getMetadataLength()}
* @return true if the additional event could be written, false otherwise
*/
boolean canWriteAdditionalEvent(final int length);

/** Builder to add a log entry to the batch. */
interface LogEntryBuilder {
/** Use the default values as key. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.logstreams.impl.log;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import io.camunda.zeebe.dispatcher.ClaimedFragmentBatch;
import io.camunda.zeebe.dispatcher.Dispatcher;
import io.camunda.zeebe.util.buffer.BufferUtil;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;

final class LogStreamBatchWriterImplTest {
private final Dispatcher dispatcher = mock(Dispatcher.class);
private final LogStreamBatchWriterImpl writer = new LogStreamBatchWriterImpl(1, dispatcher);

/**
* This test asserts that {@link LogStreamBatchWriterImpl#canWriteAdditionalEvent(int)} computes
* the correct batch length before passing it on to the dispatcher to check if it's acceptable. It
* also verifies that the same length is passed to the dispatcher when actually writing, ensuring
* both methods remain consistent with each other.
*/
@Test
void canWriteAdditionalEvent() {
// given
final DirectBuffer value = BufferUtil.wrapString("foo");
final DirectBuffer metadata = BufferUtil.wrapString("bar");
final var expectedFragmentCount = 2;
final var expectedFramingLength =
expectedFragmentCount * LogEntryDescriptor.HEADER_BLOCK_LENGTH;
final var expectedEventsLength =
expectedFragmentCount * (value.capacity() + metadata.capacity());
final var expectedBatchLength = expectedEventsLength + expectedFramingLength;

// when
when(dispatcher.canClaimFragmentBatch(anyInt(), anyInt())).thenReturn(false);
when(dispatcher.canClaimFragmentBatch(expectedFragmentCount, expectedBatchLength))
.thenReturn(true);
when(dispatcher.claimFragmentBatch(any(), anyInt(), anyInt()))
.then(this::mockClaimFragmentBatch);
writer.event().value(value).metadata(metadata).done();
final boolean canWriteAdditionalEvent =
writer.canWriteAdditionalEvent(value.capacity() + metadata.capacity());
writer.event().value(value).metadata(metadata).done().tryWrite();

// then
verify(dispatcher, times(1)).canClaimFragmentBatch(expectedFragmentCount, expectedBatchLength);
verify(dispatcher, times(1))
.claimFragmentBatch(
any(ClaimedFragmentBatch.class), eq(expectedFragmentCount), eq(expectedBatchLength));
assertThat(canWriteAdditionalEvent).isTrue();
verifyNoMoreInteractions(dispatcher);
}

private long mockClaimFragmentBatch(final InvocationOnMock i) {
final var batch = i.getArgument(0, ClaimedFragmentBatch.class);
final var writeBuffer = new UnsafeBuffer(new ExpandableArrayBuffer(1024));
batch.wrap(writeBuffer, 1, 0, writeBuffer.capacity(), () -> {});
return 1L;
}
}

0 comments on commit 6fe0922

Please sign in to comment.