Skip to content
Permalink
Browse files
[FLINK-27486][tests] Fix archunit test violations in connector-base m…
…odule
  • Loading branch information
Fabian Paul authored and fapaul committed May 5, 2022
1 parent 439a97f commit 09358d1fead500169a887c81a81afc26f5aef315
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 24 deletions.
@@ -3,4 +3,4 @@ org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkITCase does not satisf
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
@@ -1,12 +0,0 @@
org.apache.flink.connector.base.sink.AsyncSinkBaseITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
@@ -20,16 +20,29 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.TestLoggerExtension;

import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Integration tests of a baseline generic sink that implements the AsyncSinkBase. */
@ExtendWith(TestLoggerExtension.class)
public class AsyncSinkBaseITCase {

@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.build());

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

@Test
@@ -43,13 +56,11 @@ public void testFailuresOnPersistingToDestinationAreCaughtAndRaised() {
env.fromSequence(999_999, 1_000_100)
.map(Object::toString)
.sinkTo(new ArrayListAsyncSink(1, 1, 2, 10, 1000, 10));
Exception e =
assertThrows(
JobExecutionException.class,
() -> env.execute("Integration Test: AsyncSinkBaseITCase"));
assertEquals(
"Intentional error on persisting 1_000_000 to ArrayListDestination",
e.getCause().getCause().getMessage());
assertThatThrownBy(() -> env.execute("Integration Test: AsyncSinkBaseITCase"))
.isInstanceOf(JobExecutionException.class)
.getRootCause()
.hasMessageContaining(
"Intentional error on persisting 1_000_000 to ArrayListDestination");
}

@Test
@@ -26,12 +26,15 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;

import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -55,6 +58,14 @@ public class CoordinatedSourceRescaleITCase extends TestLogger {
public static final String CREATED_CHECKPOINT = "successfully created checkpoint";
public static final String RESTORED_CHECKPOINT = "successfully restored checkpoint";

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(7)
.build());

@Rule public final TemporaryFolder temp = new TemporaryFolder();

@Test
@@ -115,7 +126,8 @@ private StreamExecutionEnvironment createEnv(
conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, p);

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(p, conf);
StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(p);
env.enableCheckpointing(100);
env.getCheckpointConfig()
.setExternalizedCheckpointCleanup(

0 comments on commit 09358d1

Please sign in to comment.