diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 0542d5080191e..1bc04eb5587d0 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -111,9 +111,9 @@ public void testJobSubmitCancelStop() throws Exception { RestServerEndpoint rse = new RestServerEndpoint(rsec) { @Override - protected Collection> initializeHandlers(CompletableFuture restAddressFuture) { + protected List> initializeHandlers(CompletableFuture restAddressFuture) { - Collection> handlers = new ArrayList<>(); + List> handlers = new ArrayList<>(); handlers.add(Tuple2.of(portHandler.getMessageHeaders(), portHandler)); handlers.add(Tuple2.of(submitHandler.getMessageHeaders(), submitHandler)); handlers.add(Tuple2.of(terminationHandler.getMessageHeaders(), terminationHandler)); @@ -212,9 +212,9 @@ public void testTriggerSavepoint() throws Exception { RestServerEndpoint rse = new RestServerEndpoint(rsec) { @Override - protected Collection> initializeHandlers(CompletableFuture restAddressFuture) { + protected List> initializeHandlers(CompletableFuture restAddressFuture) { - Collection> handlers = new ArrayList<>(); + List> handlers = new ArrayList<>(); handlers.add(Tuple2.of(triggerHandler.getMessageHeaders(), triggerHandler)); return handlers; } @@ -280,9 +280,9 @@ public void testListJobs() throws Exception { RestServerEndpoint rse = new RestServerEndpoint(rsec) { @Override - protected Collection> initializeHandlers(CompletableFuture restAddressFuture) { + protected List> initializeHandlers(CompletableFuture restAddressFuture) { - Collection> handlers = new ArrayList<>(); + List> handlers = new ArrayList<>(); handlers.add(Tuple2.of(listJobsHandler.getMessageHeaders(), listJobsHandler)); return handlers; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 67667846f918e..73c999a739d2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler; import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler; import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler; import org.apache.flink.runtime.rest.handler.job.JobConfigHandler; @@ -44,12 +45,10 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler; import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; -import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification; -import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler; @@ -85,7 +84,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -137,21 +136,18 @@ public DispatcherRestEndpoint( } @Override - protected Collection> initializeHandlers(CompletableFuture restAddressFuture) { + protected List> initializeHandlers(CompletableFuture restAddressFuture) { ArrayList> handlers = new ArrayList<>(3); final Time timeout = restConfiguration.getTimeout(); final Map responseHeaders = restConfiguration.getResponseHeaders(); - LegacyRestHandlerAdapter clusterOverviewHandler = new LegacyRestHandlerAdapter<>( + ClusterOverviewHandler clusterOverviewHandler = new ClusterOverviewHandler<>( restAddressFuture, leaderRetriever, timeout, responseHeaders, - ClusterOverviewHeaders.getInstance(), - new ClusterOverviewHandler( - executor, - timeout)); + ClusterOverviewHeaders.getInstance()); LegacyRestHandlerAdapter dashboardConfigurationHandler = new LegacyRestHandlerAdapter<>( restAddressFuture, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index 18766c0046dd7..ea18e15f60726 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -45,8 +45,11 @@ import javax.net.ssl.SSLEngine; +import java.io.Serializable; import java.net.InetSocketAddress; -import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -88,7 +91,7 @@ public RestServerEndpoint(RestServerEndpointConfiguration configuration) { * @param restAddressFuture future rest address of the RestServerEndpoint * @return Collection of AbstractRestHandler which are added to the server endpoint */ - protected abstract Collection> initializeHandlers(CompletableFuture restAddressFuture); + protected abstract List> initializeHandlers(CompletableFuture restAddressFuture); /** * Starts this REST server endpoint. @@ -107,7 +110,23 @@ public void start() throws Exception { final Router router = new Router(); final CompletableFuture restAddressFuture = new CompletableFuture<>(); - initializeHandlers(restAddressFuture).forEach(handler -> registerHandler(router, handler)); + List> handlers = initializeHandlers(restAddressFuture); + + /* sort the handlers such that they are ordered the following: + * /jobs + * /jobs/overview + * /jobs/:jobid + * /jobs/:jobid/config + * /:* + */ + Collections.sort( + handlers, + RestHandlerUrlComparator.INSTANCE); + + handlers.forEach(handler -> { + log.debug("Register handler {} under {}@{}.", handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL()); + registerHandler(router, handler); + }); ChannelInitializer initializer = new ChannelInitializer() { @@ -268,4 +287,69 @@ private static void registerHandler(Router router, Tuple2The comparator orders the Rest URLs such that URLs with path parameters are ordered behind + * those without parameters. E.g.: + * /jobs + * /jobs/overview + * /jobs/:jobid + * /jobs/:jobid/config + * /:* + * + *

IMPORTANT: This comparator is highly specific to how Netty path parameter are encoded. Namely + * via a preceding ':' character. + */ + static final class RestHandlerUrlComparator implements Comparator>, Serializable { + + private static final long serialVersionUID = 2388466767835547926L; + + private static final Comparator CASE_INSENSITIVE_ORDER = new CaseInsensitiveOrderComparator(); + + static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator(); + + @Override + public int compare( + Tuple2 o1, + Tuple2 o2) { + return CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL()); + } + + static final class CaseInsensitiveOrderComparator implements Comparator, Serializable { + private static final long serialVersionUID = 8550835445193437027L; + + @Override + public int compare(String s1, String s2) { + int n1 = s1.length(); + int n2 = s2.length(); + int min = Math.min(n1, n2); + for (int i = 0; i < min; i++) { + char c1 = s1.charAt(i); + char c2 = s2.charAt(i); + if (c1 != c2) { + c1 = Character.toUpperCase(c1); + c2 = Character.toUpperCase(c2); + if (c1 != c2) { + c1 = Character.toLowerCase(c1); + c2 = Character.toLowerCase(c2); + if (c1 != c2) { + if (c1 == ':') { + // c2 is less than c1 because it is also different + return 1; + } else if (c2 == ':') { + // c1 is less than c2 + return -1; + } else { + return c1 - c2; + } + } + } + } + } + return n1 - n2; + } + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterOverviewHandler.java new file mode 100644 index 0000000000000..19b94c5be03d0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ClusterOverviewHandler.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.cluster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Handler which returns the cluster overview information with version. + * + * @param type of the leader gateway + */ +public class ClusterOverviewHandler extends AbstractRestHandler { + + private static final String version = EnvironmentInformation.getVersion(); + + private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId; + + public ClusterOverviewHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + MessageHeaders messageHeaders) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); + } + + @Override + public CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull T gateway) { + CompletableFuture overviewFuture = gateway.requestClusterOverview(timeout); + + return overviewFuture.thenApply( + statusOverview -> ClusterOverviewWithVersion.fromStatusOverview(statusOverview, version, commitID)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java index 30e7fec8d9c3f..0a26084a0ab73 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java @@ -20,16 +20,11 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; import org.apache.flink.runtime.messages.webmonitor.JobsOverview; -import org.apache.flink.runtime.rest.handler.HandlerRequest; -import org.apache.flink.runtime.rest.handler.LegacyRestHandler; import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion; import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; -import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; -import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.util.FlinkException; @@ -48,7 +43,7 @@ * Responder that returns the status of the Flink cluster, such as how many * TaskManagers are currently connected, and how many jobs are running. */ -public class ClusterOverviewHandler extends AbstractJsonRequestHandler implements LegacyRestHandler { +public class ClusterOverviewHandler extends AbstractJsonRequestHandler { private static final String version = EnvironmentInformation.getVersion(); @@ -108,12 +103,4 @@ public CompletableFuture handleJsonRequest(Map pathParam return FutureUtils.completedExceptionally(new FlinkException("Failed to fetch list of all running jobs: ", e)); } } - - @Override - public CompletableFuture handleRequest(HandlerRequest request, DispatcherGateway gateway) { - CompletableFuture overviewFuture = gateway.requestClusterOverview(timeout); - - return overviewFuture.thenApply( - statusOverview -> ClusterOverviewWithVersion.fromStatusOverview(statusOverview, version, commitID)); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java index 4d3c6b5b0cc60..ee59da76a5c21 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java @@ -57,6 +57,7 @@ import java.net.InetSocketAddress; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -205,8 +206,8 @@ private static class TestRestServerEndpoint extends RestServerEndpoint { } @Override - protected Collection> initializeHandlers(CompletableFuture restAddressFuture) { - return Collections.singleton(Tuple2.of(new TestHeaders(), testHandler)); + protected List> initializeHandlers(CompletableFuture restAddressFuture) { + return Collections.singletonList(Tuple2.of(new TestHeaders(), testHandler)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java new file mode 100644 index 0000000000000..3cb4a55b7ec53 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Test cases for the {@link RestServerEndpoint}. + */ +public class RestServerEndpointTest extends TestLogger { + + /** + * Tests that the REST handler URLs are properly sorted. + */ + @Test + public void testRestHandlerUrlSorting() { + final int numberHandlers = 5; + + final List handlerUrls = new ArrayList<>(numberHandlers); + + handlerUrls.add("/jobs/overview"); + handlerUrls.add("/jobs/:jobid"); + handlerUrls.add("/jobs"); + handlerUrls.add("/:*"); + handlerUrls.add("/jobs/:jobid/config"); + + final List expected = new ArrayList<>(numberHandlers); + + expected.add("/jobs"); + expected.add("/jobs/overview"); + expected.add("/jobs/:jobid"); + expected.add("/jobs/:jobid/config"); + expected.add("/:*"); + + Collections.sort(handlerUrls, new RestServerEndpoint.RestHandlerUrlComparator.CaseInsensitiveOrderComparator()); + + assertEquals(expected, handlerUrls); + } +}