Skip to content

Commit

Permalink
[FLINK-16303][rest] Enable retrieval of custom JobManager log files
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 committed Mar 27, 2020
1 parent 9c351b7 commit 380820d
Show file tree
Hide file tree
Showing 31 changed files with 927 additions and 160 deletions.
60 changes: 58 additions & 2 deletions docs/_includes/generated/rest_v1_dispatcher.html
Expand Up @@ -647,6 +647,62 @@
</tr>
</tbody>
</table>
<table class="table table-bordered">
<tbody>
<tr>
<td class="text-left" colspan="2"><h5><strong>/jobmanager/logs</strong></h5></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
<td class="text-left">Response code: <code>200 OK</code></td>
</tr>
<tr>
<td colspan="2">Returns the list of log files on JobManager.</td>
</tr>
<tr>
<td colspan="2">
<button data-toggle="collapse" data-target="#821119120">Request</button>
<div id="821119120" class="collapse">
<pre>
<code>
{} </code>
</pre>
</div>
</td>
</tr>
<tr>
<td colspan="2">
<button data-toggle="collapse" data-target="#1131847481">Response</button>
<div id="1131847481" class="collapse">
<pre>
<code>
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogListInfo",
"properties" : {
"logs" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
"properties" : {
"name" : {
"type" : "string"
},
"size" : {
"type" : "integer"
}
}
}
}
}
} </code>
</pre>
</div>
</td>
</tr>
</tbody>
</table>
<table class="table table-bordered">
<tbody>
<tr>
Expand Down Expand Up @@ -4111,13 +4167,13 @@
<code>
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:LogListInfo",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogListInfo",
"properties" : {
"logs" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:LogInfo",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
"properties" : {
"name" : {
"type" : "string"
Expand Down
Expand Up @@ -203,6 +203,16 @@ public void getLogAndStdoutFiles() throws Exception {
assertThat(logs, containsString("job manager out"));
}

@Test
public void getCustomLogFiles() throws Exception {
WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION);
String customFileName = "test.log";
final String logDir = logFiles.logFile.getParent();
FileUtils.writeStringToFile(new File(logDir, customFileName), "job manager custom log");
String logs = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/jobmanager/logs/" + customFileName);
assertThat(logs, containsString("job manager custom log"));
}

@Test
public void getTaskManagerLogAndStdoutFiles() throws Exception {
String json = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/");
Expand Down
39 changes: 37 additions & 2 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Expand Up @@ -397,6 +397,41 @@
"response" : {
"type" : "any"
}
}, {
"url" : "/jobmanager/logs",
"method" : "GET",
"status-code" : "200 OK",
"file-upload" : false,
"path-parameters" : {
"pathParameters" : [ ]
},
"query-parameters" : {
"queryParameters" : [ ]
},
"request" : {
"type" : "any"
},
"response" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogListInfo",
"properties" : {
"logs" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
"properties" : {
"name" : {
"type" : "string"
},
"size" : {
"type" : "integer"
}
}
}
}
}
}
}, {
"url" : "/jobs",
"method" : "GET",
Expand Down Expand Up @@ -2702,13 +2737,13 @@
},
"response" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:LogListInfo",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogListInfo",
"properties" : {
"logs" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:LogInfo",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
"properties" : {
"name" : {
"type" : "string"
Expand Down
Expand Up @@ -54,7 +54,7 @@
import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rest.messages.taskmanager.LogInfo;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
Expand Down
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rest.messages.taskmanager.LogInfo;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
Expand Down
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.cluster;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.AbstractHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import java.io.File;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

/**
* Base class for serving files from the JobManager.
*/
public abstract class AbstractJobManagerFileHandler<M extends MessageParameters> extends AbstractHandler<RestfulGateway, EmptyRequestBody, M> {

protected final WebMonitorUtils.LogFileLocation logFileLocation;

protected AbstractJobManagerFileHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
UntypedResponseMessageHeaders<EmptyRequestBody, M> messageHeaders,
WebMonitorUtils.LogFileLocation logFileLocation) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders);

Preconditions.checkNotNull(logFileLocation);
this.logFileLocation = logFileLocation;
}

@Override
protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) {
File file = getFile(handlerRequest);
if (file != null && file.exists()) {
return CompletableFuture.completedFuture(file).thenAcceptAsync(logFile -> {
try {
HandlerUtils.transferFile(
ctx,
logFile,
httpRequest);
} catch (FlinkException e) {
throw new CompletionException(new FlinkException("Could not transfer file to client.", e));
}
});
} else {
throw new CompletionException(new RestHandlerException(
"This file is not exist in JobManager log dir.",
HttpResponseStatus.NOT_FOUND));
}
}

protected abstract File getFile(HandlerRequest<EmptyRequestBody, M> handlerRequest);
}
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.cluster;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.cluster.FileMessageParameters;
import org.apache.flink.runtime.rest.messages.taskmanager.LogFileNamePathParameter;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.StringUtils;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import java.io.File;
import java.util.Map;
import java.util.concurrent.CompletionException;

/**
* Rest handler which serves the custom log file from JobManager.
*/
public class JobManagerCustomLogHandler extends AbstractJobManagerFileHandler<FileMessageParameters> {

public JobManagerCustomLogHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout, Map<String, String> responseHeaders,
UntypedResponseMessageHeaders<EmptyRequestBody, FileMessageParameters> messageHeaders,
WebMonitorUtils.LogFileLocation logFileLocation) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders, logFileLocation);
}

@Override
protected File getFile(HandlerRequest<EmptyRequestBody, FileMessageParameters> handlerRequest) {
final String logDir = logFileLocation.logFile.getParent();
if (logFileLocation.logFile == null || StringUtils.isNullOrWhitespaceOnly(logFileLocation.logFile.getParent())) {
throw new CompletionException(new RestHandlerException("Can not get JobManager log dir.", HttpResponseStatus.NOT_FOUND));
}
String filename = handlerRequest.getPathParameter(LogFileNamePathParameter.class);
return new File(logDir, filename);
}
}
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.cluster;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.io.File;
import java.util.Map;

/**
* Rest handler which serves the log files from JobManager.
*/
public class JobManagerLogFileHandler extends AbstractJobManagerFileHandler<EmptyMessageParameters> {

public JobManagerLogFileHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout, Map<String, String> responseHeaders,
UntypedResponseMessageHeaders<EmptyRequestBody, EmptyMessageParameters> messageHeaders,
WebMonitorUtils.LogFileLocation logFileLocation) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders, logFileLocation);
}

@Override
protected File getFile(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest) {
return this.logFileLocation.logFile;
}
}

0 comments on commit 380820d

Please sign in to comment.