From 78e9cc58af61b2b8e03074ecb241de11d80a3a4c Mon Sep 17 00:00:00 2001 From: Mate Czagany Date: Thu, 23 Apr 2026 12:15:42 +0200 Subject: [PATCH] [FLINK-39527][test] Migrate flink-end-to-end-tests to JUnit 5 --- .../apache/flink/sql/tests/BatchSQLTest.java | 8 +- .../tests/AllroundMiniClusterTest.java | 41 ++++---- .../flink/tests/util/cache/DownloadCache.java | 11 ++- .../util/cache/DownloadCacheExtension.java | 54 ++++++++++ .../flink/tests/util/flink/FlinkResource.java | 11 ++- .../util/flink/FlinkResourceExtension.java | 51 ++++++++++ .../flink/tests/util/TestUtilsTest.java | 28 +++--- .../flink/tests/util/util/FileUtilsTest.java | 46 ++++----- .../flink/runtime/rest/RestClientITCase.java | 10 +- .../flink/tests/scala/ScalaFreeITCase.java | 52 +++++----- .../CompileAndExecuteRemotePlanITCase.java | 20 ++-- .../flink/table/sql/CreateTableAsITCase.java | 20 ++-- .../flink/table/sql/HdfsITCaseBase.java | 51 +++++----- .../table/sql/PlannerScalaFreeITCase.java | 24 ++--- .../apache/flink/table/sql/SqlITCaseBase.java | 98 +++++++++---------- .../flink/table/sql/UsingRemoteJarITCase.java | 55 ++++++----- .../test/async/AsyncScalarFunctionTest.java | 4 +- .../tests/MetricsAvailabilityITCase.java | 36 ++++--- .../PrometheusReporterEndToEndITCase.java | 91 +++++++++-------- .../src/test/java/SqlClientITCase.java | 8 +- .../table/gateway/SqlGatewayE2ECase.java | 90 +++++++++-------- .../gateway/containers/HiveContainer.java | 9 +- .../apache/flink/util/ExternalResource.java | 61 ------------ 23 files changed, 467 insertions(+), 412 deletions(-) create mode 100644 flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheExtension.java create mode 100644 flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceExtension.java delete mode 100644 flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/ExternalResource.java diff --git a/flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java b/flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java index 007bef8abd677..0c2ce4c429435 100644 --- a/flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java +++ b/flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java @@ -51,7 +51,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; @ExtendWith(TestLoggerExtension.class) class BatchSQLTest { @@ -79,7 +79,7 @@ class BatchSQLTest { // Only above shuffle modes are supported by the adaptive batch scheduler // , "ALL_EXCHANGES_PIPELINE" }) - public void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path tmpDir) throws Exception { + void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path tmpDir) throws Exception { LOG.info("Results for this test will be stored at: {}", tmpDir); String sqlStatement = new String(Files.readAllBytes(sqlPath)); @@ -143,13 +143,13 @@ public void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path tmpDir) thr .filter(file -> !file.isDirectory()) .map(File::getPath) .collect(Collectors.toList()); - assertEquals(1, files.size()); + assertThat(files).hasSize(1); Path resultFile = Paths.get(files.get(0)); LOG.info("Result found at {}", resultFile); String actual = new String(Files.readAllBytes(resultFile)); LOG.info("Actual result is: '{}'", actual); - assertEquals(expected, actual); + assertThat(actual).isEqualTo(expected); } } diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/test/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/test/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java index eabe48b063979..c8ede888852f0 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/test/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/test/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java @@ -21,39 +21,41 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.test.util.MiniClusterWithClientResource; -import org.apache.flink.util.TestLogger; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; -import java.io.File; +import java.nio.file.Path; import static org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY; /** DataStreamAllroundTestProgram on MiniCluster for manual debugging purposes. */ -@Ignore("Test is already part of end-to-end tests. This is for manual debugging.") -public class AllroundMiniClusterTest extends TestLogger { +@Disabled("Test is already part of end-to-end tests. This is for manual debugging.") +@ExtendWith(TestLoggerExtension.class) +class AllroundMiniClusterTest { - @BeforeClass - public static void beforeClass() { + @BeforeAll + static void beforeClass() { org.apache.log4j.PropertyConfigurator.configure( AllroundMiniClusterTest.class.getClassLoader().getResource("log4j.properties")); } - @ClassRule - public static MiniClusterWithClientResource miniClusterResource = - new MiniClusterWithClientResource( + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = + new MiniClusterExtension( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(4) .setNumberSlotsPerTaskManager(2) .setConfiguration(createConfiguration()) .build()); - @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + @TempDir private static Path temporaryFolder; private static Configuration createConfiguration() { Configuration configuration = new Configuration(); @@ -63,12 +65,13 @@ private static Configuration createConfiguration() { } @Test - public void runTest() throws Exception { - File checkpointDir = temporaryFolder.newFolder(); + void runTest() throws Exception { + Path checkpointDir = temporaryFolder.resolve("checkpoints"); + java.nio.file.Files.createDirectories(checkpointDir); DataStreamAllroundTestProgram.main( new String[] { "--environment.parallelism", "8", - "--state_backend.checkpoint_directory", checkpointDir.toURI().toString(), + "--state_backend.checkpoint_directory", checkpointDir.toUri().toString(), "--state_backend", "rocks", "--state_backend.rocks.incremental", "true", "--test.simulate_failure", "true", diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCache.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCache.java index 687f5144a24e7..0be985bfeb2d8 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCache.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCache.java @@ -19,7 +19,6 @@ package org.apache.flink.tests.util.cache; import org.apache.flink.tests.util.util.FactoryUtils; -import org.apache.flink.util.ExternalResource; import java.io.IOException; import java.nio.file.Path; @@ -30,7 +29,15 @@ * *

Whether, how, and for how long files are cached is implementation-dependent. */ -public interface DownloadCache extends ExternalResource { +public interface DownloadCache { + + void before() throws Exception; + + void afterTestSuccess(); + + default void afterTestFailure() { + afterTestSuccess(); + } /** * Returns either a cached or newly downloaded version of the given file. The returned file path diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheExtension.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheExtension.java new file mode 100644 index 0000000000000..5a006dc2a3a9a --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/cache/DownloadCacheExtension.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.cache; + +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +import java.io.IOException; +import java.nio.file.Path; + +/** JUnit 5 extension that wraps a {@link DownloadCache} and manages its lifecycle. */ +public class DownloadCacheExtension implements BeforeEachCallback, AfterEachCallback { + + private final DownloadCache delegate; + + public DownloadCacheExtension(DownloadCache delegate) { + this.delegate = delegate; + } + + public Path getOrDownload(String url, Path targetDir) throws IOException { + return delegate.getOrDownload(url, targetDir); + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + delegate.before(); + } + + @Override + public void afterEach(ExtensionContext context) { + if (context.getExecutionException().isPresent()) { + delegate.afterTestFailure(); + } else { + delegate.afterTestSuccess(); + } + } +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java index 397d742882389..63412d64b3011 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResource.java @@ -20,7 +20,6 @@ import org.apache.flink.test.util.JobSubmission; import org.apache.flink.tests.util.util.FactoryUtils; -import org.apache.flink.util.ExternalResource; import java.io.IOException; import java.time.Duration; @@ -30,7 +29,15 @@ import java.util.stream.Stream; /** Generic interface for interacting with Flink. */ -public interface FlinkResource extends ExternalResource { +public interface FlinkResource { + + void before() throws Exception; + + void afterTestSuccess(); + + default void afterTestFailure() { + afterTestSuccess(); + } /** * Starts a cluster. diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceExtension.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceExtension.java new file mode 100644 index 0000000000000..349a67f783fbf --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkResourceExtension.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.flink; + +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +/** JUnit 5 extension that wraps a {@link FlinkResource} and manages its lifecycle. */ +public class FlinkResourceExtension implements BeforeEachCallback, AfterEachCallback { + + private final FlinkResource delegate; + + public FlinkResourceExtension(FlinkResource delegate) { + this.delegate = delegate; + } + + public FlinkResource getFlinkResource() { + return delegate; + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + delegate.before(); + } + + @Override + public void afterEach(ExtensionContext context) { + if (context.getExecutionException().isPresent()) { + delegate.afterTestFailure(); + } else { + delegate.afterTestSuccess(); + } + } +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/TestUtilsTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/TestUtilsTest.java index 717f5103d57bd..05a6b907ca00c 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/TestUtilsTest.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/TestUtilsTest.java @@ -19,38 +19,40 @@ import org.apache.flink.tests.util.activation.OperatingSystemRestriction; import org.apache.flink.util.OperatingSystem; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import static org.assertj.core.api.Assertions.assertThat; + /** Tests for {@link TestUtils}. */ -public class TestUtilsTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) +class TestUtilsTest { - @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + @TempDir private Path temporaryFolder; - @BeforeClass - public static void setupClass() { + @BeforeAll + static void setupClass() { OperatingSystemRestriction.forbid( "Symbolic links usually require special permissions on Windows.", OperatingSystem.WINDOWS); } @Test - public void copyDirectory() throws IOException { + void copyDirectory() throws IOException { Path[] files = { Paths.get("file1"), Paths.get("dir1", "file2"), }; - Path source = temporaryFolder.newFolder("source").toPath(); + Path source = Files.createDirectory(temporaryFolder.resolve("source")); for (Path file : files) { Files.createDirectories(source.resolve(file).getParent()); Files.createFile(source.resolve(file)); @@ -63,7 +65,7 @@ public void copyDirectory() throws IOException { TestUtils.copyDirectory(symbolicLink, target); for (Path file : files) { - Assert.assertTrue(Files.exists(target.resolve(file))); + assertThat(target.resolve(file)).exists(); } } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java index d376432127ed5..f1d960cb60e2d 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java @@ -18,34 +18,32 @@ package org.apache.flink.tests.util.util; import org.apache.flink.test.util.FileUtils; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.Assert; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.regex.Pattern; +import static org.assertj.core.api.Assertions.assertThat; + /** Tests for {@link FileUtils}. */ -public class FileUtilsTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) +class FileUtilsTest { - @ClassRule public static final TemporaryFolder TMP = new TemporaryFolder(); + private static final List ORIGINAL_LINES = List.of("line1", "line2", "line3"); - private static final List ORIGINAL_LINES = - Collections.unmodifiableList(Arrays.asList("line1", "line2", "line3")); private Path testFile; - @Before - public void setupFile() throws IOException { - Path path = TMP.newFile().toPath(); + @BeforeEach + void setupFile(@TempDir Path tmpDir) throws IOException { + Path path = Files.createTempFile(tmpDir, null, null); Files.write(path, ORIGINAL_LINES); @@ -53,27 +51,25 @@ public void setupFile() throws IOException { } @Test - public void replaceSingleMatch() throws IOException { + void replaceSingleMatch() throws IOException { FileUtils.replace(testFile, Pattern.compile("line1"), matcher -> "removed"); - Assert.assertEquals( - Arrays.asList("removed", ORIGINAL_LINES.get(1), ORIGINAL_LINES.get(2)), - Files.readAllLines(testFile)); + assertThat(Files.readAllLines(testFile)) + .containsExactly("removed", ORIGINAL_LINES.get(1), ORIGINAL_LINES.get(2)); } @Test - public void replaceMultipleMatch() throws IOException { + void replaceMultipleMatch() throws IOException { FileUtils.replace(testFile, Pattern.compile("line(.*)"), matcher -> matcher.group(1)); - Assert.assertEquals(Arrays.asList("1", "2", "3"), Files.readAllLines(testFile)); + assertThat(Files.readAllLines(testFile)).containsExactly("1", "2", "3"); } @Test - public void replaceWithEmptyLine() throws IOException { + void replaceWithEmptyLine() throws IOException { FileUtils.replace(testFile, Pattern.compile("line2"), matcher -> ""); - Assert.assertEquals( - Arrays.asList(ORIGINAL_LINES.get(0), "", ORIGINAL_LINES.get(2)), - Files.readAllLines(testFile)); + assertThat(Files.readAllLines(testFile)) + .containsExactly(ORIGINAL_LINES.get(0), "", ORIGINAL_LINES.get(2)); } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java index 57cbac116d15e..fb3c77339e9f5 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-restclient/src/test/java/org/apache/flink/runtime/rest/RestClientITCase.java @@ -24,12 +24,13 @@ import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders; import org.apache.flink.runtime.rest.versioning.RestAPIVersion; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.concurrent.Executors; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import java.net.URL; import java.util.Collection; @@ -38,10 +39,11 @@ import java.util.concurrent.TimeUnit; /** Tests for {@link RestClient} that rely on external connections. */ -public class RestClientITCase extends TestLogger { +@ExtendWith(TestLoggerExtension.class) +class RestClientITCase { @Test - public void testHttpsConnectionWithDefaultCerts() throws Exception { + void testHttpsConnectionWithDefaultCerts() throws Exception { final Configuration config = new Configuration(); final URL httpsUrl = new URL("https://raw.githubusercontent.com"); TestUrlMessageHeaders testUrlMessageHeaders = diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java index e0ad66e0af84f..f3ce565b2ba2d 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/test/java/org/apache/flink/tests/scala/ScalaFreeITCase.java @@ -21,15 +21,17 @@ import org.apache.flink.test.util.JobSubmission; import org.apache.flink.tests.util.flink.ClusterController; import org.apache.flink.tests.util.flink.FlinkResource; +import org.apache.flink.tests.util.flink.FlinkResourceExtension; import org.apache.flink.tests.util.flink.FlinkResourceSetup; import org.apache.flink.tests.util.flink.JarLocation; -import org.apache.flink.testutils.executor.TestExecutorResource; -import org.apache.flink.util.TestLogger; +import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; import java.nio.file.Path; import java.time.Duration; @@ -42,19 +44,19 @@ * Tests that Flink does not require Scala for jobs that do not use the Scala APIs. This covers both * pure Java jobs, and Scala jobs that use the Java APIs exclusively with Scala types. */ -@RunWith(Parameterized.class) -public class ScalaFreeITCase extends TestLogger { +@ExtendWith({ParameterizedTestExtension.class, TestLoggerExtension.class}) +class ScalaFreeITCase { - @Rule - public final TestExecutorResource testExecutorResource = - new TestExecutorResource<>( + @RegisterExtension + private static final TestExecutorExtension TEST_EXECUTOR_EXTENSION = + new TestExecutorExtension<>( java.util.concurrent.Executors::newSingleThreadScheduledExecutor); - @Rule public final FlinkResource flink; + @RegisterExtension private final FlinkResourceExtension flinkExtension; private final String mainClass; - @Parameterized.Parameters(name = "{index}: {0}") - public static Collection testParameters() { + @Parameters(name = "{0}") + private static Collection testParameters() { return Arrays.asList( new TestParams("Java job, without Scala in lib/", JavaJob.class.getCanonicalName()), new TestParams( @@ -69,21 +71,21 @@ public static Collection testParameters() { JarLocation.LIB))); } - public ScalaFreeITCase(TestParams testParams) { + private ScalaFreeITCase(TestParams testParams) { final FlinkResourceSetup.FlinkResourceSetupBuilder builder = FlinkResourceSetup.builder() .moveJar("flink-scala", JarLocation.LIB, JarLocation.OPT); testParams.builderSetup.accept(builder); - flink = FlinkResource.get(builder.build()); + flinkExtension = new FlinkResourceExtension(FlinkResource.get(builder.build())); mainClass = testParams.mainClass; } - @Test - public void testScalaFreeJobExecution() throws Exception { + @TestTemplate + void testScalaFreeJobExecution() throws Exception { final Path jobJar = ResourceTestUtils.getResource("/jobs.jar"); - try (final ClusterController clusterController = flink.startCluster(1)) { - // if the job fails then this throws an exception + try (final ClusterController clusterController = + flinkExtension.getFlinkResource().startCluster(1)) { clusterController.submitJob( new JobSubmission.JobSubmissionBuilder(jobJar) .setDetached(false) @@ -93,7 +95,7 @@ public void testScalaFreeJobExecution() throws Exception { } } - static class TestParams { + private static class TestParams { private final String description; private final String mainClass; @@ -112,14 +114,6 @@ static class TestParams { this.builderSetup = builderSetup; } - public String getMainClass() { - return mainClass; - } - - public Consumer getBuilderSetup() { - return builderSetup; - } - @Override public String toString() { return description; diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CompileAndExecuteRemotePlanITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CompileAndExecuteRemotePlanITCase.java index 439770cce244e..5ad87f0f87c63 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CompileAndExecuteRemotePlanITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CompileAndExecuteRemotePlanITCase.java @@ -20,8 +20,7 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.RemoteIterator; -import org.junit.Assume; -import org.junit.Test; +import org.junit.jupiter.api.TestTemplate; import java.io.IOException; import java.nio.file.Path; @@ -32,27 +31,24 @@ import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; /** End-to-End tests for COMPILE AND EXECUTE PLAN statement with hdfs as remote uri. */ -public class CompileAndExecuteRemotePlanITCase extends HdfsITCaseBase { +class CompileAndExecuteRemotePlanITCase extends HdfsITCaseBase { private static final String TABLE1 = "message"; private static final String TABLE2 = "employee"; private String planDir; - public CompileAndExecuteRemotePlanITCase(String executionMode) { - super(executionMode); - } - @Override - protected void createHDFS() { + void createHDFS() { super.createHDFS(); planDir = getRemotePlanDir(); } @Override - protected Map generateReplaceVars() { + Map generateReplaceVars() { Map varsMap = super.generateReplaceVars(); varsMap.put("$REMOTE_PLAN_DIR", planDir); varsMap.put("$TABLE1", TABLE1); @@ -60,10 +56,10 @@ protected Map generateReplaceVars() { return varsMap; } - @Test - public void testCompileAndExecutePlan() throws Exception { + @TestTemplate + void testCompileAndExecutePlan() throws Exception { // COMPILE AND EXECUTE PLAN is not supported under batch mode - Assume.assumeTrue(executionMode.equals("streaming")); + assumeThat(executionMode).isEqualTo("streaming"); Map> resultItems = new HashMap<>(); resultItems.put(result.resolve(TABLE1), Arrays.asList("1,Meow", "2,Purr")); resultItems.put(result.resolve(TABLE2), Arrays.asList("1,Tom", "2,Jerry")); diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java index 06fbe73dd6639..85a333a8e9457 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java @@ -28,7 +28,7 @@ import org.apache.flink.test.util.SQLJobSubmission; import org.apache.flink.tests.util.flink.ClusterController; -import org.junit.Test; +import org.junit.jupiter.api.TestTemplate; import java.net.URI; import java.time.Duration; @@ -37,7 +37,7 @@ import java.util.List; /** End-to-End tests for create table as select syntax. */ -public class CreateTableAsITCase extends SqlITCaseBase { +class CreateTableAsITCase extends SqlITCaseBase { private static final ResolvedSchema SINK_TABLE_SCHEMA = new ResolvedSchema( @@ -54,29 +54,25 @@ public class CreateTableAsITCase extends SqlITCaseBase { private static final DebeziumJsonDeserializationSchema DESERIALIZATION_SCHEMA = createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA); - public CreateTableAsITCase(String executionMode) { - super(executionMode); - } - - @Test - public void testCreateTableAs() throws Exception { + @TestTemplate + void testCreateTableAs() throws Exception { runAndCheckSQL("create_table_as_e2e.sql", Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]")); } - @Test - public void testCreateTableAsInStatementSet() throws Exception { + @TestTemplate + void testCreateTableAsInStatementSet() throws Exception { runAndCheckSQL( "create_table_as_statementset_e2e.sql", Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]")); } @Override - protected List formatRawResult(List rawResult) { + List formatRawResult(List rawResult) { return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA, DESERIALIZATION_SCHEMA); } @Override - protected void executeSqlStatements( + void executeSqlStatements( ClusterController clusterController, List sqlLines, List dependencies) throws Exception { clusterController.submitSQLJob( diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java index 8aae8efcc7124..0342c6bd51724 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java @@ -27,11 +27,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.BeforeClass; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import java.io.File; import java.io.FileNotFoundException; @@ -41,38 +39,37 @@ import java.time.Duration; import java.util.List; +import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assumptions.assumeThat; + /** Base class for sql ITCase which depends on HDFS env. */ -public abstract class HdfsITCaseBase extends SqlITCaseBase { +abstract class HdfsITCaseBase extends SqlITCaseBase { private static final Path HADOOP_CLASSPATH = ResourceTestUtils.getResource(".*hadoop.classpath"); - protected Configuration hdConf; - protected MiniDFSCluster hdfsCluster; - - public HdfsITCaseBase(String executionMode) { - super(executionMode); - } + Configuration hdConf; + MiniDFSCluster hdfsCluster; - @BeforeClass - public static void verifyOS() { - Assume.assumeTrue( - "HDFS cluster cannot be started on Windows without extensions.", - !OperatingSystem.isWindows()); + @BeforeAll + static void verifyOS() { + assumeThat(OperatingSystem.isWindows()) + .as("HDFS cluster cannot be started on Windows without extensions.") + .isFalse(); } - @Before - public void before() throws Exception { + @BeforeEach + void before() throws Exception { super.before(); createHDFS(); } - @After - public void after() { + @AfterEach + void after() { destroyHDFS(); } - protected void createHDFS() { + void createHDFS() { try { hdConf = new Configuration(); File baseDir = @@ -83,16 +80,16 @@ protected void createHDFS() { MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); hdfsCluster = builder.build(); } catch (Throwable e) { - Assert.fail("Failed to create HDFS env" + e.getMessage()); + fail("Failed to create HDFS env" + e.getMessage(), e); } } - protected void destroyHDFS() { + void destroyHDFS() { hdfsCluster.shutdown(); } @Override - protected void executeSqlStatements( + void executeSqlStatements( ClusterController clusterController, List sqlLines, List dependencies) throws Exception { clusterController.submitSQLJob( @@ -116,7 +113,7 @@ private String getHadoopClassPathContent() { "File that contains hadoop classpath %s does not exist.", HADOOP_CLASSPATH)); } catch (FileNotFoundException e) { - Assert.fail("Test failed " + e.getMessage()); + fail("Test failed " + e.getMessage(), e); } } @@ -124,7 +121,7 @@ private String getHadoopClassPathContent() { try { classPathContent = FileUtils.readFileUtf8(hadoopClasspathFile); } catch (IOException e) { - Assert.fail("Test failed " + e.getMessage()); + fail("Test failed " + e.getMessage(), e); } return classPathContent; } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java index dbdbae5b551a9..51a0f278cb385 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java @@ -28,7 +28,7 @@ import org.apache.flink.test.util.SQLJobSubmission; import org.apache.flink.tests.util.flink.ClusterController; -import org.junit.Test; +import org.junit.jupiter.api.TestTemplate; import java.net.URI; import java.time.Duration; @@ -36,7 +36,7 @@ import java.util.Collections; import java.util.List; -import static org.junit.Assume.assumeTrue; +import static org.assertj.core.api.Assumptions.assumeThat; /** * End-to-End tests for table planner scala-free since 1.15. Due to scala-free of table planner @@ -44,7 +44,7 @@ * class in execution time, ClassNotFound exception will be thrown. ITCase in table planner can not * cover it, so we should add E2E test for these case. */ -public class PlannerScalaFreeITCase extends SqlITCaseBase { +class PlannerScalaFreeITCase extends SqlITCaseBase { private static final ResolvedSchema SINK_TABLE_SCHEMA = new ResolvedSchema( @@ -61,29 +61,25 @@ public class PlannerScalaFreeITCase extends SqlITCaseBase { private static final DebeziumJsonDeserializationSchema DESERIALIZATION_SCHEMA = createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA); - public PlannerScalaFreeITCase(String executionMode) { - super(executionMode); - } - - @Test - public void testImperativeUdaf() throws Exception { + @TestTemplate + void testImperativeUdaf() throws Exception { runAndCheckSQL("scala_free_e2e.sql", Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]")); } /** The test data is from {@link org.apache.flink.table.toolbox.TestSourceFunction#DATA}. */ - @Test - public void testWatermarkPushDown() throws Exception { - assumeTrue(executionMode.equalsIgnoreCase("streaming")); + @TestTemplate + void testWatermarkPushDown() throws Exception { + assumeThat(executionMode).isEqualTo("streaming"); runAndCheckSQL("watermark_push_down_e2e.sql", Arrays.asList("+I[Bob, 1]", "+I[Alice, 2]")); } @Override - protected List formatRawResult(List rawResult) { + List formatRawResult(List rawResult) { return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA, DESERIALIZATION_SCHEMA); } @Override - protected void executeSqlStatements( + void executeSqlStatements( ClusterController clusterController, List sqlLines, List dependencies) throws Exception { clusterController.submitSQLJob( diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java index 41d1f55a1b23e..31ac671344bd4 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java @@ -30,19 +30,21 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.tests.util.flink.ClusterController; -import org.apache.flink.tests.util.flink.FlinkResource; +import org.apache.flink.tests.util.flink.FlinkResourceExtension; import org.apache.flink.tests.util.flink.FlinkResourceSetup; import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.Before; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,38 +69,33 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; -import static org.junit.jupiter.api.Assertions.assertTrue; /** Base class for sql ITCase. */ -@RunWith(Parameterized.class) -public abstract class SqlITCaseBase extends TestLogger { +@ExtendWith({ParameterizedTestExtension.class, TestLoggerExtension.class}) +abstract class SqlITCaseBase { private static final Logger LOG = LoggerFactory.getLogger(SqlITCaseBase.class); - @Parameterized.Parameters(name = "executionMode") - public static Collection data() { + @Parameters(name = "executionMode") + private static Collection data() { return Arrays.asList("streaming", "batch"); } - @Rule - public final FlinkResource flink = - new LocalStandaloneFlinkResourceFactory() - .create( - FlinkResourceSetup.builder() - .addConfiguration(getConfiguration()) - .build()); + @RegisterExtension + private final FlinkResourceExtension flinkExtension = + new FlinkResourceExtension( + new LocalStandaloneFlinkResourceFactory() + .create( + FlinkResourceSetup.builder() + .addConfiguration(getConfiguration()) + .build())); - @Rule public final TemporaryFolder tmp = new TemporaryFolder(); + @TempDir private Path tmp; - protected final String executionMode; + @Parameter String executionMode; - protected Path result; + Path result; - protected static final Path SQL_TOOL_BOX_JAR = - ResourceTestUtils.getResource(".*SqlToolbox.jar"); - - public SqlITCaseBase(String executionMode) { - this.executionMode = executionMode; - } + static final Path SQL_TOOL_BOX_JAR = ResourceTestUtils.getResource(".*SqlToolbox.jar"); private static Configuration getConfiguration() { // we have to enable checkpoint to trigger flushing for filesystem sink @@ -107,18 +104,17 @@ private static Configuration getConfiguration() { return flinkConfig; } - @Before - public void before() throws Exception { - Path tmpPath = tmp.getRoot().toPath(); - LOG.info("The current temporary path: {}", tmpPath); - this.result = tmpPath.resolve(String.format("result-%s", UUID.randomUUID())); + @BeforeEach + void before() throws Exception { + LOG.info("The current temporary path: {}", tmp); + this.result = tmp.resolve(String.format("result-%s", UUID.randomUUID())); } - public void runAndCheckSQL(String sqlPath, List resultItems) throws Exception { + void runAndCheckSQL(String sqlPath, List resultItems) throws Exception { runAndCheckSQL(sqlPath, Collections.singletonMap(result, resultItems)); } - public void runAndCheckSQL( + void runAndCheckSQL( String sqlPath, List resultItems, Function, List> formatter) @@ -130,18 +126,18 @@ public void runAndCheckSQL( Collections.emptyList()); } - public void runAndCheckSQL(String sqlPath, Map> resultItems) - throws Exception { + void runAndCheckSQL(String sqlPath, Map> resultItems) throws Exception { runAndCheckSQL(sqlPath, resultItems, Collections.emptyMap(), Collections.emptyList()); } - public void runAndCheckSQL( + void runAndCheckSQL( String sqlPath, Map> resultItems, Map, List>> formatters, List dependencies) throws Exception { - try (ClusterController clusterController = flink.startCluster(1)) { + try (ClusterController clusterController = + flinkExtension.getFlinkResource().startCluster(1)) { List sqlLines = initializeSqlLines(sqlPath); executeSqlStatements(clusterController, sqlLines, dependencies); @@ -158,14 +154,14 @@ public void runAndCheckSQL( } } - protected Map generateReplaceVars() { + Map generateReplaceVars() { Map varsMap = new HashMap<>(); varsMap.put("$RESULT", this.result.toAbsolutePath().toString()); varsMap.put("$MODE", this.executionMode); return varsMap; } - protected abstract void executeSqlStatements( + abstract void executeSqlStatements( ClusterController clusterController, List sqlLines, List dependencies) throws Exception; @@ -200,9 +196,7 @@ private static void checkResultFile( lines = readResultFiles(resultPath); try { List actual = resultFormatter.apply(lines); - assertThat(actual) - .hasSameSizeAs(expectedItems) - .containsExactlyInAnyOrderElementsOf(expectedItems); + assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedItems); success = true; break; } catch (AssertionError e) { @@ -217,10 +211,10 @@ private static void checkResultFile( } Thread.sleep(500); } - assertTrue( - success, - String.format( - "Did not get expected results before timeout, actual result: %s.", lines)); + assertThat(success) + .withFailMessage( + "Did not get expected results before timeout, actual result: %s.", lines) + .isTrue(); } private static List readResultFiles(Path path) throws Exception { @@ -242,16 +236,16 @@ private static List readResultFiles(Path path) throws Exception { * *

{@code
      * @Override
-     * protected List formatRawResult(List rawResults) {
+     * List formatRawResult(List rawResults) {
      *     return convertToMaterializedResult(rawResults, schema, deserializationSchema);
      * }
      * }
*/ - protected List formatRawResult(List rawResults) { + List formatRawResult(List rawResults) { return rawResults; } - protected static List convertToMaterializedResult( + static List convertToMaterializedResult( List rawResults, ResolvedSchema schema, DeserializationSchema deserializationSchema) { @@ -298,7 +292,7 @@ protected static List convertToMaterializedResult( * Create a DebeziumJsonDeserializationSchema using the given {@link ResolvedSchema} to convert * debezium-json formatted record into {@link RowData}. */ - protected static DebeziumJsonDeserializationSchema createDebeziumDeserializationSchema( + static DebeziumJsonDeserializationSchema createDebeziumDeserializationSchema( ResolvedSchema schema) { return new DebeziumJsonDeserializationSchema( schema.toPhysicalRowDataType(), diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java index 0432aa76ecc84..6a3ae1f813060 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java @@ -26,19 +26,21 @@ import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.TestTemplate; import java.io.IOException; +import java.lang.reflect.Method; import java.net.URI; import java.util.Arrays; import java.util.Collections; import java.util.Map; +import static org.assertj.core.api.Assertions.fail; + /** End-to-End tests for using remote jar. */ -public class UsingRemoteJarITCase extends HdfsITCaseBase { +class UsingRemoteJarITCase extends HdfsITCaseBase { private static final ResolvedSchema USER_ORDER_SCHEMA = new ResolvedSchema( @@ -55,16 +57,17 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase { private static final DebeziumJsonDeserializationSchema USER_ORDER_DESERIALIZATION_SCHEMA = createDebeziumDeserializationSchema(USER_ORDER_SCHEMA); - @Rule public TestName name = new TestName(); + private TestInfo testInfo; private org.apache.hadoop.fs.Path hdPath; private org.apache.hadoop.fs.FileSystem hdfs; - public UsingRemoteJarITCase(String executionMode) { - super(executionMode); + @BeforeEach + void captureTestInfo(TestInfo testInfo) { + this.testInfo = testInfo; } @Override - protected void createHDFS() { + void createHDFS() { super.createHDFS(); hdPath = new org.apache.hadoop.fs.Path("/test.jar"); try { @@ -73,22 +76,22 @@ protected void createHDFS() { hdfs.copyFromLocalFile( new org.apache.hadoop.fs.Path(SQL_TOOL_BOX_JAR.toString()), hdPath); } catch (IOException e) { - Assert.fail("Failed to copy local test.jar to HDFS env" + e.getMessage()); + fail("Failed to copy local test.jar to HDFS env" + e.getMessage(), e); } } @Override - protected void destroyHDFS() { + void destroyHDFS() { try { hdfs.delete(hdPath, false); } catch (IOException e) { - Assert.fail("Failed to cleanup HDFS path" + e.getMessage()); + fail("Failed to cleanup HDFS path" + e.getMessage(), e); } super.destroyHDFS(); } - @Test - public void testUdfInRemoteJar() throws Exception { + @TestTemplate + void testUdfInRemoteJar() throws Exception { runAndCheckSQL( "remote_jar_e2e.sql", Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"), @@ -97,8 +100,8 @@ public void testUdfInRemoteJar() throws Exception { raw, USER_ORDER_SCHEMA, USER_ORDER_DESERIALIZATION_SCHEMA)); } - @Test - public void testCreateFunctionFromRemoteJarViaSqlClient() throws Exception { + @TestTemplate + void testCreateFunctionFromRemoteJarViaSqlClient() throws Exception { runAndCheckSQL( "sql_client_remote_jar_e2e.sql", Collections.singletonMap(result, Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]")), @@ -116,16 +119,16 @@ public void testCreateFunctionFromRemoteJarViaSqlClient() throws Exception { hdPath)))); } - @Test - public void testScalarUdfWhenCheckpointEnable() throws Exception { + @TestTemplate + void testScalarUdfWhenCheckpointEnable() throws Exception { runAndCheckSQL( "scalar_udf_e2e.sql", Collections.singletonList( "{\"before\":null,\"after\":{\"id\":1,\"str\":\"Hello Flink\"},\"op\":\"c\"}")); } - @Test - public void testCreateTemporarySystemFunctionUsingRemoteJar() throws Exception { + @TestTemplate + void testCreateTemporarySystemFunctionUsingRemoteJar() throws Exception { runAndCheckSQL( "create_function_using_remote_jar_e2e.sql", Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"), @@ -134,8 +137,8 @@ public void testCreateTemporarySystemFunctionUsingRemoteJar() throws Exception { raw, USER_ORDER_SCHEMA, USER_ORDER_DESERIALIZATION_SCHEMA)); } - @Test - public void testCreateCatalogFunctionUsingRemoteJar() throws Exception { + @TestTemplate + void testCreateCatalogFunctionUsingRemoteJar() throws Exception { runAndCheckSQL( "create_function_using_remote_jar_e2e.sql", Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"), @@ -144,8 +147,8 @@ public void testCreateCatalogFunctionUsingRemoteJar() throws Exception { raw, USER_ORDER_SCHEMA, USER_ORDER_DESERIALIZATION_SCHEMA)); } - @Test - public void testCreateTemporaryCatalogFunctionUsingRemoteJar() throws Exception { + @TestTemplate + void testCreateTemporaryCatalogFunctionUsingRemoteJar() throws Exception { Map replaceVars = generateReplaceVars(); replaceVars.put("$TEMPORARY", "TEMPORARY"); runAndCheckSQL( @@ -157,7 +160,7 @@ public void testCreateTemporaryCatalogFunctionUsingRemoteJar() throws Exception } @Override - protected Map generateReplaceVars() { + Map generateReplaceVars() { String remoteJarPath = String.format( "hdfs://%s:%s/%s", @@ -165,7 +168,7 @@ protected Map generateReplaceVars() { Map varsMap = super.generateReplaceVars(); varsMap.put("$JAR_PATH", remoteJarPath); - String methodName = name.getMethodName(); + String methodName = testInfo.getTestMethod().map(Method::getName).orElse(""); if (methodName.startsWith("testCreateTemporarySystemFunction")) { varsMap.put("$TEMPORARY", "TEMPORARY SYSTEM"); } else if (methodName.startsWith("testCreateTemporaryCatalogFunction")) { diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java index b527d11c29daf..876202240a4f1 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java @@ -26,10 +26,10 @@ *

This test verifies that AsyncScalarFunction works correctly. The test passes if the * application runs without errors. */ -public class AsyncScalarFunctionTest { +class AsyncScalarFunctionTest { @Test - public void testAsyncScalarFunction() throws Exception { + void testAsyncScalarFunction() throws Exception { // Create and run the example application AsyncScalarFunctionExample example = new AsyncScalarFunctionExample(); example.execute(); diff --git a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java index 6200b52b87ee5..170a08598daa9 100644 --- a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java +++ b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java @@ -33,18 +33,19 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; import org.apache.flink.tests.util.flink.ClusterController; -import org.apache.flink.tests.util.flink.FlinkResource; +import org.apache.flink.tests.util.flink.FlinkResourceExtension; import org.apache.flink.tests.util.flink.FlinkResourceSetup; import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.util.function.SupplierWithException; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; import javax.annotation.Nullable; @@ -60,33 +61,36 @@ import java.util.stream.Collectors; /** End-to-end test for the availability of metrics. */ -public class MetricsAvailabilityITCase extends TestLogger { +@ExtendWith(TestLoggerExtension.class) +class MetricsAvailabilityITCase { private static final String HOST = "localhost"; private static final int PORT = 8081; - @Rule - public final FlinkResource dist = - new LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build()); + @RegisterExtension + private final FlinkResourceExtension dist = + new FlinkResourceExtension( + new LocalStandaloneFlinkResourceFactory() + .create(FlinkResourceSetup.builder().build())); @Nullable private static ScheduledExecutorService scheduledExecutorService = null; - @BeforeClass - public static void startExecutor() { + @BeforeAll + static void startExecutor() { scheduledExecutorService = Executors.newScheduledThreadPool(4); } - @AfterClass - public static void shutdownExecutor() { + @AfterAll + static void shutdownExecutor() { if (scheduledExecutorService != null) { scheduledExecutorService.shutdown(); } } @Test - public void testReporter() throws Exception { + void testReporter() throws Exception { final Deadline deadline = Deadline.fromNow(Duration.ofMinutes(10)); - try (ClusterController ignored = dist.startCluster(1)) { + try (ClusterController ignored = dist.getFlinkResource().startCluster(1)) { final RestClient restClient = new RestClient(new Configuration(), scheduledExecutorService); diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java index 5556aad1b83c3..d619a6c0e0d42 100644 --- a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java +++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java @@ -24,28 +24,29 @@ import org.apache.flink.tests.util.AutoClosableProcess; import org.apache.flink.tests.util.CommandLineWrapper; import org.apache.flink.tests.util.cache.DownloadCache; +import org.apache.flink.tests.util.cache.DownloadCacheExtension; import org.apache.flink.tests.util.flink.ClusterController; -import org.apache.flink.tests.util.flink.FlinkResource; +import org.apache.flink.tests.util.flink.FlinkResourceExtension; import org.apache.flink.tests.util.flink.FlinkResourceSetup; import org.apache.flink.tests.util.flink.JarLocation; import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.ProcessorArchitecture; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,24 +61,29 @@ import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking; import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking; +import static org.assertj.core.api.Assumptions.assumeThat; /** End-to-end test for the PrometheusReporter. */ -@RunWith(Parameterized.class) -public class PrometheusReporterEndToEndITCase extends TestLogger { +@ExtendWith({ParameterizedTestExtension.class, TestLoggerExtension.class}) +class PrometheusReporterEndToEndITCase { private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporterEndToEndITCase.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String PROMETHEUS_VERSION = "2.4.3"; - private static final String PROMETHEUS_FILE_NAME; + private static final String PROMETHEUS_VERSION = "3.11.2"; private static final String PROMETHEUS_JAR_PREFIX = "flink-metrics-prometheus"; - static { - final String base = "prometheus-" + PROMETHEUS_VERSION + '.'; - final String os; - final String platform; + private static String prometheusFileName; + + private static final Pattern LOG_REPORTER_PORT_PATTERN = + Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*"); + + private static String getPrometheusFileName() { + String base = "prometheus-" + PROMETHEUS_VERSION + '.'; + String os; + String platform; switch (OperatingSystem.getCurrentOperatingSystem()) { case MAC_OS: os = "darwin"; @@ -107,19 +113,17 @@ public class PrometheusReporterEndToEndITCase extends TestLogger { break; } - PROMETHEUS_FILE_NAME = base + os + "-" + platform; + return String.format("%s%s-%s", base, os, platform); } - private static final Pattern LOG_REPORTER_PORT_PATTERN = - Pattern.compile(".*Started PrometheusReporter HTTP server on port ([0-9]+).*"); - - @BeforeClass - public static void checkOS() { - Assume.assumeFalse("This test does not run on Windows.", OperatingSystem.isWindows()); + @BeforeAll + static void beforeAll() { + assumeThat(OperatingSystem.isWindows()).as("This test does not run on Windows.").isFalse(); + prometheusFileName = getPrometheusFileName(); } - @Parameterized.Parameters(name = "{index}: {0}") - public static Collection testParameters() { + @Parameters(name = "{0}") + static Collection testParameters() { return Arrays.asList( TestParams.from( "Jar in 'lib'", @@ -137,18 +141,22 @@ public static Collection testParameters() { })); } - @Rule public final FlinkResource dist; + @RegisterExtension private final FlinkResourceExtension dist; - public PrometheusReporterEndToEndITCase(TestParams params) { + PrometheusReporterEndToEndITCase(TestParams params) { final FlinkResourceSetup.FlinkResourceSetupBuilder builder = FlinkResourceSetup.builder(); params.getBuilderSetup().accept(builder); builder.addConfiguration(getFlinkConfig()); - dist = new LocalStandaloneFlinkResourceFactory().create(builder.build()); + dist = + new FlinkResourceExtension( + new LocalStandaloneFlinkResourceFactory().create(builder.build())); } - @Rule public final TemporaryFolder tmp = new TemporaryFolder(); + @TempDir private Path tmp; - @Rule public final DownloadCache downloadCache = DownloadCache.get(); + @RegisterExtension + private final DownloadCacheExtension downloadCache = + new DownloadCacheExtension(DownloadCache.get()); private static Configuration getFlinkConfig() { final Configuration config = new Configuration(); @@ -162,10 +170,10 @@ private static Configuration getFlinkConfig() { return config; } - @Test - public void testReporter() throws Exception { - final Path tmpPrometheusDir = tmp.newFolder().toPath().resolve("prometheus"); - final Path prometheusBinDir = tmpPrometheusDir.resolve(PROMETHEUS_FILE_NAME); + @TestTemplate + void testReporter() throws Exception { + final Path tmpPrometheusDir = tmp.resolve("prometheus"); + final Path prometheusBinDir = tmpPrometheusDir.resolve(prometheusFileName); final Path prometheusConfig = prometheusBinDir.resolve("prometheus.yml"); final Path prometheusBinary = prometheusBinDir.resolve("prometheus"); Files.createDirectory(tmpPrometheusDir); @@ -175,7 +183,7 @@ public void testReporter() throws Exception { "https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' - + PROMETHEUS_FILE_NAME + + prometheusFileName + ".tar.gz", tmpPrometheusDir); @@ -193,10 +201,11 @@ public void testReporter() throws Exception { .inPlace() .build()); - try (ClusterController ignored = dist.startCluster(1)) { + try (ClusterController ignored = dist.getFlinkResource().startCluster(1)) { final List ports = - dist.searchAllLogs(LOG_REPORTER_PORT_PATTERN, matcher -> matcher.group(1)) + dist.getFlinkResource() + .searchAllLogs(LOG_REPORTER_PORT_PATTERN, matcher -> matcher.group(1)) .map(Integer::valueOf) .collect(Collectors.toList()); @@ -288,7 +297,7 @@ private static void checkMetricAvailability(final OkHttpClient client, final Str "Could not retrieve metric " + metric + " from Prometheus.", reportedException); } - static class TestParams { + private static class TestParams { private final String jarLocationDescription; private final Consumer builderSetup; @@ -299,13 +308,13 @@ private TestParams( this.builderSetup = builderSetup; } - public static TestParams from( + private static TestParams from( String jarLocationDesription, Consumer builderSetup) { return new TestParams(jarLocationDesription, builderSetup); } - public Consumer getBuilderSetup() { + private Consumer getBuilderSetup() { return builderSetup; } diff --git a/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java b/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java index 676846527bdb9..3ff92b30a7bcb 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java +++ b/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java @@ -62,7 +62,7 @@ /** E2E Test for SqlClient. */ @Testcontainers -public class SqlClientITCase { +class SqlClientITCase { private static final Logger LOG = LoggerFactory.getLogger(SqlClientITCase.class); @@ -76,16 +76,16 @@ public class SqlClientITCase { private final Path sqlConnectorUpsertTestJar = ResourceTestUtils.getResource(".*flink-test-utils.*\\.jar"); - public static final Network NETWORK = Network.newNetwork(); + private static final Network NETWORK = Network.newNetwork(); @Container - public static final KafkaContainer KAFKA = + private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS) .withLogConsumer(LOG_CONSUMER); - public final FlinkContainers flink = + private final FlinkContainers flink = FlinkContainers.builder() .withFlinkContainersSettings( FlinkContainersSettings.builder() diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java index 0d7387b5803fa..3bf654b97afb4 100644 --- a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java +++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java @@ -31,22 +31,24 @@ import org.apache.flink.test.util.SQLJobSubmission; import org.apache.flink.tests.util.flink.ClusterController; import org.apache.flink.tests.util.flink.FlinkDistribution; -import org.apache.flink.tests.util.flink.FlinkResource; +import org.apache.flink.tests.util.flink.FlinkResourceExtension; import org.apache.flink.tests.util.flink.FlinkResourceSetup; import org.apache.flink.tests.util.flink.GatewayController; import org.apache.flink.tests.util.flink.JarLocation; import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory; import org.apache.flink.util.FileUtils; import org.apache.flink.util.NetUtils; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; import org.apache.hadoop.hive.conf.HiveConf; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; import java.io.File; import java.io.FileInputStream; @@ -67,6 +69,7 @@ import java.util.Map; import java.util.Objects; import java.util.TimeZone; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -76,10 +79,12 @@ import static org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions.PORT; import static org.apache.flink.table.utils.DateTimeUtils.formatTimestampMillis; import static org.apache.flink.tests.util.TestUtils.readCsvResultFiles; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; /** E2E Tests for {@code SqlGateway} with {@link HiveServer2Endpoint}. */ -public class SqlGatewayE2ECase extends TestLogger { +@Testcontainers +@ExtendWith(TestLoggerExtension.class) +class SqlGatewayE2ECase { private static final Path HIVE_SQL_CONNECTOR_JAR = ResourceTestUtils.getResource(".*dependencies/flink-sql-connector-hive-.*.jar"); @@ -92,9 +97,9 @@ public class SqlGatewayE2ECase extends TestLogger { private static final Configuration ENDPOINT_CONFIG = new Configuration(); private static final String RESULT_KEY = "$RESULT"; - @ClassRule public static final TemporaryFolder FOLDER = new TemporaryFolder(); - @ClassRule public static final HiveContainer HIVE_CONTAINER = new HiveContainer(); - @Rule public final FlinkResource flinkResource = buildFlinkResource(); + @TempDir private static Path FOLDER; + @Container private static final HiveContainer HIVE_CONTAINER = new HiveContainer(); + @RegisterExtension private final FlinkResourceExtension flinkResourceExtension = buildFlinkResource(); private static NetUtils.Port hiveserver2Port; private static NetUtils.Port restPort; @@ -105,20 +110,20 @@ public class SqlGatewayE2ECase extends TestLogger { private static final AtomicInteger CATALOG_COUNTER = new AtomicInteger(0); private static String filesystemCatalogName; - @BeforeClass - public static void beforeClass() { + @BeforeAll + static void beforeClass() { ENDPOINT_CONFIG.setString( getPrefixedConfigOptionName(CATALOG_HIVE_CONF_DIR), createHiveConf().getParent()); } - @AfterClass - public static void afterClass() throws Exception { + @AfterAll + static void afterClass() throws Exception { hiveserver2Port.close(); restPort.close(); } @Test - public void testHiveServer2ExecuteStatement() throws Exception { + void testHiveServer2ExecuteStatement() throws Exception { executeStatement( SQLJobClientMode.getHiveJDBC( InetAddress.getByName("localhost").getHostAddress(), @@ -126,7 +131,7 @@ public void testHiveServer2ExecuteStatement() throws Exception { } @Test - public void testRestExecuteStatement() throws Exception { + void testRestExecuteStatement() throws Exception { executeStatement( SQLJobClientMode.getRestClient( InetAddress.getByName("localhost").getHostAddress(), @@ -135,19 +140,19 @@ public void testRestExecuteStatement() throws Exception { } @Test - public void testSqlClientExecuteStatement() throws Exception { + void testSqlClientExecuteStatement() throws Exception { executeStatement( SQLJobClientMode.getGatewaySqlClient( InetAddress.getByName("localhost").getHostAddress(), restPort.getPort())); } @Test - public void testMaterializedTableInContinuousMode() throws Exception { + void testMaterializedTableInContinuousMode() throws Exception { Duration continuousWaitTime = Duration.ofMinutes(5); Duration continuousWaitPause = Duration.ofSeconds(10); - try (GatewayController gateway = flinkResource.startSqlGateway(); - ClusterController ignore = flinkResource.startCluster(2)) { + try (GatewayController gateway = flinkResourceExtension.getFlinkResource().startSqlGateway(); + ClusterController ignore = flinkResourceExtension.getFlinkResource().startCluster(2)) { FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient = initSessionWithCatalogStore(Collections.emptyMap()); @@ -199,12 +204,12 @@ public void testMaterializedTableInContinuousMode() throws Exception { continuousWaitPause, "Failed to wait for the result"); - File savepointFolder = FOLDER.newFolder("savepoint"); + Path savepointFolder = Files.createDirectory(FOLDER.resolve("savepoint-" + UUID.randomUUID())); // configure savepoint path gatewayRestClient.executeStatementWithResult( String.format( "set 'execution.checkpointing.savepoint-dir'='file://%s'", - savepointFolder.getAbsolutePath())); + savepointFolder.toAbsolutePath())); // suspend the materialized table gatewayRestClient.executeStatementWithResult( @@ -239,13 +244,13 @@ public void testMaterializedTableInContinuousMode() throws Exception { } @Test - public void testMaterializedTableInFullMode() throws Exception { + void testMaterializedTableInFullMode() throws Exception { Duration fullModeWaitTime = Duration.ofMinutes(5); Duration fullModeWaitPause = Duration.ofSeconds(10); // init session - try (GatewayController gateway = flinkResource.startSqlGateway(); - ClusterController ignore = flinkResource.startCluster(2)) { + try (GatewayController gateway = flinkResourceExtension.getFlinkResource().startSqlGateway(); + ClusterController ignore = flinkResourceExtension.getFlinkResource().startCluster(2)) { Map sessionProperties = new HashMap<>(); sessionProperties.put("workflow-scheduler.type", "embedded"); @@ -402,11 +407,11 @@ public void testMaterializedTableInFullMode() throws Exception { private FlinkDistribution.TestSqlGatewayRestClient initSessionWithCatalogStore( Map extraProperties) throws Exception { - File catalogStoreFolder = FOLDER.newFolder(); + Path catalogStoreFolder = Files.createTempDirectory(FOLDER, "catalogStore"); Map sessionProperties = new HashMap<>(); sessionProperties.put("table.catalog-store.kind", "file"); sessionProperties.put( - "table.catalog-store.file.path", catalogStoreFolder.getAbsolutePath()); + "table.catalog-store.file.path", catalogStoreFolder.toAbsolutePath().toString()); sessionProperties.putAll(extraProperties); FlinkDistribution.TestSqlGatewayRestClient gatewayRestClient = @@ -417,9 +422,10 @@ private FlinkDistribution.TestSqlGatewayRestClient initSessionWithCatalogStore( sessionProperties); filesystemCatalogName = CATALOG_NAME_PREFIX + CATALOG_COUNTER.getAndAdd(1); - File catalogFolder = FOLDER.newFolder(filesystemCatalogName); - FOLDER.newFolder( - String.format("%s/%s", filesystemCatalogName, FILESYSTEM_DEFAULT_DATABASE)); + Path catalogFolder = Files.createDirectory(FOLDER.resolve(filesystemCatalogName)); + Files.createDirectory( + FOLDER.resolve( + String.format("%s/%s", filesystemCatalogName, FILESYSTEM_DEFAULT_DATABASE))); String createCatalogDDL = String.format( "CREATE CATALOG %s WITH (\n" @@ -427,7 +433,7 @@ private FlinkDistribution.TestSqlGatewayRestClient initSessionWithCatalogStore( + " 'default-database' = 'test_db',\n" + " 'path' = '%s'\n" + ")", - filesystemCatalogName, catalogFolder.getAbsolutePath()); + filesystemCatalogName, catalogFolder.toAbsolutePath()); gatewayRestClient.executeStatementWithResult(createCatalogDDL); gatewayRestClient.executeStatementWithResult( String.format("USE CATALOG %s", filesystemCatalogName)); @@ -436,16 +442,16 @@ private FlinkDistribution.TestSqlGatewayRestClient initSessionWithCatalogStore( } private void executeStatement(SQLJobClientMode mode) throws Exception { - File result = FOLDER.newFolder(mode.getClass().getName() + ".csv"); - try (GatewayController gateway = flinkResource.startSqlGateway(); - ClusterController ignore = flinkResource.startCluster(1)) { + Path result = Files.createTempDirectory(FOLDER, mode.getClass().getName() + ".csv"); + try (GatewayController gateway = flinkResourceExtension.getFlinkResource().startSqlGateway(); + ClusterController ignore = flinkResourceExtension.getFlinkResource().startCluster(1)) { gateway.submitSQLJob( - new SQLJobSubmission.SQLJobSubmissionBuilder(getSqlLines(result)) + new SQLJobSubmission.SQLJobSubmissionBuilder(getSqlLines(result.toFile())) .setClientMode(mode) .build(), Duration.ofSeconds(60)); } - assertEquals(Collections.singletonList("1"), readCsvResultFiles(result.toPath())); + assertThat(readCsvResultFiles(result)).containsExactly("1"); } private static List getSqlLines(File result) throws Exception { @@ -480,7 +486,7 @@ private static File createHiveConf() { } hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, HIVE_CONTAINER.getHiveMetastoreURI()); try { - File site = FOLDER.newFile(HiveCatalog.HIVE_SITE_FILE); + File site = Files.createFile(FOLDER.resolve(HiveCatalog.HIVE_SITE_FILE)).toFile(); try (OutputStream out = new FileOutputStream(site)) { hiveConf.writeXml(out); } @@ -495,7 +501,7 @@ private static File createHiveConf() { * hadoop.classpath which contains all hadoop jars. It also moves planner to the lib and remove * the planner load to make the Hive sql connector works. */ - private static FlinkResource buildFlinkResource() { + private static FlinkResourceExtension buildFlinkResource() { // add hive jar and planner jar FlinkResourceSetup.FlinkResourceSetupBuilder builder = FlinkResourceSetup.builder() @@ -536,7 +542,7 @@ private static FlinkResource buildFlinkResource() { ENDPOINT_CONFIG.addAll(Configuration.fromMap(endpointConfig)); builder.addConfiguration(ENDPOINT_CONFIG); - return new LocalStandaloneFlinkResourceFactory().create(builder.build()); + return new FlinkResourceExtension(new LocalStandaloneFlinkResourceFactory().create(builder.build())); } private static String getPrefixedConfigOptionName(ConfigOption option) { diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java index 6fa9064fcfd73..a908f270f785d 100644 --- a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java +++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/containers/HiveContainer.java @@ -25,7 +25,6 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; -import org.junit.runner.Description; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; @@ -42,8 +41,8 @@ public class HiveContainer extends GenericContainer { private static final Logger LOG = LoggerFactory.getLogger(HiveContainer.class); - public static final String HOST_NAME = "hadoop-master"; - public static final int HIVE_METASTORE_PORT = 9083; + private static final String HOST_NAME = "hadoop-master"; + private static final int HIVE_METASTORE_PORT = 9083; private static final int NAME_NODE_WEB_PORT = 50070; // Detailed log paths are from @@ -96,9 +95,9 @@ protected void containerIsStarted(InspectContainerResponse containerInfo) { } @Override - protected void finished(Description description) { + protected void containerIsStopping(InspectContainerResponse containerInfo) { backupLogs(); - super.finished(description); + super.containerIsStopping(containerInfo); } public String getHiveMetastoreURI() { diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/ExternalResource.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/ExternalResource.java deleted file mode 100644 index 2bb69bd333e05..0000000000000 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/ExternalResource.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.util; - -import org.junit.rules.TestRule; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; - -/** - * Modified version of the jUnit {@link org.junit.rules.ExternalResource}. - * - *

This version is an interface instead of an abstract class and allows resources to - * differentiate between successful and failed tests in their {@code After} methods. - */ -public interface ExternalResource extends TestRule { - - void before() throws Exception; - - void afterTestSuccess(); - - default void afterTestFailure() { - afterTestSuccess(); - } - - @Override - default Statement apply(final Statement base, final Description description) { - return new Statement() { - @Override - public void evaluate() throws Throwable { - before(); - try { - base.evaluate(); - } catch (final Throwable testThrowable) { - try { - afterTestFailure(); - } catch (final Throwable afterFailureThrowable) { - testThrowable.addSuppressed(afterFailureThrowable); - } - throw testThrowable; - } - afterTestSuccess(); - } - }; - } -}