From c12cee2a1b57116fc9bd5003b1a45863221e66fd Mon Sep 17 00:00:00 2001 From: lucienlu-aws <132623944+lucienlu-aws@users.noreply.github.com> Date: Tue, 30 Apr 2024 11:16:37 -0700 Subject: [PATCH] Add additional integration tests for multistream and cross account (#1313) --- README.md | 6 +- amazon-kinesis-client/pom.xml | 1 + .../kinesis/application/TestConsumer.java | 108 +++++--- .../application/TestRecordProcessor.java | 11 +- .../TestRecordProcessorFactory.java | 8 +- .../amazon/kinesis/config/KCLAppConfig.java | 258 ++++++++++++++++-- .../ReleaseCanaryPollingH1TestConfig.java | 32 +-- .../ReleaseCanaryPollingH2TestConfig.java | 33 +-- ...seCanaryStreamingReshardingTestConfig.java | 23 +- .../ReleaseCanaryStreamingTestConfig.java | 20 +- .../amazon/kinesis/config/RetrievalMode.java | 6 + .../KCLCrossAccountAppConfig.java | 23 ++ ...AccountMultiStreamPollingH2TestConfig.java | 48 ++++ ...AccountMultiStreamStreamingTestConfig.java | 48 ++++ ...CanaryCrossAccountPollingH2TestConfig.java | 43 +++ ...CanaryCrossAccountStreamingTestConfig.java | 42 +++ ...eCanaryMultiStreamPollingH2TestConfig.java | 43 +++ ...eCanaryMultiStreamStreamingTestConfig.java | 41 +++ ...sAccountStreamConsumerIntegrationTest.java | 47 ++++ .../MultiStreamConsumerIntegrationTest.java | 24 ++ .../kinesis/utils/AWSResourceManager.java | 4 +- .../kinesis/utils/StreamExistenceManager.java | 165 +++++++++-- 22 files changed, 890 insertions(+), 144 deletions(-) create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/RetrievalMode.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/KCLCrossAccountAppConfig.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountMultiStreamPollingH2TestConfig.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountMultiStreamStreamingTestConfig.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountPollingH2TestConfig.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountStreamingTestConfig.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/multistream/ReleaseCanaryMultiStreamPollingH2TestConfig.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/multistream/ReleaseCanaryMultiStreamStreamingTestConfig.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/CrossAccountStreamConsumerIntegrationTest.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/MultiStreamConsumerIntegrationTest.java diff --git a/README.md b/README.md index b12fd6673..80aa7f1c5 100644 --- a/README.md +++ b/README.md @@ -38,14 +38,16 @@ After you've downloaded the code from GitHub, you can build it using Maven. To d this command: `mvn clean install -Dgpg.skip=true`. Note: This command does not run integration tests. +To disable running unit tests in the build, add the property `-Dskip.ut=true`. + ## Running Integration Tests Note that running integration tests creates AWS resources. Integration tests require valid AWS credentials. This will look for a default AWS profile specified in your local `.aws/credentials`. To run all integration tests: `mvn verify -DskipITs=false`. -To run one integration tests: `mvn -Dit.test=*IntegrationTest -DskipITs=false verify` -Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn verify -DskipITs=false -DawsProfile=""`. +To run one integration tests, specify the integration test class: `mvn -Dit.test="BasicStreamConsumerIntegrationTest" -DskipITs=false verify` +Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -DskipITs=false -DawsProfile="" verify`. ## Integration with the Kinesis Producer Library For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user. diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 551fe0a40..0a77cf2b0 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -201,6 +201,7 @@ maven-surefire-plugin 3.1.2 + ${skip.ut} ${skipITs} **/*IntegrationTest.java diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java index 3e4e931da..5bfb928af 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java @@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -18,6 +19,7 @@ import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.config.KCLAppConfig; +import software.amazon.kinesis.config.RetrievalMode; import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.leases.LeaseManagementConfig; @@ -33,6 +35,7 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -45,8 +48,9 @@ public class TestConsumer { public final KCLAppConfig consumerConfig; public final Region region; - public final String streamName; + public final List streamNames; public final KinesisAsyncClient kinesisClient; + public final KinesisAsyncClient kinesisClientForStreamOwner; private MetricsConfig metricsConfig; private RetrievalConfig retrievalConfig; private CheckpointConfig checkpointConfig; @@ -67,13 +71,18 @@ public class TestConsumer { public TestConsumer(KCLAppConfig consumerConfig) throws Exception { this.consumerConfig = consumerConfig; this.region = consumerConfig.getRegion(); - this.streamName = consumerConfig.getStreamName(); - this.kinesisClient = consumerConfig.buildAsyncKinesisClient(); + this.streamNames = consumerConfig.getStreamNames(); + this.kinesisClientForStreamOwner = consumerConfig.buildAsyncKinesisClientForStreamOwner(); + this.kinesisClient = consumerConfig.buildAsyncKinesisClientForConsumer(); this.dynamoClient = consumerConfig.buildAsyncDynamoDbClient(); } public void run() throws Exception { + if (consumerConfig.isCrossAccount()) { + verifyCrossAccountCreds(); + } + final StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig); final LeaseTableManager leaseTableManager = new LeaseTableManager(this.dynamoClient); @@ -81,10 +90,11 @@ public void run() throws Exception { cleanTestResources(streamExistenceManager, leaseTableManager); // Check if stream is created. If not, create it - streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName); + streamExistenceManager.checkStreamsAndCreateIfNecessary(); + Map streamToConsumerArnsMap = streamExistenceManager.createCrossAccountConsumerIfNecessary(); startProducer(); - setUpConsumerResources(); + setUpConsumerResources(streamToConsumerArnsMap); try { startConsumer(); @@ -116,6 +126,13 @@ public void run() throws Exception { } } + private void verifyCrossAccountCreds() { + if (consumerConfig.getCrossAccountCredentialsProvider() == null) { + throw new RuntimeException("To run cross account integration tests, pass in an AWS profile with -D" + + KCLAppConfig.CROSS_ACCOUNT_PROFILE_PROPERTY); + } + } + private void cleanTestResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception { log.info("----------Before starting, Cleaning test environment----------"); log.info("----------Deleting all lease tables in account----------"); @@ -135,25 +152,35 @@ private void startProducer() { if (consumerConfig.getReshardFactorList() != null) { log.info("----Reshard Config found: {}", consumerConfig.getReshardFactorList()); - final StreamScaler s = new StreamScaler( - kinesisClient, - consumerConfig.getStreamName(), - consumerConfig.getReshardFactorList(), - consumerConfig - ); + for (String streamName : consumerConfig.getStreamNames()) { + final StreamScaler streamScaler = new StreamScaler(kinesisClientForStreamOwner, streamName, + consumerConfig.getReshardFactorList(), consumerConfig); - // Schedule the stream scales 4 minutes apart with 2 minute starting delay - for (int i = 0; i < consumerConfig.getReshardFactorList().size(); i++) { - producerExecutor.schedule(s, (4 * i) + 2, TimeUnit.MINUTES); + // Schedule the stream scales 4 minutes apart with 2 minute starting delay + for (int i = 0; i < consumerConfig.getReshardFactorList() + .size(); i++) { + producerExecutor.schedule(streamScaler, (4 * i) + 2, TimeUnit.MINUTES); + } } } } - private void setUpConsumerResources() throws Exception { + private void setUpConsumerResources(Map streamToConsumerArnsMap) throws Exception { // Setup configuration of KCL (including DynamoDB and CloudWatch) - final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder(); + final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder(streamToConsumerArnsMap); + + // For polling mode in both CAA and non CAA, set retrievalSpecificConfig to use PollingConfig + // For SingleStreamMode EFO CAA, must set the retrieval config to specify the consumerArn in FanoutConfig + // For MultiStream EFO CAA, the consumerArn can be set in StreamConfig + if (consumerConfig.getRetrievalMode().equals(RetrievalMode.POLLING)) { + retrievalConfig = consumerConfig.getRetrievalConfig(configsBuilder, null); + } else if (consumerConfig.isCrossAccount()) { + retrievalConfig = consumerConfig.getRetrievalConfig(configsBuilder, + streamToConsumerArnsMap); + } else { + retrievalConfig = configsBuilder.retrievalConfig(); + } - retrievalConfig = consumerConfig.getRetrievalConfig(); checkpointConfig = configsBuilder.checkpointConfig(); coordinatorConfig = configsBuilder.coordinatorConfig(); leaseManagementConfig = configsBuilder.leaseManagementConfig() @@ -194,23 +221,27 @@ private void stopProducer() { } public void publishRecord() { - final PutRecordRequest request; - try { - request = PutRecordRequest.builder() - .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) - .streamName(this.streamName) - .data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024 is 1 KB - .build(); - kinesisClient.putRecord(request).get(); - - // Increment the payload counter if the putRecord call was successful - payloadCounter = payloadCounter.add(new BigInteger("1")); - successfulPutRecords += 1; - log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords); - } catch (InterruptedException e) { - log.info("Interrupted, assuming shutdown. ", e); - } catch (ExecutionException | RuntimeException e) { - log.error("Error during publish records", e); + for (String streamName : consumerConfig.getStreamNames()) { + try { + final PutRecordRequest request = PutRecordRequest.builder() + .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) + .streamName(streamName) + .data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024 + // is 1 KB + .build(); + kinesisClientForStreamOwner.putRecord(request) + .get(); + + // Increment the payload counter if the putRecord call was successful + payloadCounter = payloadCounter.add(new BigInteger("1")); + successfulPutRecords += 1; + log.info("---------Record published for stream {}, successfulPutRecords is now: {}", + streamName, successfulPutRecords); + } catch (InterruptedException e) { + log.info("Interrupted, assuming shutdown. ", e); + } catch (ExecutionException | RuntimeException e) { + log.error("Error during publish records", e); + } } } @@ -248,10 +279,13 @@ private void validateRecordProcessor() throws Exception { } private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception { - log.info("-------------Start deleting stream.---------"); - streamExistenceManager.deleteResource(this.streamName); + log.info("-------------Start deleting streams.---------"); + for (String streamName : consumerConfig.getStreamNames()) { + log.info("Deleting stream {}", streamName); + streamExistenceManager.deleteResource(streamName); + } log.info("---------Start deleting lease table.---------"); - leaseTableManager.deleteResource(this.consumerConfig.getStreamName()); + leaseTableManager.deleteResource(consumerConfig.getApplicationName()); log.info("---------Finished deleting resources.---------"); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessor.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessor.java index 0e4dc489d..037f180bd 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessor.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessor.java @@ -2,6 +2,7 @@ import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; @@ -23,12 +24,15 @@ public class TestRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; + private StreamIdentifier streamIdentifier; + private String shardId; private final RecordValidatorQueue recordValidator; - public TestRecordProcessor(RecordValidatorQueue recordValidator) { + public TestRecordProcessor(StreamIdentifier streamIdentifier, RecordValidatorQueue recordValidator) { this.recordValidator = recordValidator; + this.streamIdentifier = streamIdentifier; } @Override @@ -51,8 +55,9 @@ public void processRecords(ProcessRecordsInput processRecordsInput) { for (KinesisClientRecord kinesisRecord : processRecordsInput.records()) { final String data = new String(asByteArray(kinesisRecord.data())); - log.info("Processing record pk: {}", data); - recordValidator.add(shardId, data); + log.info("Processing record pk for stream {}: {}", streamIdentifier.streamName(), data); + String recordValidatorKey = streamIdentifier.toString() + "-" + shardId; + recordValidator.add(recordValidatorKey, data); } } catch (Throwable t) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessorFactory.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessorFactory.java index 4e06890e5..98f50ca09 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessorFactory.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessorFactory.java @@ -1,5 +1,6 @@ package software.amazon.kinesis.application; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.utils.RecordValidatorQueue; @@ -14,7 +15,12 @@ public TestRecordProcessorFactory(RecordValidatorQueue recordValidator) { @Override public ShardRecordProcessor shardRecordProcessor() { - return new TestRecordProcessor(this.recordValidator); + return new TestRecordProcessor(null, this.recordValidator); + } + + @Override + public ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) { + return new TestRecordProcessor(streamIdentifier, this.recordValidator); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java index b5d0c4d11..3168d13bd 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java @@ -1,8 +1,18 @@ package software.amazon.kinesis.config; import lombok.Value; +import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; +import software.amazon.kinesis.processor.MultiStreamTracker; +import software.amazon.kinesis.processor.SingleStreamTracker; +import software.amazon.kinesis.retrieval.RetrievalConfig; +import software.amazon.kinesis.retrieval.fanout.FanOutConfig; +import software.amazon.kinesis.retrieval.polling.PollingConfig; import software.amazon.kinesis.utils.RecordValidatorQueue; import software.amazon.kinesis.utils.ReshardOptions; import software.amazon.kinesis.application.TestRecordProcessorFactory; @@ -19,48 +29,92 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; +import software.amazon.awssdk.services.sts.StsAsyncClient; import software.amazon.awssdk.utils.AttributeMap; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; -import software.amazon.kinesis.retrieval.RetrievalConfig; import java.io.IOException; import java.net.Inet4Address; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.time.Duration; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; /** * Default configuration for a producer or consumer used in integration tests. * Producer: puts records of size 60 KB at an interval of 100 ms * Consumer: streaming configuration (vs polling) that starts processing records at shard horizon */ +@Slf4j public abstract class KCLAppConfig { + public static final String AWS_ACCOUNT_PROFILE_PROPERTY = "awsProfile"; + public static final String CROSS_ACCOUNT_PROFILE_PROPERTY = "awsCrossAccountProfile"; + public static final String CROSS_ACCOUNT_CONSUMER_NAME = "cross-account-consumer"; + public static final String INTEGRATION_TEST_RESOURCE_PREFIX = "KCLIntegrationTest"; - private KinesisAsyncClient kinesisAsyncClient; + private String accountIdForConsumer = null; + private String accountIdForStreamOwner = null; + private List streamNames = null; + private KinesisAsyncClient kinesisAsyncClientForConsumer; + private StsAsyncClient stsAsyncClientForConsumer; + private KinesisAsyncClient kinesisAsyncClientForStreamOwner; + private StsAsyncClient stsAsyncClientForStreamOwner; private DynamoDbAsyncClient dynamoDbAsyncClient; private CloudWatchAsyncClient cloudWatchAsyncClient; private RecordValidatorQueue recordValidator; /** - * Name used for test stream and lease tracker table + * List of Strings, either stream names or valid stream Arns, to be used in testing. For single stream mode, return + * a list of size 1. For multistream mode, return a list of size > 1. */ - public abstract String getStreamName(); + public abstract List getStreamArns(); + + public List getStreamNames() { + if (this.streamNames == null) { + return getStreamArns().stream().map(streamArn -> + streamArn.toString().substring(streamArn.toString().indexOf("/") + 1)) + .collect(Collectors.toList()); + } else { + return this.streamNames; + } + } + + public abstract String getTestName(); + + public String getApplicationName() { + return INTEGRATION_TEST_RESOURCE_PREFIX + getTestName(); + } public int getShardCount() { return 4; } public Region getRegion() { return Region.US_WEST_2; } /** - * "default" profile, should match with profiles listed in "cat ~/.aws/config" + * Gets credentials for passed in profile with "-DawsProfile" which should match "~/.aws/config". Otherwise, + * uses default profile credentials chain. */ private AwsCredentialsProvider getCredentialsProvider() { - final String awsProfile = System.getProperty("awsProfile"); + final String awsProfile = System.getProperty(AWS_ACCOUNT_PROFILE_PROPERTY); return (awsProfile != null) ? ProfileCredentialsProvider.builder().profileName(awsProfile).build() : DefaultCredentialsProvider.create(); } + public boolean isCrossAccount() { + return false; + } + + public AwsCredentialsProvider getCrossAccountCredentialsProvider() { + return null; + } + public InitialPositionInStream getInitialPosition() { return InitialPositionInStream.TRIM_HORIZON; } @@ -80,28 +134,106 @@ public List getReshardFactorList() { return null; } - public final KinesisAsyncClient buildAsyncKinesisClient() throws URISyntaxException, IOException { - if (kinesisAsyncClient == null) { - // Setup H2 client config. - final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder() - .maxConcurrency(Integer.MAX_VALUE); + public String getAccountIdForConsumer() { + if (this.accountIdForConsumer == null) { + try { + this.accountIdForConsumer = FutureUtils.resolveOrCancelFuture( + buildStsAsyncClientForConsumer().getCallerIdentity(), Duration.ofSeconds(30)).account(); + } + catch (Exception e) { + log.error("Error when getting account ID through STS for consumer", e); + } + } + return this.accountIdForConsumer; + } + + public String getAccountIdForStreamOwner() { + if (this.accountIdForStreamOwner == null) { + try { + this.accountIdForStreamOwner = FutureUtils.resolveOrCancelFuture( + buildStsAsyncClientForStreamOwner().getCallerIdentity(), Duration.ofSeconds(30)).account(); + } + catch (Exception e) { + log.error("Error when getting account ID through STS for consumer", e); + } + } + return this.accountIdForStreamOwner; + } + + public final KinesisAsyncClient buildAsyncKinesisClientForConsumer() throws URISyntaxException, IOException { + if (this.kinesisAsyncClientForConsumer == null) { + this.kinesisAsyncClientForConsumer = buildAsyncKinesisClient(getCredentialsProvider()); + } + return this.kinesisAsyncClientForConsumer; + } + + /** + * Builds the kinesis client for the account which owns the Kinesis stream. For cross account, this can be a + * different account than the account which gets records from the stream in the KCL. + * @return + * @throws URISyntaxException + * @throws IOException + */ + public final KinesisAsyncClient buildAsyncKinesisClientForStreamOwner() throws URISyntaxException, IOException { + if (this.kinesisAsyncClientForStreamOwner == null) { + final KinesisAsyncClient client; + if (isCrossAccount()) { + client = buildAsyncKinesisClient(getCrossAccountCredentialsProvider()); + } else { + client = buildAsyncKinesisClient(getCredentialsProvider()); + } + this.kinesisAsyncClientForStreamOwner = client; + } + return this.kinesisAsyncClientForStreamOwner; + } + - builder.protocol(getKinesisClientProtocol()); + private KinesisAsyncClient buildAsyncKinesisClient(AwsCredentialsProvider creds) throws URISyntaxException, IOException { + // Setup H2 client config. + final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder() + .maxConcurrency(Integer.MAX_VALUE) + .protocol(getKinesisClientProtocol()); - final SdkAsyncHttpClient sdkAsyncHttpClient = - builder.buildWithDefaults(AttributeMap.builder().build()); + final SdkAsyncHttpClient sdkAsyncHttpClient = + builder.buildWithDefaults(AttributeMap.builder().build()); - // Setup client builder by default values - final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region(getRegion()); + // Setup client builder by default values + final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder().region(getRegion()); + kinesisAsyncClientBuilder.httpClient(sdkAsyncHttpClient); + kinesisAsyncClientBuilder.credentialsProvider(creds); - kinesisAsyncClientBuilder.httpClient(sdkAsyncHttpClient); + return kinesisAsyncClientBuilder.build(); + } - kinesisAsyncClientBuilder.credentialsProvider(getCredentialsProvider()); + private StsAsyncClient buildStsAsyncClientForConsumer() { + if (this.stsAsyncClientForConsumer == null) { + this.stsAsyncClientForConsumer = StsAsyncClient.builder() + .credentialsProvider(getCredentialsProvider()) + .region(getRegion()) + .build(); + } + return this.stsAsyncClientForConsumer; + } - this.kinesisAsyncClient = kinesisAsyncClientBuilder.build(); + private StsAsyncClient buildStsAsyncClientForStreamOwner() { + if (this.stsAsyncClientForStreamOwner == null) { + final StsAsyncClient client; + if (isCrossAccount()) { + client = buildStsAsyncClient(getCrossAccountCredentialsProvider()); + } + else { + client = buildStsAsyncClient(getCredentialsProvider()); + } + this.stsAsyncClientForStreamOwner = client; } + return this.stsAsyncClientForStreamOwner; + } - return this.kinesisAsyncClient; + private StsAsyncClient buildStsAsyncClient(AwsCredentialsProvider creds) { + return StsAsyncClient.builder() + .credentialsProvider(creds) + .region(getRegion()) + .build(); } public final DynamoDbAsyncClient buildAsyncDynamoDbClient() throws IOException { @@ -137,22 +269,90 @@ public ShardRecordProcessorFactory getShardRecordProcessorFactory() { return new TestRecordProcessorFactory(getRecordValidator()); } - public final ConfigsBuilder getConfigsBuilder() throws IOException, URISyntaxException { + public final ConfigsBuilder getConfigsBuilder(Map streamToConsumerArnsMap) + throws IOException, URISyntaxException { final String workerId = getWorkerId(); - return new ConfigsBuilder(getStreamName(), getStreamName(), buildAsyncKinesisClient(), buildAsyncDynamoDbClient(), - buildAsyncCloudWatchClient(), workerId, getShardRecordProcessorFactory()); + if (getStreamArns().size() == 1) { + final SingleStreamTracker singleStreamTracker = new SingleStreamTracker( + StreamIdentifier.singleStreamInstance(getStreamArns().get(0)), + buildStreamConfigList(streamToConsumerArnsMap).get(0)); + return new ConfigsBuilder(singleStreamTracker, getApplicationName(), + buildAsyncKinesisClientForConsumer(), buildAsyncDynamoDbClient(), buildAsyncCloudWatchClient(), workerId, + getShardRecordProcessorFactory()); + } else { + final MultiStreamTracker multiStreamTracker = new MultiStreamTracker() { + @Override + public List streamConfigList() { + return buildStreamConfigList(streamToConsumerArnsMap); + } + @Override + public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() { + return new FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy(); + } + }; + return new ConfigsBuilder(multiStreamTracker, getApplicationName(), + buildAsyncKinesisClientForConsumer(), buildAsyncDynamoDbClient(), buildAsyncCloudWatchClient(), workerId, + getShardRecordProcessorFactory()); + } } - public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { - final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended - .newInitialPosition(getInitialPosition()); + private List buildStreamConfigList(Map streamToConsumerArnsMap) { + return getStreamArns().stream().map(streamArn-> { + final StreamIdentifier streamIdentifier; + if (getStreamArns().size() == 1) { + streamIdentifier = StreamIdentifier.singleStreamInstance(streamArn); + } else { //is multi-stream + streamIdentifier = StreamIdentifier.multiStreamInstance(streamArn, getCreationEpoch(streamArn)); + } - // Default is a streaming consumer - final RetrievalConfig config = getConfigsBuilder().retrievalConfig(); - config.initialPositionInStreamExtended(initialPosition); + if (streamToConsumerArnsMap != null) { + final StreamConfig streamConfig = new StreamConfig(streamIdentifier, + InitialPositionInStreamExtended.newInitialPosition(getInitialPosition())); + return streamConfig.consumerArn(streamToConsumerArnsMap.get(streamArn).toString()); + } else { + return new StreamConfig(streamIdentifier, InitialPositionInStreamExtended.newInitialPosition(getInitialPosition())); + } + }).collect(Collectors.toList()); + } + + private long getCreationEpoch(Arn streamArn) { + final DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder() + .streamARN(streamArn.toString()) + .build(); + + DescribeStreamSummaryResponse response = null; + try { + response = FutureUtils.resolveOrCancelFuture( + buildAsyncKinesisClientForStreamOwner().describeStreamSummary(request), Duration.ofSeconds(60)); + } catch (Exception e) { + log.error("Exception when calling DescribeStreamSummary", e); + } + return response.streamDescriptionSummary().streamCreationTimestamp().toEpochMilli(); + } + + + public abstract RetrievalMode getRetrievalMode(); + + public RetrievalConfig getRetrievalConfig(ConfigsBuilder configsBuilder, Map streamToConsumerArnsMap) { + final RetrievalConfig config = configsBuilder.retrievalConfig(); + if (getRetrievalMode() == RetrievalMode.POLLING) { + config.retrievalSpecificConfig(new PollingConfig(config.kinesisClient())); + } else { + if (getStreamArns().size() == 1) { + final Arn consumerArn = streamToConsumerArnsMap.get(getStreamArns().get(0)); + config.retrievalSpecificConfig(new FanOutConfig(config.kinesisClient()).consumerArn(consumerArn.toString())); + } + // For CAA multi-stream EFO, consumerArn is specified in StreamConfig + } return config; } + public Arn buildStreamArn(String streamName) { + final String partition = getRegion().metadata().partition().id(); + return Arn.fromString(String.join(":", "arn", partition, "kinesis", getRegion().id(), + getAccountIdForStreamOwner(), "stream") + "/" + INTEGRATION_TEST_RESOURCE_PREFIX + streamName); + } + /** * Configure ingress load (batch size, record size, and calling interval) */ diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java index 07a291711..d4bb1c496 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH1TestConfig.java @@ -1,12 +1,10 @@ package software.amazon.kinesis.config; +import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.http.Protocol; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; -import software.amazon.kinesis.retrieval.RetrievalConfig; -import software.amazon.kinesis.retrieval.polling.PollingConfig; -import java.io.IOException; -import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; import java.util.UUID; /** @@ -16,9 +14,17 @@ public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig { private final UUID uniqueId = UUID.randomUUID(); + private final String applicationName = "PollingH1Test"; + private final String streamName = "2XPollingH1TestStream_" + uniqueId; + + @Override + public String getTestName() { + return applicationName; + } + @Override - public String getStreamName() { - return "KCLReleaseCanary2XPollingH1TestStream_" + uniqueId; + public List getStreamArns() { + return Collections.singletonList(buildStreamArn(streamName)); } @Override @@ -27,15 +33,7 @@ public Protocol getKinesisClientProtocol() { } @Override - public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { - - final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended - .newInitialPosition(getInitialPosition()); - - final RetrievalConfig config = getConfigsBuilder().retrievalConfig(); - config.initialPositionInStreamExtended(initialPosition); - config.retrievalSpecificConfig(new PollingConfig(getStreamName(), config.kinesisClient())); - - return config; + public RetrievalMode getRetrievalMode() { + return RetrievalMode.POLLING; } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java index eb2b21431..5af838438 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryPollingH2TestConfig.java @@ -1,12 +1,10 @@ package software.amazon.kinesis.config; +import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.http.Protocol; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; -import software.amazon.kinesis.retrieval.RetrievalConfig; -import software.amazon.kinesis.retrieval.polling.PollingConfig; -import java.io.IOException; -import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; import java.util.UUID; /** @@ -15,9 +13,17 @@ public class ReleaseCanaryPollingH2TestConfig extends KCLAppConfig { private final UUID uniqueId = UUID.randomUUID(); + private final String applicationName = "PollingH2Test"; + private final String streamName = "2XPollingH2TestStream_" + uniqueId; + + @Override + public String getTestName() { + return applicationName; + } + @Override - public String getStreamName() { - return "KCLReleaseCanary2XPollingH2TestStream_" + uniqueId; + public List getStreamArns() { + return Collections.singletonList(buildStreamArn(streamName)); } @Override @@ -26,16 +32,7 @@ public Protocol getKinesisClientProtocol() { } @Override - public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxException { - - final InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended - .newInitialPosition(getInitialPosition()); - - final RetrievalConfig config = getConfigsBuilder().retrievalConfig(); - config.initialPositionInStreamExtended(initialPosition); - config.retrievalSpecificConfig(new PollingConfig(getStreamName(), config.kinesisClient())); - - return config; + public RetrievalMode getRetrievalMode() { + return RetrievalMode.POLLING; } } - diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java index cfdc5298e..7984131fd 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java @@ -1,9 +1,11 @@ package software.amazon.kinesis.config; +import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.http.Protocol; import software.amazon.kinesis.utils.ReshardOptions; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -13,22 +15,35 @@ public class ReleaseCanaryStreamingReshardingTestConfig extends KCLAppConfig { private final UUID uniqueId = UUID.randomUUID(); + + private final String applicationName = "StreamingReshardingTest"; + private final String streamName ="2XStreamingReshardingTestStream_" + uniqueId; + + @Override + public String getTestName() { + return applicationName; + } + @Override - public String getStreamName() { - return "KCLReleaseCanary2XStreamingReshardingTestStream_" + uniqueId; + public List getStreamArns() { + return Collections.singletonList(buildStreamArn(streamName)); } @Override public Protocol getKinesisClientProtocol() { return Protocol.HTTP2; } + @Override + public RetrievalMode getRetrievalMode() { + return RetrievalMode.STREAMING; + } + @Override public int getShardCount() { - return 100; + return 20; } @Override public List getReshardFactorList() { return Arrays.asList(SPLIT, MERGE); } - } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java index 6b0284c7a..832fb9057 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingTestConfig.java @@ -1,7 +1,10 @@ package software.amazon.kinesis.config; +import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.http.Protocol; +import java.util.Collections; +import java.util.List; import java.util.UUID; /** @@ -10,9 +13,17 @@ public class ReleaseCanaryStreamingTestConfig extends KCLAppConfig { private final UUID uniqueId = UUID.randomUUID(); + private final String applicationName = "StreamingTest"; + private final String streamName ="2XStreamingTestStream_" + uniqueId; + + @Override + public String getTestName() { + return applicationName; + } + @Override - public String getStreamName() { - return "KCLReleaseCanary2XStreamingTestStream_" + uniqueId; + public List getStreamArns() { + return Collections.singletonList(buildStreamArn(streamName)); } @Override @@ -20,5 +31,8 @@ public Protocol getKinesisClientProtocol() { return Protocol.HTTP2; } + @Override + public RetrievalMode getRetrievalMode() { + return RetrievalMode.STREAMING; + } } - diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/RetrievalMode.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/RetrievalMode.java new file mode 100644 index 000000000..dcce39eb4 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/RetrievalMode.java @@ -0,0 +1,6 @@ +package software.amazon.kinesis.config; + +public enum RetrievalMode { + POLLING, + STREAMING +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/KCLCrossAccountAppConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/KCLCrossAccountAppConfig.java new file mode 100644 index 000000000..c20dd7823 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/KCLCrossAccountAppConfig.java @@ -0,0 +1,23 @@ +package software.amazon.kinesis.config.crossaccount; + +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; +import software.amazon.kinesis.config.KCLAppConfig; + +/** + * Config class to configure cross account integration tests. + */ +public abstract class KCLCrossAccountAppConfig extends KCLAppConfig { + + @Override + public boolean isCrossAccount() { + return true; + } + + @Override + public AwsCredentialsProvider getCrossAccountCredentialsProvider() { + final String awsCrossAccountProfile = System.getProperty(KCLAppConfig.CROSS_ACCOUNT_PROFILE_PROPERTY); + return (awsCrossAccountProfile != null) ? + ProfileCredentialsProvider.builder().profileName(awsCrossAccountProfile).build() : null; + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountMultiStreamPollingH2TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountMultiStreamPollingH2TestConfig.java new file mode 100644 index 000000000..da48ca97b --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountMultiStreamPollingH2TestConfig.java @@ -0,0 +1,48 @@ +package software.amazon.kinesis.config.crossaccount; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import lombok.extern.slf4j.Slf4j; + +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.http.Protocol; +import software.amazon.kinesis.config.RetrievalMode; + +/** + * Config for a cross account polling consumer with HTTP protocol of HTTP2 + */ +@Slf4j +public class ReleaseCanaryCrossAccountMultiStreamPollingH2TestConfig extends KCLCrossAccountAppConfig { + private final UUID uniqueId = UUID.randomUUID(); + + private final int numStreams = 2; + private final String applicationName = "CrossAccountMultiStreamPollingH2Test"; + + private final String streamName = "2XCrossAccountPollingH2TestStream"; + + @Override + public String getTestName() { + return applicationName; + } + + @Override + public List getStreamArns() { + ArrayList streamArns = new ArrayList<>(numStreams); + for (int i = 1; i <= numStreams; i++) { + streamArns.add(buildStreamArn(String.join("_", streamName, Integer.toString(i), uniqueId.toString()))); + } + return streamArns; + } + + @Override + public Protocol getKinesisClientProtocol() { + return Protocol.HTTP2; + } + + @Override + public RetrievalMode getRetrievalMode() { + return RetrievalMode.POLLING; + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountMultiStreamStreamingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountMultiStreamStreamingTestConfig.java new file mode 100644 index 000000000..30c99bee7 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountMultiStreamStreamingTestConfig.java @@ -0,0 +1,48 @@ +package software.amazon.kinesis.config.crossaccount; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import lombok.extern.slf4j.Slf4j; + +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.http.Protocol; +import software.amazon.kinesis.config.RetrievalMode; + +/** + * Config for a cross account polling consumer with HTTP protocol of HTTP2 + */ +@Slf4j +public class ReleaseCanaryCrossAccountMultiStreamStreamingTestConfig extends KCLCrossAccountAppConfig { + private final UUID uniqueId = UUID.randomUUID(); + + private final int numStreams = 2; + private final String applicationName = "CrossAccountMultiStreamStreamingTest"; + + private final String streamName = "2XCrossAccountStreamingTestStream"; + + @Override + public String getTestName() { + return applicationName; + } + + @Override + public List getStreamArns() { + ArrayList streamArns = new ArrayList<>(numStreams); + for (int i = 1; i <= numStreams; i++) { + streamArns.add(buildStreamArn(String.join("_", streamName, Integer.toString(i), uniqueId.toString()))); + } + return streamArns; + } + + @Override + public Protocol getKinesisClientProtocol() { + return Protocol.HTTP2; + } + + @Override + public RetrievalMode getRetrievalMode() { + return RetrievalMode.STREAMING; + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountPollingH2TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountPollingH2TestConfig.java new file mode 100644 index 000000000..a0754d910 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountPollingH2TestConfig.java @@ -0,0 +1,43 @@ +package software.amazon.kinesis.config.crossaccount; + +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.http.Protocol; +import software.amazon.kinesis.config.RetrievalMode; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import lombok.extern.slf4j.Slf4j; + +/** + * Config for a cross account polling consumer with HTTP protocol of HTTP2 + */ +@Slf4j +public class ReleaseCanaryCrossAccountPollingH2TestConfig extends KCLCrossAccountAppConfig { + private final UUID uniqueId = UUID.randomUUID(); + + private final String applicationName = "CrossAccountPollingH2Test"; + + private final String streamName = "2XCrossAccountPollingH2TestStream_" + uniqueId; + + @Override + public String getTestName() { + return applicationName; + } + + @Override + public List getStreamArns() { + return Collections.singletonList(buildStreamArn(streamName)); + } + + @Override + public Protocol getKinesisClientProtocol() { + return Protocol.HTTP2; + } + + @Override + public RetrievalMode getRetrievalMode() { + return RetrievalMode.POLLING; + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountStreamingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountStreamingTestConfig.java new file mode 100644 index 000000000..90bd637ec --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/crossaccount/ReleaseCanaryCrossAccountStreamingTestConfig.java @@ -0,0 +1,42 @@ +package software.amazon.kinesis.config.crossaccount; + +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.http.Protocol; +import software.amazon.kinesis.config.RetrievalMode; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import lombok.extern.slf4j.Slf4j; + +/** + * Config for a streaming consumer with HTTP protocol of HTTP2 + */ +@Slf4j +public class ReleaseCanaryCrossAccountStreamingTestConfig extends KCLCrossAccountAppConfig { + private final UUID uniqueId = UUID.randomUUID(); + + private final String applicationName = "CrossAccountStreamingTest"; + private final String streamName = "2XCrossAccountStreamingTestStream_" + uniqueId; + + @Override + public String getTestName() { + return applicationName; + } + + @Override + public List getStreamArns() { + return Collections.singletonList(buildStreamArn(streamName)); + } + + @Override + public Protocol getKinesisClientProtocol() { + return Protocol.HTTP2; + } + + @Override + public RetrievalMode getRetrievalMode() { + return RetrievalMode.STREAMING; + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/multistream/ReleaseCanaryMultiStreamPollingH2TestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/multistream/ReleaseCanaryMultiStreamPollingH2TestConfig.java new file mode 100644 index 000000000..4f9260253 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/multistream/ReleaseCanaryMultiStreamPollingH2TestConfig.java @@ -0,0 +1,43 @@ +package software.amazon.kinesis.config; + +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.http.Protocol; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * Config for a polling consumer with HTTP protocol of HTTP2 + */ +public class ReleaseCanaryMultiStreamPollingH2TestConfig extends KCLAppConfig { + private final UUID uniqueId = UUID.randomUUID(); + + private final int numStreams = 2; + private final String applicationName = "MultiStreamPollingH2Test"; + private final String streamName = "2XMultiStreamPollingH2TestStream"; + + @Override + public String getTestName() { + return applicationName; + } + + @Override + public List getStreamArns() { + ArrayList streamArns = new ArrayList<>(numStreams); + for (Integer i = 1; i <= numStreams; i++) { + streamArns.add(buildStreamArn(String.join("_", streamName, i.toString(), uniqueId.toString()))); + } + return streamArns; + } + + @Override + public Protocol getKinesisClientProtocol() { + return Protocol.HTTP2; + } + + @Override + public RetrievalMode getRetrievalMode() { + return RetrievalMode.POLLING; + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/multistream/ReleaseCanaryMultiStreamStreamingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/multistream/ReleaseCanaryMultiStreamStreamingTestConfig.java new file mode 100644 index 000000000..d80a43d9f --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/multistream/ReleaseCanaryMultiStreamStreamingTestConfig.java @@ -0,0 +1,41 @@ +package software.amazon.kinesis.config.multistream; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.http.Protocol; +import software.amazon.kinesis.config.KCLAppConfig; +import software.amazon.kinesis.config.RetrievalMode; + +public class ReleaseCanaryMultiStreamStreamingTestConfig extends KCLAppConfig { + private final UUID uniqueId = UUID.randomUUID(); + private final int numStreams = 2; + private final String applicationName = "MultiStreamStreamingTest"; + private final String streamName = "2XMultiStreamStreamingTestStream"; + + @Override + public String getTestName() { + return applicationName; + } + + @Override + public List getStreamArns() { + ArrayList streamArns = new ArrayList<>(numStreams); + for (int i = 1; i <= numStreams; i++) { + streamArns.add(buildStreamArn(String.join("_", streamName, Integer.toString(i), uniqueId.toString()))); + } + return streamArns; + } + + @Override + public Protocol getKinesisClientProtocol() { + return Protocol.HTTP2; + } + + @Override + public RetrievalMode getRetrievalMode() { + return RetrievalMode.STREAMING; + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/CrossAccountStreamConsumerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/CrossAccountStreamConsumerIntegrationTest.java new file mode 100644 index 000000000..264a43ed3 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/CrossAccountStreamConsumerIntegrationTest.java @@ -0,0 +1,47 @@ +package software.amazon.kinesis.lifecycle; + +import software.amazon.kinesis.application.TestConsumer; +import software.amazon.kinesis.config.KCLAppConfig; +import software.amazon.kinesis.config.crossaccount.ReleaseCanaryCrossAccountMultiStreamPollingH2TestConfig; +import software.amazon.kinesis.config.crossaccount.ReleaseCanaryCrossAccountMultiStreamStreamingTestConfig; +import software.amazon.kinesis.config.crossaccount.ReleaseCanaryCrossAccountPollingH2TestConfig; +import software.amazon.kinesis.config.crossaccount.ReleaseCanaryCrossAccountStreamingTestConfig; + +import org.junit.Test; + +public class CrossAccountStreamConsumerIntegrationTest { + + /** + * Test with a cross account polling consumer using HTTP2 protocol. + * In the polling case, consumer makes calls to the producer each time to request records to process. + * The stream is in a different account than the kinesis client used to get records. + */ + @Test + public void kclReleaseCanaryCrossAccountPollingH2Test() throws Exception { + KCLAppConfig consumerConfig = new ReleaseCanaryCrossAccountPollingH2TestConfig(); + TestConsumer consumer = new TestConsumer(consumerConfig); + consumer.run(); + } + + @Test + public void kclReleaseCanaryCrossAccountStreamingTest() throws Exception { + KCLAppConfig consumerConfig = new ReleaseCanaryCrossAccountStreamingTestConfig(); + TestConsumer consumer = new TestConsumer(consumerConfig); + consumer.run(); + } + + @Test + public void kclReleaseCanaryCrossAccountMultiStreamStreamingTest() throws Exception { + KCLAppConfig consumerConfig = new ReleaseCanaryCrossAccountMultiStreamStreamingTestConfig(); + TestConsumer consumer = new TestConsumer(consumerConfig); + consumer.run(); + } + + @Test + public void kclReleaseCanaryCrossAccountMultiStreamPollingH2Test() throws Exception { + KCLAppConfig consumerConfig = new ReleaseCanaryCrossAccountMultiStreamPollingH2TestConfig(); + TestConsumer consumer = new TestConsumer(consumerConfig); + consumer.run(); + } + +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/MultiStreamConsumerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/MultiStreamConsumerIntegrationTest.java new file mode 100644 index 000000000..f6314eac5 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/MultiStreamConsumerIntegrationTest.java @@ -0,0 +1,24 @@ +package software.amazon.kinesis.lifecycle; + +import org.junit.Test; + +import software.amazon.kinesis.application.TestConsumer; +import software.amazon.kinesis.config.KCLAppConfig; +import software.amazon.kinesis.config.ReleaseCanaryMultiStreamPollingH2TestConfig; +import software.amazon.kinesis.config.multistream.ReleaseCanaryMultiStreamStreamingTestConfig; + +public class MultiStreamConsumerIntegrationTest { + @Test + public void kclReleaseCanaryMultiStreamPollingTest() throws Exception { + KCLAppConfig consumerConfig = new ReleaseCanaryMultiStreamPollingH2TestConfig(); + TestConsumer consumer = new TestConsumer(consumerConfig); + consumer.run(); + } + + @Test + public void kclReleaseCanaryMultiStreamStreamingTest() throws Exception { + KCLAppConfig consumerConfig = new ReleaseCanaryMultiStreamStreamingTestConfig(); + TestConsumer consumer = new TestConsumer(consumerConfig); + consumer.run(); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java index 0d4ba6566..232a68596 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/AWSResourceManager.java @@ -1,5 +1,7 @@ package software.amazon.kinesis.utils; +import software.amazon.kinesis.config.KCLAppConfig; + import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -66,7 +68,7 @@ public void deleteAllResource() throws Exception { final List resourceNames = getAllResourceNames(); for (String resourceName : resourceNames) { // Delete all resources that have prefix "KCLRelease" - if (resourceName.startsWith("KCLRelease")) { + if (resourceName.startsWith(KCLAppConfig.INTEGRATION_TEST_RESOURCE_PREFIX)) { deleteResource(resourceName); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java index db8615c35..e7be5141b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java @@ -2,49 +2,74 @@ import lombok.Value; import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse; import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest; import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse; +import software.amazon.awssdk.services.kinesis.model.PutResourcePolicyRequest; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.StreamStatus; +import software.amazon.awssdk.services.kinesis.model.ConsumerStatus; import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.config.KCLAppConfig; +import software.amazon.kinesis.config.RetrievalMode; import java.io.IOException; import java.net.URISyntaxException; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; -import java.util.concurrent.CompletableFuture; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @Value @Slf4j public class StreamExistenceManager extends AWSResourceManager { + private static final int CHECK_RESOURCE_ACTIVE_MAX_RETRIES = 3; + private final KinesisAsyncClient client; private final KCLAppConfig testConfig; public StreamExistenceManager(KCLAppConfig config) throws URISyntaxException, IOException { this.testConfig = config; - this.client = config.buildAsyncKinesisClient(); + this.client = config.buildAsyncKinesisClientForStreamOwner(); } public boolean isResourceActive(String streamName) { final DescribeStreamSummaryRequest request = DescribeStreamSummaryRequest.builder().streamName(streamName).build(); - final CompletableFuture describeStreamSummaryResponseCompletableFuture = client.describeStreamSummary(request); - try { - final DescribeStreamSummaryResponse response = describeStreamSummaryResponseCompletableFuture.get(30, TimeUnit.SECONDS); - boolean isActive = response.streamDescriptionSummary().streamStatus().equals(StreamStatus.ACTIVE); - if (!isActive) { - throw new RuntimeException("Stream is not active, instead in status: " + response.streamDescriptionSummary().streamStatus()); + final DescribeStreamSummaryResponse response = + FutureUtils.resolveOrCancelFuture(client.describeStreamSummary(request), Duration.ofSeconds(60)); + final boolean isActive = response.streamDescriptionSummary().streamStatus().equals(StreamStatus.ACTIVE); + return isActive; + } catch (ExecutionException e) { + if (e.getCause() instanceof ResourceNotFoundException) { + return false; + } else { + throw new RuntimeException(e); } - return true; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private boolean isConsumerActive(Arn consumerArn) { + final DescribeStreamConsumerRequest request = DescribeStreamConsumerRequest.builder().consumerARN(consumerArn.toString()).build(); + try { + final DescribeStreamConsumerResponse response = + FutureUtils.resolveOrCancelFuture(client.describeStreamConsumer(request), Duration.ofSeconds(60)); + final boolean isActive = response.consumerDescription().consumerStatus().equals(ConsumerStatus.ACTIVE); + return isActive; } catch (ExecutionException e) { if (e.getCause() instanceof ResourceNotFoundException) { return false; @@ -73,12 +98,101 @@ public List getAllResourceNames() throws Exception { return allStreamNames; } - public void checkStreamAndCreateIfNecessary(String streamName) { + public void checkStreamsAndCreateIfNecessary() { + for (String streamName : testConfig.getStreamNames()) { + if (!isResourceActive(streamName)) { + createStream(streamName, testConfig.getShardCount()); + } + log.info("Using stream {} with region {}", streamName, testConfig.getRegion()); + } + + if (testConfig.isCrossAccount()) { + for (Arn streamArn : testConfig.getStreamArns()) { + log.info("Putting cross account stream resource policy for stream {}", streamArn); + putResourcePolicyForCrossAccount(streamArn, + getCrossAccountStreamResourcePolicy(testConfig.getAccountIdForConsumer(), streamArn)); + } + } + } + + public Map createCrossAccountConsumerIfNecessary() throws Exception { + // For cross account, KCL cannot create the consumer automatically in another account, so + // we have to create it ourselves and provide the arn to the StreamConfig in multi-stream mode or + // RetrievalConfig in single-stream mode + if (testConfig.isCrossAccount() && testConfig.getRetrievalMode().equals(RetrievalMode.STREAMING)) { + final Map streamToConsumerArnsMap = new HashMap<>(); + for (Arn streamArn : testConfig.getStreamArns()) { + final Arn consumerArn = registerConsumerAndWaitForActive(streamArn, + KCLAppConfig.CROSS_ACCOUNT_CONSUMER_NAME); + putResourcePolicyForCrossAccount(consumerArn, + getCrossAccountConsumerResourcePolicy(testConfig.getAccountIdForConsumer(), consumerArn)); + streamToConsumerArnsMap.put(streamArn, consumerArn); + } + return streamToConsumerArnsMap; + } + return null; + } + + private void putResourcePolicyForCrossAccount(Arn resourceArn, String policy) { + try { + final PutResourcePolicyRequest putResourcePolicyRequest = PutResourcePolicyRequest.builder() + .resourceARN(resourceArn.toString()) + .policy(policy) + .build(); + FutureUtils.resolveOrCancelFuture(client.putResourcePolicy(putResourcePolicyRequest), Duration.ofSeconds(60)); + } catch (Exception e) { + throw new RuntimeException("Failed to PutResourcePolicy " + policy + " on resource " + resourceArn, e); + } + } + + private String getCrossAccountStreamResourcePolicy(String accountId, Arn streamArn) { + return "{\"Version\":\"2012-10-17\"," + + "\"Statement\":[{" + + "\"Effect\": \"Allow\"," + + "\"Principal\": {\"AWS\": \"" + accountId + "\"}," + + "\"Action\": [" + + "\"kinesis:DescribeStreamSummary\",\"kinesis:ListShards\",\"kinesis:PutRecord\",\"kinesis:PutRecords\"," + + "\"kinesis:GetRecords\",\"kinesis:GetShardIterator\"]," + + "\"Resource\": \"" + streamArn.toString() + "\"" + + "}]}"; + } + + private String getCrossAccountConsumerResourcePolicy(String accountId, Arn consumerArn) { + return "{\"Version\":\"2012-10-17\"," + + "\"Statement\":[{" + + "\"Effect\": \"Allow\"," + + "\"Principal\": {\"AWS\": \"" + accountId + "\"}," + + "\"Action\": [" + + "\"kinesis:DescribeStreamConsumer\",\"kinesis:SubscribeToShard\"]," + + "\"Resource\": \"" + consumerArn.toString() + "\"" + + "}]}"; + } - if (!isResourceActive(streamName)) { - createStream(streamName, testConfig.getShardCount()); + private Arn registerConsumerAndWaitForActive(Arn streamArn, String consumerName) throws Exception { + final RegisterStreamConsumerRequest registerStreamConsumerRequest = RegisterStreamConsumerRequest.builder() + .streamARN(streamArn.toString()) + .consumerName(consumerName) + .build(); + final RegisterStreamConsumerResponse response = + FutureUtils.resolveOrCancelFuture(client.registerStreamConsumer(registerStreamConsumerRequest), + Duration.ofSeconds(60)); + final Arn consumerArn = Arn.fromString(response.consumer().consumerARN()); + + int retries = 0; + while (!isConsumerActive(consumerArn)) { + log.info("Consumer {} is not active yet. Checking again in 5 seconds.", consumerArn); + if (retries > CHECK_RESOURCE_ACTIVE_MAX_RETRIES) { + throw new RuntimeException("Failed consumer registration, did not transition into active"); + } + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); + } catch (InterruptedException e) { + log.error("Failed to sleep"); + } + retries++; } - log.info("Using stream {} with region {}", streamName, testConfig.getRegion()); + log.info("Successfully registered consumer {}", consumerArn); + return consumerArn; } private void createStream(String streamName, int shardCount) { @@ -89,26 +203,19 @@ private void createStream(String streamName, int shardCount) { throw new RuntimeException("Failed to create stream with name " + streamName, e); } - int i = 0; - while (true) { - i++; - if (i > 100) { + int retries = 0; + while (!isResourceActive(streamName)) { + log.info("Stream {} is not active yet. Checking again in 5 seconds.", streamName); + if (retries > CHECK_RESOURCE_ACTIVE_MAX_RETRIES) { throw new RuntimeException("Failed stream creation, did not transition into active"); } try { - boolean isActive = isResourceActive(streamName); - if (isActive) { - log.info("Succesfully created the stream {}", streamName); - return; - } - } catch (Exception e) { - try { - Thread.sleep(TimeUnit.SECONDS.toMillis(10)); - } catch (InterruptedException e1) { - log.error("Failed to sleep"); - } - log.info("Stream {} is not active yet, exception: ", streamName, e); + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); + } catch (InterruptedException e) { + log.error("Failed to sleep"); } + retries++; } + log.info("Successfully created the stream {}", streamName); } }