Skip to content
Permalink
Browse files
[FLINK-27567][aws][tests] Migrate connector-aws-kinesis-streams to JU…
…nit5
  • Loading branch information
snuyanzin committed May 17, 2022
1 parent bf38c12 commit a10fd2314269e0686d9fbbd3eacb470a03432ef3
Showing 7 changed files with 123 additions and 116 deletions.
@@ -21,25 +21,25 @@
import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.jupiter.api.Test;

/** Covers construction, defaults and sanity checking of KinesisStreamsSinkBuilder. */
public class KinesisStreamsSinkBuilderTest {
class KinesisStreamsSinkBuilderTest {
private static final SerializationSchema<String> SERIALIZATION_SCHEMA =
new SimpleStringSchema();
private static final PartitionKeyGenerator<String> PARTITION_KEY_GENERATOR =
element -> String.valueOf(element.hashCode());

@Test
public void elementConverterOfSinkMustBeSetWhenBuilt() {
void elementConverterOfSinkMustBeSetWhenBuilt() {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(() -> KinesisStreamsSink.builder().setStreamName("stream").build())
.withMessageContaining(
"No SerializationSchema was supplied to the KinesisStreamsSinkElementConverter builder.");
}

@Test
public void streamNameOfSinkMustBeSetWhenBuilt() {
void streamNameOfSinkMustBeSetWhenBuilt() {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(
() ->
@@ -52,7 +52,7 @@ public void streamNameOfSinkMustBeSetWhenBuilt() {
}

@Test
public void streamNameOfSinkMustBeSetToNonEmptyWhenBuilt() {
void streamNameOfSinkMustBeSetToNonEmptyWhenBuilt() {
Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(
() ->
@@ -66,7 +66,7 @@ public void streamNameOfSinkMustBeSetToNonEmptyWhenBuilt() {
}

@Test
public void serializationSchemaMustBeSetWhenSinkIsBuilt() {
void serializationSchemaMustBeSetWhenSinkIsBuilt() {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(
() ->
@@ -79,7 +79,7 @@ public void serializationSchemaMustBeSetWhenSinkIsBuilt() {
}

@Test
public void partitionKeyGeneratorMustBeSetWhenSinkIsBuilt() {
void partitionKeyGeneratorMustBeSetWhenSinkIsBuilt() {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(
() ->
@@ -29,16 +29,16 @@
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
import org.apache.flink.util.DockerImageVersions;
import org.apache.flink.util.TestLogger;

import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.rnorth.ducttape.ratelimits.RateLimiter;
import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
@@ -63,7 +63,8 @@
import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;

/** IT cases for using Kinesis Data Streams Sink based on Kinesalite. */
public class KinesisStreamsSinkITCase extends TestLogger {
@Testcontainers
class KinesisStreamsSinkITCase {

private static final String DEFAULT_FIRST_SHARD_NAME = "shardId-000000000000";

@@ -72,8 +73,8 @@ public class KinesisStreamsSinkITCase extends TestLogger {
element -> String.valueOf(element.hashCode());
private final PartitionKeyGenerator<String> longPartitionKeyGenerator = element -> element;

@ClassRule
public static final KinesaliteContainer KINESALITE =
@Container
private static final KinesaliteContainer KINESALITE =
new KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE))
.withNetwork(Network.newNetwork())
.withNetworkAliases("kinesalite");
@@ -82,8 +83,8 @@ public class KinesisStreamsSinkITCase extends TestLogger {
private SdkAsyncHttpClient httpClient;
private KinesisAsyncClient kinesisClient;

@Before
public void setUp() throws Exception {
@BeforeEach
void setUp() throws Exception {
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");

env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -93,15 +94,14 @@ public void setUp() throws Exception {
kinesisClient = KINESALITE.createHostClient(httpClient);
}

@After
public void teardown() {
@AfterEach
void teardown() {
System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
AWSGeneralUtil.closeResources(httpClient, kinesisClient);
}

@Test
public void elementsMaybeWrittenSuccessfullyToLocalInstanceWhenBatchSizeIsReached()
throws Exception {
void elementsMaybeWrittenSuccessfullyToLocalInstanceWhenBatchSizeIsReached() throws Exception {

new Scenario()
.withKinesaliteStreamName("test-stream-name-1")
@@ -110,7 +110,7 @@ public void elementsMaybeWrittenSuccessfullyToLocalInstanceWhenBatchSizeIsReache
}

@Test
public void elementsBufferedAndTriggeredByTimeBasedFlushShouldBeFlushedIfSourcedIsKeptAlive()
void elementsBufferedAndTriggeredByTimeBasedFlushShouldBeFlushedIfSourcedIsKeptAlive()
throws Exception {

new Scenario()
@@ -123,7 +123,7 @@ public void elementsBufferedAndTriggeredByTimeBasedFlushShouldBeFlushedIfSourced
}

@Test
public void veryLargeMessagesSucceedInBeingPersisted() throws Exception {
void veryLargeMessagesSucceedInBeingPersisted() throws Exception {

new Scenario()
.withNumberOfElementsToSend(5)
@@ -136,8 +136,7 @@ public void veryLargeMessagesSucceedInBeingPersisted() throws Exception {
}

@Test
public void multipleInFlightRequestsResultsInCorrectNumberOfElementsPersisted()
throws Exception {
void multipleInFlightRequestsResultsInCorrectNumberOfElementsPersisted() throws Exception {

new Scenario()
.withNumberOfElementsToSend(150)
@@ -152,12 +151,12 @@ public void multipleInFlightRequestsResultsInCorrectNumberOfElementsPersisted()
}

@Test
public void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOn() {
void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOn() {
testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(true, "test-stream-name-5");
}

@Test
public void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOff() {
void nonExistentStreamNameShouldResultInFailureInFailOnErrorIsOff() {
testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(false, "test-stream-name-6");
}

@@ -177,7 +176,7 @@ private void testJobFatalFailureTerminatesCorrectlyWithFailOnErrorFlagSetTo(
}

@Test
public void veryLargeMessagesFailGracefullyWithBrokenElementConverter() {
void veryLargeMessagesFailGracefullyWithBrokenElementConverter() {
Assertions.assertThatExceptionOfType(JobExecutionException.class)
.isThrownBy(
() ->
@@ -197,12 +196,12 @@ public void veryLargeMessagesFailGracefullyWithBrokenElementConverter() {
}

@Test
public void badRegionShouldResultInFailureWhenInFailOnErrorIsOn() {
void badRegionShouldResultInFailureWhenInFailOnErrorIsOn() {
badRegionShouldResultInFailureWhenInFailOnErrorIs(true);
}

@Test
public void badRegionShouldResultInFailureWhenInFailOnErrorIsOff() {
void badRegionShouldResultInFailureWhenInFailOnErrorIsOff() {
badRegionShouldResultInFailureWhenInFailOnErrorIs(false);
}

@@ -215,12 +214,12 @@ private void badRegionShouldResultInFailureWhenInFailOnErrorIs(boolean failOnErr
}

@Test
public void missingRegionShouldResultInFailureWhenInFailOnErrorIsOn() {
void missingRegionShouldResultInFailureWhenInFailOnErrorIsOn() {
missingRegionShouldResultInFailureWhenInFailOnErrorIs(true);
}

@Test
public void missingRegionShouldResultInFailureWhenInFailOnErrorIsOff() {
void missingRegionShouldResultInFailureWhenInFailOnErrorIsOff() {
missingRegionShouldResultInFailureWhenInFailOnErrorIs(false);
}

@@ -232,12 +231,12 @@ private void missingRegionShouldResultInFailureWhenInFailOnErrorIs(boolean failO
}

@Test
public void noURIEndpointShouldResultInFailureWhenInFailOnErrorIsOn() {
void noURIEndpointShouldResultInFailureWhenInFailOnErrorIsOn() {
noURIEndpointShouldResultInFailureWhenInFailOnErrorIs(true);
}

@Test
public void noURIEndpointShouldResultInFailureWhenInFailOnErrorIsOff() {
void noURIEndpointShouldResultInFailureWhenInFailOnErrorIsOff() {
noURIEndpointShouldResultInFailureWhenInFailOnErrorIs(false);
}

@@ -249,12 +248,12 @@ private void noURIEndpointShouldResultInFailureWhenInFailOnErrorIs(boolean failO
}

@Test
public void badEndpointShouldResultInFailureWhenInFailOnErrorIsOn() {
void badEndpointShouldResultInFailureWhenInFailOnErrorIsOn() {
badEndpointShouldResultInFailureWhenInFailOnErrorIs(true);
}

@Test
public void badEndpointShouldResultInFailureWhenInFailOnErrorIsOff() {
void badEndpointShouldResultInFailureWhenInFailOnErrorIsOff() {
badEndpointShouldResultInFailureWhenInFailOnErrorIs(false);
}

@@ -268,77 +267,77 @@ private void badEndpointShouldResultInFailureWhenInFailOnErrorIs(boolean failOnE
}

@Test
public void envVarWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOn() {
void envVarWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOn() {
noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(
true,
AWSConfigConstants.CredentialProvider.ENV_VAR.toString(),
"Access key must be specified either via environment variable");
}

@Test
public void envVarWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOff() {
void envVarWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOff() {
noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(
false,
AWSConfigConstants.CredentialProvider.ENV_VAR.toString(),
"Access key must be specified either via environment variable");
}

@Test
public void sysPropWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOn() {
void sysPropWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOn() {
noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(
true,
AWSConfigConstants.CredentialProvider.SYS_PROP.toString(),
"Unable to load credentials from system settings");
}

@Test
public void sysPropWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOff() {
void sysPropWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOff() {
noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(
false,
AWSConfigConstants.CredentialProvider.SYS_PROP.toString(),
"Unable to load credentials from system settings");
}

@Test
public void basicWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOn() {
void basicWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOn() {
noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(
true,
AWSConfigConstants.CredentialProvider.BASIC.toString(),
"Please set values for AWS Access Key ID ('aws.credentials.provider.basic.accesskeyid') and Secret Key ('aws.credentials.provider.basic.secretkey') when using the BASIC AWS credential provider type.");
}

@Test
public void basicWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOff() {
void basicWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOff() {
noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(
false,
AWSConfigConstants.CredentialProvider.BASIC.toString(),
"Please set values for AWS Access Key ID ('aws.credentials.provider.basic.accesskeyid') and Secret Key ('aws.credentials.provider.basic.secretkey') when using the BASIC AWS credential provider type.");
}

@Test
public void webIdentityTokenWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOn() {
void webIdentityTokenWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOn() {
noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(
true,
AWSConfigConstants.CredentialProvider.WEB_IDENTITY_TOKEN.toString(),
"Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set");
}

@Test
public void webIdentityTokenWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOff() {
void webIdentityTokenWithNoCredentialsShouldResultInFailureWhenInFailOnErrorIsOff() {
noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(
false,
AWSConfigConstants.CredentialProvider.WEB_IDENTITY_TOKEN.toString(),
"Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set");
}

@Test
public void wrongCredentialProviderNameShouldResultInFailureWhenInFailOnErrorIsOn() {
void wrongCredentialProviderNameShouldResultInFailureWhenInFailOnErrorIsOn() {
noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(
true, "WRONG", "Invalid AWS Credential Provider Type");
}

@Test
public void wrongCredentialProviderNameShouldResultInFailureWhenInFailOnErrorIsOff() {
void wrongCredentialProviderNameShouldResultInFailureWhenInFailOnErrorIsOff() {
noCredentialsProvidedAndCredentialsProviderSpecifiedShouldResultInFailure(
false, "WRONG", "Invalid AWS Credential Provider Type");
}
@@ -22,7 +22,7 @@
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;

import org.junit.Test;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;

import java.io.IOException;
@@ -31,7 +31,7 @@
import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.getTestState;

/** Test class for {@link KinesisStreamsStateSerializer}. */
public class KinesisStreamsStateSerializerTest {
class KinesisStreamsStateSerializerTest {

private static final ElementConverter<String, PutRecordsRequestEntry> ELEMENT_CONVERTER =
KinesisStreamsSinkElementConverter.<String>builder()
@@ -40,7 +40,7 @@ public class KinesisStreamsStateSerializerTest {
.build();

@Test
public void testSerializeAndDeserialize() throws IOException {
void testSerializeAndDeserialize() throws IOException {
BufferedRequestState<PutRecordsRequestEntry> expectedState =
getTestState(ELEMENT_CONVERTER, this::getRequestSize);

0 comments on commit a10fd23

Please sign in to comment.