Skip to content

Commit

Permalink
[FLINK-18483][kinesis] Test coverage improvements for FlinkKinesisCon…
Browse files Browse the repository at this point in the history
…sumer/ShardConsumer
  • Loading branch information
dannycranmer committed Jul 8, 2020
1 parent 2210aff commit d2297eb
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 182 deletions.
7 changes: 7 additions & 0 deletions flink-connectors/flink-connector-kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-aggregator</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
Expand All @@ -37,12 +36,23 @@
import org.mockito.Mockito;

import java.math.BigInteger;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

/**
* Tests for the {@link ShardConsumer}.
Expand All @@ -51,148 +61,123 @@ public class ShardConsumerTest {

@Test
public void testMetricsReporting() {
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);

LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard,
new SequenceNumber("fakeStartingState")));
KinesisProxyInterface kinesis = FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(500, 5, 500);

TestSourceContext<String> sourceContext = new TestSourceContext<>();

KinesisDeserializationSchemaWrapper<String> deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
new SimpleStringSchema());
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
Collections.singletonList("fakeStream"),
sourceContext,
new Properties(),
deserializationSchema,
10,
2,
new AtomicReference<>(),
subscribedShardsStateUnderTest,
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
ShardMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(500, kinesis, fakeSequenceNumber());
assertEquals(500, metrics.getMillisBehindLatest());
assertEquals(10000, metrics.getMaxNumberOfRecordsPerFetch());
}

ShardMetricsReporter shardMetricsReporter = new ShardMetricsReporter();
long millisBehindLatest = 500L;
new ShardConsumer<>(
fetcher,
0,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, millisBehindLatest),
shardMetricsReporter,
deserializationSchema)
.run();
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceNumber() throws Exception {
KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L));

// the millisBehindLatest metric should have been reported
assertEquals(millisBehindLatest, shardMetricsReporter.getMillisBehindLatest());
assertNumberOfMessagesReceivedFromKinesis(1000, kinesis, fakeSequenceNumber());
verify(kinesis).getShardIterator(any(), eq("AFTER_SEQUENCE_NUMBER"), eq("fakeStartingState"));
}

@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedState() {
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceSentinelTimestamp() throws Exception {
String format = "yyyy-MM-dd'T'HH:mm";
String timestamp = "2020-07-02T09:14";
Date expectedTimestamp = new SimpleDateFormat(format).parse(timestamp);

LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
Properties consumerProperties = new Properties();
consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, timestamp);
consumerProperties.setProperty(STREAM_TIMESTAMP_DATE_FORMAT, format);
SequenceNumber sequenceNumber = SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get();

TestSourceContext<String> sourceContext = new TestSourceContext<>();
KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(10, 1, 0));

KinesisDeserializationSchemaWrapper<String> deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
new SimpleStringSchema());
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
Collections.singletonList("fakeStream"),
sourceContext,
new Properties(),
deserializationSchema,
10,
2,
new AtomicReference<>(),
subscribedShardsStateUnderTest,
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
assertNumberOfMessagesReceivedFromKinesis(10, kinesis, sequenceNumber, consumerProperties);
verify(kinesis).getShardIterator(any(), eq("AT_TIMESTAMP"), eq(expectedTimestamp));
}

int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L),
new ShardMetricsReporter(),
deserializationSchema)
.run();
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceSentinelEarliest() throws Exception {
SequenceNumber sequenceNumber = SENTINEL_EARLIEST_SEQUENCE_NUM.get();

assertEquals(1000, sourceContext.getCollectedOutputs().size());
assertEquals(
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(50, 2, 0));

assertNumberOfMessagesReceivedFromKinesis(50, kinesis, sequenceNumber);
verify(kinesis).getShardIterator(any(), eq("TRIM_HORIZON"), eq(null));
}

@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
KinesisProxyInterface kinesis = FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000, 9, 7, 500L);

LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
// Get a total of 1000 records with 9 getRecords() calls,
// and the 7th getRecords() call will encounter an unexpected expired shard iterator
assertNumberOfMessagesReceivedFromKinesis(1000, kinesis, fakeSequenceNumber());
}

TestSourceContext<String> sourceContext = new TestSourceContext<>();
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
Properties consumerProperties = new Properties();
consumerProperties.setProperty("flink.shard.adaptivereads", "true");

KinesisDeserializationSchemaWrapper<String> deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
new SimpleStringSchema());
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
Collections.singletonList("fakeStream"),
sourceContext,
new Properties(),
deserializationSchema,
10,
2,
new AtomicReference<>(),
subscribedShardsStateUnderTest,
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
KinesisProxyInterface kinesis = FakeKinesisBehavioursFactory.initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(10, 2, 500L);

int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
// Get a total of 1000 records with 9 getRecords() calls,
// and the 7th getRecords() call will encounter an unexpected expired shard iterator
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
1000, 9, 7, 500L),
new ShardMetricsReporter(),
deserializationSchema)
.run();
// Avg record size for first batch --> 10 * 10 Kb/10 = 10 Kb
// Number of records fetched in second batch --> 2 Mb/10Kb * 5 = 40
// Total number of records = 10 + 40 = 50
assertNumberOfMessagesReceivedFromKinesis(50, kinesis, fakeSequenceNumber(), consumerProperties);
}

assertEquals(1000, sourceContext.getCollectedOutputs().size());
assertEquals(
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAggregatedRecords() throws Exception {
KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.aggregatedRecords(3, 5, 10));

// Expecting to receive all messages
// 10 batches of 3 aggregated records each with 5 child records
// 10 * 3 * 5 = 150
ShardMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(150, kinesis, fakeSequenceNumber());
assertEquals(3, metrics.getNumberOfAggregatedRecords());
assertEquals(15, metrics.getNumberOfDeaggregatedRecords());

verify(kinesis).getShardIterator(any(), eq("AFTER_SEQUENCE_NUMBER"), eq("fakeStartingState"));
}

@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
Properties consumerProperties = new Properties();
consumerProperties.put("flink.shard.adaptivereads", "true");
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAggregatedRecordsWithSubSequenceStartingNumber() throws Exception {
SequenceNumber sequenceNumber = new SequenceNumber("0", 5);
KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.aggregatedRecords(1, 10, 5));

// Expecting to start consuming from last sub sequence number
// 5 batches of 1 aggregated record each with 10 child records
// Last consumed message was sub-sequence 5 (6/10) (zero based) (remaining are 6, 7, 8, 9)
// 5 * 1 * 10 - 6 = 44
ShardMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(44, kinesis, sequenceNumber);
assertEquals(1, metrics.getNumberOfAggregatedRecords());
assertEquals(10, metrics.getNumberOfDeaggregatedRecords());

verify(kinesis).getShardIterator(any(), eq("AT_SEQUENCE_NUMBER"), eq("0"));
}

private SequenceNumber fakeSequenceNumber() {
return new SequenceNumber("fakeStartingState");
}

private ShardMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
final int expectedNumberOfMessages,
final KinesisProxyInterface kinesis,
final SequenceNumber startingSequenceNumber) {
return assertNumberOfMessagesReceivedFromKinesis(expectedNumberOfMessages, kinesis, startingSequenceNumber, new Properties());
}

private ShardMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
final int expectedNumberOfMessages,
final KinesisProxyInterface kinesis,
final SequenceNumber startingSequenceNumber,
final Properties consumerProperties) {
ShardMetricsReporter shardMetricsReporter = new ShardMetricsReporter();
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);

LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
fakeToBeConsumedShard, startingSequenceNumber));

TestSourceContext<String> sourceContext = new TestSourceContext<>();

Expand All @@ -217,19 +202,17 @@ public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
// Initial number of records to fetch --> 10
FakeKinesisBehavioursFactory.initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(10, 2, 500L),
new ShardMetricsReporter(),
kinesis,
shardMetricsReporter,
deserializationSchema)
.run();

// Avg record size for first batch --> 10 * 10 Kb/10 = 10 Kb
// Number of records fetched in second batch --> 2 Mb/10Kb * 5 = 40
// Total number of records = 10 + 40 = 50
assertEquals(50, sourceContext.getCollectedOutputs().size());
assertEquals(expectedNumberOfMessages, sourceContext.getCollectedOutputs().size());
assertEquals(
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());

return shardMetricsReporter;
}

private static StreamShardHandle getMockStreamShard(String streamName, int shardId) {
Expand Down

0 comments on commit d2297eb

Please sign in to comment.