Skip to content

Commit

Permalink
Fix BufferedStreamConsumer tests (#7773)
Browse files Browse the repository at this point in the history
  • Loading branch information
sherifnada committed Nov 9, 2021
1 parent c3c3e31 commit 62992bf
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.bytes;

public class ByteUtils {

public static long getSizeInBytes(String s) {
return s.length() * 2; // by default UTF-8 encoding takes 2bytes per char
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.bytes;

import static org.junit.jupiter.api.Assertions.*;

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Test;

class ByteUtilsTest {

@Test
public void testIt() {
for (int i = 1; i < 1000; i++) {
String s = RandomStringUtils.random(i);
// for now the formula is just hardcoded to str length * 2
assertEquals(i * 2, ByteUtils.getSizeInBytes(s));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.bytes.ByteUtils;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
Expand Down Expand Up @@ -146,7 +147,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
// are serialized again when writing to
// the destination
// TODO use a smarter way of estimating byte size rather than always multiply by two
long messageSizeInBytes = Jsons.serialize(recordMessage.getData()).length() * 2; // Strings serialize to UTF-8 by default
long messageSizeInBytes = ByteUtils.getSizeInBytes(Jsons.serialize(recordMessage.getData()));
if (bufferSizeInBytes + messageSizeInBytes >= maxQueueSizeInBytes) {
flushQueueToDestination();
bufferSizeInBytes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.bytes.ByteUtils;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
Expand All @@ -34,9 +35,9 @@
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;

public class BufferedStreamConsumerTest {

Expand Down Expand Up @@ -84,14 +85,14 @@ void setup() throws Exception {
onClose,
CATALOG,
isValidRecord,
10000);
1_000);

when(isValidRecord.apply(any())).thenReturn(true);
}

@Test
void test1StreamWith1State() throws Exception {
final List<AirbyteMessage> expectedRecords = getNRecords(10);
final List<AirbyteMessage> expectedRecords = generateRecords(1_000);

consumer.start();
consumeRecords(consumer, expectedRecords);
Expand All @@ -107,7 +108,7 @@ void test1StreamWith1State() throws Exception {

@Test
void test1StreamWith2State() throws Exception {
final List<AirbyteMessage> expectedRecords = getNRecords(10);
final List<AirbyteMessage> expectedRecords = generateRecords(1_000);

consumer.start();
consumeRecords(consumer, expectedRecords);
Expand All @@ -124,7 +125,7 @@ void test1StreamWith2State() throws Exception {

@Test
void test1StreamWith0State() throws Exception {
final List<AirbyteMessage> expectedRecords = getNRecords(10);
final List<AirbyteMessage> expectedRecords = generateRecords(1_000);

consumer.start();
consumeRecords(consumer, expectedRecords);
Expand All @@ -137,8 +138,8 @@ void test1StreamWith0State() throws Exception {

@Test
void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception {
final List<AirbyteMessage> expectedRecordsBatch1 = getNRecords(10);
final List<AirbyteMessage> expectedRecordsBatch2 = getNRecords(10, 20);
final List<AirbyteMessage> expectedRecordsBatch1 = generateRecords(1_000);
final List<AirbyteMessage> expectedRecordsBatch2 = generateRecords(1_000);

consumer.start();
consumeRecords(consumer, expectedRecordsBatch1);
Expand All @@ -156,8 +157,8 @@ void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception {

@Test
void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception {
final List<AirbyteMessage> expectedRecordsBatch1 = getNRecords(10);
final List<AirbyteMessage> expectedRecordsBatch2 = getNRecords(10, 20);
final List<AirbyteMessage> expectedRecordsBatch1 = generateRecords(1_000);
final List<AirbyteMessage> expectedRecordsBatch2 = generateRecords(1_000);

// consumer with big enough buffered that we see both batches are flushed in one go.
final BufferedStreamConsumer consumer = new BufferedStreamConsumer(
Expand All @@ -167,7 +168,7 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception
onClose,
CATALOG,
isValidRecord,
10000);
10_000);

consumer.start();
consumeRecords(consumer, expectedRecordsBatch1);
Expand All @@ -188,9 +189,9 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception

@Test
void testExceptionAfterOneStateMessage() throws Exception {
final List<AirbyteMessage> expectedRecordsBatch1 = getNRecords(10);
final List<AirbyteMessage> expectedRecordsBatch2 = getNRecords(10, 20);
final List<AirbyteMessage> expectedRecordsBatch3 = getNRecords(20, 21);
final List<AirbyteMessage> expectedRecordsBatch1 = generateRecords(1_000);
final List<AirbyteMessage> expectedRecordsBatch2 = generateRecords(1_000);
final List<AirbyteMessage> expectedRecordsBatch3 = generateRecords(1_000);

consumer.start();
consumeRecords(consumer, expectedRecordsBatch1);
Expand All @@ -209,9 +210,9 @@ void testExceptionAfterOneStateMessage() throws Exception {

@Test
void testExceptionAfterNoStateMessages() throws Exception {
final List<AirbyteMessage> expectedRecordsBatch1 = getNRecords(10);
final List<AirbyteMessage> expectedRecordsBatch2 = getNRecords(10, 20);
final List<AirbyteMessage> expectedRecordsBatch3 = getNRecords(20, 21);
final List<AirbyteMessage> expectedRecordsBatch1 = generateRecords(1_000);
final List<AirbyteMessage> expectedRecordsBatch2 = generateRecords(1_000);
final List<AirbyteMessage> expectedRecordsBatch3 = generateRecords(1_000);

consumer.start();
consumeRecords(consumer, expectedRecordsBatch1);
Expand All @@ -232,14 +233,14 @@ void testExceptionAfterNoStateMessages() throws Exception {
void testExceptionDuringOnClose() throws Exception {
doThrow(new IllegalStateException("induced exception")).when(onClose).accept(false);

final List<AirbyteMessage> expectedRecordsBatch1 = getNRecords(10);
final List<AirbyteMessage> expectedRecordsBatch2 = getNRecords(10, 20);
final List<AirbyteMessage> expectedRecordsBatch1 = generateRecords(1_000);
final List<AirbyteMessage> expectedRecordsBatch2 = generateRecords(1_000);

consumer.start();
consumeRecords(consumer, expectedRecordsBatch1);
consumer.accept(STATE_MESSAGE1);
consumeRecords(consumer, expectedRecordsBatch2);
assertThrows(IllegalStateException.class, () -> consumer.close());
assertThrows(IllegalStateException.class, () -> consumer.close(), "Expected an error to be thrown on close");

verifyStartAndClose();

Expand All @@ -250,7 +251,7 @@ void testExceptionDuringOnClose() throws Exception {

@Test
void test2StreamWith1State() throws Exception {
final List<AirbyteMessage> expectedRecordsStream1 = getNRecords(10);
final List<AirbyteMessage> expectedRecordsStream1 = generateRecords(1_000);
final List<AirbyteMessage> expectedRecordsStream2 = expectedRecordsStream1
.stream()
.map(Jsons::clone)
Expand All @@ -273,7 +274,7 @@ void test2StreamWith1State() throws Exception {

@Test
void test2StreamWith2State() throws Exception {
final List<AirbyteMessage> expectedRecordsStream1 = getNRecords(10);
final List<AirbyteMessage> expectedRecordsStream1 = generateRecords(1_000);
final List<AirbyteMessage> expectedRecordsStream2 = expectedRecordsStream1
.stream()
.map(Jsons::clone)
Expand Down Expand Up @@ -310,21 +311,27 @@ private static void consumeRecords(final BufferedStreamConsumer consumer, final
});
}

private static List<AirbyteMessage> getNRecords(final int endExclusive) {
return getNRecords(0, endExclusive);
}

private static List<AirbyteMessage> getNRecords(final int startInclusive, final int endExclusive) {
return IntStream.range(startInclusive, endExclusive)
.boxed()
.map(i -> new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(STREAM_NAME)
.withNamespace(SCHEMA_NAME)
.withEmittedAt(Instant.now().toEpochMilli())
.withData(Jsons.jsonNode(ImmutableMap.of("id", i, "name", "human " + i)))))
.collect(Collectors.toList());
private static List<AirbyteMessage> generateRecords(final long targetSizeInBytes) {
List<AirbyteMessage> output = Lists.newArrayList();
long bytesCounter = 0;
for (int i = 0;; i++) {
JsonNode payload = Jsons.jsonNode(ImmutableMap.of("id", RandomStringUtils.randomAscii(7), "name", "human " + i));
long sizeInBytes = ByteUtils.getSizeInBytes(Jsons.serialize(payload));
bytesCounter += sizeInBytes;
AirbyteMessage airbyteMessage = new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(STREAM_NAME)
.withNamespace(SCHEMA_NAME)
.withEmittedAt(Instant.now().toEpochMilli())
.withData(payload));
if (bytesCounter > targetSizeInBytes) {
break;
} else {
output.add(airbyteMessage);
}
}
return output;
}

private void verifyRecords(final String streamName, final String namespace, final Collection<AirbyteMessage> expectedRecords) throws Exception {
Expand Down

0 comments on commit 62992bf

Please sign in to comment.