From 62992bff8b766bc1cded5e07e09a5e0142be21bd Mon Sep 17 00:00:00 2001 From: "Sherif A. Nada" Date: Tue, 9 Nov 2021 12:49:59 -0800 Subject: [PATCH] Fix BufferedStreamConsumer tests (#7773) --- .../io/airbyte/commons/bytes/ByteUtils.java | 13 +++ .../airbyte/commons/bytes/ByteUtilsTest.java | 23 ++++++ .../BufferedStreamConsumer.java | 3 +- .../BufferedStreamConsumerTest.java | 79 ++++++++++--------- 4 files changed, 81 insertions(+), 37 deletions(-) create mode 100644 airbyte-commons/src/main/java/io/airbyte/commons/bytes/ByteUtils.java create mode 100644 airbyte-commons/src/test/java/io/airbyte/commons/bytes/ByteUtilsTest.java diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/bytes/ByteUtils.java b/airbyte-commons/src/main/java/io/airbyte/commons/bytes/ByteUtils.java new file mode 100644 index 0000000000000..320c4978f45d5 --- /dev/null +++ b/airbyte-commons/src/main/java/io/airbyte/commons/bytes/ByteUtils.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.bytes; + +public class ByteUtils { + + public static long getSizeInBytes(String s) { + return s.length() * 2; // by default UTF-8 encoding takes 2bytes per char + } + +} diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/bytes/ByteUtilsTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/bytes/ByteUtilsTest.java new file mode 100644 index 0000000000000..423dfb08a618e --- /dev/null +++ b/airbyte-commons/src/test/java/io/airbyte/commons/bytes/ByteUtilsTest.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.bytes; + +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.Test; + +class ByteUtilsTest { + + @Test + public void testIt() { + for (int i = 1; i < 1000; i++) { + String s = RandomStringUtils.random(i); + // for now the formula is just hardcoded to str length * 2 + assertEquals(i * 2, ByteUtils.getSizeInBytes(s)); + } + } + +} diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index d2581f5321acd..ac6fdf9382d7e 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; +import io.airbyte.commons.bytes.ByteUtils; import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedFunction; @@ -146,7 +147,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception { // are serialized again when writing to // the destination // TODO use a smarter way of estimating byte size rather than always multiply by two - long messageSizeInBytes = Jsons.serialize(recordMessage.getData()).length() * 2; // Strings serialize to UTF-8 by default + long messageSizeInBytes = ByteUtils.getSizeInBytes(Jsons.serialize(recordMessage.getData())); if (bufferSizeInBytes + messageSizeInBytes >= maxQueueSizeInBytes) { flushQueueToDestination(); bufferSizeInBytes = 0; diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java index 024cf33b3ea80..d8e169deea189 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import io.airbyte.commons.bytes.ByteUtils; import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedFunction; @@ -34,9 +35,9 @@ import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils; public class BufferedStreamConsumerTest { @@ -84,14 +85,14 @@ void setup() throws Exception { onClose, CATALOG, isValidRecord, - 10000); + 1_000); when(isValidRecord.apply(any())).thenReturn(true); } @Test void test1StreamWith1State() throws Exception { - final List expectedRecords = getNRecords(10); + final List expectedRecords = generateRecords(1_000); consumer.start(); consumeRecords(consumer, expectedRecords); @@ -107,7 +108,7 @@ void test1StreamWith1State() throws Exception { @Test void test1StreamWith2State() throws Exception { - final List expectedRecords = getNRecords(10); + final List expectedRecords = generateRecords(1_000); consumer.start(); consumeRecords(consumer, expectedRecords); @@ -124,7 +125,7 @@ void test1StreamWith2State() throws Exception { @Test void test1StreamWith0State() throws Exception { - final List expectedRecords = getNRecords(10); + final List expectedRecords = generateRecords(1_000); consumer.start(); consumeRecords(consumer, expectedRecords); @@ -137,8 +138,8 @@ void test1StreamWith0State() throws Exception { @Test void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception { - final List expectedRecordsBatch1 = getNRecords(10); - final List expectedRecordsBatch2 = getNRecords(10, 20); + final List expectedRecordsBatch1 = generateRecords(1_000); + final List expectedRecordsBatch2 = generateRecords(1_000); consumer.start(); consumeRecords(consumer, expectedRecordsBatch1); @@ -156,8 +157,8 @@ void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception { @Test void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception { - final List expectedRecordsBatch1 = getNRecords(10); - final List expectedRecordsBatch2 = getNRecords(10, 20); + final List expectedRecordsBatch1 = generateRecords(1_000); + final List expectedRecordsBatch2 = generateRecords(1_000); // consumer with big enough buffered that we see both batches are flushed in one go. final BufferedStreamConsumer consumer = new BufferedStreamConsumer( @@ -167,7 +168,7 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception onClose, CATALOG, isValidRecord, - 10000); + 10_000); consumer.start(); consumeRecords(consumer, expectedRecordsBatch1); @@ -188,9 +189,9 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception @Test void testExceptionAfterOneStateMessage() throws Exception { - final List expectedRecordsBatch1 = getNRecords(10); - final List expectedRecordsBatch2 = getNRecords(10, 20); - final List expectedRecordsBatch3 = getNRecords(20, 21); + final List expectedRecordsBatch1 = generateRecords(1_000); + final List expectedRecordsBatch2 = generateRecords(1_000); + final List expectedRecordsBatch3 = generateRecords(1_000); consumer.start(); consumeRecords(consumer, expectedRecordsBatch1); @@ -209,9 +210,9 @@ void testExceptionAfterOneStateMessage() throws Exception { @Test void testExceptionAfterNoStateMessages() throws Exception { - final List expectedRecordsBatch1 = getNRecords(10); - final List expectedRecordsBatch2 = getNRecords(10, 20); - final List expectedRecordsBatch3 = getNRecords(20, 21); + final List expectedRecordsBatch1 = generateRecords(1_000); + final List expectedRecordsBatch2 = generateRecords(1_000); + final List expectedRecordsBatch3 = generateRecords(1_000); consumer.start(); consumeRecords(consumer, expectedRecordsBatch1); @@ -232,14 +233,14 @@ void testExceptionAfterNoStateMessages() throws Exception { void testExceptionDuringOnClose() throws Exception { doThrow(new IllegalStateException("induced exception")).when(onClose).accept(false); - final List expectedRecordsBatch1 = getNRecords(10); - final List expectedRecordsBatch2 = getNRecords(10, 20); + final List expectedRecordsBatch1 = generateRecords(1_000); + final List expectedRecordsBatch2 = generateRecords(1_000); consumer.start(); consumeRecords(consumer, expectedRecordsBatch1); consumer.accept(STATE_MESSAGE1); consumeRecords(consumer, expectedRecordsBatch2); - assertThrows(IllegalStateException.class, () -> consumer.close()); + assertThrows(IllegalStateException.class, () -> consumer.close(), "Expected an error to be thrown on close"); verifyStartAndClose(); @@ -250,7 +251,7 @@ void testExceptionDuringOnClose() throws Exception { @Test void test2StreamWith1State() throws Exception { - final List expectedRecordsStream1 = getNRecords(10); + final List expectedRecordsStream1 = generateRecords(1_000); final List expectedRecordsStream2 = expectedRecordsStream1 .stream() .map(Jsons::clone) @@ -273,7 +274,7 @@ void test2StreamWith1State() throws Exception { @Test void test2StreamWith2State() throws Exception { - final List expectedRecordsStream1 = getNRecords(10); + final List expectedRecordsStream1 = generateRecords(1_000); final List expectedRecordsStream2 = expectedRecordsStream1 .stream() .map(Jsons::clone) @@ -310,21 +311,27 @@ private static void consumeRecords(final BufferedStreamConsumer consumer, final }); } - private static List getNRecords(final int endExclusive) { - return getNRecords(0, endExclusive); - } - - private static List getNRecords(final int startInclusive, final int endExclusive) { - return IntStream.range(startInclusive, endExclusive) - .boxed() - .map(i -> new AirbyteMessage() - .withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage() - .withStream(STREAM_NAME) - .withNamespace(SCHEMA_NAME) - .withEmittedAt(Instant.now().toEpochMilli()) - .withData(Jsons.jsonNode(ImmutableMap.of("id", i, "name", "human " + i))))) - .collect(Collectors.toList()); + private static List generateRecords(final long targetSizeInBytes) { + List output = Lists.newArrayList(); + long bytesCounter = 0; + for (int i = 0;; i++) { + JsonNode payload = Jsons.jsonNode(ImmutableMap.of("id", RandomStringUtils.randomAscii(7), "name", "human " + i)); + long sizeInBytes = ByteUtils.getSizeInBytes(Jsons.serialize(payload)); + bytesCounter += sizeInBytes; + AirbyteMessage airbyteMessage = new AirbyteMessage() + .withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream(STREAM_NAME) + .withNamespace(SCHEMA_NAME) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(payload)); + if (bytesCounter > targetSizeInBytes) { + break; + } else { + output.add(airbyteMessage); + } + } + return output; } private void verifyRecords(final String streamName, final String namespace, final Collection expectedRecords) throws Exception {