Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18483][kinesis] Test coverage improvements for FlinkKinesisConsumer/ShardConsumer #12850

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions flink-connectors/flink-connector-kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-aggregator</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
Expand All @@ -37,12 +36,24 @@
import org.mockito.Mockito;

import java.math.BigInteger;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

/**
* Tests for the {@link ShardConsumer}.
Expand All @@ -51,148 +62,123 @@ public class ShardConsumerTest {

@Test
public void testMetricsReporting() {
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);

LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(
KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard,
new SequenceNumber("fakeStartingState")));
KinesisProxyInterface kinesis = FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(500, 5, 500);

TestSourceContext<String> sourceContext = new TestSourceContext<>();

KinesisDeserializationSchemaWrapper<String> deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
new SimpleStringSchema());
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
Collections.singletonList("fakeStream"),
sourceContext,
new Properties(),
deserializationSchema,
10,
2,
new AtomicReference<>(),
subscribedShardsStateUnderTest,
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
ShardMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(500, kinesis, fakeSequenceNumber());
assertEquals(500, metrics.getMillisBehindLatest());
assertEquals(10000, metrics.getMaxNumberOfRecordsPerFetch());
}

ShardMetricsReporter shardMetricsReporter = new ShardMetricsReporter();
long millisBehindLatest = 500L;
new ShardConsumer<>(
fetcher,
0,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, millisBehindLatest),
shardMetricsReporter,
deserializationSchema)
.run();
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceNumber() throws Exception {
KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L));

// the millisBehindLatest metric should have been reported
assertEquals(millisBehindLatest, shardMetricsReporter.getMillisBehindLatest());
assertNumberOfMessagesReceivedFromKinesis(1000, kinesis, fakeSequenceNumber());
verify(kinesis).getShardIterator(any(), eq("AFTER_SEQUENCE_NUMBER"), eq("fakeStartingState"));
}

@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedState() {
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceSentinelTimestamp() throws Exception {
String format = "yyyy-MM-dd'T'HH:mm";
String timestamp = "2020-07-02T09:14";
Date expectedTimestamp = new SimpleDateFormat(format).parse(timestamp);

LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
Properties consumerProperties = new Properties();
consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, timestamp);
consumerProperties.setProperty(STREAM_TIMESTAMP_DATE_FORMAT, format);
SequenceNumber sequenceNumber = SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get();

TestSourceContext<String> sourceContext = new TestSourceContext<>();
KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(10, 1, 0));

KinesisDeserializationSchemaWrapper<String> deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
new SimpleStringSchema());
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
Collections.singletonList("fakeStream"),
sourceContext,
new Properties(),
deserializationSchema,
10,
2,
new AtomicReference<>(),
subscribedShardsStateUnderTest,
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
assertNumberOfMessagesReceivedFromKinesis(10, kinesis, sequenceNumber, consumerProperties);
verify(kinesis).getShardIterator(any(), eq("AT_TIMESTAMP"), eq(expectedTimestamp));
}

int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9, 500L),
new ShardMetricsReporter(),
deserializationSchema)
.run();
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceSentinelEarliest() throws Exception {
SequenceNumber sequenceNumber = SENTINEL_EARLIEST_SEQUENCE_NUM.get();

assertEquals(1000, sourceContext.getCollectedOutputs().size());
assertEquals(
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(50, 2, 0));

assertNumberOfMessagesReceivedFromKinesis(50, kinesis, sequenceNumber);
verify(kinesis).getShardIterator(any(), eq("TRIM_HORIZON"), eq(null));
}

@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
KinesisProxyInterface kinesis = FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000, 9, 7, 500L);

LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
// Get a total of 1000 records with 9 getRecords() calls,
// and the 7th getRecords() call will encounter an unexpected expired shard iterator
assertNumberOfMessagesReceivedFromKinesis(1000, kinesis, fakeSequenceNumber());
}

TestSourceContext<String> sourceContext = new TestSourceContext<>();
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(SHARD_USE_ADAPTIVE_READS, "true");

KinesisDeserializationSchemaWrapper<String> deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
new SimpleStringSchema());
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
Collections.singletonList("fakeStream"),
sourceContext,
new Properties(),
deserializationSchema,
10,
2,
new AtomicReference<>(),
subscribedShardsStateUnderTest,
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
KinesisProxyInterface kinesis = FakeKinesisBehavioursFactory.initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(10, 2, 500L);

int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
// Get a total of 1000 records with 9 getRecords() calls,
// and the 7th getRecords() call will encounter an unexpected expired shard iterator
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
1000, 9, 7, 500L),
new ShardMetricsReporter(),
deserializationSchema)
.run();
// Avg record size for first batch --> 10 * 10 Kb/10 = 10 Kb
// Number of records fetched in second batch --> 2 Mb/10Kb * 5 = 40
// Total number of records = 10 + 40 = 50
assertNumberOfMessagesReceivedFromKinesis(50, kinesis, fakeSequenceNumber(), consumerProperties);
}

assertEquals(1000, sourceContext.getCollectedOutputs().size());
assertEquals(
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAggregatedRecords() throws Exception {
KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.aggregatedRecords(3, 5, 10));

// Expecting to receive all messages
// 10 batches of 3 aggregated records each with 5 child records
// 10 * 3 * 5 = 150
ShardMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(150, kinesis, fakeSequenceNumber());
assertEquals(3, metrics.getNumberOfAggregatedRecords());
assertEquals(15, metrics.getNumberOfDeaggregatedRecords());

verify(kinesis).getShardIterator(any(), eq("AFTER_SEQUENCE_NUMBER"), eq("fakeStartingState"));
}

@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
Properties consumerProperties = new Properties();
consumerProperties.put("flink.shard.adaptivereads", "true");
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAggregatedRecordsWithSubSequenceStartingNumber() throws Exception {
SequenceNumber sequenceNumber = new SequenceNumber("0", 5);
KinesisProxyInterface kinesis = spy(FakeKinesisBehavioursFactory.aggregatedRecords(1, 10, 5));

// Expecting to start consuming from last sub sequence number
// 5 batches of 1 aggregated record each with 10 child records
// Last consumed message was sub-sequence 5 (6/10) (zero based) (remaining are 6, 7, 8, 9)
// 5 * 1 * 10 - 6 = 44
ShardMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(44, kinesis, sequenceNumber);
assertEquals(1, metrics.getNumberOfAggregatedRecords());
assertEquals(10, metrics.getNumberOfDeaggregatedRecords());

verify(kinesis).getShardIterator(any(), eq("AT_SEQUENCE_NUMBER"), eq("0"));
}

private SequenceNumber fakeSequenceNumber() {
return new SequenceNumber("fakeStartingState");
}

private ShardMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
final int expectedNumberOfMessages,
final KinesisProxyInterface kinesis,
final SequenceNumber startingSequenceNumber) {
return assertNumberOfMessagesReceivedFromKinesis(expectedNumberOfMessages, kinesis, startingSequenceNumber, new Properties());
}

private ShardMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
final int expectedNumberOfMessages,
final KinesisProxyInterface kinesis,
final SequenceNumber startingSequenceNumber,
final Properties consumerProperties) {
ShardMetricsReporter shardMetricsReporter = new ShardMetricsReporter();
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);

LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
fakeToBeConsumedShard, startingSequenceNumber));

TestSourceContext<String> sourceContext = new TestSourceContext<>();

Expand All @@ -217,19 +203,17 @@ public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
shardIndex,
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
// Initial number of records to fetch --> 10
FakeKinesisBehavioursFactory.initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(10, 2, 500L),
new ShardMetricsReporter(),
kinesis,
shardMetricsReporter,
deserializationSchema)
.run();

// Avg record size for first batch --> 10 * 10 Kb/10 = 10 Kb
// Number of records fetched in second batch --> 2 Mb/10Kb * 5 = 40
// Total number of records = 10 + 40 = 50
assertEquals(50, sourceContext.getCollectedOutputs().size());
assertEquals(expectedNumberOfMessages, sourceContext.getCollectedOutputs().size());
assertEquals(
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());

return shardMetricsReporter;
}

private static StreamShardHandle getMockStreamShard(String streamName, int shardId) {
Expand Down