diff --git a/flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java b/flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java
index 007bef8abd677..0c2ce4c429435 100644
--- a/flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java
+++ b/flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java
@@ -51,7 +51,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith(TestLoggerExtension.class)
class BatchSQLTest {
@@ -79,7 +79,7 @@ class BatchSQLTest {
// Only above shuffle modes are supported by the adaptive batch scheduler
// , "ALL_EXCHANGES_PIPELINE"
})
- public void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path tmpDir) throws Exception {
+ void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path tmpDir) throws Exception {
LOG.info("Results for this test will be stored at: {}", tmpDir);
String sqlStatement = new String(Files.readAllBytes(sqlPath));
@@ -143,13 +143,13 @@ public void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path tmpDir) thr
.filter(file -> !file.isDirectory())
.map(File::getPath)
.collect(Collectors.toList());
- assertEquals(1, files.size());
+ assertThat(files).hasSize(1);
Path resultFile = Paths.get(files.get(0));
LOG.info("Result found at {}", resultFile);
String actual = new String(Files.readAllBytes(resultFile));
LOG.info("Actual result is: '{}'", actual);
- assertEquals(expected, actual);
+ assertThat(actual).isEqualTo(expected);
}
}
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/test/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/test/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java
index eabe48b063979..c8ede888852f0 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/test/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/test/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java
@@ -21,39 +21,41 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
-import java.io.File;
+import java.nio.file.Path;
import static org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
/** DataStreamAllroundTestProgram on MiniCluster for manual debugging purposes. */
-@Ignore("Test is already part of end-to-end tests. This is for manual debugging.")
-public class AllroundMiniClusterTest extends TestLogger {
+@Disabled("Test is already part of end-to-end tests. This is for manual debugging.")
+@ExtendWith(TestLoggerExtension.class)
+class AllroundMiniClusterTest {
- @BeforeClass
- public static void beforeClass() {
+ @BeforeAll
+ static void beforeClass() {
org.apache.log4j.PropertyConfigurator.configure(
AllroundMiniClusterTest.class.getClassLoader().getResource("log4j.properties"));
}
- @ClassRule
- public static MiniClusterWithClientResource miniClusterResource =
- new MiniClusterWithClientResource(
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+ new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(4)
.setNumberSlotsPerTaskManager(2)
.setConfiguration(createConfiguration())
.build());
- @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @TempDir private static Path temporaryFolder;
private static Configuration createConfiguration() {
Configuration configuration = new Configuration();
@@ -63,12 +65,13 @@ private static Configuration createConfiguration() {
}
@Test
- public void runTest() throws Exception {
- File checkpointDir = temporaryFolder.newFolder();
+ void runTest() throws Exception {
+ Path checkpointDir = temporaryFolder.resolve("checkpoints");
+ java.nio.file.Files.createDirectories(checkpointDir);
DataStreamAllroundTestProgram.main(
new String[] {
"--environment.parallelism", "8",
- "--state_backend.checkpoint_directory", checkpointDir.toURI().toString(),
+ "--state_backend.checkpoint_directory", checkpointDir.toUri().toString(),
"--state_backend", "rocks",
"--state_backend.rocks.incremental", "true",
"--test.simulate_failure", "true",
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCache.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCache.java
index 687f5144a24e7..0be985bfeb2d8 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCache.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCache.java
@@ -19,7 +19,6 @@
package org.apache.flink.tests.util.cache;
import org.apache.flink.tests.util.util.FactoryUtils;
-import org.apache.flink.util.ExternalResource;
import java.io.IOException;
import java.nio.file.Path;
@@ -30,7 +29,15 @@
*
*
Whether, how, and for how long files are cached is implementation-dependent.
*/
-public interface DownloadCache extends ExternalResource {
+public interface DownloadCache {
+
+ void before() throws Exception;
+
+ void afterTestSuccess();
+
+ default void afterTestFailure() {
+ afterTestSuccess();
+ }
/**
* Returns either a cached or newly downloaded version of the given file. The returned file path
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheExtension.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheExtension.java
new file mode 100644
index 0000000000000..5a006dc2a3a9a
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheExtension.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.cache;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+/** JUnit 5 extension that wraps a {@link DownloadCache} and manages its lifecycle. */
+public class DownloadCacheExtension implements BeforeEachCallback, AfterEachCallback {
+
+ private final DownloadCache delegate;
+
+ public DownloadCacheExtension(DownloadCache delegate) {
+ this.delegate = delegate;
+ }
+
+ public Path getOrDownload(String url, Path targetDir) throws IOException {
+ return delegate.getOrDownload(url, targetDir);
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ delegate.before();
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) {
+ if (context.getExecutionException().isPresent()) {
+ delegate.afterTestFailure();
+ } else {
+ delegate.afterTestSuccess();
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
index 397d742882389..63412d64b3011 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java
@@ -20,7 +20,6 @@
import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.tests.util.util.FactoryUtils;
-import org.apache.flink.util.ExternalResource;
import java.io.IOException;
import java.time.Duration;
@@ -30,7 +29,15 @@
import java.util.stream.Stream;
/** Generic interface for interacting with Flink. */
-public interface FlinkResource extends ExternalResource {
+public interface FlinkResource {
+
+ void before() throws Exception;
+
+ void afterTestSuccess();
+
+ default void afterTestFailure() {
+ afterTestSuccess();
+ }
/**
* Starts a cluster.
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceExtension.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceExtension.java
new file mode 100644
index 0000000000000..349a67f783fbf
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceExtension.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/** JUnit 5 extension that wraps a {@link FlinkResource} and manages its lifecycle. */
+public class FlinkResourceExtension implements BeforeEachCallback, AfterEachCallback {
+
+ private final FlinkResource delegate;
+
+ public FlinkResourceExtension(FlinkResource delegate) {
+ this.delegate = delegate;
+ }
+
+ public FlinkResource getFlinkResource() {
+ return delegate;
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ delegate.before();
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) {
+ if (context.getExecutionException().isPresent()) {
+ delegate.afterTestFailure();
+ } else {
+ delegate.afterTestSuccess();
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/TestUtilsTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/TestUtilsTest.java
index 717f5103d57bd..05a6b907ca00c 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/TestUtilsTest.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/TestUtilsTest.java
@@ -19,38 +19,40 @@
import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import static org.assertj.core.api.Assertions.assertThat;
+
/** Tests for {@link TestUtils}. */
-public class TestUtilsTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class TestUtilsTest {
- @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @TempDir private Path temporaryFolder;
- @BeforeClass
- public static void setupClass() {
+ @BeforeAll
+ static void setupClass() {
OperatingSystemRestriction.forbid(
"Symbolic links usually require special permissions on Windows.",
OperatingSystem.WINDOWS);
}
@Test
- public void copyDirectory() throws IOException {
+ void copyDirectory() throws IOException {
Path[] files = {
Paths.get("file1"), Paths.get("dir1", "file2"),
};
- Path source = temporaryFolder.newFolder("source").toPath();
+ Path source = Files.createDirectory(temporaryFolder.resolve("source"));
for (Path file : files) {
Files.createDirectories(source.resolve(file).getParent());
Files.createFile(source.resolve(file));
@@ -63,7 +65,7 @@ public void copyDirectory() throws IOException {
TestUtils.copyDirectory(symbolicLink, target);
for (Path file : files) {
- Assert.assertTrue(Files.exists(target.resolve(file)));
+ assertThat(target.resolve(file)).exists();
}
}
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
index d376432127ed5..f1d960cb60e2d 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
@@ -18,34 +18,32 @@
package org.apache.flink.tests.util.util;
import org.apache.flink.test.util.FileUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
+import static org.assertj.core.api.Assertions.assertThat;
+
/** Tests for {@link FileUtils}. */
-public class FileUtilsTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class FileUtilsTest {
- @ClassRule public static final TemporaryFolder TMP = new TemporaryFolder();
+ private static final List ORIGINAL_LINES = List.of("line1", "line2", "line3");
- private static final List ORIGINAL_LINES =
- Collections.unmodifiableList(Arrays.asList("line1", "line2", "line3"));
private Path testFile;
- @Before
- public void setupFile() throws IOException {
- Path path = TMP.newFile().toPath();
+ @BeforeEach
+ void setupFile(@TempDir Path tmpDir) throws IOException {
+ Path path = Files.createTempFile(tmpDir, null, null);
Files.write(path, ORIGINAL_LINES);
@@ -53,27 +51,25 @@ public void setupFile() throws IOException {
}
@Test
- public void replaceSingleMatch() throws IOException {
+ void replaceSingleMatch() throws IOException {
FileUtils.replace(testFile, Pattern.compile("line1"), matcher -> "removed");
- Assert.assertEquals(
- Arrays.asList("removed", ORIGINAL_LINES.get(1), ORIGINAL_LINES.get(2)),
- Files.readAllLines(testFile));
+ assertThat(Files.readAllLines(testFile))
+ .containsExactly("removed", ORIGINAL_LINES.get(1), ORIGINAL_LINES.get(2));
}
@Test
- public void replaceMultipleMatch() throws IOException {
+ void replaceMultipleMatch() throws IOException {
FileUtils.replace(testFile, Pattern.compile("line(.*)"), matcher -> matcher.group(1));
- Assert.assertEquals(Arrays.asList("1", "2", "3"), Files.readAllLines(testFile));
+ assertThat(Files.readAllLines(testFile)).containsExactly("1", "2", "3");
}
@Test
- public void replaceWithEmptyLine() throws IOException {
+ void replaceWithEmptyLine() throws IOException {
FileUtils.replace(testFile, Pattern.compile("line2"), matcher -> "");
- Assert.assertEquals(
- Arrays.asList(ORIGINAL_LINES.get(0), "", ORIGINAL_LINES.get(2)),
- Files.readAllLines(testFile));
+ assertThat(Files.readAllLines(testFile))
+ .containsExactly(ORIGINAL_LINES.get(0), "", ORIGINAL_LINES.get(2));
}
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java
index 57cbac116d15e..fb3c77339e9f5 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java
@@ -24,12 +24,13 @@
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.net.URL;
import java.util.Collection;
@@ -38,10 +39,11 @@
import java.util.concurrent.TimeUnit;
/** Tests for {@link RestClient} that rely on external connections. */
-public class RestClientITCase extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class RestClientITCase {
@Test
- public void testHttpsConnectionWithDefaultCerts() throws Exception {
+ void testHttpsConnectionWithDefaultCerts() throws Exception {
final Configuration config = new Configuration();
final URL httpsUrl = new URL("https://raw.githubusercontent.com");
TestUrlMessageHeaders testUrlMessageHeaders =
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java
index e0ad66e0af84f..f3ce565b2ba2d 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java
@@ -21,15 +21,17 @@
import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.tests.util.flink.ClusterController;
import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceExtension;
import org.apache.flink.tests.util.flink.FlinkResourceSetup;
import org.apache.flink.tests.util.flink.JarLocation;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.nio.file.Path;
import java.time.Duration;
@@ -42,19 +44,19 @@
* Tests that Flink does not require Scala for jobs that do not use the Scala APIs. This covers both
* pure Java jobs, and Scala jobs that use the Java APIs exclusively with Scala types.
*/
-@RunWith(Parameterized.class)
-public class ScalaFreeITCase extends TestLogger {
+@ExtendWith({ParameterizedTestExtension.class, TestLoggerExtension.class})
+class ScalaFreeITCase {
- @Rule
- public final TestExecutorResource testExecutorResource =
- new TestExecutorResource<>(
+ @RegisterExtension
+ private static final TestExecutorExtension TEST_EXECUTOR_EXTENSION =
+ new TestExecutorExtension<>(
java.util.concurrent.Executors::newSingleThreadScheduledExecutor);
- @Rule public final FlinkResource flink;
+ @RegisterExtension private final FlinkResourceExtension flinkExtension;
private final String mainClass;
- @Parameterized.Parameters(name = "{index}: {0}")
- public static Collection testParameters() {
+ @Parameters(name = "{0}")
+ private static Collection testParameters() {
return Arrays.asList(
new TestParams("Java job, without Scala in lib/", JavaJob.class.getCanonicalName()),
new TestParams(
@@ -69,21 +71,21 @@ public static Collection testParameters() {
JarLocation.LIB)));
}
- public ScalaFreeITCase(TestParams testParams) {
+ private ScalaFreeITCase(TestParams testParams) {
final FlinkResourceSetup.FlinkResourceSetupBuilder builder =
FlinkResourceSetup.builder()
.moveJar("flink-scala", JarLocation.LIB, JarLocation.OPT);
testParams.builderSetup.accept(builder);
- flink = FlinkResource.get(builder.build());
+ flinkExtension = new FlinkResourceExtension(FlinkResource.get(builder.build()));
mainClass = testParams.mainClass;
}
- @Test
- public void testScalaFreeJobExecution() throws Exception {
+ @TestTemplate
+ void testScalaFreeJobExecution() throws Exception {
final Path jobJar = ResourceTestUtils.getResource("/jobs.jar");
- try (final ClusterController clusterController = flink.startCluster(1)) {
- // if the job fails then this throws an exception
+ try (final ClusterController clusterController =
+ flinkExtension.getFlinkResource().startCluster(1)) {
clusterController.submitJob(
new JobSubmission.JobSubmissionBuilder(jobJar)
.setDetached(false)
@@ -93,7 +95,7 @@ public void testScalaFreeJobExecution() throws Exception {
}
}
- static class TestParams {
+ private static class TestParams {
private final String description;
private final String mainClass;
@@ -112,14 +114,6 @@ static class TestParams {
this.builderSetup = builderSetup;
}
- public String getMainClass() {
- return mainClass;
- }
-
- public Consumer getBuilderSetup() {
- return builderSetup;
- }
-
@Override
public String toString() {
return description;
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CompileAndExecuteRemotePlanITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CompileAndExecuteRemotePlanITCase.java
index 439770cce244e..5ad87f0f87c63 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CompileAndExecuteRemotePlanITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CompileAndExecuteRemotePlanITCase.java
@@ -20,8 +20,7 @@
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
-import org.junit.Assume;
-import org.junit.Test;
+import org.junit.jupiter.api.TestTemplate;
import java.io.IOException;
import java.nio.file.Path;
@@ -32,27 +31,24 @@
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
/** End-to-End tests for COMPILE AND EXECUTE PLAN statement with hdfs as remote uri. */
-public class CompileAndExecuteRemotePlanITCase extends HdfsITCaseBase {
+class CompileAndExecuteRemotePlanITCase extends HdfsITCaseBase {
private static final String TABLE1 = "message";
private static final String TABLE2 = "employee";
private String planDir;
- public CompileAndExecuteRemotePlanITCase(String executionMode) {
- super(executionMode);
- }
-
@Override
- protected void createHDFS() {
+ void createHDFS() {
super.createHDFS();
planDir = getRemotePlanDir();
}
@Override
- protected Map generateReplaceVars() {
+ Map generateReplaceVars() {
Map varsMap = super.generateReplaceVars();
varsMap.put("$REMOTE_PLAN_DIR", planDir);
varsMap.put("$TABLE1", TABLE1);
@@ -60,10 +56,10 @@ protected Map generateReplaceVars() {
return varsMap;
}
- @Test
- public void testCompileAndExecutePlan() throws Exception {
+ @TestTemplate
+ void testCompileAndExecutePlan() throws Exception {
// COMPILE AND EXECUTE PLAN is not supported under batch mode
- Assume.assumeTrue(executionMode.equals("streaming"));
+ assumeThat(executionMode).isEqualTo("streaming");
Map> resultItems = new HashMap<>();
resultItems.put(result.resolve(TABLE1), Arrays.asList("1,Meow", "2,Purr"));
resultItems.put(result.resolve(TABLE2), Arrays.asList("1,Tom", "2,Jerry"));
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
index 06fbe73dd6639..85a333a8e9457 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
@@ -28,7 +28,7 @@
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.flink.ClusterController;
-import org.junit.Test;
+import org.junit.jupiter.api.TestTemplate;
import java.net.URI;
import java.time.Duration;
@@ -37,7 +37,7 @@
import java.util.List;
/** End-to-End tests for create table as select syntax. */
-public class CreateTableAsITCase extends SqlITCaseBase {
+class CreateTableAsITCase extends SqlITCaseBase {
private static final ResolvedSchema SINK_TABLE_SCHEMA =
new ResolvedSchema(
@@ -54,29 +54,25 @@ public class CreateTableAsITCase extends SqlITCaseBase {
private static final DebeziumJsonDeserializationSchema DESERIALIZATION_SCHEMA =
createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
- public CreateTableAsITCase(String executionMode) {
- super(executionMode);
- }
-
- @Test
- public void testCreateTableAs() throws Exception {
+ @TestTemplate
+ void testCreateTableAs() throws Exception {
runAndCheckSQL("create_table_as_e2e.sql", Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"));
}
- @Test
- public void testCreateTableAsInStatementSet() throws Exception {
+ @TestTemplate
+ void testCreateTableAsInStatementSet() throws Exception {
runAndCheckSQL(
"create_table_as_statementset_e2e.sql",
Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"));
}
@Override
- protected List formatRawResult(List rawResult) {
+ List formatRawResult(List rawResult) {
return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA, DESERIALIZATION_SCHEMA);
}
@Override
- protected void executeSqlStatements(
+ void executeSqlStatements(
ClusterController clusterController, List sqlLines, List dependencies)
throws Exception {
clusterController.submitSQLJob(
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java
index 8aae8efcc7124..0342c6bd51724 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java
@@ -27,11 +27,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.BeforeClass;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import java.io.File;
import java.io.FileNotFoundException;
@@ -41,38 +39,37 @@
import java.time.Duration;
import java.util.List;
+import static org.assertj.core.api.Assertions.fail;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
/** Base class for sql ITCase which depends on HDFS env. */
-public abstract class HdfsITCaseBase extends SqlITCaseBase {
+abstract class HdfsITCaseBase extends SqlITCaseBase {
private static final Path HADOOP_CLASSPATH =
ResourceTestUtils.getResource(".*hadoop.classpath");
- protected Configuration hdConf;
- protected MiniDFSCluster hdfsCluster;
-
- public HdfsITCaseBase(String executionMode) {
- super(executionMode);
- }
+ Configuration hdConf;
+ MiniDFSCluster hdfsCluster;
- @BeforeClass
- public static void verifyOS() {
- Assume.assumeTrue(
- "HDFS cluster cannot be started on Windows without extensions.",
- !OperatingSystem.isWindows());
+ @BeforeAll
+ static void verifyOS() {
+ assumeThat(OperatingSystem.isWindows())
+ .as("HDFS cluster cannot be started on Windows without extensions.")
+ .isFalse();
}
- @Before
- public void before() throws Exception {
+ @BeforeEach
+ void before() throws Exception {
super.before();
createHDFS();
}
- @After
- public void after() {
+ @AfterEach
+ void after() {
destroyHDFS();
}
- protected void createHDFS() {
+ void createHDFS() {
try {
hdConf = new Configuration();
File baseDir =
@@ -83,16 +80,16 @@ protected void createHDFS() {
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
hdfsCluster = builder.build();
} catch (Throwable e) {
- Assert.fail("Failed to create HDFS env" + e.getMessage());
+ fail("Failed to create HDFS env" + e.getMessage(), e);
}
}
- protected void destroyHDFS() {
+ void destroyHDFS() {
hdfsCluster.shutdown();
}
@Override
- protected void executeSqlStatements(
+ void executeSqlStatements(
ClusterController clusterController, List sqlLines, List dependencies)
throws Exception {
clusterController.submitSQLJob(
@@ -116,7 +113,7 @@ private String getHadoopClassPathContent() {
"File that contains hadoop classpath %s does not exist.",
HADOOP_CLASSPATH));
} catch (FileNotFoundException e) {
- Assert.fail("Test failed " + e.getMessage());
+ fail("Test failed " + e.getMessage(), e);
}
}
@@ -124,7 +121,7 @@ private String getHadoopClassPathContent() {
try {
classPathContent = FileUtils.readFileUtf8(hadoopClasspathFile);
} catch (IOException e) {
- Assert.fail("Test failed " + e.getMessage());
+ fail("Test failed " + e.getMessage(), e);
}
return classPathContent;
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
index dbdbae5b551a9..51a0f278cb385 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
@@ -28,7 +28,7 @@
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.flink.ClusterController;
-import org.junit.Test;
+import org.junit.jupiter.api.TestTemplate;
import java.net.URI;
import java.time.Duration;
@@ -36,7 +36,7 @@
import java.util.Collections;
import java.util.List;
-import static org.junit.Assume.assumeTrue;
+import static org.assertj.core.api.Assumptions.assumeThat;
/**
* End-to-End tests for table planner scala-free since 1.15. Due to scala-free of table planner
@@ -44,7 +44,7 @@
* class in execution time, ClassNotFound exception will be thrown. ITCase in table planner can not
* cover it, so we should add E2E test for these case.
*/
-public class PlannerScalaFreeITCase extends SqlITCaseBase {
+class PlannerScalaFreeITCase extends SqlITCaseBase {
private static final ResolvedSchema SINK_TABLE_SCHEMA =
new ResolvedSchema(
@@ -61,29 +61,25 @@ public class PlannerScalaFreeITCase extends SqlITCaseBase {
private static final DebeziumJsonDeserializationSchema DESERIALIZATION_SCHEMA =
createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
- public PlannerScalaFreeITCase(String executionMode) {
- super(executionMode);
- }
-
- @Test
- public void testImperativeUdaf() throws Exception {
+ @TestTemplate
+ void testImperativeUdaf() throws Exception {
runAndCheckSQL("scala_free_e2e.sql", Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"));
}
/** The test data is from {@link org.apache.flink.table.toolbox.TestSourceFunction#DATA}. */
- @Test
- public void testWatermarkPushDown() throws Exception {
- assumeTrue(executionMode.equalsIgnoreCase("streaming"));
+ @TestTemplate
+ void testWatermarkPushDown() throws Exception {
+ assumeThat(executionMode).isEqualTo("streaming");
runAndCheckSQL("watermark_push_down_e2e.sql", Arrays.asList("+I[Bob, 1]", "+I[Alice, 2]"));
}
@Override
- protected List formatRawResult(List rawResult) {
+ List formatRawResult(List rawResult) {
return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA, DESERIALIZATION_SCHEMA);
}
@Override
- protected void executeSqlStatements(
+ void executeSqlStatements(
ClusterController clusterController, List sqlLines, List dependencies)
throws Exception {
clusterController.submitSQLJob(
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java
index 41d1f55a1b23e..31ac671344bd4 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java
@@ -30,19 +30,21 @@
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.tests.util.flink.ClusterController;
-import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceExtension;
import org.apache.flink.tests.util.flink.FlinkResourceSetup;
import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,38 +69,33 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
-import static org.junit.jupiter.api.Assertions.assertTrue;
/** Base class for sql ITCase. */
-@RunWith(Parameterized.class)
-public abstract class SqlITCaseBase extends TestLogger {
+@ExtendWith({ParameterizedTestExtension.class, TestLoggerExtension.class})
+abstract class SqlITCaseBase {
private static final Logger LOG = LoggerFactory.getLogger(SqlITCaseBase.class);
- @Parameterized.Parameters(name = "executionMode")
- public static Collection data() {
+ @Parameters(name = "executionMode")
+ private static Collection data() {
return Arrays.asList("streaming", "batch");
}
- @Rule
- public final FlinkResource flink =
- new LocalStandaloneFlinkResourceFactory()
- .create(
- FlinkResourceSetup.builder()
- .addConfiguration(getConfiguration())
- .build());
+ @RegisterExtension
+ private final FlinkResourceExtension flinkExtension =
+ new FlinkResourceExtension(
+ new LocalStandaloneFlinkResourceFactory()
+ .create(
+ FlinkResourceSetup.builder()
+ .addConfiguration(getConfiguration())
+ .build()));
- @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+ @TempDir private Path tmp;
- protected final String executionMode;
+ @Parameter String executionMode;
- protected Path result;
+ Path result;
- protected static final Path SQL_TOOL_BOX_JAR =
- ResourceTestUtils.getResource(".*SqlToolbox.jar");
-
- public SqlITCaseBase(String executionMode) {
- this.executionMode = executionMode;
- }
+ static final Path SQL_TOOL_BOX_JAR = ResourceTestUtils.getResource(".*SqlToolbox.jar");
private static Configuration getConfiguration() {
// we have to enable checkpoint to trigger flushing for filesystem sink
@@ -107,18 +104,17 @@ private static Configuration getConfiguration() {
return flinkConfig;
}
- @Before
- public void before() throws Exception {
- Path tmpPath = tmp.getRoot().toPath();
- LOG.info("The current temporary path: {}", tmpPath);
- this.result = tmpPath.resolve(String.format("result-%s", UUID.randomUUID()));
+ @BeforeEach
+ void before() throws Exception {
+ LOG.info("The current temporary path: {}", tmp);
+ this.result = tmp.resolve(String.format("result-%s", UUID.randomUUID()));
}
- public void runAndCheckSQL(String sqlPath, List resultItems) throws Exception {
+ void runAndCheckSQL(String sqlPath, List resultItems) throws Exception {
runAndCheckSQL(sqlPath, Collections.singletonMap(result, resultItems));
}
- public void runAndCheckSQL(
+ void runAndCheckSQL(
String sqlPath,
List resultItems,
Function, List> formatter)
@@ -130,18 +126,18 @@ public void runAndCheckSQL(
Collections.emptyList());
}
- public void runAndCheckSQL(String sqlPath, Map> resultItems)
- throws Exception {
+ void runAndCheckSQL(String sqlPath, Map> resultItems) throws Exception {
runAndCheckSQL(sqlPath, resultItems, Collections.emptyMap(), Collections.emptyList());
}
- public void runAndCheckSQL(
+ void runAndCheckSQL(
String sqlPath,
Map> resultItems,
Map, List>> formatters,
List dependencies)
throws Exception {
- try (ClusterController clusterController = flink.startCluster(1)) {
+ try (ClusterController clusterController =
+ flinkExtension.getFlinkResource().startCluster(1)) {
List sqlLines = initializeSqlLines(sqlPath);
executeSqlStatements(clusterController, sqlLines, dependencies);
@@ -158,14 +154,14 @@ public void runAndCheckSQL(
}
}
- protected Map generateReplaceVars() {
+ Map generateReplaceVars() {
Map varsMap = new HashMap<>();
varsMap.put("$RESULT", this.result.toAbsolutePath().toString());
varsMap.put("$MODE", this.executionMode);
return varsMap;
}
- protected abstract void executeSqlStatements(
+ abstract void executeSqlStatements(
ClusterController clusterController, List sqlLines, List dependencies)
throws Exception;
@@ -200,9 +196,7 @@ private static void checkResultFile(
lines = readResultFiles(resultPath);
try {
List actual = resultFormatter.apply(lines);
- assertThat(actual)
- .hasSameSizeAs(expectedItems)
- .containsExactlyInAnyOrderElementsOf(expectedItems);
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedItems);
success = true;
break;
} catch (AssertionError e) {
@@ -217,10 +211,10 @@ private static void checkResultFile(
}
Thread.sleep(500);
}
- assertTrue(
- success,
- String.format(
- "Did not get expected results before timeout, actual result: %s.", lines));
+ assertThat(success)
+ .withFailMessage(
+ "Did not get expected results before timeout, actual result: %s.", lines)
+ .isTrue();
}
private static List readResultFiles(Path path) throws Exception {
@@ -242,16 +236,16 @@ private static List readResultFiles(Path path) throws Exception {
*
* {@code
* @Override
- * protected List formatRawResult(List rawResults) {
+ * List formatRawResult(List rawResults) {
* return convertToMaterializedResult(rawResults, schema, deserializationSchema);
* }
* }
*/
- protected List formatRawResult(List rawResults) {
+ List formatRawResult(List rawResults) {
return rawResults;
}
- protected static List convertToMaterializedResult(
+ static List convertToMaterializedResult(
List rawResults,
ResolvedSchema schema,
DeserializationSchema deserializationSchema) {
@@ -298,7 +292,7 @@ protected static List convertToMaterializedResult(
* Create a DebeziumJsonDeserializationSchema using the given {@link ResolvedSchema} to convert
* debezium-json formatted record into {@link RowData}.
*/
- protected static DebeziumJsonDeserializationSchema createDebeziumDeserializationSchema(
+ static DebeziumJsonDeserializationSchema createDebeziumDeserializationSchema(
ResolvedSchema schema) {
return new DebeziumJsonDeserializationSchema(
schema.toPhysicalRowDataType(),
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
index 0432aa76ecc84..6a3ae1f813060 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
@@ -26,19 +26,21 @@
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.TestTemplate;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
+import static org.assertj.core.api.Assertions.fail;
+
/** End-to-End tests for using remote jar. */
-public class UsingRemoteJarITCase extends HdfsITCaseBase {
+class UsingRemoteJarITCase extends HdfsITCaseBase {
private static final ResolvedSchema USER_ORDER_SCHEMA =
new ResolvedSchema(
@@ -55,16 +57,17 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
private static final DebeziumJsonDeserializationSchema USER_ORDER_DESERIALIZATION_SCHEMA =
createDebeziumDeserializationSchema(USER_ORDER_SCHEMA);
- @Rule public TestName name = new TestName();
+ private TestInfo testInfo;
private org.apache.hadoop.fs.Path hdPath;
private org.apache.hadoop.fs.FileSystem hdfs;
- public UsingRemoteJarITCase(String executionMode) {
- super(executionMode);
+ @BeforeEach
+ void captureTestInfo(TestInfo testInfo) {
+ this.testInfo = testInfo;
}
@Override
- protected void createHDFS() {
+ void createHDFS() {
super.createHDFS();
hdPath = new org.apache.hadoop.fs.Path("/test.jar");
try {
@@ -73,22 +76,22 @@ protected void createHDFS() {
hdfs.copyFromLocalFile(
new org.apache.hadoop.fs.Path(SQL_TOOL_BOX_JAR.toString()), hdPath);
} catch (IOException e) {
- Assert.fail("Failed to copy local test.jar to HDFS env" + e.getMessage());
+ fail("Failed to copy local test.jar to HDFS env" + e.getMessage(), e);
}
}
@Override
- protected void destroyHDFS() {
+ void destroyHDFS() {
try {
hdfs.delete(hdPath, false);
} catch (IOException e) {
- Assert.fail("Failed to cleanup HDFS path" + e.getMessage());
+ fail("Failed to cleanup HDFS path" + e.getMessage(), e);
}
super.destroyHDFS();
}
- @Test
- public void testUdfInRemoteJar() throws Exception {
+ @TestTemplate
+ void testUdfInRemoteJar() throws Exception {
runAndCheckSQL(
"remote_jar_e2e.sql",
Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
@@ -97,8 +100,8 @@ public void testUdfInRemoteJar() throws Exception {
raw, USER_ORDER_SCHEMA, USER_ORDER_DESERIALIZATION_SCHEMA));
}
- @Test
- public void testCreateFunctionFromRemoteJarViaSqlClient() throws Exception {
+ @TestTemplate
+ void testCreateFunctionFromRemoteJarViaSqlClient() throws Exception {
runAndCheckSQL(
"sql_client_remote_jar_e2e.sql",
Collections.singletonMap(result, Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]")),
@@ -116,16 +119,16 @@ public void testCreateFunctionFromRemoteJarViaSqlClient() throws Exception {
hdPath))));
}
- @Test
- public void testScalarUdfWhenCheckpointEnable() throws Exception {
+ @TestTemplate
+ void testScalarUdfWhenCheckpointEnable() throws Exception {
runAndCheckSQL(
"scalar_udf_e2e.sql",
Collections.singletonList(
"{\"before\":null,\"after\":{\"id\":1,\"str\":\"Hello Flink\"},\"op\":\"c\"}"));
}
- @Test
- public void testCreateTemporarySystemFunctionUsingRemoteJar() throws Exception {
+ @TestTemplate
+ void testCreateTemporarySystemFunctionUsingRemoteJar() throws Exception {
runAndCheckSQL(
"create_function_using_remote_jar_e2e.sql",
Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
@@ -134,8 +137,8 @@ public void testCreateTemporarySystemFunctionUsingRemoteJar() throws Exception {
raw, USER_ORDER_SCHEMA, USER_ORDER_DESERIALIZATION_SCHEMA));
}
- @Test
- public void testCreateCatalogFunctionUsingRemoteJar() throws Exception {
+ @TestTemplate
+ void testCreateCatalogFunctionUsingRemoteJar() throws Exception {
runAndCheckSQL(
"create_function_using_remote_jar_e2e.sql",
Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
@@ -144,8 +147,8 @@ public void testCreateCatalogFunctionUsingRemoteJar() throws Exception {
raw, USER_ORDER_SCHEMA, USER_ORDER_DESERIALIZATION_SCHEMA));
}
- @Test
- public void testCreateTemporaryCatalogFunctionUsingRemoteJar() throws Exception {
+ @TestTemplate
+ void testCreateTemporaryCatalogFunctionUsingRemoteJar() throws Exception {
Map replaceVars = generateReplaceVars();
replaceVars.put("$TEMPORARY", "TEMPORARY");
runAndCheckSQL(
@@ -157,7 +160,7 @@ public void testCreateTemporaryCatalogFunctionUsingRemoteJar() throws Exception
}
@Override
- protected Map generateReplaceVars() {
+ Map generateReplaceVars() {
String remoteJarPath =
String.format(
"hdfs://%s:%s/%s",
@@ -165,7 +168,7 @@ protected Map generateReplaceVars() {
Map varsMap = super.generateReplaceVars();
varsMap.put("$JAR_PATH", remoteJarPath);
- String methodName = name.getMethodName();
+ String methodName = testInfo.getTestMethod().map(Method::getName).orElse("");
if (methodName.startsWith("testCreateTemporarySystemFunction")) {
varsMap.put("$TEMPORARY", "TEMPORARY SYSTEM");
} else if (methodName.startsWith("testCreateTemporaryCatalogFunction")) {
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java
index b527d11c29daf..876202240a4f1 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java
@@ -26,10 +26,10 @@
* This test verifies that AsyncScalarFunction works correctly. The test passes if the
* application runs without errors.
*/
-public class AsyncScalarFunctionTest {
+class AsyncScalarFunctionTest {
@Test
- public void testAsyncScalarFunction() throws Exception {
+ void testAsyncScalarFunction() throws Exception {
// Create and run the example application
AsyncScalarFunctionExample example = new AsyncScalarFunctionExample();
example.execute();
diff --git a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
index 6200b52b87ee5..170a08598daa9 100644
--- a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
+++ b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
@@ -33,18 +33,19 @@
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
import org.apache.flink.tests.util.flink.ClusterController;
-import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceExtension;
import org.apache.flink.tests.util.flink.FlinkResourceSetup;
import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.util.function.SupplierWithException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
import javax.annotation.Nullable;
@@ -60,33 +61,36 @@
import java.util.stream.Collectors;
/** End-to-end test for the availability of metrics. */
-public class MetricsAvailabilityITCase extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class MetricsAvailabilityITCase {
private static final String HOST = "localhost";
private static final int PORT = 8081;
- @Rule
- public final FlinkResource dist =
- new LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+ @RegisterExtension
+ private final FlinkResourceExtension dist =
+ new FlinkResourceExtension(
+ new LocalStandaloneFlinkResourceFactory()
+ .create(FlinkResourceSetup.builder().build()));
@Nullable private static ScheduledExecutorService scheduledExecutorService = null;
- @BeforeClass
- public static void startExecutor() {
+ @BeforeAll
+ static void startExecutor() {
scheduledExecutorService = Executors.newScheduledThreadPool(4);
}
- @AfterClass
- public static void shutdownExecutor() {
+ @AfterAll
+ static void shutdownExecutor() {
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
}
}
@Test
- public void testReporter() throws Exception {
+ void testReporter() throws Exception {
final Deadline deadline = Deadline.fromNow(Duration.ofMinutes(10));
- try (ClusterController ignored = dist.startCluster(1)) {
+ try (ClusterController ignored = dist.getFlinkResource().startCluster(1)) {
final RestClient restClient =
new RestClient(new Configuration(), scheduledExecutorService);
diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
index 5556aad1b83c3..d619a6c0e0d42 100644
--- a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
+++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java
@@ -24,28 +24,29 @@
import org.apache.flink.tests.util.AutoClosableProcess;
import org.apache.flink.tests.util.CommandLineWrapper;
import org.apache.flink.tests.util.cache.DownloadCache;
+import org.apache.flink.tests.util.cache.DownloadCacheExtension;
import org.apache.flink.tests.util.flink.ClusterController;
-import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceExtension;
import org.apache.flink.tests.util.flink.FlinkResourceSetup;
import org.apache.flink.tests.util.flink.JarLocation;
import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.ProcessorArchitecture;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,24 +61,29 @@
import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking;
import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking;
+import static org.assertj.core.api.Assumptions.assumeThat;
/** End-to-end test for the PrometheusReporter. */
-@RunWith(Parameterized.class)
-public class PrometheusReporterEndToEndITCase extends TestLogger {
+@ExtendWith({ParameterizedTestExtension.class, TestLoggerExtension.class})
+class PrometheusReporterEndToEndITCase {
private static final Logger LOG =
LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final String PROMETHEUS_VERSION = "2.4.3";
- private static final String PROMETHEUS_FILE_NAME;
+ private static final String PROMETHEUS_VERSION = "3.11.2";
private static final String PROMETHEUS_JAR_PREFIX = "flink-metrics-prometheus";
- static {
- final String base = "prometheus-" + PROMETHEUS_VERSION + '.';
- final String os;
- final String platform;
+ private static String prometheusFileName;
+
+ private static final Pattern LOG_REPORTER_PORT_PATTERN =
+ Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*");
+
+ private static String getPrometheusFileName() {
+ String base = "prometheus-" + PROMETHEUS_VERSION + '.';
+ String os;
+ String platform;
switch (OperatingSystem.getCurrentOperatingSystem()) {
case MAC_OS:
os = "darwin";
@@ -107,19 +113,17 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
break;
}
- PROMETHEUS_FILE_NAME = base + os + "-" + platform;
+ return String.format("%s%s-%s", base, os, platform);
}
- private static final Pattern LOG_REPORTER_PORT_PATTERN =
- Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*");
-
- @BeforeClass
- public static void checkOS() {
- Assume.assumeFalse("This test does not run on Windows.", OperatingSystem.isWindows());
+ @BeforeAll
+ static void beforeAll() {
+ assumeThat(OperatingSystem.isWindows()).as("This test does not run on Windows.").isFalse();
+ prometheusFileName = getPrometheusFileName();
}
- @Parameterized.Parameters(name = "{index}: {0}")
- public static Collection testParameters() {
+ @Parameters(name = "{0}")
+ static Collection testParameters() {
return Arrays.asList(
TestParams.from(
"Jar in 'lib'",
@@ -137,18 +141,22 @@ public static Collection testParameters() {
}));
}
- @Rule public final FlinkResource dist;
+ @RegisterExtension private final FlinkResourceExtension dist;
- public PrometheusReporterEndToEndITCase(TestParams params) {
+ PrometheusReporterEndToEndITCase(TestParams params) {
final FlinkResourceSetup.FlinkResourceSetupBuilder builder = FlinkResourceSetup.builder();
params.getBuilderSetup().accept(builder);
builder.addConfiguration(getFlinkConfig());
- dist = new LocalStandaloneFlinkResourceFactory().create(builder.build());
+ dist =
+ new FlinkResourceExtension(
+ new LocalStandaloneFlinkResourceFactory().create(builder.build()));
}
- @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+ @TempDir private Path tmp;
- @Rule public final DownloadCache downloadCache = DownloadCache.get();
+ @RegisterExtension
+ private final DownloadCacheExtension downloadCache =
+ new DownloadCacheExtension(DownloadCache.get());
private static Configuration getFlinkConfig() {
final Configuration config = new Configuration();
@@ -162,10 +170,10 @@ private static Configuration getFlinkConfig() {
return config;
}
- @Test
- public void testReporter() throws Exception {
- final Path tmpPrometheusDir = tmp.newFolder().toPath().resolve("prometheus");
- final Path prometheusBinDir = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME);
+ @TestTemplate
+ void testReporter() throws Exception {
+ final Path tmpPrometheusDir = tmp.resolve("prometheus");
+ final Path prometheusBinDir = tmpPrometheusDir.resolve(prometheusFileName);
final Path prometheusConfig = prometheusBinDir.resolve("prometheus.yml");
final Path prometheusBinary = prometheusBinDir.resolve("prometheus");
Files.createDirectory(tmpPrometheusDir);
@@ -175,7 +183,7 @@ public void testReporter() throws Exception {
"https://github.com/prometheus/prometheus/releases/download/v"
+ PROMETHEUS_VERSION
+ '/'
- + PROMETHEUS_FILE_NAME
+ + prometheusFileName
+ ".tar.gz",
tmpPrometheusDir);
@@ -193,10 +201,11 @@ public void testReporter() throws Exception {
.inPlace()
.build());
- try (ClusterController ignored = dist.startCluster(1)) {
+ try (ClusterController ignored = dist.getFlinkResource().startCluster(1)) {
final List ports =
- dist.searchAllLogs(LOG_REPORTER_PORT_PATTERN, matcher -> matcher.group(1))
+ dist.getFlinkResource()
+ .searchAllLogs(LOG_REPORTER_PORT_PATTERN, matcher -> matcher.group(1))
.map(Integer::valueOf)
.collect(Collectors.toList());
@@ -288,7 +297,7 @@ private static void checkMetricAvailability(final OkHttpClient client, final Str
"Could not retrieve metric " + metric + " from Prometheus.", reportedException);
}
- static class TestParams {
+ private static class TestParams {
private final String jarLocationDescription;
private final Consumer builderSetup;
@@ -299,13 +308,13 @@ private TestParams(
this.builderSetup = builderSetup;
}
- public static TestParams from(
+ private static TestParams from(
String jarLocationDesription,
Consumer builderSetup) {
return new TestParams(jarLocationDesription, builderSetup);
}
- public Consumer getBuilderSetup() {
+ private Consumer getBuilderSetup() {
return builderSetup;
}
diff --git a/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java b/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java
index 676846527bdb9..3ff92b30a7bcb 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java
+++ b/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java
@@ -62,7 +62,7 @@
/** E2E Test for SqlClient. */
@Testcontainers
-public class SqlClientITCase {
+class SqlClientITCase {
private static final Logger LOG = LoggerFactory.getLogger(SqlClientITCase.class);
@@ -76,16 +76,16 @@ public class SqlClientITCase {
private final Path sqlConnectorUpsertTestJar =
ResourceTestUtils.getResource(".*flink-test-utils.*\\.jar");
- public static final Network NETWORK = Network.newNetwork();
+ private static final Network NETWORK = Network.newNetwork();
@Container
- public static final KafkaContainer KAFKA =
+ private static final KafkaContainer KAFKA =
new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
.withLogConsumer(LOG_CONSUMER);
- public final FlinkContainers flink =
+ private final FlinkContainers flink =
FlinkContainers.builder()
.withFlinkContainersSettings(
FlinkContainersSettings.builder()
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
index 0d7387b5803fa..3bf654b97afb4 100644
--- a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
@@ -31,22 +31,24 @@
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.flink.ClusterController;
import org.apache.flink.tests.util.flink.FlinkDistribution;
-import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceExtension;
import org.apache.flink.tests.util.flink.FlinkResourceSetup;
import org.apache.flink.tests.util.flink.GatewayController;
import org.apache.flink.tests.util.flink.JarLocation;
import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import java.io.File;
import java.io.FileInputStream;
@@ -67,6 +69,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.TimeZone;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -76,10 +79,12 @@
import static org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions.PORT;
import static org.apache.flink.table.utils.DateTimeUtils.formatTimestampMillis;
import static org.apache.flink.tests.util.TestUtils.readCsvResultFiles;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
/** E2E Tests for {@code SqlGateway} with {@link HiveServer2Endpoint}. */
-public class SqlGatewayE2ECase extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+class SqlGatewayE2ECase {
private static final Path HIVE_SQL_CONNECTOR_JAR =
ResourceTestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar");
@@ -92,9 +97,9 @@ public class SqlGatewayE2ECase extends TestLogger {
private static final Configuration ENDPOINT_CONFIG = new Configuration();
private static final String RESULT_KEY = "$RESULT";
- @ClassRule public static final TemporaryFolder FOLDER = new TemporaryFolder();
- @ClassRule public static final HiveContainer HIVE_CONTAINER = new HiveContainer();
- @Rule public final FlinkResource flinkResource = buildFlinkResource();
+ @TempDir private static Path FOLDER;
+ @Container private static final HiveContainer HIVE_CONTAINER = new HiveContainer();
+ @RegisterExtension private final FlinkResourceExtension flinkResourceExtension = buildFlinkResource();
private static NetUtils.Port hiveserver2Port;
private static NetUtils.Port restPort;
@@ -105,20 +110,20 @@ public class SqlGatewayE2ECase extends TestLogger {
private static final AtomicInteger CATALOG_COUNTER = new AtomicInteger(0);
private static String filesystemCatalogName;
- @BeforeClass
- public static void beforeClass() {
+ @BeforeAll
+ static void beforeClass() {
ENDPOINT_CONFIG.setString(
getPrefixedConfigOptionName(CATALOG_HIVE_CONF_DIR), createHiveConf().getParent());
}
- @AfterClass
- public static void afterClass() throws Exception {
+ @AfterAll
+ static void afterClass() throws Exception {
hiveserver2Port.close();
restPort.close();
}
@Test
- public void testHiveServer2ExecuteStatement() throws Exception {
+ void testHiveServer2ExecuteStatement() throws Exception {
executeStatement(
SQLJobClientMode.getHiveJDBC(
InetAddress.getByName("localhost").getHostAddress(),
@@ -126,7 +131,7 @@ public void testHiveServer2ExecuteStatement() throws Exception {
}
@Test
- public void testRestExecuteStatement() throws Exception {
+ void testRestExecuteStatement() throws Exception {
executeStatement(
SQLJobClientMode.getRestClient(
InetAddress.getByName("localhost").getHostAddress(),
@@ -135,19 +140,19 @@ public void testRestExecuteStatement() throws Exception {
}
@Test
- public void testSqlClientExecuteStatement() throws Exception {
+ void testSqlClientExecuteStatement() throws Exception {
executeStatement(
SQLJobClientMode.getGatewaySqlClient(
InetAddress.getByName("localhost").getHostAddress(), restPort.getPort()));
}
@Test
- public void testMaterializedTableInContinuousMode() throws Exception {
+ void testMaterializedTableInContinuousMode() throws Exception {
Duration continuousWaitTime = Duration.ofMinutes(5);
Duration continuousWaitPause = Duration.ofSeconds(10);
- try (GatewayController gateway = flinkResource.startSqlGateway();
- ClusterController ignore = flinkResource.startCluster(2)) {
+ try (GatewayController gateway = flinkResourceExtension.getFlinkResource().startSqlGateway();
+ ClusterController ignore = flinkResourceExtension.getFlinkResource().startCluster(2)) {
FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient =
initSessionWithCatalogStore(Collections.emptyMap());
@@ -199,12 +204,12 @@ public void testMaterializedTableInContinuousMode() throws Exception {
continuousWaitPause,
"Failed to wait for the result");
- File savepointFolder = FOLDER.newFolder("savepoint");
+ Path savepointFolder = Files.createDirectory(FOLDER.resolve("savepoint-" + UUID.randomUUID()));
// configure savepoint path
gatewayRestClient.executeStatementWithResult(
String.format(
"set 'execution.checkpointing.savepoint-dir'='file://%s'",
- savepointFolder.getAbsolutePath()));
+ savepointFolder.toAbsolutePath()));
// suspend the materialized table
gatewayRestClient.executeStatementWithResult(
@@ -239,13 +244,13 @@ public void testMaterializedTableInContinuousMode() throws Exception {
}
@Test
- public void testMaterializedTableInFullMode() throws Exception {
+ void testMaterializedTableInFullMode() throws Exception {
Duration fullModeWaitTime = Duration.ofMinutes(5);
Duration fullModeWaitPause = Duration.ofSeconds(10);
// init session
- try (GatewayController gateway = flinkResource.startSqlGateway();
- ClusterController ignore = flinkResource.startCluster(2)) {
+ try (GatewayController gateway = flinkResourceExtension.getFlinkResource().startSqlGateway();
+ ClusterController ignore = flinkResourceExtension.getFlinkResource().startCluster(2)) {
Map sessionProperties = new HashMap<>();
sessionProperties.put("workflow-scheduler.type", "embedded");
@@ -402,11 +407,11 @@ public void testMaterializedTableInFullMode() throws Exception {
private FlinkDistribution.TestSqlGatewayRestClient initSessionWithCatalogStore(
Map extraProperties) throws Exception {
- File catalogStoreFolder = FOLDER.newFolder();
+ Path catalogStoreFolder = Files.createTempDirectory(FOLDER, "catalogStore");
Map sessionProperties = new HashMap<>();
sessionProperties.put("table.catalog-store.kind", "file");
sessionProperties.put(
- "table.catalog-store.file.path", catalogStoreFolder.getAbsolutePath());
+ "table.catalog-store.file.path", catalogStoreFolder.toAbsolutePath().toString());
sessionProperties.putAll(extraProperties);
FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient =
@@ -417,9 +422,10 @@ private FlinkDistribution.TestSqlGatewayRestClient initSessionWithCatalogStore(
sessionProperties);
filesystemCatalogName = CATALOG_NAME_PREFIX + CATALOG_COUNTER.getAndAdd(1);
- File catalogFolder = FOLDER.newFolder(filesystemCatalogName);
- FOLDER.newFolder(
- String.format("%s/%s", filesystemCatalogName, FILESYSTEM_DEFAULT_DATABASE));
+ Path catalogFolder = Files.createDirectory(FOLDER.resolve(filesystemCatalogName));
+ Files.createDirectory(
+ FOLDER.resolve(
+ String.format("%s/%s", filesystemCatalogName, FILESYSTEM_DEFAULT_DATABASE)));
String createCatalogDDL =
String.format(
"CREATE CATALOG %s WITH (\n"
@@ -427,7 +433,7 @@ private FlinkDistribution.TestSqlGatewayRestClient initSessionWithCatalogStore(
+ " 'default-database' = 'test_db',\n"
+ " 'path' = '%s'\n"
+ ")",
- filesystemCatalogName, catalogFolder.getAbsolutePath());
+ filesystemCatalogName, catalogFolder.toAbsolutePath());
gatewayRestClient.executeStatementWithResult(createCatalogDDL);
gatewayRestClient.executeStatementWithResult(
String.format("USE CATALOG %s", filesystemCatalogName));
@@ -436,16 +442,16 @@ private FlinkDistribution.TestSqlGatewayRestClient initSessionWithCatalogStore(
}
private void executeStatement(SQLJobClientMode mode) throws Exception {
- File result = FOLDER.newFolder(mode.getClass().getName() + ".csv");
- try (GatewayController gateway = flinkResource.startSqlGateway();
- ClusterController ignore = flinkResource.startCluster(1)) {
+ Path result = Files.createTempDirectory(FOLDER, mode.getClass().getName() + ".csv");
+ try (GatewayController gateway = flinkResourceExtension.getFlinkResource().startSqlGateway();
+ ClusterController ignore = flinkResourceExtension.getFlinkResource().startCluster(1)) {
gateway.submitSQLJob(
- new SQLJobSubmission.SQLJobSubmissionBuilder(getSqlLines(result))
+ new SQLJobSubmission.SQLJobSubmissionBuilder(getSqlLines(result.toFile()))
.setClientMode(mode)
.build(),
Duration.ofSeconds(60));
}
- assertEquals(Collections.singletonList("1"), readCsvResultFiles(result.toPath()));
+ assertThat(readCsvResultFiles(result)).containsExactly("1");
}
private static List getSqlLines(File result) throws Exception {
@@ -480,7 +486,7 @@ private static File createHiveConf() {
}
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, HIVE_CONTAINER.getHiveMetastoreURI());
try {
- File site = FOLDER.newFile(HiveCatalog.HIVE_SITE_FILE);
+ File site = Files.createFile(FOLDER.resolve(HiveCatalog.HIVE_SITE_FILE)).toFile();
try (OutputStream out = new FileOutputStream(site)) {
hiveConf.writeXml(out);
}
@@ -495,7 +501,7 @@ private static File createHiveConf() {
* hadoop.classpath which contains all hadoop jars. It also moves planner to the lib and remove
* the planner load to make the Hive sql connector works.
*/
- private static FlinkResource buildFlinkResource() {
+ private static FlinkResourceExtension buildFlinkResource() {
// add hive jar and planner jar
FlinkResourceSetup.FlinkResourceSetupBuilder builder =
FlinkResourceSetup.builder()
@@ -536,7 +542,7 @@ private static FlinkResource buildFlinkResource() {
ENDPOINT_CONFIG.addAll(Configuration.fromMap(endpointConfig));
builder.addConfiguration(ENDPOINT_CONFIG);
- return new LocalStandaloneFlinkResourceFactory().create(builder.build());
+ return new FlinkResourceExtension(new LocalStandaloneFlinkResourceFactory().create(builder.build()));
}
private static String getPrefixedConfigOptionName(ConfigOption> option) {
diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java
index 6fa9064fcfd73..a908f270f785d 100644
--- a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java
+++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java
@@ -25,7 +25,6 @@
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
-import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
@@ -42,8 +41,8 @@ public class HiveContainer extends GenericContainer {
private static final Logger LOG = LoggerFactory.getLogger(HiveContainer.class);
- public static final String HOST_NAME = "hadoop-master";
- public static final int HIVE_METASTORE_PORT = 9083;
+ private static final String HOST_NAME = "hadoop-master";
+ private static final int HIVE_METASTORE_PORT = 9083;
private static final int NAME_NODE_WEB_PORT = 50070;
// Detailed log paths are from
@@ -96,9 +95,9 @@ protected void containerIsStarted(InspectContainerResponse containerInfo) {
}
@Override
- protected void finished(Description description) {
+ protected void containerIsStopping(InspectContainerResponse containerInfo) {
backupLogs();
- super.finished(description);
+ super.containerIsStopping(containerInfo);
}
public String getHiveMetastoreURI() {
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/ExternalResource.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/ExternalResource.java
deleted file mode 100644
index 2bb69bd333e05..0000000000000
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/ExternalResource.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util;
-
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-/**
- * Modified version of the jUnit {@link org.junit.rules.ExternalResource}.
- *
- * This version is an interface instead of an abstract class and allows resources to
- * differentiate between successful and failed tests in their {@code After} methods.
- */
-public interface ExternalResource extends TestRule {
-
- void before() throws Exception;
-
- void afterTestSuccess();
-
- default void afterTestFailure() {
- afterTestSuccess();
- }
-
- @Override
- default Statement apply(final Statement base, final Description description) {
- return new Statement() {
- @Override
- public void evaluate() throws Throwable {
- before();
- try {
- base.evaluate();
- } catch (final Throwable testThrowable) {
- try {
- afterTestFailure();
- } catch (final Throwable afterFailureThrowable) {
- testThrowable.addSuppressed(afterFailureThrowable);
- }
- throw testThrowable;
- }
- afterTestSuccess();
- }
- };
- }
-}