Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions docs/_includes/generated/rest_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,31 @@
</tr>
<tr>
<td colspan="2">
<button data-toggle="collapse" data-target="#1936993190">Request</button>
<div id="1936993190" class="collapse">
<button data-toggle="collapse" data-target="#315035146">Request</button>
<div id="315035146" class="collapse">
<pre>
<code>
{} </code>
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunRequestBody",
"properties" : {
"entryClass" : {
"type" : "string"
},
"programArgs" : {
"type" : "string"
},
"parallelism" : {
"type" : "integer"
},
"allowNonRestoredState" : {
"type" : "boolean"
},
"savepointPath" : {
"type" : "string"
}
}
} </code>
</pre>
</div>
</td>
Expand Down
41 changes: 41 additions & 0 deletions flink-runtime-web/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ under the License.

<packaging>jar</packaging>

<properties>
<test.parameterProgram.name>parameter-program</test.parameterProgram.name>
<test.ParameterProgramNoManifest.name>parameter-program-without-manifest</test.ParameterProgramNoManifest.name>
</properties>

<dependencies>

<!-- ===================================================
Expand Down Expand Up @@ -136,6 +141,7 @@ under the License.
</goals>
</execution>
<execution>
<!-- Used for JarHandler tests -->
<id>test-program-jar</id>
<phase>process-test-classes</phase>
<goals>
Expand All @@ -153,6 +159,39 @@ under the License.
<finalName>test-program</finalName>
</configuration>
</execution>
<execution>
<!-- Used for JarHandler tests -->
<id>test-parameter-program-jar</id>
<phase>process-test-classes</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<includes>
<include>org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java</include>
</includes>
<archive>
<manifest>
<mainClass>org.apache.flink.runtime.webmonitor.testutils.ParameterProgram</mainClass>
</manifest>
</archive>
<finalName>${test.parameterProgram.name}</finalName>
</configuration>
</execution>
<execution>
<!-- Used for JarHandler tests -->
<id>test-parameter-program-jar-without-manifest</id>
<phase>process-test-classes</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<includes>
<include>org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java</include>
</includes>
<finalName>${test.ParameterProgramNoManifest.name}</finalName>
</configuration>
</execution>
</executions>
</plugin>

Expand All @@ -162,6 +201,8 @@ under the License.
<configuration>
<systemPropertyVariables>
<targetDir>${project.build.directory}</targetDir>
<parameterJarName>${test.parameterProgram.name}</parameterJarName>
<parameterJarWithoutManifestName>${test.ParameterProgramNoManifest.name}</parameterJarWithoutManifestName>
</systemPropertyVariables>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.SupplierWithException;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import org.slf4j.Logger;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -61,7 +63,7 @@
* Handler to submit jobs uploaded via the Web UI.
*/
public class JarRunHandler extends
AbstractRestHandler<DispatcherGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
AbstractRestHandler<DispatcherGateway, JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {

private final Path jarDir;

Expand All @@ -74,7 +76,7 @@ public JarRunHandler(
final GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
final Time timeout,
final Map<String, String> responseHeaders,
final MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
final MessageHeaders<JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
final Path jarDir,
final Configuration configuration,
final Executor executor) {
Expand All @@ -87,15 +89,33 @@ public JarRunHandler(

@Override
protected CompletableFuture<JarRunResponseBody> handleRequest(
@Nonnull final HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request,
@Nonnull final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request,
@Nonnull final DispatcherGateway gateway) throws RestHandlerException {

final JarRunRequestBody requestBody = request.getRequestBody();

final String pathParameter = request.getPathParameter(JarIdPathParameter.class);
final Path jarFile = jarDir.resolve(pathParameter);

final String entryClass = emptyToNull(getQueryParameter(request, EntryClassQueryParameter.class));
final List<String> programArgs = tokenizeArguments(getQueryParameter(request, ProgramArgsQueryParameter.class));
final int parallelism = getQueryParameter(request, ParallelismQueryParameter.class, ExecutionConfig.PARALLELISM_DEFAULT);
final String entryClass = fromRequestBodyOrQueryParameter(
emptyToNull(requestBody.getEntryClassName()),
() -> emptyToNull(getQueryParameter(request, EntryClassQueryParameter.class)),
null,
log);

final List<String> programArgs = tokenizeArguments(
fromRequestBodyOrQueryParameter(
emptyToNull(requestBody.getProgramArguments()),
() -> getQueryParameter(request, ProgramArgsQueryParameter.class),
null,
log));

final int parallelism = fromRequestBodyOrQueryParameter(
requestBody.getParallelism(),
() -> getQueryParameter(request, ParallelismQueryParameter.class),
ExecutionConfig.PARALLELISM_DEFAULT,
log);

final SavepointRestoreSettings savepointRestoreSettings = getSavepointRestoreSettings(request);

final CompletableFuture<JobGraph> jobGraphFuture = getJobGraphAsync(
Expand Down Expand Up @@ -134,12 +154,22 @@ protected CompletableFuture<JarRunResponseBody> handleRequest(
});
}

private static SavepointRestoreSettings getSavepointRestoreSettings(
final @Nonnull HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request)
private SavepointRestoreSettings getSavepointRestoreSettings(
final @Nonnull HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
throws RestHandlerException {

final boolean allowNonRestoredState = getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false);
final String savepointPath = getQueryParameter(request, SavepointPathQueryParameter.class);
final JarRunRequestBody requestBody = request.getRequestBody();

final boolean allowNonRestoredState = fromRequestBodyOrQueryParameter(
requestBody.getAllowNonRestoredState(),
() -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class),
false,
log);
final String savepointPath = fromRequestBodyOrQueryParameter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add a fromRequestBodyOrQueryParameter method which does not take a default value as a convenience function. That way we would not have all the calls where we pass null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could result in unexpected NullPointerExceptions when retrieving a primitive, like in the following example:

fromRequestBodyOrQueryParameter(
		requestBody.getParallelism(),
		() -> getQueryParameter(request, ParallelismQueryParameter.class)
		log);

The explicit default argument prevents that from happening.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't something like this work:

private static <T> T fromRequestBodyOrQueryParameter(
			T requestValue,
			SupplierWithException<T, RestHandlerException> queryParameterExtractor,
			Logger log) throws RestHandlerException {
                 return fromRequestBodyOrQueryParameter(requestValue, queryParameterExtractor, null, log);
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this prevent the scenario i described?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, making the specification of the default value explicit will better guard against this. In the other case, the user would have to know the difference between these methods and which to apply to primitive and non-primitive values. Alright, then it's good to go from my side.

emptyToNull(requestBody.getSavepointPath()),
() -> emptyToNull(getQueryParameter(request, SavepointPathQueryParameter.class)),
null,
log);
final SavepointRestoreSettings savepointRestoreSettings;
if (savepointPath != null) {
savepointRestoreSettings = SavepointRestoreSettings.forPath(
Expand All @@ -151,6 +181,29 @@ private static SavepointRestoreSettings getSavepointRestoreSettings(
return savepointRestoreSettings;
}

/**
* Returns {@code requestValue} if it is not null, otherwise returns the query parameter value
* if it is not null, otherwise returns the default value.
*/
private static <T> T fromRequestBodyOrQueryParameter(
T requestValue,
SupplierWithException<T, RestHandlerException> queryParameterExtractor,
T defaultValue,
Logger log) throws RestHandlerException {
if (requestValue != null) {
return requestValue;
} else {
T queryParameterValue = queryParameterExtractor.get();
if (queryParameterValue != null) {
log.warn("Configuring the job submission via query parameters is deprecated." +
" Please migrate to submitting a JSON request instead.");
return queryParameterValue;
} else {
return defaultValue;
}
}
}

private CompletableFuture<JobGraph> getJobGraphAsync(
final Path jarFile,
@Nullable final String entryClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* {@link MessageHeaders} for {@link JarRunHandler}.
*/
public class JarRunHeaders implements MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
public class JarRunHeaders implements MessageHeaders<JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {

private static final JarRunHeaders INSTANCE = new JarRunHeaders();

Expand All @@ -44,8 +43,8 @@ public HttpResponseStatus getResponseStatusCode() {
}

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
public Class<JarRunRequestBody> getRequestClass() {
return JarRunRequestBody.class;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.rest.messages.RequestBody;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

/**
* {@link RequestBody} for running a jar.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class JarRunRequestBody implements RequestBody {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing marshalling test


private static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
private static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs";
private static final String FIELD_NAME_PARALLELISM = "parallelism";
private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";
private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";

@JsonProperty(FIELD_NAME_ENTRY_CLASS)
@Nullable
private String entryClassName;

@JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
@Nullable
private String programArguments;

@JsonProperty(FIELD_NAME_PARALLELISM)
@Nullable
private Integer parallelism;

@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
@Nullable
private Boolean allowNonRestoredState;

@JsonProperty(FIELD_NAME_SAVEPOINT_PATH)
@Nullable
private String savepointPath;

public JarRunRequestBody() {
this(null, null, null, null, null);
}

@JsonCreator
public JarRunRequestBody(
@Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName,
@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
@Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism,
@Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean allowNonRestoredState,
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) {
this.entryClassName = entryClassName;
this.programArguments = programArguments;
this.parallelism = parallelism;
this.allowNonRestoredState = allowNonRestoredState;
this.savepointPath = savepointPath;
}

@Nullable
@JsonIgnore
public String getEntryClassName() {
return entryClassName;
}

@Nullable
@JsonIgnore
public String getProgramArguments() {
return programArguments;
}

@Nullable
@JsonIgnore
public Integer getParallelism() {
return parallelism;
}

@Nullable
@JsonIgnore
public Boolean getAllowNonRestoredState() {
return allowNonRestoredState;
}

@Nullable
@JsonIgnore
public String getSavepointPath() {
return savepointPath;
}
}
Loading