Skip to content

Commit

Permalink
[FLINK-32851][runtime][JUnit5 Migration] The rest package of flink-ru…
Browse files Browse the repository at this point in the history
…ntime module
  • Loading branch information
wangzzu committed Aug 19, 2023
1 parent f68967a commit 3c730a1
Show file tree
Hide file tree
Showing 123 changed files with 1,585 additions and 1,639 deletions.
Expand Up @@ -18,13 +18,13 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.runtime.rest.messages.RestResponseUnmarshallingTestBase;

import java.util.ArrayList;
import java.util.List;

/** Tests that the {@link JarListInfo} can be marshalled and unmarshalled. */
public class JarListInfoTest extends RestResponseMarshallingTestBase<JarListInfo> {
class JarListInfoTest extends RestResponseUnmarshallingTestBase<JarListInfo> {
@Override
protected Class<JarListInfo> getTestResponseClass() {
return JarListInfo.class;
Expand Down
Expand Up @@ -20,15 +20,15 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
import org.apache.flink.runtime.rest.messages.RestRequestUnmarshallingTestBase;

import java.util.Arrays;
import java.util.Collections;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link JarRunRequestBody}. */
public class JarRunRequestBodyTest extends RestRequestMarshallingTestBase<JarRunRequestBody> {
public class JarRunRequestBodyTest extends RestRequestUnmarshallingTestBase<JarRunRequestBody> {

@Override
protected Class<JarRunRequestBody> getTestRequestClass() {
Expand Down
Expand Up @@ -19,12 +19,12 @@
package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.runtime.rest.messages.RestResponseUnmarshallingTestBase;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link JarRunResponseBody}. */
public class JarRunResponseBodyTest extends RestResponseMarshallingTestBase<JarRunResponseBody> {
class JarRunResponseBodyTest extends RestResponseUnmarshallingTestBase<JarRunResponseBody> {

@Override
protected Class<JarRunResponseBody> getTestResponseClass() {
Expand Down
Expand Up @@ -18,13 +18,13 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.runtime.rest.messages.RestResponseUnmarshallingTestBase;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link JarUploadResponseBody}. */
public class JarUploadResponseBodyTest
extends RestResponseMarshallingTestBase<JarUploadResponseBody> {
class JarUploadResponseBodyTest
extends RestResponseUnmarshallingTestBase<JarUploadResponseBody> {

@Override
protected Class<JarUploadResponseBody> getTestResponseClass() {
Expand Down
Expand Up @@ -20,13 +20,13 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.runtime.rest.messages.RestResponseUnmarshallingTestBase;

import java.util.Arrays;

/** Marshalling test for the {@link JobIdsWithStatusOverview} message. */
public class JobIdsWithStatusOverviewTest
extends RestResponseMarshallingTestBase<JobIdsWithStatusOverview> {
class JobIdsWithStatusOverviewTest
extends RestResponseUnmarshallingTestBase<JobIdsWithStatusOverview> {

@Override
protected Class<JobIdsWithStatusOverview> getTestResponseClass() {
Expand Down
Expand Up @@ -19,10 +19,10 @@
package org.apache.flink.runtime.messages.webmonitor;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.runtime.rest.messages.RestResponseUnmarshallingTestBase;

/** Tests for the {@link JobStatusInfo}. */
public class JobStatusInfoTest extends RestResponseMarshallingTestBase<JobStatusInfo> {
class JobStatusInfoTest extends RestResponseUnmarshallingTestBase<JobStatusInfo> {
@Override
protected Class<JobStatusInfo> getTestResponseClass() {
return JobStatusInfo.class;
Expand Down
Expand Up @@ -27,10 +27,9 @@
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.CheckedSupplier;

Expand All @@ -41,10 +40,8 @@
import org.apache.flink.shaded.netty4.io.netty.channel.SelectStrategyFactory;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.IOException;
import java.net.ServerSocket;
Expand All @@ -56,26 +53,21 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

/** Tests for {@link RestClient}. */
public class RestClientTest extends TestLogger {
@ClassRule
public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorResource();
class RestClientTest {
@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorExtension();

private static final String unroutableIp = "240.0.0.0";

private static final long TIMEOUT = 10L;

@Test
public void testConnectionTimeout() throws Exception {
void testConnectionTimeout() throws Exception {
final Configuration config = new Configuration();
config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
try (final RestClient restClient = new RestClient(config, Executors.directExecutor())) {
Expand All @@ -87,14 +79,19 @@ public void testConnectionTimeout() throws Exception {
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance());

final Throwable cause = assertThrows(ExecutionException.class, future::get).getCause();
assertThat(cause, instanceOf(ConnectTimeoutException.class));
assertThat(cause.getMessage(), containsString(unroutableIp));
try {
future.get();
fail("Expected exception not thrown.");
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
assertThat(cause).isInstanceOf(ConnectTimeoutException.class);
assertThat(cause.getMessage()).contains(unroutableIp);
}
}
}

@Test
public void testInvalidVersionRejection() throws Exception {
void testInvalidVersionRejection() throws Exception {
try (final RestClient restClient =
new RestClient(new Configuration(), Executors.directExecutor())) {
CompletableFuture<EmptyResponseBody> invalidVersionResponse =
Expand All @@ -106,15 +103,15 @@ public void testInvalidVersionRejection() throws Exception {
EmptyRequestBody.getInstance(),
Collections.emptyList(),
RuntimeRestAPIVersion.V0);
Assert.fail("The request should have been rejected due to a version mismatch.");
fail("Expected exception not thrown.");
} catch (IllegalArgumentException e) {
// expected
}
}

/** Tests that we fail the operation if the remote connection closes. */
@Test
public void testConnectionClosedHandling() throws Exception {
void testConnectionClosedHandling() throws Exception {
final Configuration config = new Configuration();
config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
try (final ServerSocket serverSocket = new ServerSocket(0);
Expand Down Expand Up @@ -165,7 +162,7 @@ public void testConnectionClosedHandling() throws Exception {

/** Tests that we fail the operation if the client closes. */
@Test
public void testRestClientClosedHandling() throws Exception {
void testRestClientClosedHandling() throws Exception {
final Configuration config = new Configuration();
config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);

Expand Down Expand Up @@ -223,7 +220,7 @@ public void testRestClientClosedHandling() throws Exception {
* <p>See FLINK-32583
*/
@Test
public void testCloseClientBeforeRequest() throws Exception {
void testCloseClientBeforeRequest() throws Exception {
try (final RestClient restClient =
new RestClient(new Configuration(), Executors.directExecutor())) {
restClient.close(); // Intentionally close the client prior to the request
Expand All @@ -236,19 +233,22 @@ public void testCloseClientBeforeRequest() throws Exception {
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance());

// Call get() on the future with a timeout of 0s so we can test that the exception
// thrown is not a TimeoutException, which is what would be thrown if restClient were
// not already closed
final ThrowingRunnable getFuture = () -> future.get(0, TimeUnit.SECONDS);

final Throwable cause = assertThrows(ExecutionException.class, getFuture).getCause();
assertThat(cause, instanceOf(IllegalStateException.class));
assertThat(cause.getMessage(), equalTo("RestClient is already closed"));
try {
// Call get() on the future with a timeout of 0s so we can test that the exception
// thrown is not a TimeoutException, which is what would be thrown if restClient
// were not already closed
future.get(0, TimeUnit.SECONDS);
fail("Expected exception not thrown.");
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
assertThat(cause).isInstanceOf(IllegalStateException.class);
assertThat(cause.getMessage()).isEqualTo("RestClient is already closed");
}
}
}

@Test
public void testCloseClientWhileProcessingRequest() throws Exception {
void testCloseClientWhileProcessingRequest() throws Exception {
// Set up a Netty SelectStrategy with latches that allow us to step forward through Netty's
// request state machine, closing the client at a particular moment
final OneShotLatch connectTriggered = new OneShotLatch();
Expand All @@ -270,7 +270,7 @@ public void testCloseClientWhileProcessingRequest() throws Exception {
new Configuration(), Executors.directExecutor(), selectStrategyFactory)) {
// Check that client's internal collection of pending response futures is empty prior to
// the request
assertThat(restClient.getResponseChannelFutures(), empty());
assertThat(restClient.getResponseChannelFutures()).isEmpty();

final CompletableFuture<?> requestFuture =
restClient.sendRequest(
Expand All @@ -282,7 +282,7 @@ public void testCloseClientWhileProcessingRequest() throws Exception {

// Check that client's internal collection of pending response futures now has one
// entry, presumably due to the call to sendRequest
assertThat(restClient.getResponseChannelFutures(), hasSize(1));
assertThat(restClient.getResponseChannelFutures()).hasSize(1);

// Wait for Netty to start connecting, then while it's paused in the SelectStrategy,
// close the client before unpausing Netty
Expand All @@ -293,18 +293,19 @@ public void testCloseClientWhileProcessingRequest() throws Exception {
// Close should complete successfully
closeFuture.get();

final Throwable cause =
assertThrows(
ExecutionException.class,
() -> requestFuture.get(0, TimeUnit.SECONDS))
.getCause();
assertThat(cause, instanceOf(IllegalStateException.class));
assertThat(cause.getMessage(), equalTo("executor not accepting a task"));
try {
requestFuture.get(0, TimeUnit.SECONDS);
fail("Expected exception not thrown.");
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
assertThat(cause).isInstanceOf(IllegalStateException.class);
assertThat(cause.getMessage()).isEqualTo("executor not accepting a task");
}
}
}

@Test
public void testResponseChannelFuturesResolvedExceptionallyOnClose() throws Exception {
void testResponseChannelFuturesResolvedExceptionallyOnClose() throws Exception {
try (final RestClient restClient =
new RestClient(new Configuration(), Executors.directExecutor())) {
CompletableFuture<Channel> responseChannelFuture = new CompletableFuture<>();
Expand All @@ -318,15 +319,17 @@ public void testResponseChannelFuturesResolvedExceptionallyOnClose() throws Exce

// Ensure the client's internal collection of pending response futures was cleared after
// close
assertThat(restClient.getResponseChannelFutures(), empty());

final Throwable cause =
assertThrows(
ExecutionException.class,
() -> responseChannelFuture.get(0, TimeUnit.SECONDS))
.getCause();
assertThat(cause, instanceOf(IllegalStateException.class));
assertThat(cause.getMessage(), equalTo("RestClient closed before request completed"));
assertThat(restClient.getResponseChannelFutures()).isEmpty();

try {
responseChannelFuture.get(0, TimeUnit.SECONDS);
fail("Expected exception not thrown.");
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
assertThat(cause).isInstanceOf(IllegalStateException.class);
assertThat(cause.getMessage())
.isEqualTo("RestClient closed before request completed");
}
}
}

Expand Down
Expand Up @@ -33,14 +33,12 @@
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.extensions.ContextClassLoaderExtension;
import org.apache.flink.util.TestLoggerExtension;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand All @@ -51,12 +49,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

/** IT cases for {@link RestClient} and {@link RestServerEndpoint}. */
@ExtendWith(TestLoggerExtension.class)
class RestExternalHandlersITCase {

private static final Time timeout = Time.seconds(10L);
Expand Down Expand Up @@ -128,29 +124,27 @@ void teardown() throws Exception {
void testHandlersMustBeLoaded() {
final List<InboundChannelHandlerFactory> inboundChannelHandlerFactories =
serverEndpoint.getInboundChannelHandlerFactories();
assertEquals(inboundChannelHandlerFactories.size(), 2);
assertTrue(
inboundChannelHandlerFactories.get(0) instanceof Prio1InboundChannelHandlerFactory);
assertTrue(
inboundChannelHandlerFactories.get(1) instanceof Prio0InboundChannelHandlerFactory);
assertThat(inboundChannelHandlerFactories.size()).isEqualTo(2);
assertThat(inboundChannelHandlerFactories.get(0))
.isInstanceOf(Prio1InboundChannelHandlerFactory.class);
assertThat(inboundChannelHandlerFactories.get(1))
.isInstanceOf(Prio0InboundChannelHandlerFactory.class);

final List<OutboundChannelHandlerFactory> outboundChannelHandlerFactories =
restClient.getOutboundChannelHandlerFactories();
assertEquals(outboundChannelHandlerFactories.size(), 2);
assertTrue(
outboundChannelHandlerFactories.get(0)
instanceof Prio1OutboundChannelHandlerFactory);
assertTrue(
outboundChannelHandlerFactories.get(1)
instanceof Prio0OutboundChannelHandlerFactory);
assertThat(outboundChannelHandlerFactories.size()).isEqualTo(2);
assertThat(outboundChannelHandlerFactories.get(0))
.isInstanceOf(Prio1OutboundChannelHandlerFactory.class);
assertThat(outboundChannelHandlerFactories.get(1))
.isInstanceOf(Prio0OutboundChannelHandlerFactory.class);

try {
final CompletableFuture<TestResponse> response =
sendRequestToTestHandler(new TestRequest());
response.get();
fail("Request must fail with 2 times redirected URL");
} catch (Exception e) {
assertTrue(e.getMessage().contains(REDIRECT2_URL));
assertThat(e.getMessage().contains(REDIRECT2_URL)).isTrue();
}
}

Expand Down

0 comments on commit 3c730a1

Please sign in to comment.