From 0ddae04b67fca826191ea47f1b78b592a26f0147 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 7 Mar 2018 11:14:20 +0100 Subject: [PATCH 1/3] [FLINK-8703][tests] Expose WebUI port --- .../apache/flink/test/util/MiniClusterResource.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index 531a3c7578caa..185690f605ecc 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -75,6 +75,8 @@ public class MiniClusterResource extends ExternalResource { private TestEnvironment executionEnvironment; + private int webUIPort = -1; + public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) { this(miniClusterResourceConfiguration, false); } @@ -129,6 +131,10 @@ public TestEnvironment getTestEnvironment() { return executionEnvironment; } + public int getWebUIPort() { + return webUIPort; + } + @Override public void before() throws Exception { @@ -205,6 +211,10 @@ private void startLegacyMiniCluster() throws Exception { Configuration restClientConfig = new Configuration(); restClientConfig.setInteger(JobManagerOptions.PORT, flinkMiniCluster.getLeaderRPCPort()); this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); + + if (flinkMiniCluster.webMonitor().isDefined()) { + webUIPort = flinkMiniCluster.webMonitor().get().getServerPort(); + } } private void startMiniCluster() throws Exception { @@ -244,6 +254,8 @@ private void startMiniCluster() throws Exception { restClientConfig.setString(JobManagerOptions.ADDRESS, miniCluster.getRestAddress().getHost()); restClientConfig.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort()); this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); + + webUIPort = miniCluster.getRestAddress().getPort(); } /** From 67968abc5267e4b1a83fbb2aaf32e245f252612b Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 7 Mar 2018 11:14:46 +0100 Subject: [PATCH 2/3] [FLINK-8703][tests] Port WebFrontendITCase to MiniClusterResource --- .../runtime/webmonitor/WebFrontendITCase.java | 162 +++++++++++------- .../webmonitor/testutils/HttpTestClient.java | 19 ++ .../flink/test/util/MiniClusterResource.java | 4 +- 3 files changed, 122 insertions(+), 63 deletions(-) diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index 14602e3a40933..8e2cd0a89c7c2 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -19,15 +19,17 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testutils.StoppableInvokable; import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient; +import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.TestLogger; @@ -39,7 +41,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import java.io.File; @@ -47,8 +49,12 @@ import java.net.HttpURLConnection; import java.net.URL; import java.nio.file.Files; +import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; @@ -66,40 +72,44 @@ public class WebFrontendITCase extends TestLogger { private static final int NUM_TASK_MANAGERS = 2; private static final int NUM_SLOTS = 4; - private static LocalFlinkMiniCluster cluster; + private static final Configuration CLUSTER_CONFIGURATION = getClusterConfiguration(); - private static int port = -1; + @ClassRule + public static final MiniClusterResource CLUSTER = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + CLUSTER_CONFIGURATION, + NUM_TASK_MANAGERS, + NUM_SLOTS), + true + ); - @BeforeClass - public static void initialize() throws Exception { + private static Configuration getClusterConfiguration() { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS); - config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L); - config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); - - File logDir = File.createTempFile("TestBaseUtils-logdir", null); - assertTrue("Unable to delete temp file", logDir.delete()); - assertTrue("Unable to create temp directory", logDir.mkdir()); - File logFile = new File(logDir, "jobmanager.log"); - File outFile = new File(logDir, "jobmanager.out"); - - Files.createFile(logFile.toPath()); - Files.createFile(outFile.toPath()); + try { + File logDir = File.createTempFile("TestBaseUtils-logdir", null); + assertTrue("Unable to delete temp file", logDir.delete()); + assertTrue("Unable to create temp directory", logDir.mkdir()); + File logFile = new File(logDir, "jobmanager.log"); + File outFile = new File(logDir, "jobmanager.out"); - config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath()); - config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath()); + Files.createFile(logFile.toPath()); + Files.createFile(outFile.toPath()); - cluster = new LocalFlinkMiniCluster(config, false); - cluster.start(); + config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath()); + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath()); + } catch (Exception e) { + throw new AssertionError("Could not setup test.", e); + } + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L); + config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); - port = cluster.webMonitor().get().getServerPort(); + return config; } @Test public void getFrontPage() { try { - String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/index.html"); + String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/index.html"); String text = "Apache Flink Dashboard"; assertTrue("Startpage should contain " + text, fromHTTP.contains(text)); } catch (Exception e) { @@ -111,7 +121,7 @@ public void getFrontPage() { @Test public void testResponseHeaders() throws Exception { // check headers for successful json response - URL taskManagersUrl = new URL("http://localhost:" + port + "/taskmanagers"); + URL taskManagersUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers"); HttpURLConnection taskManagerConnection = (HttpURLConnection) taskManagersUrl.openConnection(); taskManagerConnection.setConnectTimeout(100000); taskManagerConnection.connect(); @@ -127,14 +137,18 @@ public void testResponseHeaders() throws Exception { Assert.assertEquals("application/json; charset=UTF-8", taskManagerConnection.getContentType()); // check headers in case of an error - URL notFoundJobUrl = new URL("http://localhost:" + port + "/jobs/dontexist"); + URL notFoundJobUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/jobs/dontexist"); HttpURLConnection notFoundJobConnection = (HttpURLConnection) notFoundJobUrl.openConnection(); notFoundJobConnection.setConnectTimeout(100000); notFoundJobConnection.connect(); if (notFoundJobConnection.getResponseCode() >= 400) { // we don't set the content-encoding header Assert.assertNull(notFoundJobConnection.getContentEncoding()); - Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType()); + if (Objects.equals("flip6", System.getProperty("codebase"))) { + Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType()); + } else { + Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType()); + } } else { throw new RuntimeException("Request for non-existing job did not return an error."); } @@ -143,14 +157,14 @@ public void testResponseHeaders() throws Exception { @Test public void getNumberOfTaskManagers() { try { - String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/"); + String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/"); ObjectMapper mapper = new ObjectMapper(); JsonNode response = mapper.readTree(json); ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers"); assertNotNull(taskManagers); - assertEquals(cluster.numTaskManagers(), taskManagers.size()); + assertEquals(NUM_TASK_MANAGERS, taskManagers.size()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -159,14 +173,14 @@ public void getNumberOfTaskManagers() { @Test public void getTaskmanagers() throws Exception { - String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/"); + String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/"); ObjectMapper mapper = new ObjectMapper(); JsonNode parsed = mapper.readTree(json); ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers"); assertNotNull(taskManagers); - assertEquals(cluster.numTaskManagers(), taskManagers.size()); + assertEquals(NUM_TASK_MANAGERS, taskManagers.size()); JsonNode taskManager = taskManagers.get(0); assertNotNull(taskManager); @@ -176,21 +190,21 @@ public void getTaskmanagers() throws Exception { @Test public void getLogAndStdoutFiles() throws Exception { - WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration()); + WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION); FileUtils.writeStringToFile(logFiles.logFile, "job manager log"); - String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/log"); + String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/log"); assertTrue(logs.contains("job manager log")); FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out"); - logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/stdout"); + logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/stdout"); assertTrue(logs.contains("job manager out")); } @Test public void getTaskManagerLogAndStdoutFiles() { try { - String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/"); + String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/"); ObjectMapper mapper = new ObjectMapper(); JsonNode parsed = mapper.readTree(json); @@ -198,15 +212,15 @@ public void getTaskManagerLogAndStdoutFiles() { JsonNode taskManager = taskManagers.get(0); String id = taskManager.get("id").asText(); - WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration()); + WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION); //we check for job manager log files, since no separate taskmanager logs exist FileUtils.writeStringToFile(logFiles.logFile, "job manager log"); - String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/log"); + String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/log"); assertTrue(logs.contains("job manager log")); FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out"); - logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/stdout"); + logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/stdout"); assertTrue(logs.contains("job manager out")); } catch (Exception e) { e.printStackTrace(); @@ -217,12 +231,12 @@ public void getTaskManagerLogAndStdoutFiles() { @Test public void getConfiguration() { try { - String config = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/config"); + String config = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/config"); Map conf = WebMonitorUtils.fromKeyValueJsonArray(config); assertEquals( - cluster.configuration().getString("taskmanager.numberOfTaskSlots", null), - conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)); + CLUSTER_CONFIGURATION.getString(ConfigConstants.LOCAL_START_WEBSERVER, null), + conf.get(ConfigConstants.LOCAL_START_WEBSERVER)); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -232,7 +246,7 @@ public void getConfiguration() { @Test public void testStop() throws Exception { // this only works if there is no active job at this point - assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty()); + assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty()); // Create a task final JobVertex sender = new JobVertex("Sender"); @@ -242,32 +256,44 @@ public void testStop() throws Exception { final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender); final JobID jid = jobGraph.getJobID(); - cluster.submitJobDetached(jobGraph); + ClusterClient clusterClient = CLUSTER.getClusterClient(); + clusterClient.setDetached(true); + clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader()); // wait for job to show up - while (cluster.getCurrentlyRunningJobsJava().isEmpty()) { + while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) { Thread.sleep(10); } final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES); final Deadline deadline = testTimeout.fromNow(); - while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) { - try (HttpTestClient client = new HttpTestClient("localhost", port)) { - // Request the file from the web server - client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft()); - HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); - - assertEquals(HttpResponseStatus.OK, response.getStatus()); - assertEquals("application/json; charset=UTF-8", response.getType()); - assertEquals("{}", response.getContent()); + while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) { + try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) { + if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) { + // Request the file from the web server + client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft()); + HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); + + assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); + assertEquals("application/json; charset=UTF-8", response.getType()); + assertEquals("{}", response.getContent()); + } else { + // Request the file from the web server + client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft()); + HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); + + assertEquals(HttpResponseStatus.OK, response.getStatus()); + assertEquals("application/json; charset=UTF-8", response.getType()); + assertEquals("{}", response.getContent()); + } } Thread.sleep(20); } // ensure we can access job details when its finished (FLINK-4011) - try (HttpTestClient client = new HttpTestClient("localhost", port)) { + try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) { FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); client.sendGetRequest("/jobs/" + jid + "/config", timeout); HttpTestClient.SimpleHttpResponse response = client.getNextResponse(timeout); @@ -283,7 +309,7 @@ public void testStop() throws Exception { @Test public void testStopYarn() throws Exception { // this only works if there is no active job at this point - assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty()); + assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty()); // Create a task final JobVertex sender = new JobVertex("Sender"); @@ -293,25 +319,31 @@ public void testStopYarn() throws Exception { final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender); final JobID jid = jobGraph.getJobID(); - cluster.submitJobDetached(jobGraph); + ClusterClient clusterClient = CLUSTER.getClusterClient(); + clusterClient.setDetached(true); + clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader()); // wait for job to show up - while (cluster.getCurrentlyRunningJobsJava().isEmpty()) { + while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) { Thread.sleep(10); } final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES); final Deadline deadline = testTimeout.fromNow(); - while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) { - try (HttpTestClient client = new HttpTestClient("localhost", port)) { + while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) { + try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) { // Request the file from the web server client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft()); HttpTestClient.SimpleHttpResponse response = client .getNextResponse(deadline.timeLeft()); - assertEquals(HttpResponseStatus.OK, response.getStatus()); + if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) { + assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); + } else { + assertEquals(HttpResponseStatus.OK, response.getStatus()); + } assertEquals("application/json; charset=UTF-8", response.getType()); assertEquals("{}", response.getContent()); } @@ -319,4 +351,12 @@ public void testStopYarn() throws Exception { Thread.sleep(20); } } + + private static List getRunningJobs(ClusterClient client) throws Exception { + Collection statusMessages = client.listJobs().get(); + return statusMessages.stream() + .filter(status -> !status.getJobState().isGloballyTerminalState()) + .map(JobStatusMessage::getJobId) + .collect(Collectors.toList()); + } } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java index d9608fe3fa12e..ec6429cfb0969 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java @@ -185,6 +185,25 @@ public void sendDeleteRequest(String path, FiniteDuration timeout) throws Timeou sendRequest(getRequest, timeout); } + /** + * Sends a simple PATH request to the given path. You only specify the $path part of + * http://$host:$host/$path. + * + * @param path The $path to DELETE (http://$host:$host/$path) + */ + public void sendPatchRequest(String path, FiniteDuration timeout) throws TimeoutException, InterruptedException { + if (!path.startsWith("/")) { + path = "/" + path; + } + + HttpRequest getRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, + HttpMethod.PATCH, path); + getRequest.headers().set(HttpHeaders.Names.HOST, host); + getRequest.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + + sendRequest(getRequest, timeout); + } + /** * Returns the next available HTTP response. A call to this method blocks until a response * becomes available. diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index 185690f605ecc..25488e851db43 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -55,9 +55,9 @@ public class MiniClusterResource extends ExternalResource { private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class); - private static final String CODEBASE_KEY = "codebase"; + public static final String CODEBASE_KEY = "codebase"; - private static final String NEW_CODEBASE = "new"; + public static final String NEW_CODEBASE = "new"; private final MiniClusterResourceConfiguration miniClusterResourceConfiguration; From 48534f6821252a8fc0a7224ba8d2c603a099a7bb Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 17 Apr 2018 13:34:21 +0200 Subject: [PATCH 3/3] address comments --- .../apache/flink/runtime/webmonitor/WebFrontendITCase.java | 6 +++--- .../flink/runtime/webmonitor/testutils/HttpTestClient.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index 8e2cd0a89c7c2..ab586cdf7186b 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -144,7 +144,7 @@ public void testResponseHeaders() throws Exception { if (notFoundJobConnection.getResponseCode() >= 400) { // we don't set the content-encoding header Assert.assertNull(notFoundJobConnection.getContentEncoding()); - if (Objects.equals("flip6", System.getProperty("codebase"))) { + if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) { Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType()); } else { Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType()); @@ -271,7 +271,7 @@ public void testStop() throws Exception { while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) { try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) { if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) { - // Request the file from the web server + // stop the job client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft()); HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); @@ -279,7 +279,7 @@ public void testStop() throws Exception { assertEquals("application/json; charset=UTF-8", response.getType()); assertEquals("{}", response.getContent()); } else { - // Request the file from the web server + // stop the job client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft()); HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java index ec6429cfb0969..d94f7a265e5d6 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java @@ -186,10 +186,10 @@ public void sendDeleteRequest(String path, FiniteDuration timeout) throws Timeou } /** - * Sends a simple PATH request to the given path. You only specify the $path part of + * Sends a simple PATCH request to the given path. You only specify the $path part of * http://$host:$host/$path. * - * @param path The $path to DELETE (http://$host:$host/$path) + * @param path The $path to PATCH (http://$host:$host/$path) */ public void sendPatchRequest(String path, FiniteDuration timeout) throws TimeoutException, InterruptedException { if (!path.startsWith("/")) {