Skip to content

Commit

Permalink
Add additional integration tests for multistream and cross account (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lucienlu-aws committed Apr 30, 2024
1 parent 96be30b commit c12cee2
Show file tree
Hide file tree
Showing 22 changed files with 890 additions and 144 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d
this command: `mvn clean install -Dgpg.skip=true`.
Note: This command does not run integration tests.

To disable running unit tests in the build, add the property `-Dskip.ut=true`.

## Running Integration Tests

Note that running integration tests creates AWS resources.
Integration tests require valid AWS credentials.
This will look for a default AWS profile specified in your local `.aws/credentials`.
To run all integration tests: `mvn verify -DskipITs=false`.
To run one integration tests: `mvn -Dit.test=*IntegrationTest -DskipITs=false verify`
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn verify -DskipITs=false -DawsProfile="<PROFILE_NAME>"`.
To run one integration tests, specify the integration test class: `mvn -Dit.test="BasicStreamConsumerIntegrationTest" -DskipITs=false verify`
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -DskipITs=false -DawsProfile="<PROFILE_NAME>" verify`.

## Integration with the Kinesis Producer Library
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.
Expand Down
1 change: 1 addition & 0 deletions amazon-kinesis-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<skipTests>${skip.ut}</skipTests>
<skipITs>${skipITs}</skipITs>
<excludes>
<exclude>**/*IntegrationTest.java</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
Expand All @@ -18,6 +19,7 @@
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.config.RetrievalMode;
import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.leases.LeaseManagementConfig;
Expand All @@ -33,6 +35,7 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -45,8 +48,9 @@
public class TestConsumer {
public final KCLAppConfig consumerConfig;
public final Region region;
public final String streamName;
public final List<String> streamNames;
public final KinesisAsyncClient kinesisClient;
public final KinesisAsyncClient kinesisClientForStreamOwner;
private MetricsConfig metricsConfig;
private RetrievalConfig retrievalConfig;
private CheckpointConfig checkpointConfig;
Expand All @@ -67,24 +71,30 @@ public class TestConsumer {
public TestConsumer(KCLAppConfig consumerConfig) throws Exception {
this.consumerConfig = consumerConfig;
this.region = consumerConfig.getRegion();
this.streamName = consumerConfig.getStreamName();
this.kinesisClient = consumerConfig.buildAsyncKinesisClient();
this.streamNames = consumerConfig.getStreamNames();
this.kinesisClientForStreamOwner = consumerConfig.buildAsyncKinesisClientForStreamOwner();
this.kinesisClient = consumerConfig.buildAsyncKinesisClientForConsumer();
this.dynamoClient = consumerConfig.buildAsyncDynamoDbClient();
}

public void run() throws Exception {

if (consumerConfig.isCrossAccount()) {
verifyCrossAccountCreds();
}

final StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig);
final LeaseTableManager leaseTableManager = new LeaseTableManager(this.dynamoClient);

// Clean up any old streams or lease tables left in test environment
cleanTestResources(streamExistenceManager, leaseTableManager);

// Check if stream is created. If not, create it
streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName);
streamExistenceManager.checkStreamsAndCreateIfNecessary();
Map<Arn, Arn> streamToConsumerArnsMap = streamExistenceManager.createCrossAccountConsumerIfNecessary();

startProducer();
setUpConsumerResources();
setUpConsumerResources(streamToConsumerArnsMap);

try {
startConsumer();
Expand Down Expand Up @@ -116,6 +126,13 @@ public void run() throws Exception {
}
}

private void verifyCrossAccountCreds() {
if (consumerConfig.getCrossAccountCredentialsProvider() == null) {
throw new RuntimeException("To run cross account integration tests, pass in an AWS profile with -D" +
KCLAppConfig.CROSS_ACCOUNT_PROFILE_PROPERTY);
}
}

private void cleanTestResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
log.info("----------Before starting, Cleaning test environment----------");
log.info("----------Deleting all lease tables in account----------");
Expand All @@ -135,25 +152,35 @@ private void startProducer() {
if (consumerConfig.getReshardFactorList() != null) {
log.info("----Reshard Config found: {}", consumerConfig.getReshardFactorList());

final StreamScaler s = new StreamScaler(
kinesisClient,
consumerConfig.getStreamName(),
consumerConfig.getReshardFactorList(),
consumerConfig
);
for (String streamName : consumerConfig.getStreamNames()) {
final StreamScaler streamScaler = new StreamScaler(kinesisClientForStreamOwner, streamName,
consumerConfig.getReshardFactorList(), consumerConfig);

// Schedule the stream scales 4 minutes apart with 2 minute starting delay
for (int i = 0; i < consumerConfig.getReshardFactorList().size(); i++) {
producerExecutor.schedule(s, (4 * i) + 2, TimeUnit.MINUTES);
// Schedule the stream scales 4 minutes apart with 2 minute starting delay
for (int i = 0; i < consumerConfig.getReshardFactorList()
.size(); i++) {
producerExecutor.schedule(streamScaler, (4 * i) + 2, TimeUnit.MINUTES);
}
}
}
}

private void setUpConsumerResources() throws Exception {
private void setUpConsumerResources(Map<Arn, Arn> streamToConsumerArnsMap) throws Exception {
// Setup configuration of KCL (including DynamoDB and CloudWatch)
final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder();
final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder(streamToConsumerArnsMap);

// For polling mode in both CAA and non CAA, set retrievalSpecificConfig to use PollingConfig
// For SingleStreamMode EFO CAA, must set the retrieval config to specify the consumerArn in FanoutConfig
// For MultiStream EFO CAA, the consumerArn can be set in StreamConfig
if (consumerConfig.getRetrievalMode().equals(RetrievalMode.POLLING)) {
retrievalConfig = consumerConfig.getRetrievalConfig(configsBuilder, null);
} else if (consumerConfig.isCrossAccount()) {
retrievalConfig = consumerConfig.getRetrievalConfig(configsBuilder,
streamToConsumerArnsMap);
} else {
retrievalConfig = configsBuilder.retrievalConfig();
}

retrievalConfig = consumerConfig.getRetrievalConfig();
checkpointConfig = configsBuilder.checkpointConfig();
coordinatorConfig = configsBuilder.coordinatorConfig();
leaseManagementConfig = configsBuilder.leaseManagementConfig()
Expand Down Expand Up @@ -194,23 +221,27 @@ private void stopProducer() {
}

public void publishRecord() {
final PutRecordRequest request;
try {
request = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(this.streamName)
.data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024 is 1 KB
.build();
kinesisClient.putRecord(request).get();

// Increment the payload counter if the putRecord call was successful
payloadCounter = payloadCounter.add(new BigInteger("1"));
successfulPutRecords += 1;
log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords);
} catch (InterruptedException e) {
log.info("Interrupted, assuming shutdown. ", e);
} catch (ExecutionException | RuntimeException e) {
log.error("Error during publish records", e);
for (String streamName : consumerConfig.getStreamNames()) {
try {
final PutRecordRequest request = PutRecordRequest.builder()
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
.streamName(streamName)
.data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024
// is 1 KB
.build();
kinesisClientForStreamOwner.putRecord(request)
.get();

// Increment the payload counter if the putRecord call was successful
payloadCounter = payloadCounter.add(new BigInteger("1"));
successfulPutRecords += 1;
log.info("---------Record published for stream {}, successfulPutRecords is now: {}",
streamName, successfulPutRecords);
} catch (InterruptedException e) {
log.info("Interrupted, assuming shutdown. ", e);
} catch (ExecutionException | RuntimeException e) {
log.error("Error during publish records", e);
}
}
}

Expand Down Expand Up @@ -248,10 +279,13 @@ private void validateRecordProcessor() throws Exception {
}

private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
log.info("-------------Start deleting stream.---------");
streamExistenceManager.deleteResource(this.streamName);
log.info("-------------Start deleting streams.---------");
for (String streamName : consumerConfig.getStreamNames()) {
log.info("Deleting stream {}", streamName);
streamExistenceManager.deleteResource(streamName);
}
log.info("---------Start deleting lease table.---------");
leaseTableManager.deleteResource(this.consumerConfig.getStreamName());
leaseTableManager.deleteResource(consumerConfig.getApplicationName());
log.info("---------Finished deleting resources.---------");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
Expand All @@ -23,12 +24,15 @@ public class TestRecordProcessor implements ShardRecordProcessor {

private static final String SHARD_ID_MDC_KEY = "ShardId";

private StreamIdentifier streamIdentifier;

private String shardId;

private final RecordValidatorQueue recordValidator;

public TestRecordProcessor(RecordValidatorQueue recordValidator) {
public TestRecordProcessor(StreamIdentifier streamIdentifier, RecordValidatorQueue recordValidator) {
this.recordValidator = recordValidator;
this.streamIdentifier = streamIdentifier;
}

@Override
Expand All @@ -51,8 +55,9 @@ public void processRecords(ProcessRecordsInput processRecordsInput) {

for (KinesisClientRecord kinesisRecord : processRecordsInput.records()) {
final String data = new String(asByteArray(kinesisRecord.data()));
log.info("Processing record pk: {}", data);
recordValidator.add(shardId, data);
log.info("Processing record pk for stream {}: {}", streamIdentifier.streamName(), data);
String recordValidatorKey = streamIdentifier.toString() + "-" + shardId;
recordValidator.add(recordValidatorKey, data);
}

} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package software.amazon.kinesis.application;

import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.utils.RecordValidatorQueue;
Expand All @@ -14,7 +15,12 @@ public TestRecordProcessorFactory(RecordValidatorQueue recordValidator) {

@Override
public ShardRecordProcessor shardRecordProcessor() {
return new TestRecordProcessor(this.recordValidator);
return new TestRecordProcessor(null, this.recordValidator);
}

@Override
public ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) {
return new TestRecordProcessor(streamIdentifier, this.recordValidator);
}

}
Loading

0 comments on commit c12cee2

Please sign in to comment.