Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport stable/1.2] Add API to probe the logstream batch writer if more bytes can be written without writing them #8809

Merged
4 commits merged into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dispatcher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,19 @@ public long claimFragmentBatch(
LogBufferAppender.claimedBatchLength(fragmentCount, batchLength));
}

/**
* Returns whether a batch of length {@code batchLength}, containing {@code fragmentCount}
* fragments, can be claimed by this dispatcher.
*
* @param fragmentCount the count of fragments in the batch
* @param batchLength the total length of the batch (all fragments included), unframed
* @return true if the batch can be claimed, false otherwise
*/
public boolean canClaimFragmentBatch(final int fragmentCount, final int batchLength) {
final int framedLength = LogBufferAppender.claimedBatchLength(fragmentCount, batchLength);
return framedLength < maxFragmentLength;
}

private synchronized long offer(
final BiFunction<LogBufferPartition, Integer, Integer> claimer,
final int fragmentCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public final class DispatcherTest {
final class DispatcherTest {

static final byte[] A_MSG_PAYLOAD = "some bytes".getBytes(StandardCharsets.UTF_8);
static final int A_MSG_PAYLOAD_LENGTH = A_MSG_PAYLOAD.length;
static final int A_FRAGMENT_LENGTH = align(A_MSG_PAYLOAD_LENGTH + HEADER_LENGTH, FRAME_ALIGNMENT);
static final int AN_INITIAL_PARTITION_ID = 0;
static final int A_LOG_WINDOW_LENGTH = 128;
static final int A_PARTITION_SIZE = 1024;
static final int A_STREAM_ID = 20;
Expand All @@ -57,8 +56,8 @@ public final class DispatcherTest {
ClaimedFragmentBatch claimedFragmentBatch;
AtomicPosition subscriberPosition;

@Before
public void setup() {
@BeforeEach
void beforeEach() {
logBuffer = mock(LogBuffer.class);
logBufferPartition0 = mock(LogBufferPartition.class);
logBufferPartition1 = mock(LogBufferPartition.class);
Expand Down Expand Up @@ -110,7 +109,7 @@ protected Subscription newSubscription(
}

@Test
public void shouldNotClaimBeyondPublisherLimit() {
void shouldNotClaimBeyondPublisherLimit() {
// given
// position of 0,0
when(logBuffer.getActivePartitionIdVolatile()).thenReturn(0);
Expand All @@ -134,7 +133,48 @@ public void shouldNotClaimBeyondPublisherLimit() {
}

@Test
public void shouldClaimFragment() {
void canClaimFragmentBatch() {
// given
final int fragmentCount = 2;
final int batchLength = dispatcher.getMaxFragmentLength() / 2;

// when
final var canClaimFragmentBatch = dispatcher.canClaimFragmentBatch(fragmentCount, batchLength);

// then
assertThat(canClaimFragmentBatch).isTrue();
}

@Test
void cannotClaimFragmentBatch() {
// given - a fragment of max length, unframed
final int fragmentCount = 1;
final int batchLength = 2 * dispatcher.getMaxFragmentLength();

// when
final var canClaimFragmentBatch = dispatcher.canClaimFragmentBatch(fragmentCount, batchLength);

// then
assertThat(canClaimFragmentBatch).isFalse();
}

@Test
void cannotClaimFragmentBatchOfMaxLength() {
// given - a fragment of max length, unframed
final int fragmentCount = 1;
final int batchLength = dispatcher.getMaxFragmentLength();

// when
final var canClaimFragmentBatch = dispatcher.canClaimFragmentBatch(fragmentCount, batchLength);

// then
assertThat(canClaimFragmentBatch)
.as("cannot claim when the unframed, unaligned batch is the max fragment length")
.isFalse();
}

@Test
void shouldClaimFragment() {
// given
// position is 0,0
when(logBuffer.getActivePartitionIdVolatile()).thenReturn(0);
Expand Down Expand Up @@ -175,7 +215,7 @@ public void shouldClaimFragment() {
}

@Test
public void shouldReadFragmentsFromPartition() {
void shouldReadFragmentsFromPartition() {
// given
dispatcher.doOpenSubscription("test", mock(ActorCondition.class));
when(subscriberPosition.get()).thenReturn(0L);
Expand All @@ -198,7 +238,7 @@ public void shouldReadFragmentsFromPartition() {
}

@Test
public void shouldNotReadBeyondPublisherPosition() {
void shouldNotReadBeyondPublisherPosition() {
// given
dispatcher.doOpenSubscription("test", mock(ActorCondition.class));
when(subscriptionSpy.getPosition()).thenReturn(0L);
Expand All @@ -212,7 +252,7 @@ public void shouldNotReadBeyondPublisherPosition() {
}

@Test
public void shouldUpdatePublisherLimit() {
void shouldUpdatePublisherLimit() {
when(subscriberPosition.get()).thenReturn(position(10, 100));

dispatcher.doOpenSubscription("test", mock(ActorCondition.class));
Expand All @@ -222,7 +262,7 @@ public void shouldUpdatePublisherLimit() {
}

@Test
public void shouldUpdatePublisherLimitToNextPartition() {
void shouldUpdatePublisherLimitToNextPartition() {
when(subscriberPosition.get()).thenReturn(position(10, A_PARTITION_SIZE - A_LOG_WINDOW_LENGTH));

dispatcher.doOpenSubscription("test", mock(ActorCondition.class));
Expand All @@ -232,7 +272,7 @@ public void shouldUpdatePublisherLimitToNextPartition() {
}

@Test
public void shouldReadFragmentsFromPartitionOnPeekAndConsume() {
void shouldReadFragmentsFromPartitionOnPeekAndConsume() {
// given
dispatcher.doOpenSubscription("test", mock(ActorCondition.class));
when(subscriberPosition.get()).thenReturn(0L);
Expand All @@ -255,7 +295,7 @@ public void shouldReadFragmentsFromPartitionOnPeekAndConsume() {
}

@Test
public void shouldNotOpenSubscriptionWithSameName() {
void shouldNotOpenSubscriptionWithSameName() {
dispatcher.doOpenSubscription("s1", mock(ActorCondition.class));
Assert.assertThrows(
"subscription with name 's1' already exists",
Expand All @@ -264,7 +304,7 @@ public void shouldNotOpenSubscriptionWithSameName() {
}

@Test
public void shouldIncrementRecordPositionAfterClaimingFragment() {
void shouldIncrementRecordPositionAfterClaimingFragment() {
// given
when(publisherLimit.get()).thenReturn(position(0, A_FRAGMENT_LENGTH));
when(logAppender.claim(
Expand All @@ -288,7 +328,7 @@ public void shouldIncrementRecordPositionAfterClaimingFragment() {
}

@Test
public void shouldIncreasePositionByFragmentCountAfterClaimingBatch() {
void shouldIncreasePositionByFragmentCountAfterClaimingBatch() {
// given
final int fragmentCount = 3;
when(logBuffer.getActivePartitionIdVolatile()).thenReturn(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ public void reset() {
resetEvent();
}

@Override
public boolean canWriteAdditionalEvent(final int length) {
final var count = eventCount + 1;
final var batchLength = computeBatchLength(count, eventLength + length);
return logWriteBuffer.canClaimFragmentBatch(count, batchLength);
}

@Override
public LogEntryBuilder keyNull() {
return key(LogEntryDescriptor.KEY_NULL_VALUE);
Expand All @@ -105,6 +112,7 @@ public LogEntryBuilder key(final long key) {
return this;
}

@Override
public LogEntryBuilder sourceIndex(final int index) {
sourceIndex = index;
return this;
Expand Down Expand Up @@ -215,7 +223,7 @@ public long tryWrite() {
}

private long claimBatchForEvents() {
final int batchLength = eventLength + (eventCount * HEADER_BLOCK_LENGTH);
final var batchLength = computeBatchLength(eventCount, eventLength);

long claimedPosition;
do {
Expand Down Expand Up @@ -284,4 +292,8 @@ private void resetEvent() {
bufferWriterInstance.reset();
metadataWriterInstance.reset();
}

private int computeBatchLength(final int eventsCount, final int eventsLength) {
return eventsLength + (eventsCount * HEADER_BLOCK_LENGTH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ public interface LogStreamBatchWriter extends LogStreamWriter {
/** Discard all non-written batch data. */
void reset();

/**
* Returns whether an additional event of length {@code length} could be written to the batch or
* not.
*
* @param length the length of the event's value and metadata summed, i.e. {@link
* LoggedEvent#getValueLength()} + {@link LoggedEvent#getMetadataLength()}
* @return true if the additional event could be written, false otherwise
*/
boolean canWriteAdditionalEvent(final int length);

/** Builder to add a log entry to the batch. */
interface LogEntryBuilder {
/** Use the default values as key. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.logstreams.impl.log;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import io.camunda.zeebe.dispatcher.ClaimedFragmentBatch;
import io.camunda.zeebe.dispatcher.Dispatcher;
import io.camunda.zeebe.util.buffer.BufferUtil;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;

final class LogStreamBatchWriterImplTest {
private final Dispatcher dispatcher = mock(Dispatcher.class);
private final LogStreamBatchWriterImpl writer = new LogStreamBatchWriterImpl(1, dispatcher);

/**
* This test asserts that {@link LogStreamBatchWriterImpl#canWriteAdditionalEvent(int)} computes
* the correct batch length before passing it on to the dispatcher to check if it's acceptable. It
* also verifies that the same length is passed to the dispatcher when actually writing, ensuring
* both methods remain consistent with each other.
*/
@Test
void canWriteAdditionalEvent() {
// given
final DirectBuffer value = BufferUtil.wrapString("foo");
final DirectBuffer metadata = BufferUtil.wrapString("bar");
final var expectedFragmentCount = 2;
final var expectedFramingLength =
expectedFragmentCount * LogEntryDescriptor.HEADER_BLOCK_LENGTH;
final var expectedEventsLength =
expectedFragmentCount * (value.capacity() + metadata.capacity());
final var expectedBatchLength = expectedEventsLength + expectedFramingLength;

// when
when(dispatcher.canClaimFragmentBatch(anyInt(), anyInt())).thenReturn(false);
when(dispatcher.canClaimFragmentBatch(expectedFragmentCount, expectedBatchLength))
.thenReturn(true);
when(dispatcher.claimFragmentBatch(any(), anyInt(), anyInt()))
.then(this::mockClaimFragmentBatch);
writer.event().value(value).metadata(metadata).done();
final boolean canWriteAdditionalEvent =
writer.canWriteAdditionalEvent(value.capacity() + metadata.capacity());
writer.event().value(value).metadata(metadata).done().tryWrite();

// then
verify(dispatcher, times(1)).canClaimFragmentBatch(expectedFragmentCount, expectedBatchLength);
verify(dispatcher, times(1))
.claimFragmentBatch(
any(ClaimedFragmentBatch.class), eq(expectedFragmentCount), eq(expectedBatchLength));
assertThat(canWriteAdditionalEvent).isTrue();
verifyNoMoreInteractions(dispatcher);
}

private long mockClaimFragmentBatch(final InvocationOnMock i) {
final var batch = i.getArgument(0, ClaimedFragmentBatch.class);
final var writeBuffer = new UnsafeBuffer(new ExpandableArrayBuffer(1024));
batch.wrap(writeBuffer, 1, 0, writeBuffer.capacity(), () -> {});
return 1L;
}
}