Skip to content

Commit

Permalink
[FLINK-7551][rest] Add versioning
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Sep 3, 2018
1 parent 79c3412 commit 14ae6e5
Show file tree
Hide file tree
Showing 12 changed files with 561 additions and 36 deletions.
51 changes: 34 additions & 17 deletions docs/monitoring/rest_api.md
Expand Up @@ -52,13 +52,26 @@ To add new requests, one needs to
A good example is the `org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler` that uses the `org.apache.flink.runtime.rest.messages.JobExceptionsHeaders`.


## Available Requests
## API

### Dispatcher
The REST API is versioned, with specific versions being queryable by prefixing the url with the version prefix. Prefixes are always of the form `v[version_number]`.
For example, to access version 1 of `/foo/bar` one would query `/v1/foo/bar`.

{% include generated/rest_dispatcher.html %}
If no version is specified Flink will default to the *oldest* version supporting the request.

## Legacy
Querying unsupported/non-existing versions will return a 404 error.

<span class="label label-danger">Attention</span> REST API versioning is *not* active if the cluster runs in [legacy mode](../ops/config.html#mode). For this case please refer to the legacy API below.

<div class="codetabs" markdown="1">

<div data-lang="v1" markdown="1">
#### Dispatcher

{% include generated/rest_v1_dispatcher.html %}
</div>

<div data-lang="legacy" markdown="1">

This section is only relevant if the cluster runs in [legacy mode](../ops/config.html#mode).

Expand Down Expand Up @@ -90,7 +103,7 @@ Values in angle brackets are variables, for example `http://hostname:8081/jobs/<
- `/jars/:jarid/run`


### General
#### General

**`/config`**

Expand Down Expand Up @@ -126,7 +139,7 @@ Sample Result:
}
{% endhighlight %}

### Overview of Jobs
#### Overview of Jobs

**`/jobs/overview`**

Expand Down Expand Up @@ -163,7 +176,7 @@ Sample Result:
}
{% endhighlight %}

### Details of a Running or Completed Job
#### Details of a Running or Completed Job

**`/jobs/<jobid>`**

Expand Down Expand Up @@ -573,15 +586,15 @@ Sample Result:
}
{% endhighlight %}

### Job Cancellation
#### Job Cancellation

#### Cancel Job
##### Cancel Job

`DELETE` request to **`/jobs/:jobid/cancel`**.

Triggers job cancellation, result on success is `{}`.

#### Cancel Job with Savepoint
##### Cancel Job with Savepoint

Triggers a savepoint and cancels the job after the savepoint succeeds.

Expand All @@ -601,7 +614,7 @@ Sample Trigger Result:
}
{% endhighlight %}

##### Monitoring Progress
###### Monitoring Progress

The progress of the cancellation has to be monitored by the user at

Expand All @@ -611,7 +624,7 @@ The progress of the cancellation has to be monitored by the user at

The request ID is returned by the trigger result.

###### In-Progress
####### In-Progress

{% highlight json %}
{
Expand All @@ -620,7 +633,7 @@ The request ID is returned by the trigger result.
}
{% endhighlight %}

###### Success
####### Success

{% highlight json %}
{
Expand All @@ -632,7 +645,7 @@ The request ID is returned by the trigger result.

The `savepointPath` points to the external path of the savepoint, which can be used to resume the savepoint.

###### Failed
####### Failed

{% highlight json %}
{
Expand All @@ -642,11 +655,11 @@ The `savepointPath` points to the external path of the savepoint, which can be u
}
{% endhighlight %}

### Submitting Programs
#### Submitting Programs

It is possible to upload, run, and list Flink programs via the REST APIs and web frontend.

#### Upload a new JAR file
##### Upload a new JAR file

Send a `POST` request to `/jars/upload` with your jar file sent as multi-part data under the `jarfile` file.
Also make sure that the multi-part data includes the `Content-Type` of the file itself, some http libraries do not add the header by default.
Expand All @@ -659,7 +672,7 @@ Content-Disposition: form-data; name="jarfile"; filename="YourFileName.jar"
Content-Type: application/x-java-archive
{% endhighlight %}

#### Run a Program (POST)
##### Run a Program (POST)

Send a `POST` request to `/jars/:jarid/run`. The `jarid` parameter is the file name of the program JAR in the configured web frontend upload directory (configuration key `web.upload.dir`).

Expand Down Expand Up @@ -688,3 +701,7 @@ Response:
{% endhighlight %}

{% top %}
</div>

</div>

Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
Expand Down Expand Up @@ -123,13 +124,24 @@ public class RestAPIDocGenerator {
public static void main(String[] args) throws IOException {
String outputDirectory = args[0];

createHtmlFile(new DocumentingDispatcherRestEndpoint(), Paths.get(outputDirectory, "rest_dispatcher.html"));
for (final RestAPIVersion apiVersion : RestAPIVersion.values()) {
if (apiVersion == RestAPIVersion.V0) {
// this version exists only for testing purposes
continue;
}
createHtmlFile(
new DocumentingDispatcherRestEndpoint(),
apiVersion,
Paths.get(outputDirectory, "rest_" + apiVersion.getURLVersionPrefix() + "_dispatcher.html"));
}
}

private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, Path outputFile) throws IOException {
private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, RestAPIVersion apiVersion, Path outputFile) throws IOException {
StringBuilder html = new StringBuilder();

List<MessageHeaders> specs = restEndpoint.getSpecs();
List<MessageHeaders> specs = restEndpoint.getSpecs().stream()
.filter(spec -> spec.getSupportedAPIVersions().contains(apiVersion))
.collect(Collectors.toList());
specs.forEach(spec -> html.append(createHtmlEntry(spec)));

Files.deleteIfExists(outputFile);
Expand Down
Expand Up @@ -164,7 +164,7 @@ private void respondWithFile(ChannelHandlerContext ctx, HttpRequest request, Str
HandlerUtils.sendErrorResponse(
ctx,
request,
new ErrorResponseBody(String.format("Unable to load requested file %s.", requestPath)),
new ErrorResponseBody("File not found."),
NOT_FOUND,
Collections.emptyMap());
return;
Expand Down
Expand Up @@ -30,6 +30,8 @@

import java.io.File;

import static org.hamcrest.CoreMatchers.containsString;

/**
* Tests for the HistoryServerStaticFileServerHandler.
*/
Expand All @@ -56,7 +58,7 @@ public void testRespondWithFile() throws Exception {
try {
// verify that 404 message is returned when requesting a non-existent file
String notFound404 = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/hello");
Assert.assertTrue(notFound404.contains("404 Not Found"));
Assert.assertThat(notFound404, containsString("not found"));

// verify that a) a file can be loaded using the ClassLoader and b) that the HistoryServer
// index_hs.html is injected
Expand All @@ -71,12 +73,12 @@ public void testRespondWithFile() throws Exception {
File dir = new File(webDir, "dir.json");
dir.mkdirs();
String dirNotFound404 = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/dir");
Assert.assertTrue(dirNotFound404.contains("404 Not Found"));
Assert.assertTrue(dirNotFound404.contains("not found"));

// verify that a 404 message is returned when requesting a file outside the webDir
tmp.newFile("secret");
String x = HistoryServerTest.getFromHTTP("http://localhost:" + port + "/../secret");
Assert.assertTrue(x.contains("404 Not Found"));
Assert.assertTrue(x.contains("not found"));
} finally {
webUI.shutdown();
}
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -85,6 +86,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;

Expand Down Expand Up @@ -173,6 +175,24 @@ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extend
U messageParameters,
R request,
Collection<FileUpload> fileUploads) throws IOException {
return sendRequest(
targetAddress,
targetPort,
messageHeaders,
messageParameters,
request,
fileUploads,
RestAPIVersion.getLatestVersion(messageHeaders.getSupportedAPIVersions()));
}

public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
String targetAddress,
int targetPort,
M messageHeaders,
U messageParameters,
R request,
Collection<FileUpload> fileUploads,
RestAPIVersion apiVersion) throws IOException {
Preconditions.checkNotNull(targetAddress);
Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
Preconditions.checkNotNull(messageHeaders);
Expand All @@ -181,7 +201,17 @@ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extend
Preconditions.checkNotNull(fileUploads);
Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");

String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
if (!messageHeaders.getSupportedAPIVersions().contains(apiVersion)) {
throw new IllegalArgumentException(String.format(
"The requested version %s is not supported by the request (method=%s URL=%s). Supported versions are: %s.",
apiVersion,
messageHeaders.getHttpMethod(),
messageHeaders.getTargetRestEndpointURL(),
messageHeaders.getSupportedAPIVersions().stream().map(RestAPIVersion::getURLVersionPrefix).collect(Collectors.joining(","))));
}

String versionedHandlerURL = "/" + apiVersion.getURLVersionPrefix() + messageHeaders.getTargetRestEndpointURL();
String targetUrl = MessageParameters.resolveUrl(versionedHandlerURL, messageParameters);

LOG.debug("Sending request of class {} to {}:{}{}", request.getClass(), targetAddress, targetPort, targetUrl);
// serialize payload
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.router.RouterHandler;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -144,8 +145,7 @@ public final void start() throws Exception {
RestHandlerUrlComparator.INSTANCE);

handlers.forEach(handler -> {
log.debug("Register handler {} under {}@{}.", handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL());
registerHandler(router, handler);
registerHandler(router, handler, log);
});

ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
Expand Down Expand Up @@ -364,22 +364,37 @@ protected CompletableFuture<Void> shutDownInternal() {
}
}

private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
switch (specificationHandler.f0.getHttpMethod()) {
private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler, Logger log) {
final String handlerURL = specificationHandler.f0.getTargetRestEndpointURL();
// setup versioned urls
for (final RestAPIVersion supportedVersion : specificationHandler.f0.getSupportedAPIVersions()) {
final String versionedHandlerURL = '/' + supportedVersion.getURLVersionPrefix() + handlerURL;
log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), versionedHandlerURL);
registerHandler(router, versionedHandlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
if (supportedVersion.isDefaultVersion()) {
// setup unversioned url for convenience and backwards compatibility
log.debug("Register handler {} under {}@{}.", specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL);
registerHandler(router, handlerURL, specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
}
}
}

private static void registerHandler(Router router, String handlerURL, HttpMethodWrapper httpMethod, ChannelInboundHandler handler) {
switch (httpMethod) {
case GET:
router.addGet(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
router.addGet(handlerURL, handler);
break;
case POST:
router.addPost(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
router.addPost(handlerURL, handler);
break;
case DELETE:
router.addDelete(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
router.addDelete(handlerURL, handler);
break;
case PATCH:
router.addPatch(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
router.addPatch(handlerURL, handler);
break;
default:
throw new RuntimeException("Unsupported http method: " + specificationHandler.f0.getHttpMethod() + '.');
throw new RuntimeException("Unsupported http method: " + httpMethod + '.');
}
}

Expand Down Expand Up @@ -437,13 +452,22 @@ public static final class RestHandlerUrlComparator implements Comparator<Tuple2<

private static final Comparator<String> CASE_INSENSITIVE_ORDER = new CaseInsensitiveOrderComparator();

private static final Comparator<RestAPIVersion> API_VERSION_ORDER = new RestAPIVersion.RestAPIVersionComparator();

static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator();

@Override
public int compare(
Tuple2<RestHandlerSpecification, ChannelInboundHandler> o1,
Tuple2<RestHandlerSpecification, ChannelInboundHandler> o2) {
return CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL());
final int urlComparisonResult = CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), o2.f0.getTargetRestEndpointURL());
if (urlComparisonResult != 0) {
return urlComparisonResult;
} else {
return API_VERSION_ORDER.compare(
Collections.min(o1.f0.getSupportedAPIVersions()),
Collections.min(o2.f0.getSupportedAPIVersions()));
}
}

/**
Expand Down
Expand Up @@ -19,6 +19,10 @@
package org.apache.flink.runtime.rest.handler;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;

import java.util.Collection;
import java.util.Collections;

/**
* Rest handler interface which all rest handler implementation have to implement.
Expand All @@ -38,4 +42,13 @@ public interface RestHandlerSpecification {
* @return endpoint url that this request should be sent to
*/
String getTargetRestEndpointURL();

/**
* Returns the supported API versions that this request supports.
*
* @return Collection of supported API versions
*/
default Collection<RestAPIVersion> getSupportedAPIVersions() {
return Collections.singleton(RestAPIVersion.V1);
}
}

0 comments on commit 14ae6e5

Please sign in to comment.