Skip to content

Commit

Permalink
[FLINK-32851][runtime][JUnit5 Migration] The rest package of flink-ru…
Browse files Browse the repository at this point in the history
…ntime module
  • Loading branch information
wangzzu committed Sep 19, 2023
1 parent 05b0b61 commit df6618a
Show file tree
Hide file tree
Showing 120 changed files with 1,569 additions and 1,597 deletions.
Expand Up @@ -19,12 +19,16 @@
package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;

import org.junit.jupiter.api.extension.ExtendWith;

import java.util.ArrayList;
import java.util.List;

/** Tests that the {@link JarListInfo} can be marshalled and unmarshalled. */
public class JarListInfoTest extends RestResponseMarshallingTestBase<JarListInfo> {
/** Tests for the {@link JarListInfo}. */
@ExtendWith(NoOpTestExtension.class)
class JarListInfoTest extends RestResponseMarshallingTestBase<JarListInfo> {
@Override
protected Class<JarListInfo> getTestResponseClass() {
return JarListInfo.class;
Expand Down
Expand Up @@ -21,14 +21,18 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;

import org.junit.jupiter.api.extension.ExtendWith;

import java.util.Arrays;
import java.util.Collections;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link JarRunRequestBody}. */
public class JarRunRequestBodyTest extends RestRequestMarshallingTestBase<JarRunRequestBody> {
@ExtendWith(NoOpTestExtension.class)
class JarRunRequestBodyTest extends RestRequestMarshallingTestBase<JarRunRequestBody> {

@Override
protected Class<JarRunRequestBody> getTestRequestClass() {
Expand Down
Expand Up @@ -20,11 +20,15 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;

import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link JarRunResponseBody}. */
public class JarRunResponseBodyTest extends RestResponseMarshallingTestBase<JarRunResponseBody> {
@ExtendWith(NoOpTestExtension.class)
class JarRunResponseBodyTest extends RestResponseMarshallingTestBase<JarRunResponseBody> {

@Override
protected Class<JarRunResponseBody> getTestResponseClass() {
Expand Down
Expand Up @@ -19,12 +19,15 @@
package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;

import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link JarUploadResponseBody}. */
public class JarUploadResponseBodyTest
extends RestResponseMarshallingTestBase<JarUploadResponseBody> {
@ExtendWith(NoOpTestExtension.class)
class JarUploadResponseBodyTest extends RestResponseMarshallingTestBase<JarUploadResponseBody> {

@Override
protected Class<JarUploadResponseBody> getTestResponseClass() {
Expand Down
Expand Up @@ -21,11 +21,15 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;

import org.junit.jupiter.api.extension.ExtendWith;

import java.util.Arrays;

/** Marshalling test for the {@link JobIdsWithStatusOverview} message. */
public class JobIdsWithStatusOverviewTest
@ExtendWith(NoOpTestExtension.class)
class JobIdsWithStatusOverviewTest
extends RestResponseMarshallingTestBase<JobIdsWithStatusOverview> {

@Override
Expand Down
Expand Up @@ -20,9 +20,13 @@

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;

import org.junit.jupiter.api.extension.ExtendWith;

/** Tests for the {@link JobStatusInfo}. */
public class JobStatusInfoTest extends RestResponseMarshallingTestBase<JobStatusInfo> {
@ExtendWith(NoOpTestExtension.class)
class JobStatusInfoTest extends RestResponseMarshallingTestBase<JobStatusInfo> {
@Override
protected Class<JobStatusInfo> getTestResponseClass() {
return JobStatusInfo.class;
Expand Down
Expand Up @@ -45,10 +45,10 @@
/** Tests for the multipart functionality of the {@link RestClient}. */
class RestClientMultipartTest {

@TempDir public static Path tempDir;
@TempDir private static Path tempDir;

@RegisterExtension
public static final AllCallbackWrapper<MultipartUploadExtension>
private static final AllCallbackWrapper<MultipartUploadExtension>
MULTIPART_UPLOAD_EXTENSION_WRAPPER =
new AllCallbackWrapper<>(new MultipartUploadExtension(() -> tempDir));

Expand Down
Expand Up @@ -63,7 +63,7 @@
class RestClientTest {

@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =
private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =
TestingUtils.defaultExecutorExtension();

private static final String unroutableIp = "240.0.0.0";
Expand Down Expand Up @@ -92,7 +92,7 @@ void testConnectionTimeout() throws Exception {
}

@Test
public void testInvalidVersionRejection() throws Exception {
void testInvalidVersionRejection() throws Exception {
try (final RestClient restClient =
new RestClient(new Configuration(), Executors.directExecutor())) {
assertThatThrownBy(
Expand All @@ -112,7 +112,7 @@ public void testInvalidVersionRejection() throws Exception {

/** Tests that we fail the operation if the remote connection closes. */
@Test
public void testConnectionClosedHandling() throws Exception {
void testConnectionClosedHandling() throws Exception {
final Configuration config = new Configuration();
config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
try (final ServerSocket serverSocket = new ServerSocket(0);
Expand Down Expand Up @@ -159,7 +159,7 @@ public void testConnectionClosedHandling() throws Exception {

/** Tests that we fail the operation if the client closes. */
@Test
public void testRestClientClosedHandling() throws Exception {
void testRestClientClosedHandling() throws Exception {
final Configuration config = new Configuration();
config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);

Expand Down Expand Up @@ -213,7 +213,7 @@ public void testRestClientClosedHandling() throws Exception {
* <p>See FLINK-32583
*/
@Test
public void testCloseClientBeforeRequest() throws Exception {
void testCloseClientBeforeRequest() throws Exception {
try (final RestClient restClient =
new RestClient(new Configuration(), Executors.directExecutor())) {
restClient.close(); // Intentionally close the client prior to the request
Expand All @@ -235,7 +235,7 @@ public void testCloseClientBeforeRequest() throws Exception {
}

@Test
public void testCloseClientWhileProcessingRequest() throws Exception {
void testCloseClientWhileProcessingRequest() throws Exception {
// Set up a Netty SelectStrategy with latches that allow us to step forward through Netty's
// request state machine, closing the client at a particular moment
final OneShotLatch connectTriggered = new OneShotLatch();
Expand Down Expand Up @@ -290,7 +290,7 @@ public void testCloseClientWhileProcessingRequest() throws Exception {
}

@Test
public void testResponseChannelFuturesResolvedExceptionallyOnClose() throws Exception {
void testResponseChannelFuturesResolvedExceptionallyOnClose() throws Exception {
try (final RestClient restClient =
new RestClient(new Configuration(), Executors.directExecutor())) {
CompletableFuture<Channel> responseChannelFuture = new CompletableFuture<>();
Expand Down
Expand Up @@ -33,14 +33,12 @@
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.extensions.ContextClassLoaderExtension;
import org.apache.flink.util.TestLoggerExtension;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand All @@ -51,12 +49,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

/** IT cases for {@link RestClient} and {@link RestServerEndpoint}. */
@ExtendWith(TestLoggerExtension.class)
class RestExternalHandlersITCase {

private static final Time timeout = Time.seconds(10L);
Expand All @@ -69,7 +65,7 @@ class RestExternalHandlersITCase {
private InetSocketAddress serverAddress;

@RegisterExtension
static final Extension CONTEXT_CLASS_LOADER_EXTENSION =
private static final Extension CONTEXT_CLASS_LOADER_EXTENSION =
ContextClassLoaderExtension.builder()
.withServiceEntry(
InboundChannelHandlerFactory.class,
Expand All @@ -82,7 +78,7 @@ class RestExternalHandlersITCase {
.build();

@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =
TestingUtils.defaultExecutorExtension();

private final Configuration config;
Expand All @@ -107,7 +103,7 @@ private static Configuration getBaseConfig() {
@BeforeEach
void setup() throws Exception {
serverEndpoint = TestRestServerEndpoint.builder(config).buildAndStart();
restClient = new RestClient(config, EXECUTOR_RESOURCE.getExecutor());
restClient = new RestClient(config, EXECUTOR_EXTENSION.getExecutor());
serverAddress = serverEndpoint.getServerAddress();
}

Expand All @@ -128,29 +124,27 @@ void teardown() throws Exception {
void testHandlersMustBeLoaded() {
final List<InboundChannelHandlerFactory> inboundChannelHandlerFactories =
serverEndpoint.getInboundChannelHandlerFactories();
assertEquals(inboundChannelHandlerFactories.size(), 2);
assertTrue(
inboundChannelHandlerFactories.get(0) instanceof Prio1InboundChannelHandlerFactory);
assertTrue(
inboundChannelHandlerFactories.get(1) instanceof Prio0InboundChannelHandlerFactory);
assertThat(inboundChannelHandlerFactories).hasSize(2);
assertThat(inboundChannelHandlerFactories.get(0))
.isInstanceOf(Prio1InboundChannelHandlerFactory.class);
assertThat(inboundChannelHandlerFactories.get(1))
.isInstanceOf(Prio0InboundChannelHandlerFactory.class);

final List<OutboundChannelHandlerFactory> outboundChannelHandlerFactories =
restClient.getOutboundChannelHandlerFactories();
assertEquals(outboundChannelHandlerFactories.size(), 2);
assertTrue(
outboundChannelHandlerFactories.get(0)
instanceof Prio1OutboundChannelHandlerFactory);
assertTrue(
outboundChannelHandlerFactories.get(1)
instanceof Prio0OutboundChannelHandlerFactory);
assertThat(outboundChannelHandlerFactories).hasSize(2);
assertThat(outboundChannelHandlerFactories.get(0))
.isInstanceOf(Prio1OutboundChannelHandlerFactory.class);
assertThat(outboundChannelHandlerFactories.get(1))
.isInstanceOf(Prio0OutboundChannelHandlerFactory.class);

try {
final CompletableFuture<TestResponse> response =
sendRequestToTestHandler(new TestRequest());
response.get();
fail("Request must fail with 2 times redirected URL");
} catch (Exception e) {
assertTrue(e.getMessage().contains(REDIRECT2_URL));
assertThat(e.getMessage()).contains(REDIRECT2_URL);
}
}

Expand Down
Expand Up @@ -22,42 +22,38 @@
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import static org.hamcrest.CoreMatchers.containsString;
import java.io.File;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for the {@link RestServerEndpointConfiguration}. */
public class RestServerEndpointConfigurationTest extends TestLogger {
class RestServerEndpointConfigurationTest {

private static final String ADDRESS = "123.123.123.123";
private static final String BIND_ADDRESS = "023.023.023.023";
private static final String BIND_PORT = "7282";
private static final int CONTENT_LENGTH = 1234;

@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();

@Test
public void testBasicMapping() throws ConfigurationException {
void testBasicMapping(@TempDir File file) throws ConfigurationException {
Configuration originalConfig = new Configuration();
originalConfig.setString(RestOptions.ADDRESS, ADDRESS);
originalConfig.setString(RestOptions.BIND_ADDRESS, BIND_ADDRESS);
originalConfig.setString(RestOptions.BIND_PORT, BIND_PORT);
originalConfig.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, CONTENT_LENGTH);
originalConfig.setString(WebOptions.TMP_DIR, temporaryFolder.getRoot().getAbsolutePath());
originalConfig.setString(WebOptions.TMP_DIR, file.getAbsolutePath());

final RestServerEndpointConfiguration result =
RestServerEndpointConfiguration.fromConfiguration(originalConfig);
Assert.assertEquals(ADDRESS, result.getRestAddress());
Assert.assertEquals(BIND_ADDRESS, result.getRestBindAddress());
Assert.assertEquals(BIND_PORT, result.getRestBindPortRange());
Assert.assertEquals(CONTENT_LENGTH, result.getMaxContentLength());
Assert.assertThat(
result.getUploadDir().toAbsolutePath().toString(),
containsString(temporaryFolder.getRoot().getAbsolutePath()));
assertThat(result.getRestAddress()).isEqualTo(ADDRESS);
assertThat(result.getRestBindAddress()).isEqualTo(BIND_ADDRESS);
assertThat(result.getRestBindPortRange()).isEqualTo(BIND_PORT);
assertThat(result.getMaxContentLength()).isEqualTo(CONTENT_LENGTH);
assertThat(result.getUploadDir().toAbsolutePath().toString())
.contains(file.getAbsolutePath());
}
}

0 comments on commit df6618a

Please sign in to comment.