Skip to content

Commit

Permalink
[FLINK-7713][flip6] Implement JarUploadHandler
Browse files Browse the repository at this point in the history
This closes #5442.
  • Loading branch information
GJL authored and tillrohrmann committed Feb 15, 2018
1 parent e3e601b commit 39df56d
Show file tree
Hide file tree
Showing 31 changed files with 1,101 additions and 101 deletions.
Expand Up @@ -72,12 +72,12 @@
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarAccessDeniedHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarListHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarAccessDeniedHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarDeleteHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarListHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarRunHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarUploadHandler;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
Expand Down
Expand Up @@ -18,67 +18,90 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.FileUpload;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.io.File;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;

import static java.util.Objects.requireNonNull;

/**
* Handles requests for uploading of jars.
* Handles .jar file uploads.
*/
public class JarUploadHandler extends AbstractJsonRequestHandler {
public class JarUploadHandler extends
AbstractRestHandler<DispatcherGateway, FileUpload, JarUploadResponseBody, EmptyMessageParameters> {

static final String JAR_UPLOAD_REST_PATH = "/jars/upload";
private final Path jarDir;

private final File jarDir;
private final Executor executor;

public JarUploadHandler(Executor executor, File jarDir) {
super(executor);
this.jarDir = jarDir;
public JarUploadHandler(
final CompletableFuture<String> localRestAddress,
final GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
final Time timeout,
final Map<String, String> responseHeaders,
final MessageHeaders<FileUpload, JarUploadResponseBody, EmptyMessageParameters> messageHeaders,
final Path jarDir,
final Executor executor) {
super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
this.jarDir = requireNonNull(jarDir);
this.executor = requireNonNull(executor);
}

@Override
public String[] getPaths() {
return new String[]{JAR_UPLOAD_REST_PATH};
}
protected CompletableFuture<JarUploadResponseBody> handleRequest(
@Nonnull final HandlerRequest<FileUpload, EmptyMessageParameters> request,
@Nonnull final DispatcherGateway gateway) throws RestHandlerException {

@Override
public CompletableFuture<String> handleJsonRequest(
Map<String, String> pathParams,
Map<String, String> queryParams,
JobManagerGateway jobManagerGateway) {

String tempFilePath = queryParams.get("filepath");
String filename = queryParams.get("filename");

return CompletableFuture.supplyAsync(
() -> {
File tempFile;
if (tempFilePath != null && (tempFile = new File(tempFilePath)).exists()) {
if (!tempFile.getName().endsWith(".jar")) {
//noinspection ResultOfMethodCallIgnored
tempFile.delete();
return "{\"error\": \"Only Jar files are allowed.\"}";
}

String filenameWithUUID = UUID.randomUUID() + "_" + filename;
File newFile = new File(jarDir, filenameWithUUID);
if (tempFile.renameTo(newFile)) {
// all went well
return "{\"status\": \"success\", \"filename\": \"" + filenameWithUUID + "\"}";
}
else {
//noinspection ResultOfMethodCallIgnored
tempFile.delete();
}
final FileUpload fileUpload = request.getRequestBody();
return CompletableFuture.supplyAsync(() -> {
if (!fileUpload.getPath().getFileName().toString().endsWith(".jar")) {
deleteUploadedFile(fileUpload);
throw new CompletionException(new RestHandlerException(
"Only Jar files are allowed.",
HttpResponseStatus.BAD_REQUEST));
} else {
final Path destination = jarDir.resolve(fileUpload.getPath().getFileName());
try {
Files.move(fileUpload.getPath(), destination);
} catch (IOException e) {
deleteUploadedFile(fileUpload);
throw new CompletionException(new RestHandlerException(
String.format("Could not move uploaded jar file [%s] to [%s].",
fileUpload.getPath(),
destination),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
e));
}
return new JarUploadResponseBody(fileUpload.getPath()
.normalize()
.toString());
}
}, executor);
}

return "{\"error\": \"Failed to upload the file.\"}";
},
executor);
private void deleteUploadedFile(final FileUpload fileUpload) {
try {
Files.delete(fileUpload.getPath());
} catch (IOException e) {
log.error("Failed to delete file {}.", fileUpload.getPath(), e);
}
}
}
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.FileUpload;
import org.apache.flink.runtime.rest.messages.MessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* {@link MessageHeaders} for uploading jars.
*/
public final class JarUploadMessageHeaders implements MessageHeaders<FileUpload, JarUploadResponseBody, EmptyMessageParameters> {

@Override
public Class<JarUploadResponseBody> getResponseClass() {
return JarUploadResponseBody.class;
}

@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}

@Override
public Class<FileUpload> getRequestClass() {
return FileUpload.class;
}

@Override
public EmptyMessageParameters getUnresolvedMessageParameters() {
return EmptyMessageParameters.getInstance();
}

@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.POST;
}

@Override
public String getTargetRestEndpointURL() {
return "/jars/upload";
}

}
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.rest.messages.ResponseBody;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import static java.util.Objects.requireNonNull;

/**
* {@link ResponseBody} for {@link JarUploadHandler}.
*/
public class JarUploadResponseBody implements ResponseBody {

private static final String FIELD_NAME_FILENAME = "filename";

private static final String FIELD_NAME_STATUS = "status";

@JsonProperty(FIELD_NAME_STATUS)
private final UploadStatus status = UploadStatus.success;

@JsonProperty(FIELD_NAME_FILENAME)
private final String filename;

@JsonCreator
public JarUploadResponseBody(@JsonProperty(FIELD_NAME_FILENAME) final String filename) {
this.filename = requireNonNull(filename);
}

public UploadStatus getStatus() {
return status;
}

public String getFilename() {
return filename;
}

enum UploadStatus {
success
}

}
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;
package org.apache.flink.runtime.webmonitor.handlers.legacy;

import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
Expand Down
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;
package org.apache.flink.runtime.webmonitor.handlers.legacy;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;
package org.apache.flink.runtime.webmonitor.handlers.legacy;

import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
Expand Down
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;
package org.apache.flink.runtime.webmonitor.handlers.legacy;

import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
Expand Down
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;
package org.apache.flink.runtime.webmonitor.handlers.legacy;

import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
Expand Down
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;
package org.apache.flink.runtime.webmonitor.handlers.legacy;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down

0 comments on commit 39df56d

Please sign in to comment.