Skip to content
Permalink
Browse files
[FLINK-27566][aws][tests] Migrate aws-kinesis-firehose to JUnit5
  • Loading branch information
snuyanzin committed May 16, 2022
1 parent a4fa285 commit 461bddecc683a0a41c4a3ea76a39f4836b871667
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 43 deletions.
@@ -22,18 +22,18 @@
import org.apache.flink.connector.aws.config.AWSConfigConstants;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.Properties;

/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkBuilder}. */
public class KinesisFirehoseSinkBuilderTest {
class KinesisFirehoseSinkBuilderTest {

private static final SerializationSchema<String> SERIALIZATION_SCHEMA =
new SimpleStringSchema();

@Test
public void elementConverterOfSinkMustBeSetWhenBuilt() {
void elementConverterOfSinkMustBeSetWhenBuilt() {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(
() ->
@@ -45,7 +45,7 @@ public void elementConverterOfSinkMustBeSetWhenBuilt() {
}

@Test
public void streamNameOfSinkMustBeSetWhenBuilt() {
void streamNameOfSinkMustBeSetWhenBuilt() {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(
() ->
@@ -57,7 +57,7 @@ public void streamNameOfSinkMustBeSetWhenBuilt() {
}

@Test
public void streamNameOfSinkMustBeSetToNonEmptyWhenBuilt() {
void streamNameOfSinkMustBeSetToNonEmptyWhenBuilt() {
Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(
() ->
@@ -70,7 +70,7 @@ public void streamNameOfSinkMustBeSetToNonEmptyWhenBuilt() {
}

@Test
public void defaultProtocolVersionInsertedToConfiguration() {
void defaultProtocolVersionInsertedToConfiguration() {
Properties expectedProps = new Properties();
expectedProps.setProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP1_1");
Properties defaultProperties =
@@ -21,25 +21,25 @@
import org.apache.flink.connector.base.sink.writer.ElementConverter;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.firehose.model.Record;

import static org.assertj.core.api.Assertions.assertThat;

/** Covers construction and sanity checking of {@link KinesisFirehoseSinkElementConverter}. */
public class KinesisFirehoseSinkElementConverterTest {
class KinesisFirehoseSinkElementConverterTest {

@Test
public void elementConverterWillComplainASerializationSchemaIsNotSetIfBuildIsCalledWithoutIt() {
void elementConverterWillComplainASerializationSchemaIsNotSetIfBuildIsCalledWithoutIt() {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(() -> KinesisFirehoseSinkElementConverter.<String>builder().build())
.withMessageContaining(
"No SerializationSchema was supplied to the KinesisFirehoseSink builder.");
}

@Test
public void elementConverterUsesProvidedSchemaToSerializeRecord() {
void elementConverterUsesProvidedSchemaToSerializeRecord() {
ElementConverter<String, Record> elementConverter =
KinesisFirehoseSinkElementConverter.<String>builder()
.setSerializationSchema(new SimpleStringSchema())
@@ -24,12 +24,13 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.DockerImageVersions;

import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
@@ -52,7 +53,8 @@
import static org.assertj.core.api.Assertions.assertThat;

/** Integration test suite for the {@code KinesisFirehoseSink} using a localstack container. */
public class KinesisFirehoseSinkITCase {
@Testcontainers
class KinesisFirehoseSinkITCase {

private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkITCase.class);
private static final String ROLE_NAME = "super-role";
@@ -67,12 +69,12 @@ public class KinesisFirehoseSinkITCase {
private FirehoseAsyncClient firehoseAsyncClient;
private IamAsyncClient iamAsyncClient;

@ClassRule
public static LocalstackContainer mockFirehoseContainer =
@Container
private static LocalstackContainer mockFirehoseContainer =
new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK));

@Before
public void setup() throws Exception {
@BeforeEach
void setup() throws Exception {
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
httpClient = AWSServicesTestUtils.createHttpClient(mockFirehoseContainer.getEndpoint());
s3AsyncClient = createS3Client(mockFirehoseContainer.getEndpoint(), httpClient);
@@ -81,13 +83,13 @@ public void setup() throws Exception {
env = StreamExecutionEnvironment.getExecutionEnvironment();
}

@After
public void teardown() {
@AfterEach
void teardown() {
System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
}

@Test
public void firehoseSinkWritesCorrectDataToMockAWSServices() throws Exception {
void firehoseSinkWritesCorrectDataToMockAWSServices() throws Exception {
LOG.info("1 - Creating the bucket for Firehose to deliver into...");
createBucket(s3AsyncClient, BUCKET_NAME);
LOG.info("2 - Creating the IAM Role for Firehose to write into the s3 bucket...");
@@ -25,7 +25,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.firehose.model.Record;

import java.util.Properties;
@@ -36,15 +36,15 @@
import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;

/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSink}. */
public class KinesisFirehoseSinkTest {
class KinesisFirehoseSinkTest {

private static final ElementConverter<String, Record> elementConverter =
KinesisFirehoseSinkElementConverter.<String>builder()
.setSerializationSchema(new SimpleStringSchema())
.build();

@Test
public void deliveryStreamNameMustNotBeNull() {
void deliveryStreamNameMustNotBeNull() {
Assertions.assertThatExceptionOfType(NullPointerException.class)
.isThrownBy(
() ->
@@ -64,7 +64,7 @@ public void deliveryStreamNameMustNotBeNull() {
}

@Test
public void deliveryStreamNameMustNotBeEmpty() {
void deliveryStreamNameMustNotBeEmpty() {
Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(
() ->
@@ -84,7 +84,7 @@ public void deliveryStreamNameMustNotBeEmpty() {
}

@Test
public void firehoseSinkFailsWhenAccessKeyIdIsNotProvided() {
void firehoseSinkFailsWhenAccessKeyIdIsNotProvided() {
Properties properties = createConfig("https://non-exisitent-location");
properties.setProperty(
AWS_CREDENTIALS_PROVIDER, AWSConfigConstants.CredentialProvider.BASIC.toString());
@@ -94,15 +94,15 @@ public void firehoseSinkFailsWhenAccessKeyIdIsNotProvided() {
}

@Test
public void firehoseSinkFailsWhenRegionIsNotProvided() {
void firehoseSinkFailsWhenRegionIsNotProvided() {
Properties properties = createConfig("https://non-exisitent-location");
properties.remove(AWS_REGION);
firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
properties, "region must not be null.");
}

@Test
public void firehoseSinkFailsWhenUnableToConnectToRemoteService() {
void firehoseSinkFailsWhenUnableToConnectToRemoteService() {
Properties properties = createConfig("https://non-exisitent-location");
properties.remove(TRUST_ALL_CERTIFICATES);
firehoseSinkFailsWithAppropriateMessageWhenInitialConditionsAreMisconfigured(
@@ -23,8 +23,8 @@
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;

import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.firehose.model.Record;
@@ -47,8 +47,8 @@ public class KinesisFirehoseSinkWriterTest {
.setSerializationSchema(new SimpleStringSchema())
.build();

@Before
public void setup() {
@BeforeEach
void setup() {
TestSinkInitContext sinkInitContext = new TestSinkInitContext();
Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint");
sinkWriter =
@@ -67,15 +67,15 @@ public void setup() {
}

@Test
public void getSizeInBytesReturnsSizeOfBlobBeforeBase64Encoding() {
void getSizeInBytesReturnsSizeOfBlobBeforeBase64Encoding() {
String testString = "{many hands make light work;";
Record record = Record.builder().data(SdkBytes.fromUtf8String(testString)).build();
assertThat(sinkWriter.getSizeInBytes(record))
.isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length);
}

@Test
public void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures()
void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures()
throws IOException, InterruptedException {
TestSinkInitContext ctx = new TestSinkInitContext();
KinesisFirehoseSink<String> kinesisFirehoseSink =
@@ -22,7 +22,7 @@
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;

import org.junit.Test;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.firehose.model.Record;

import java.io.IOException;
@@ -31,15 +31,15 @@
import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.getTestState;

/** Test class for {@link KinesisFirehoseStateSerializer}. */
public class KinesisFirehoseStateSerializerTest {
class KinesisFirehoseStateSerializerTest {

private static final ElementConverter<String, Record> ELEMENT_CONVERTER =
KinesisFirehoseSinkElementConverter.<String>builder()
.setSerializationSchema(new SimpleStringSchema())
.build();

@Test
public void testSerializeAndDeserialize() throws IOException {
void testSerializeAndDeserialize() throws IOException {
BufferedRequestState<Record> expectedState =
getTestState(ELEMENT_CONVERTER, this::getRequestSize);

@@ -29,10 +29,9 @@
import org.apache.flink.table.factories.TableOptionsBuilder;
import org.apache.flink.table.factories.TestFormatFactory;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.util.TestLogger;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Properties;
@@ -43,11 +42,11 @@
* Test for {@link KinesisFirehoseDynamicSink} created by {@link
* KinesisFirehoseDynamicTableFactory}.
*/
public class KinesisFirehoseDynamicTableFactoryTest extends TestLogger {
class KinesisFirehoseDynamicTableFactoryTest {
private static final String DELIVERY_STREAM_NAME = "myDeliveryStream";

@Test
public void testGoodTableSink() {
void testGoodTableSink() {
ResolvedSchema sinkSchema = defaultSinkSchema();
Map<String, String> sinkOptions = defaultTableOptions().build();

@@ -74,7 +73,7 @@ public void testGoodTableSink() {
}

@Test
public void testGoodTableSinkWithSinkOptions() {
void testGoodTableSinkWithSinkOptions() {
ResolvedSchema sinkSchema = defaultSinkSchema();
Map<String, String> sinkOptions = defaultTableOptionsWithSinkOptions().build();

@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.util.TestLoggerExtension

0 comments on commit 461bdde

Please sign in to comment.