diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 26e03196e3989..509a790b2a1ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -53,6 +53,7 @@ import java.util.Map; import java.util.Set; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -468,7 +469,7 @@ public ContainerReport getContainerReport(String containerIdStr) assertTrue(exitCode == 0); assertTrue(sysOutStream.toString().contains( logMessage(containerId1, "syslog"))); - assertTrue(sysOutStream.toString().contains("Log Upload Time")); + assertTrue(sysOutStream.toString().contains("LogLastModifiedTime")); assertTrue(!sysOutStream.toString().contains( "Logs for container " + containerId1.toString() + " are not present in this log-file.")); @@ -492,8 +493,12 @@ public ContainerReport getContainerReport(String containerIdStr) String logMessage = logMessage(containerId3, "stdout"); int fileContentSize = logMessage.getBytes().length; - int tailContentSize = "\nEnd of LogType:stdout\n\n".getBytes().length; - + StringBuilder sb = new StringBuilder(); + String endOfFile = "End of LogType:stdout"; + sb.append("\n" + endOfFile + "\n"); + sb.append(StringUtils.repeat("*", endOfFile.length() + 50) + + "\n\n"); + int tailContentSize = sb.toString().length(); // specify how many bytes we should get from logs // specify a position number, it would get the first n bytes from // container log diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java new file mode 100644 index 0000000000000..1de68fb839576 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; + +/** + * Utils for rendering aggregated logs block. + * + */ +@Private +public final class LogAggregationWebUtils { + + private LogAggregationWebUtils() {} + + /** + * Parse start index from html. + * @param html the html + * @param startStr the start index string + * @return the startIndex + */ + public static long getLogStartIndex(Block html, String startStr) + throws NumberFormatException { + long start = -4096; + + if (startStr != null && !startStr.isEmpty()) { + start = Long.parseLong(startStr); + } + return start; + } + + /** + * Parse end index from html. + * @param html the html + * @param endStr the end index string + * @return the endIndex + */ + public static long getLogEndIndex(Block html, String endStr) + throws NumberFormatException { + long end = Long.MAX_VALUE; + + if (endStr != null && !endStr.isEmpty()) { + end = Long.parseLong(endStr); + } + return end; + } + + /** + * Verify and parse containerId. + * @param html the html + * @param containerIdStr the containerId string + * @return the {@link ContainerId} + */ + public static ContainerId verifyAndGetContainerId(Block html, + String containerIdStr) { + if (containerIdStr == null || containerIdStr.isEmpty()) { + html.h1().__("Cannot get container logs without a ContainerId").__(); + return null; + } + ContainerId containerId = null; + try { + containerId = ContainerId.fromString(containerIdStr); + } catch (IllegalArgumentException e) { + html.h1() + .__("Cannot get container logs for invalid containerId: " + + containerIdStr).__(); + return null; + } + return containerId; + } + + /** + * Verify and parse NodeId. + * @param html the html + * @param nodeIdStr the nodeId string + * @return the {@link NodeId} + */ + public static NodeId verifyAndGetNodeId(Block html, String nodeIdStr) { + if (nodeIdStr == null || nodeIdStr.isEmpty()) { + html.h1().__("Cannot get container logs without a NodeId").__(); + return null; + } + NodeId nodeId = null; + try { + nodeId = NodeId.fromString(nodeIdStr); + } catch (IllegalArgumentException e) { + html.h1().__("Cannot get container logs. Invalid nodeId: " + nodeIdStr) + .__(); + return null; + } + return nodeId; + } + + /** + * Verify and parse the application owner. + * @param html the html + * @param appOwner the Application owner + * @return the appOwner + */ + public static String verifyAndGetAppOwner(Block html, String appOwner) { + if (appOwner == null || appOwner.isEmpty()) { + html.h1().__("Cannot get container logs without an app owner").__(); + } + return appOwner; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index cf34a1acf0f6d..03acb3365212a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.logaggregation; -import java.io.DataInputStream; -import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; @@ -37,15 +35,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.HarFs; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import com.google.common.annotations.VisibleForTesting; public class LogCLIHelpers implements Configurable { @@ -56,6 +53,7 @@ public class LogCLIHelpers implements Configurable { "Container: %s on %s"; private Configuration conf; + private LogAggregationFileControllerFactory factory; @Private @VisibleForTesting @@ -130,71 +128,11 @@ public int dumpAContainerLogsForLogType(ContainerLogsRequest options) @VisibleForTesting public int dumpAContainerLogsForLogType(ContainerLogsRequest options, boolean outputFailure) throws IOException { - ApplicationId applicationId = options.getAppId(); - String jobOwner = options.getAppOwner(); - String nodeId = options.getNodeId(); - String containerId = options.getContainerId(); - String localDir = options.getOutputLocalDir(); - List logType = new ArrayList(options.getLogTypes()); - RemoteIterator nodeFiles = getRemoteNodeFileDir( - applicationId, jobOwner); - if (nodeFiles == null) { - return -1; - } - boolean foundContainerLogs = false; - while (nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - String fileName = thisNodeFile.getPath().getName(); - if (fileName.equals(applicationId + ".har")) { - Path p = new Path("har:///" - + thisNodeFile.getPath().toUri().getRawPath()); - nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); - continue; - } - if (fileName.contains(LogAggregationUtils.getNodeString(nodeId)) - && !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = null; - PrintStream out = createPrintStream(localDir, fileName, containerId); - try { - reader = new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - if (getContainerLogsStream(containerId, reader) == null) { - continue; - } - String containerString = String.format(CONTAINER_ON_NODE_PATTERN, - containerId, thisNodeFile.getPath().getName()); - out.println(containerString); - out.println("LogAggregationType: AGGREGATED"); - out.println(StringUtils.repeat("=", containerString.length())); - // We have to re-create reader object to reset the stream index - // after calling getContainerLogsStream which would move the stream - // index to the end of the log file. - reader = - new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - if (logType == null || logType.isEmpty()) { - if (dumpAContainerLogs(containerId, reader, out, - thisNodeFile.getModificationTime(), options.getBytes()) > -1) { - foundContainerLogs = true; - } - } else { - if (dumpAContainerLogsForALogType(containerId, reader, out, - thisNodeFile.getModificationTime(), logType, - options.getBytes()) > -1) { - foundContainerLogs = true; - } - } - } finally { - if (reader != null) { - reader.close(); - } - closePrintStream(out); - } - } - } - if (!foundContainerLogs) { + boolean foundAnyLogs = this.getFileController(options.getAppId(), + options.getAppOwner()).readAggregatedLogs(options, null); + if (!foundAnyLogs) { if (outputFailure) { - containerLogNotFound(containerId); + containerLogNotFound(options.getContainerId()); } return -1; } @@ -204,217 +142,25 @@ public int dumpAContainerLogsForLogType(ContainerLogsRequest options, @Private public int dumpAContainerLogsForLogTypeWithoutNodeId( ContainerLogsRequest options) throws IOException { - ApplicationId applicationId = options.getAppId(); - String jobOwner = options.getAppOwner(); - String containerId = options.getContainerId(); - String localDir = options.getOutputLocalDir(); - List logType = new ArrayList(options.getLogTypes()); - RemoteIterator nodeFiles = getRemoteNodeFileDir( - applicationId, jobOwner); - if (nodeFiles == null) { - return -1; - } - boolean foundContainerLogs = false; - while(nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - if (!thisNodeFile.getPath().getName().endsWith( - LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = null; - PrintStream out = System.out; - try { - reader = - new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - if (getContainerLogsStream(containerId, reader) == null) { - continue; - } - // We have to re-create reader object to reset the stream index - // after calling getContainerLogsStream which would move the stream - // index to the end of the log file. - reader = - new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - out = createPrintStream(localDir, thisNodeFile.getPath().getName(), - containerId); - String containerString = String.format(CONTAINER_ON_NODE_PATTERN, - containerId, thisNodeFile.getPath().getName()); - out.println(containerString); - out.println("LogAggregationType: AGGREGATED"); - out.println(StringUtils.repeat("=", containerString.length())); - if (logType == null || logType.isEmpty()) { - if (dumpAContainerLogs(containerId, reader, out, - thisNodeFile.getModificationTime(), options.getBytes()) > -1) { - foundContainerLogs = true; - } - } else { - if (dumpAContainerLogsForALogType(containerId, reader, out, - thisNodeFile.getModificationTime(), logType, - options.getBytes()) > -1) { - foundContainerLogs = true; - } - } - } finally { - if (reader != null) { - reader.close(); - } - closePrintStream(out); - } - } - } - if (!foundContainerLogs) { - containerLogNotFound(containerId); + boolean foundAnyLogs = getFileController(options.getAppId(), + options.getAppOwner()).readAggregatedLogs( + options, null); + if (!foundAnyLogs) { + containerLogNotFound(options.getContainerId()); return -1; } return 0; } - @Private - public int dumpAContainerLogs(String containerIdStr, - AggregatedLogFormat.LogReader reader, PrintStream out, - long logUploadedTime, long bytes) throws IOException { - DataInputStream valueStream = getContainerLogsStream( - containerIdStr, reader); - - if (valueStream == null) { - return -1; - } - - boolean foundContainerLogs = false; - while (true) { - try { - LogReader.readAContainerLogsForALogType(valueStream, out, - logUploadedTime, bytes); - foundContainerLogs = true; - } catch (EOFException eof) { - break; - } - } - if (foundContainerLogs) { - return 0; - } - return -1; - } - - private DataInputStream getContainerLogsStream(String containerIdStr, - AggregatedLogFormat.LogReader reader) throws IOException { - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - - while (valueStream != null && !key.toString().equals(containerIdStr)) { - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } - return valueStream; - } - - @Private - public int dumpAContainerLogsForALogType(String containerIdStr, - AggregatedLogFormat.LogReader reader, PrintStream out, - long logUploadedTime, List logType, long bytes) - throws IOException { - DataInputStream valueStream = getContainerLogsStream( - containerIdStr, reader); - if (valueStream == null) { - return -1; - } - - boolean foundContainerLogs = false; - while (true) { - try { - int result = LogReader.readContainerLogsForALogType( - valueStream, out, logUploadedTime, logType, bytes); - if (result == 0) { - foundContainerLogs = true; - } - } catch (EOFException eof) { - break; - } - } - - if (foundContainerLogs) { - return 0; - } - return -1; - } - @Private public int dumpAllContainersLogs(ContainerLogsRequest options) throws IOException { - ApplicationId appId = options.getAppId(); - String appOwner = options.getAppOwner(); - String localDir = options.getOutputLocalDir(); - List logTypes = new ArrayList(options.getLogTypes()); - RemoteIterator nodeFiles = getRemoteNodeFileDir( - appId, appOwner); - if (nodeFiles == null) { - return -1; - } - boolean foundAnyLogs = false; - while (nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - if (thisNodeFile.getPath().getName().equals(appId + ".har")) { - Path p = new Path("har:///" - + thisNodeFile.getPath().toUri().getRawPath()); - nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); - continue; - } - if (!thisNodeFile.getPath().getName() - .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = - new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - try { - - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - - while (valueStream != null) { - PrintStream out = createPrintStream(localDir, - thisNodeFile.getPath().getName(), key.toString()); - try { - String containerString = String.format( - CONTAINER_ON_NODE_PATTERN, key, - thisNodeFile.getPath().getName()); - out.println(containerString); - out.println("LogAggregationType: AGGREGATED"); - out.println(StringUtils.repeat("=", containerString.length())); - while (true) { - try { - if (logTypes == null || logTypes.isEmpty()) { - LogReader.readAContainerLogsForALogType(valueStream, out, - thisNodeFile.getModificationTime(), - options.getBytes()); - foundAnyLogs = true; - } else { - int result = LogReader.readContainerLogsForALogType( - valueStream, out, thisNodeFile.getModificationTime(), - logTypes, options.getBytes()); - if (result == 0) { - foundAnyLogs = true; - } - } - } catch (EOFException eof) { - break; - } - } - } finally { - closePrintStream(out); - } - - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } - } finally { - reader.close(); - } - } - } + boolean foundAnyLogs = getFileController(options.getAppId(), + options.getAppOwner()).readAggregatedLogs( + options, null); if (!foundAnyLogs) { - emptyLogDir(LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner) + emptyLogDir(LogAggregationUtils.getRemoteAppLogDir( + conf, options.getAppId(), options.getAppOwner()) .toString()); return -1; } @@ -425,14 +171,13 @@ public int dumpAllContainersLogs(ContainerLogsRequest options) public int printAContainerLogMetadata(ContainerLogsRequest options, PrintStream out, PrintStream err) throws IOException { - ApplicationId appId = options.getAppId(); - String appOwner = options.getAppOwner(); String nodeId = options.getNodeId(); String containerIdStr = options.getContainerId(); List containersLogMeta; try { - containersLogMeta = LogToolUtils.getContainerLogMetaFromRemoteFS( - conf, appId, containerIdStr, nodeId, appOwner); + containersLogMeta = getFileController(options.getAppId(), + options.getAppOwner()).readAggregatedLogsMeta( + options); } catch (Exception ex) { err.println(ex.getMessage()); return -1; @@ -473,8 +218,26 @@ public void printNodesList(ContainerLogsRequest options, PrintStream out, PrintStream err) throws IOException { ApplicationId appId = options.getAppId(); String appOwner = options.getAppOwner(); - RemoteIterator nodeFiles = getRemoteNodeFileDir( - appId, appOwner); + LogAggregationFileController fileFormat = null; + try { + fileFormat = getFileController(appId, appOwner); + } catch (Exception ex) { + err.println(ex.getMessage()); + return; + } + RemoteIterator nodeFiles = null; + try { + nodeFiles = LogAggregationUtils.getRemoteNodeFileDir(conf, appId, + appOwner, fileFormat.getRemoteRootLogDir(), + fileFormat.getRemoteRootLogDirSuffix()); + } catch (FileNotFoundException fnf) { + logDirNotExist(LogAggregationUtils.getRemoteAppLogDir( + conf, appId, appOwner).toString()); + } catch (AccessControlException | AccessDeniedException ace) { + logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir( + conf, appId, appOwner).toString(), appOwner, + ace.getMessage()); + } if (nodeFiles == null) { return; } @@ -497,44 +260,21 @@ public void printNodesList(ContainerLogsRequest options, public void printContainersList(ContainerLogsRequest options, PrintStream out, PrintStream err) throws IOException { ApplicationId appId = options.getAppId(); - String appOwner = options.getAppOwner(); String nodeId = options.getNodeId(); - String nodeIdStr = (nodeId == null) ? null - : LogAggregationUtils.getNodeString(nodeId); - RemoteIterator nodeFiles = getRemoteNodeFileDir( - appId, appOwner); - if (nodeFiles == null) { - return; - } boolean foundAnyLogs = false; - while (nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - if (nodeIdStr != null) { - if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) { - continue; - } - } - if (!thisNodeFile.getPath().getName() - .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = - new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - try { - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - while (valueStream != null) { - out.println(String.format(CONTAINER_ON_NODE_PATTERN, key, - thisNodeFile.getPath().getName())); - foundAnyLogs = true; - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } - } finally { - reader.close(); - } - } + List containersLogMeta = new ArrayList<>(); + try { + containersLogMeta = getFileController(options.getAppId(), + options.getAppOwner()).readAggregatedLogsMeta( + options); + } catch (Exception ex) { + err.println(ex.getMessage()); + } + for(ContainerLogMeta logMeta : containersLogMeta) { + out.println(String.format(CONTAINER_ON_NODE_PATTERN, + logMeta.getContainerId(), + logMeta.getNodeId())); + foundAnyLogs = true; } if (!foundAnyLogs) { if (nodeId != null) { @@ -547,26 +287,6 @@ public void printContainersList(ContainerLogsRequest options, } } - private RemoteIterator getRemoteNodeFileDir(ApplicationId appId, - String appOwner) throws IOException { - RemoteIterator nodeFiles = null; - try { - nodeFiles = LogAggregationUtils.getRemoteNodeFileDir( - conf, appId, appOwner); - } catch (FileNotFoundException fnf) { - logDirNotExist(LogAggregationUtils.getRemoteAppLogDir( - conf, appId, appOwner).toString()); - } catch (AccessControlException | AccessDeniedException ace) { - logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir( - conf, appId, appOwner).toString(), appOwner, - ace.getMessage()); - } catch (IOException ioe) { - logDirIOError(LogAggregationUtils.getRemoteAppLogDir( - conf, appId, appOwner).toString(), ioe.getMessage()); - } - return nodeFiles; - } - @Override public void setConf(Configuration conf) { this.conf = conf; @@ -600,11 +320,6 @@ private static void logDirNoAccessPermission(String remoteAppLogDir, + ". Error message found: " + errorMessage); } - private static void logDirIOError(String remoteAppLogDir, String errMsg) { - System.err.println("Cannot access to " + remoteAppLogDir + - ". Error message found: " + errMsg); - } - @Private public PrintStream createPrintStream(String localDir, String nodeId, String containerId) throws IOException { @@ -628,59 +343,29 @@ public void closePrintStream(PrintStream out) { @Private public Set listContainerLogs(ContainerLogsRequest options) throws IOException { + List containersLogMeta; Set logTypes = new HashSet(); - ApplicationId appId = options.getAppId(); - String appOwner = options.getAppOwner(); - String nodeId = options.getNodeId(); - String containerIdStr = options.getContainerId(); - boolean getAllContainers = (containerIdStr == null); - String nodeIdStr = (nodeId == null) ? null - : LogAggregationUtils.getNodeString(nodeId); - RemoteIterator nodeFiles = getRemoteNodeFileDir( - appId, appOwner); - if (nodeFiles == null) { + try { + containersLogMeta = getFileController(options.getAppId(), + options.getAppOwner()).readAggregatedLogsMeta( + options); + } catch (Exception ex) { + System.err.println(ex.getMessage()); return logTypes; } - while (nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - if (nodeIdStr != null) { - if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) { - continue; - } - } - if (!thisNodeFile.getPath().getName() - .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = - new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - try { - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - while (valueStream != null) { - if (getAllContainers || (key.toString().equals(containerIdStr))) { - while (true) { - try { - String logFile = LogReader.readContainerMetaDataAndSkipData( - valueStream).getFirst(); - logTypes.add(logFile); - } catch (EOFException eof) { - break; - } - } - if (!getAllContainers) { - break; - } - } - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } - } finally { - reader.close(); - } + for (ContainerLogMeta logMeta: containersLogMeta) { + for (PerContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) { + logTypes.add(fileInfo.getFileName()); } } return logTypes; } + + private LogAggregationFileController getFileController(ApplicationId appId, + String appOwner) throws IOException { + if (factory == null) { + factory = new LogAggregationFileControllerFactory(conf); + } + return factory.getFileControllerForRead(appId, appOwner); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java index 74f694e0ca458..ddee445bb1d5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.yarn.logaggregation; -import java.io.DataInputStream; -import java.io.EOFException; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -27,19 +25,7 @@ import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; import org.apache.commons.lang.StringUtils; -import org.apache.commons.math3.util.Pair; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.HarFs; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; -import org.apache.hadoop.yarn.util.Times; /** * This class contains several utility function which could be used in different @@ -53,81 +39,6 @@ private LogToolUtils() {} public static final String CONTAINER_ON_NODE_PATTERN = "Container: %s on %s"; - /** - * Return a list of {@link ContainerLogMeta} for a container - * from Remote FileSystem. - * - * @param conf the configuration - * @param appId the applicationId - * @param containerIdStr the containerId - * @param nodeId the nodeId - * @param appOwner the application owner - * @return a list of {@link ContainerLogMeta} - * @throws IOException if there is no available log file - */ - public static List getContainerLogMetaFromRemoteFS( - Configuration conf, ApplicationId appId, String containerIdStr, - String nodeId, String appOwner) throws IOException { - List containersLogMeta = new ArrayList<>(); - boolean getAllContainers = (containerIdStr == null); - String nodeIdStr = (nodeId == null) ? null - : LogAggregationUtils.getNodeString(nodeId); - RemoteIterator nodeFiles = LogAggregationUtils - .getRemoteNodeFileDir(conf, appId, appOwner); - if (nodeFiles == null) { - throw new IOException("There is no available log fils for " - + "application:" + appId); - } - while (nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - if (nodeIdStr != null) { - if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) { - continue; - } - } - if (!thisNodeFile.getPath().getName() - .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = - new AggregatedLogFormat.LogReader(conf, - thisNodeFile.getPath()); - try { - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - while (valueStream != null) { - if (getAllContainers || (key.toString().equals(containerIdStr))) { - ContainerLogMeta containerLogMeta = new ContainerLogMeta( - key.toString(), thisNodeFile.getPath().getName()); - while (true) { - try { - Pair logMeta = - LogReader.readContainerMetaDataAndSkipData( - valueStream); - containerLogMeta.addLogMeta( - logMeta.getFirst(), - logMeta.getSecond(), - Times.format(thisNodeFile.getModificationTime())); - } catch (EOFException eof) { - break; - } - } - containersLogMeta.add(containerLogMeta); - if (!getAllContainers) { - break; - } - } - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } - } finally { - reader.close(); - } - } - } - return containersLogMeta; - } - /** * Output container log. * @param containerId the containerId @@ -247,82 +158,4 @@ public static void outputContainerLogThroughZeroCopy(String containerId, } } - public static boolean outputAggregatedContainerLog(Configuration conf, - ApplicationId appId, String appOwner, - String containerId, String nodeId, - String logFileName, long outputSize, OutputStream os, - byte[] buf) throws IOException { - boolean findLogs = false; - RemoteIterator nodeFiles = LogAggregationUtils - .getRemoteNodeFileDir(conf, appId, appOwner); - while (nodeFiles != null && nodeFiles.hasNext()) { - final FileStatus thisNodeFile = nodeFiles.next(); - String nodeName = thisNodeFile.getPath().getName(); - if (nodeName.equals(appId + ".har")) { - Path p = new Path("har:///" - + thisNodeFile.getPath().toUri().getRawPath()); - nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); - continue; - } - if ((nodeId == null || nodeName.contains(LogAggregationUtils - .getNodeString(nodeId))) && !nodeName.endsWith( - LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = null; - try { - reader = new AggregatedLogFormat.LogReader(conf, - thisNodeFile.getPath()); - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - while (valueStream != null && !key.toString() - .equals(containerId)) { - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } - if (valueStream == null) { - continue; - } - while (true) { - try { - String fileType = valueStream.readUTF(); - String fileLengthStr = valueStream.readUTF(); - long fileLength = Long.parseLong(fileLengthStr); - if (fileType.equalsIgnoreCase(logFileName)) { - LogToolUtils.outputContainerLog(containerId, - nodeId, fileType, fileLength, outputSize, - Times.format(thisNodeFile.getModificationTime()), - valueStream, os, buf, - ContainerLogAggregationType.AGGREGATED); - StringBuilder sb = new StringBuilder(); - String endOfFile = "End of LogType:" + fileType; - sb.append("\n" + endOfFile + "\n"); - sb.append(StringUtils.repeat("*", endOfFile.length() + 50) - + "\n\n"); - byte[] b = sb.toString().getBytes(Charset.forName("UTF-8")); - os.write(b, 0, b.length); - findLogs = true; - } else { - long totalSkipped = 0; - long currSkipped = 0; - while (currSkipped != -1 && totalSkipped < fileLength) { - currSkipped = valueStream.skip( - fileLength - totalSkipped); - totalSkipped += currSkipped; - } - } - } catch (EOFException eof) { - break; - } - } - } finally { - if (reader != null) { - reader.close(); - } - } - } - } - os.flush(); - return findLogs; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 5503f8fee8d49..87344a7f68ab8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -24,6 +24,10 @@ import com.google.common.collect.Sets; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Paths; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -31,7 +35,9 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -42,13 +48,18 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.webapp.View.ViewContext; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; /** * Base class to implement Log Aggregation File Controller. @@ -167,6 +178,74 @@ public abstract void write(LogKey logKey, LogValue logValue) public abstract void postWrite(LogAggregationFileControllerContext record) throws Exception; + protected PrintStream createPrintStream(String localDir, String nodeId, + String containerId) throws IOException { + PrintStream out = System.out; + if(localDir != null && !localDir.isEmpty()) { + Path nodePath = new Path(localDir, LogAggregationUtils + .getNodeString(nodeId)); + Files.createDirectories(Paths.get(nodePath.toString())); + Path containerLogPath = new Path(nodePath, containerId); + out = new PrintStream(containerLogPath.toString(), "UTF-8"); + } + return out; + } + + protected void closePrintStream(OutputStream out) { + if (out != System.out) { + IOUtils.closeQuietly(out); + } + } + + /** + * Output container log. + * @param logRequest {@link ContainerLogsRequest} + * @param os the output stream + * @throws IOException if we can not access the log file. + */ + public abstract boolean readAggregatedLogs(ContainerLogsRequest logRequest, + OutputStream os) throws IOException; + + /** + * Return a list of {@link ContainerLogMeta} for an application + * from Remote FileSystem. + * + * @param logRequest {@link ContainerLogsRequest} + * @return a list of {@link ContainerLogMeta} + * @throws IOException if there is no available log file + */ + public abstract List readAggregatedLogsMeta( + ContainerLogsRequest logRequest) throws IOException; + + /** + * Render Aggregated Logs block. + * @param html the html + * @param context the ViewContext + */ + public abstract void renderAggregatedLogsBlock(Block html, + ViewContext context); + + /** + * Returns the owner of the application. + * + * @param the aggregatedLog path. + * @return the application owner. + * @throws IOException + */ + public abstract String getApplicationOwner(Path aggregatedLogPath) + throws IOException; + + /** + * Returns ACLs for the application. An empty map is returned if no ACLs are + * found. + * + * @param the aggregatedLog path. + * @return a map of the Application ACLs. + * @throws IOException + */ + public abstract Map getApplicationAcls( + Path aggregatedLogPath) throws IOException; + /** * Verify and create the remote log directory. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java new file mode 100644 index 0000000000000..c996623ec8643 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation.filecontroller; + +import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; + +import com.google.inject.Inject; +import java.util.Map; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock; + +/** + * Base class to implement Aggregated Logs Block. + */ +@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"}) +public abstract class LogAggregationHtmlBlock extends HtmlBlock { + + @Inject + public LogAggregationHtmlBlock(ViewContext ctx) { + super(ctx); + } + + protected BlockParameters verifyAndParseParameters(Block html) { + BlockParameters params = new BlockParameters(); + ContainerId containerId = LogAggregationWebUtils + .verifyAndGetContainerId(html, $(CONTAINER_ID)); + params.setContainerId(containerId); + + NodeId nodeId = LogAggregationWebUtils + .verifyAndGetNodeId(html, $(NM_NODENAME)); + params.setNodeId(nodeId); + + String appOwner = LogAggregationWebUtils + .verifyAndGetAppOwner(html, $(APP_OWNER)); + params.setAppOwner(appOwner); + + boolean isValid = true; + long start = -4096; + try { + start = LogAggregationWebUtils.getLogStartIndex( + html, $("start")); + } catch (NumberFormatException ne) { + html.h1().__("Invalid log start value: " + $("start")).__(); + isValid = false; + } + params.setStartIndex(start); + + long end = Long.MAX_VALUE; + try { + end = LogAggregationWebUtils.getLogEndIndex( + html, $("end")); + } catch (NumberFormatException ne) { + html.h1().__("Invalid log start value: " + $("end")).__(); + isValid = false; + } + params.setEndIndex(end); + + if (containerId == null || nodeId == null || appOwner == null + || appOwner.isEmpty() || !isValid) { + return null; + } + + ApplicationId appId = containerId.getApplicationAttemptId() + .getApplicationId(); + params.setAppId(appId); + + String logEntity = $(ENTITY_STRING); + if (logEntity == null || logEntity.isEmpty()) { + logEntity = containerId.toString(); + } + params.setLogEntity(logEntity); + + return params; + } + + protected boolean checkAcls(Configuration conf, ApplicationId appId, + String owner, Map appAcls, + String remoteUser) { + ApplicationACLsManager aclsManager = new ApplicationACLsManager( + conf); + aclsManager.addApplication(appId, appAcls); + + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + if (callerUGI != null && !aclsManager.checkAccess(callerUGI, + ApplicationAccessType.VIEW_APP, owner, appId)) { + return false; + } + return true; + } + + protected static class BlockParameters { + private ApplicationId appId; + private ContainerId containerId; + private NodeId nodeId; + private String appOwner; + private long start; + private long end; + private String logEntity; + + public ApplicationId getAppId() { + return appId; + } + + public void setAppId(ApplicationId appId) { + this.appId = appId; + } + + public ContainerId getContainerId() { + return containerId; + } + + public void setContainerId(ContainerId containerId) { + this.containerId = containerId; + } + + public NodeId getNodeId() { + return nodeId; + } + + public void setNodeId(NodeId nodeId) { + this.nodeId = nodeId; + } + + public String getAppOwner() { + return appOwner; + } + + public void setAppOwner(String appOwner) { + this.appOwner = appOwner; + } + + public long getStartIndex() { + return start; + } + + public void setStartIndex(long startIndex) { + this.start = startIndex; + } + + public long getEndIndex() { + return end; + } + + public void setEndIndex(long endIndex) { + this.end = endIndex; + } + + public String getLogEntity() { + return logEntity; + } + + public void setLogEntity(String logEntity) { + this.logEntity = logEntity; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java deleted file mode 100644 index 9e0c66d73cc57..0000000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.logaggregation.filecontroller; - -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; -import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; -import org.apache.hadoop.yarn.util.Times; - -/** - * The TFile log aggregation file Controller implementation. - */ -@Private -@Unstable -public class LogAggregationTFileController - extends LogAggregationFileController { - - private static final Log LOG = LogFactory.getLog( - LogAggregationTFileController.class); - - private LogWriter writer; - - public LogAggregationTFileController(){} - - @Override - public void initInternal(Configuration conf) { - this.remoteRootLogDir = new Path( - conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - this.remoteRootLogDirSuffix = - conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); - } - - @Override - public void initializeWriter(LogAggregationFileControllerContext context) - throws IOException { - this.writer = new LogWriter(); - writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(), - context.getUserUgi()); - // Write ACLs once when the writer is created. - writer.writeApplicationACLs(context.getAppAcls()); - writer.writeApplicationOwner(context.getUserUgi().getShortUserName()); - } - - @Override - public void closeWriter() { - this.writer.close(); - } - - @Override - public void write(LogKey logKey, LogValue logValue) throws IOException { - this.writer.append(logKey, logValue); - } - - @Override - public void postWrite(final LogAggregationFileControllerContext record) - throws Exception { - // Before upload logs, make sure the number of existing logs - // is smaller than the configured NM log aggregation retention size. - if (record.isUploadedLogsInThisCycle() && - record.isLogAggregationInRolling()) { - cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(), - record.getUserUgi()); - record.increcleanupOldLogTimes(); - } - - final Path renamedPath = record.getRollingMonitorInterval() <= 0 - ? record.getRemoteNodeLogFileForApp() : new Path( - record.getRemoteNodeLogFileForApp().getParent(), - record.getRemoteNodeLogFileForApp().getName() + "_" - + record.getLogUploadTimeStamp()); - final boolean rename = record.isUploadedLogsInThisCycle(); - try { - record.getUserUgi().doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - FileSystem remoteFS = record.getRemoteNodeLogFileForApp() - .getFileSystem(conf); - if (rename) { - remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(), - renamedPath); - } else { - remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false); - } - return null; - } - }); - } catch (Exception e) { - LOG.error( - "Failed to move temporary log file to final location: [" - + record.getRemoteNodeTmpLogFileForApp() + "] to [" - + renamedPath + "]", e); - throw new Exception("Log uploaded failed for Application: " - + record.getAppId() + " in NodeManager: " - + LogAggregationUtils.getNodeString(record.getNodeId()) + " at " - + Times.format(record.getLogUploadTimeStamp()) + "\n"); - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java new file mode 100644 index 0000000000000..d2038e2d2e571 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.util.Pair; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.HarFs; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; +import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.LogToolUtils; +import org.apache.hadoop.yarn.util.Times; +import org.apache.hadoop.yarn.webapp.View.ViewContext; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; + +/** + * The TFile log aggregation file Controller implementation. + */ +@Private +@Unstable +public class LogAggregationTFileController + extends LogAggregationFileController { + + private static final Log LOG = LogFactory.getLog( + LogAggregationTFileController.class); + + private LogWriter writer; + private TFileLogReader tfReader = null; + + public LogAggregationTFileController(){} + + @Override + public void initInternal(Configuration conf) { + this.remoteRootLogDir = new Path( + conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + this.remoteRootLogDirSuffix = + conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); + } + + @Override + public void initializeWriter(LogAggregationFileControllerContext context) + throws IOException { + this.writer = new LogWriter(); + writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(), + context.getUserUgi()); + // Write ACLs once when the writer is created. + writer.writeApplicationACLs(context.getAppAcls()); + writer.writeApplicationOwner(context.getUserUgi().getShortUserName()); + } + + @Override + public void closeWriter() { + this.writer.close(); + this.writer = null; + } + + @Override + public void write(LogKey logKey, LogValue logValue) throws IOException { + this.writer.append(logKey, logValue); + } + + @Override + public void postWrite(final LogAggregationFileControllerContext record) + throws Exception { + // Before upload logs, make sure the number of existing logs + // is smaller than the configured NM log aggregation retention size. + if (record.isUploadedLogsInThisCycle() && + record.isLogAggregationInRolling()) { + cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(), + record.getUserUgi()); + record.increcleanupOldLogTimes(); + } + + final Path renamedPath = record.getRollingMonitorInterval() <= 0 + ? record.getRemoteNodeLogFileForApp() : new Path( + record.getRemoteNodeLogFileForApp().getParent(), + record.getRemoteNodeLogFileForApp().getName() + "_" + + record.getLogUploadTimeStamp()); + final boolean rename = record.isUploadedLogsInThisCycle(); + try { + record.getUserUgi().doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + FileSystem remoteFS = record.getRemoteNodeLogFileForApp() + .getFileSystem(conf); + if (rename) { + remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(), + renamedPath); + } else { + remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false); + } + return null; + } + }); + } catch (Exception e) { + LOG.error( + "Failed to move temporary log file to final location: [" + + record.getRemoteNodeTmpLogFileForApp() + "] to [" + + renamedPath + "]", e); + throw new Exception("Log uploaded failed for Application: " + + record.getAppId() + " in NodeManager: " + + LogAggregationUtils.getNodeString(record.getNodeId()) + " at " + + Times.format(record.getLogUploadTimeStamp()) + "\n"); + } + } + + @Override + public boolean readAggregatedLogs(ContainerLogsRequest logRequest, + OutputStream os) throws IOException { + boolean findLogs = false; + boolean createPrintStream = (os == null); + ApplicationId appId = logRequest.getAppId(); + String nodeId = logRequest.getNodeId(); + List logTypes = new ArrayList<>(); + if (logRequest.getLogTypes() != null && !logRequest + .getLogTypes().isEmpty()) { + logTypes.addAll(logRequest.getLogTypes()); + } + String containerIdStr = logRequest.getContainerId(); + boolean getAllContainers = (containerIdStr == null + || containerIdStr.isEmpty()); + long size = logRequest.getBytes(); + RemoteIterator nodeFiles = LogAggregationUtils + .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner()); + byte[] buf = new byte[65535]; + while (nodeFiles != null && nodeFiles.hasNext()) { + final FileStatus thisNodeFile = nodeFiles.next(); + LOG.error(thisNodeFile.getPath().toString()); + String nodeName = thisNodeFile.getPath().getName(); + if (nodeName.equals(appId + ".har")) { + Path p = new Path("har:///" + + thisNodeFile.getPath().toUri().getRawPath()); + nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); + continue; + } + if ((nodeId == null || nodeName.contains(LogAggregationUtils + .getNodeString(nodeId))) && !nodeName.endsWith( + LogAggregationUtils.TMP_FILE_SUFFIX)) { + AggregatedLogFormat.LogReader reader = null; + try { + reader = new AggregatedLogFormat.LogReader(conf, + thisNodeFile.getPath()); + DataInputStream valueStream; + LogKey key = new LogKey(); + valueStream = reader.next(key); + while (valueStream != null) { + if (getAllContainers || (key.toString().equals(containerIdStr))) { + if (createPrintStream) { + os = createPrintStream( + logRequest.getOutputLocalDir(), + thisNodeFile.getPath().getName(), key.toString()); + } + try { + while (true) { + try { + String fileType = valueStream.readUTF(); + String fileLengthStr = valueStream.readUTF(); + long fileLength = Long.parseLong(fileLengthStr); + if (logTypes == null || logTypes.isEmpty() || + logTypes.contains(fileType)) { + LogToolUtils.outputContainerLog(key.toString(), + nodeName, fileType, fileLength, size, + Times.format(thisNodeFile.getModificationTime()), + valueStream, os, buf, + ContainerLogAggregationType.AGGREGATED); + StringBuilder sb = new StringBuilder(); + String endOfFile = "End of LogType:" + fileType; + sb.append("\n" + endOfFile + "\n"); + sb.append(StringUtils.repeat("*", endOfFile.length() + 50) + + "\n\n"); + byte[] b = sb.toString().getBytes( + Charset.forName("UTF-8")); + os.write(b, 0, b.length); + findLogs = true; + } else { + long totalSkipped = 0; + long currSkipped = 0; + while (currSkipped != -1 && totalSkipped < fileLength) { + currSkipped = valueStream.skip( + fileLength - totalSkipped); + totalSkipped += currSkipped; + } + } + } catch (EOFException eof) { + break; + } + } + } finally { + os.flush(); + if (createPrintStream) { + closePrintStream(os); + } + } + if (!getAllContainers) { + break; + } + } + // Next container + key = new LogKey(); + valueStream = reader.next(key); + } + } finally { + if (reader != null) { + reader.close(); + } + } + } + } + return findLogs; + } + + @Override + public List readAggregatedLogsMeta( + ContainerLogsRequest logRequest) throws IOException { + List containersLogMeta = new ArrayList<>(); + String containerIdStr = logRequest.getContainerId(); + String nodeId = logRequest.getNodeId(); + ApplicationId appId = logRequest.getAppId(); + String appOwner = logRequest.getAppOwner(); + boolean getAllContainers = (containerIdStr == null); + String nodeIdStr = (nodeId == null) ? null + : LogAggregationUtils.getNodeString(nodeId); + RemoteIterator nodeFiles = LogAggregationUtils + .getRemoteNodeFileDir(conf, appId, appOwner); + if (nodeFiles == null) { + throw new IOException("There is no available log fils for " + + "application:" + appId); + } + while (nodeFiles.hasNext()) { + FileStatus thisNodeFile = nodeFiles.next(); + if (nodeIdStr != null) { + if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) { + continue; + } + } + if (!thisNodeFile.getPath().getName() + .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { + AggregatedLogFormat.LogReader reader = + new AggregatedLogFormat.LogReader(conf, + thisNodeFile.getPath()); + try { + DataInputStream valueStream; + LogKey key = new LogKey(); + valueStream = reader.next(key); + while (valueStream != null) { + if (getAllContainers || (key.toString().equals(containerIdStr))) { + ContainerLogMeta containerLogMeta = new ContainerLogMeta( + key.toString(), thisNodeFile.getPath().getName()); + while (true) { + try { + Pair logMeta = + LogReader.readContainerMetaDataAndSkipData( + valueStream); + containerLogMeta.addLogMeta( + logMeta.getFirst(), + logMeta.getSecond(), + Times.format(thisNodeFile.getModificationTime())); + } catch (EOFException eof) { + break; + } + } + containersLogMeta.add(containerLogMeta); + if (!getAllContainers) { + break; + } + } + // Next container + key = new LogKey(); + valueStream = reader.next(key); + } + } finally { + reader.close(); + } + } + } + return containersLogMeta; + } + + @Override + public void renderAggregatedLogsBlock(Block html, ViewContext context) { + TFileAggregatedLogsBlock block = new TFileAggregatedLogsBlock( + context, conf); + block.render(html); + } + + @Override + public String getApplicationOwner(Path aggregatedLog) throws IOException { + createTFileLogReader(aggregatedLog); + return this.tfReader.getLogReader().getApplicationOwner(); + } + + @Override + public Map getApplicationAcls( + Path aggregatedLog) throws IOException { + createTFileLogReader(aggregatedLog); + return this.tfReader.getLogReader().getApplicationAcls(); + } + + private void createTFileLogReader(Path aggregatedLog) throws IOException { + if (this.tfReader == null || !this.tfReader.getAggregatedLogPath() + .equals(aggregatedLog)) { + LogReader logReader = new LogReader(conf, aggregatedLog); + this.tfReader = new TFileLogReader(logReader, aggregatedLog); + } + } + + private static class TFileLogReader { + private LogReader logReader; + private Path aggregatedLogPath; + + TFileLogReader(LogReader logReader, Path aggregatedLogPath) { + this.setLogReader(logReader); + this.setAggregatedLogPath(aggregatedLogPath); + } + public LogReader getLogReader() { + return logReader; + } + public void setLogReader(LogReader logReader) { + this.logReader = logReader; + } + public Path getAggregatedLogPath() { + return aggregatedLogPath; + } + public void setAggregatedLogPath(Path aggregatedLogPath) { + this.aggregatedLogPath = aggregatedLogPath; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java new file mode 100644 index 0000000000000..dde70e337024f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile; + +import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; + +import com.google.inject.Inject; +import java.io.IOException; +import java.util.Map; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.HarFs; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationHtmlBlock; +import org.apache.hadoop.yarn.util.Times; +import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; +import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE; + +/** + * The Aggregated Logs Block implementation for TFile. + */ +@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"}) +public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock { + + private final Configuration conf; + + @Inject + public TFileAggregatedLogsBlock(ViewContext ctx, Configuration conf) { + super(ctx); + this.conf = conf; + } + + @Override + protected void render(Block html) { + + BlockParameters params = verifyAndParseParameters(html); + if (params == null) { + return; + } + + RemoteIterator nodeFiles; + try { + nodeFiles = LogAggregationUtils + .getRemoteNodeFileDir(conf, params.getAppId(), + params.getAppOwner()); + } catch (RuntimeException e) { + throw e; + } catch (Exception ex) { + html.h1("No logs available for container " + + params.getContainerId().toString()); + return; + } + + NodeId nodeId = params.getNodeId(); + String logEntity = params.getLogEntity(); + ApplicationId appId = params.getAppId(); + ContainerId containerId = params.getContainerId(); + long start = params.getStartIndex(); + long end = params.getEndIndex(); + + boolean foundLog = false; + String desiredLogType = $(CONTAINER_LOG_TYPE); + try { + while (nodeFiles.hasNext()) { + AggregatedLogFormat.LogReader reader = null; + try { + FileStatus thisNodeFile = nodeFiles.next(); + if (thisNodeFile.getPath().getName().equals( + params.getAppId() + ".har")) { + Path p = new Path("har:///" + + thisNodeFile.getPath().toUri().getRawPath()); + nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); + continue; + } + if (!thisNodeFile.getPath().getName() + .contains(LogAggregationUtils.getNodeString(nodeId)) + || thisNodeFile.getPath().getName() + .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { + continue; + } + long logUploadedTime = thisNodeFile.getModificationTime(); + reader = new AggregatedLogFormat.LogReader( + conf, thisNodeFile.getPath()); + + String owner = null; + Map appAcls = null; + try { + owner = reader.getApplicationOwner(); + appAcls = reader.getApplicationAcls(); + } catch (IOException e) { + LOG.error("Error getting logs for " + logEntity, e); + continue; + } + String remoteUser = request().getRemoteUser(); + + if (!checkAcls(conf, appId, owner, appAcls, remoteUser)) { + html.h1().__("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity + + " in log file [" + thisNodeFile.getPath().getName() + "]") + .__(); + LOG.error("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity); + continue; + } + + AggregatedLogFormat.ContainerLogsReader logReader = reader + .getContainerLogsReader(containerId); + if (logReader == null) { + continue; + } + + foundLog = readContainerLogs(html, logReader, start, end, + desiredLogType, logUploadedTime); + } catch (IOException ex) { + LOG.error("Error getting logs for " + logEntity, ex); + continue; + } finally { + if (reader != null) { + reader.close(); + } + } + } + if (!foundLog) { + if (desiredLogType.isEmpty()) { + html.h1("No logs available for container " + + containerId.toString()); + } else { + html.h1("Unable to locate '" + desiredLogType + + "' log for container " + containerId.toString()); + } + } + } catch (IOException e) { + html.h1().__("Error getting logs for " + logEntity).__(); + LOG.error("Error getting logs for " + logEntity, e); + } + } + + private boolean readContainerLogs(Block html, + AggregatedLogFormat.ContainerLogsReader logReader, long startIndex, + long endIndex, String desiredLogType, long logUpLoadTime) + throws IOException { + int bufferSize = 65536; + char[] cbuf = new char[bufferSize]; + + boolean foundLog = false; + String logType = logReader.nextLog(); + while (logType != null) { + if (desiredLogType == null || desiredLogType.isEmpty() + || desiredLogType.equals(logType)) { + long logLength = logReader.getCurrentLogLength(); + if (foundLog) { + html.pre().__("\n\n").__(); + } + + html.p().__("Log Type: " + logType).__(); + html.p().__("Log Upload Time: " + Times.format(logUpLoadTime)).__(); + html.p().__("Log Length: " + Long.toString(logLength)).__(); + + long start = startIndex < 0 + ? logLength + startIndex : startIndex; + start = start < 0 ? 0 : start; + start = start > logLength ? logLength : start; + long end = endIndex < 0 + ? logLength + endIndex : endIndex; + end = end < 0 ? 0 : end; + end = end > logLength ? logLength : end; + end = end < start ? start : end; + + long toRead = end - start; + if (toRead < logLength) { + html.p().__("Showing " + toRead + " bytes of " + logLength + + " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID), + $(ENTITY_STRING), $(APP_OWNER), + logType, "?start=0"), "here"). + __(" for the full log.").__(); + } + + long totalSkipped = 0; + while (totalSkipped < start) { + long ret = logReader.skip(start - totalSkipped); + if (ret == 0) { + //Read one byte + int nextByte = logReader.read(); + // Check if we have reached EOF + if (nextByte == -1) { + throw new IOException("Premature EOF from container log"); + } + ret = 1; + } + totalSkipped += ret; + } + + int len = 0; + int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + PRE pre = html.pre(); + + while (toRead > 0 + && (len = logReader.read(cbuf, 0, currentToRead)) > 0) { + pre.__(new String(cbuf, 0, len)); + toRead = toRead - len; + currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + } + + pre.__(); + foundLog = true; + } + + logType = logReader.nextLog(); + } + + return foundLog; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java new file mode 100644 index 0000000000000..b2e91ab48a989 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile; +import org.apache.hadoop.classification.InterfaceAudience; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java index 0c7e09e414971..f6fea74ff8d51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java @@ -20,55 +20,59 @@ import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; -import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE; import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Map; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.HarFs; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; -import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; -import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.apache.hadoop.yarn.util.Times; -import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; -import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE; +import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.apache.hadoop.yarn.webapp.view.HtmlBlock; - import com.google.inject.Inject; @InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"}) public class AggregatedLogsBlock extends HtmlBlock { private final Configuration conf; + private final LogAggregationFileControllerFactory factory; @Inject AggregatedLogsBlock(Configuration conf) { this.conf = conf; + factory = new LogAggregationFileControllerFactory(conf); } @Override protected void render(Block html) { - ContainerId containerId = verifyAndGetContainerId(html); - NodeId nodeId = verifyAndGetNodeId(html); - String appOwner = verifyAndGetAppOwner(html); - LogLimits logLimits = verifyAndGetLogLimits(html); + ContainerId containerId = LogAggregationWebUtils + .verifyAndGetContainerId(html, $(CONTAINER_ID)); + NodeId nodeId = LogAggregationWebUtils + .verifyAndGetNodeId(html, $(NM_NODENAME)); + String appOwner = LogAggregationWebUtils + .verifyAndGetAppOwner(html, $(APP_OWNER)); + boolean isValid = true; + try { + LogAggregationWebUtils.getLogStartIndex( + html, $("start")); + } catch (NumberFormatException ne) { + html.h1().__("Invalid log start value: " + $("start")).__(); + isValid = false; + } + try { + LogAggregationWebUtils.getLogEndIndex( + html, $("end")); + } catch (NumberFormatException ne) { + html.h1().__("Invalid log start value: " + $("end")).__(); + isValid = false; + } + if (containerId == null || nodeId == null || appOwner == null - || appOwner.isEmpty() || logLimits == null) { + || appOwner.isEmpty() || !isValid) { return; } @@ -93,21 +97,11 @@ protected void render(Block html) { return; } - Path remoteRootLogDir = new Path(conf.get( - YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - Path remoteAppDir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogDir, applicationId, appOwner, - LogAggregationUtils.getRemoteNodeLogDirSuffix(conf)); - RemoteIterator nodeFiles; + LogAggregationFileController fileController; try { - Path qualifiedLogDir = - FileContext.getFileContext(conf).makeQualified( - remoteAppDir); - nodeFiles = - FileContext.getFileContext(qualifiedLogDir.toUri(), conf) - .listStatus(remoteAppDir); - } catch (FileNotFoundException fnf) { + fileController = this.factory.getFileControllerForRead( + applicationId, appOwner); + } catch (Exception fnf) { html.h1() .__("Logs not available for " + logEntity + ". Aggregation may not be complete, " @@ -118,251 +112,9 @@ protected void render(Block html) { .__(); } return; - } catch (Exception ex) { - html.h1() - .__("Error getting logs at " + nodeId).__(); - return; - } - - boolean foundLog = false; - String desiredLogType = $(CONTAINER_LOG_TYPE); - try { - while (nodeFiles.hasNext()) { - AggregatedLogFormat.LogReader reader = null; - try { - FileStatus thisNodeFile = nodeFiles.next(); - if (thisNodeFile.getPath().getName().equals(applicationId + ".har")) { - Path p = new Path("har:///" - + thisNodeFile.getPath().toUri().getRawPath()); - nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); - continue; - } - if (!thisNodeFile.getPath().getName() - .contains(LogAggregationUtils.getNodeString(nodeId)) - || thisNodeFile.getPath().getName() - .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - continue; - } - long logUploadedTime = thisNodeFile.getModificationTime(); - reader = - new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath()); - - String owner = null; - Map appAcls = null; - try { - owner = reader.getApplicationOwner(); - appAcls = reader.getApplicationAcls(); - } catch (IOException e) { - LOG.error("Error getting logs for " + logEntity, e); - continue; - } - ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf); - aclsManager.addApplication(applicationId, appAcls); - - String remoteUser = request().getRemoteUser(); - UserGroupInformation callerUGI = null; - if (remoteUser != null) { - callerUGI = UserGroupInformation.createRemoteUser(remoteUser); - } - if (callerUGI != null && !aclsManager.checkAccess(callerUGI, - ApplicationAccessType.VIEW_APP, owner, applicationId)) { - html.h1() - .__("User [" + remoteUser - + "] is not authorized to view the logs for " + logEntity - + " in log file [" + thisNodeFile.getPath().getName() + "]").__(); - LOG.error("User [" + remoteUser - + "] is not authorized to view the logs for " + logEntity); - continue; - } - - AggregatedLogFormat.ContainerLogsReader logReader = reader - .getContainerLogsReader(containerId); - if (logReader == null) { - continue; - } - - foundLog = readContainerLogs(html, logReader, logLimits, - desiredLogType, logUploadedTime); - } catch (IOException ex) { - LOG.error("Error getting logs for " + logEntity, ex); - continue; - } finally { - if (reader != null) { - reader.close(); - } - } - } - if (!foundLog) { - if (desiredLogType.isEmpty()) { - html.h1("No logs available for container " + containerId.toString()); - } else { - html.h1("Unable to locate '" + desiredLogType - + "' log for container " + containerId.toString()); - } - } - } catch (IOException e) { - html.h1().__("Error getting logs for " + logEntity).__(); - LOG.error("Error getting logs for " + logEntity, e); - } - } - - private boolean readContainerLogs(Block html, - AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits, - String desiredLogType, long logUpLoadTime) throws IOException { - int bufferSize = 65536; - char[] cbuf = new char[bufferSize]; - - boolean foundLog = false; - String logType = logReader.nextLog(); - while (logType != null) { - if (desiredLogType == null || desiredLogType.isEmpty() - || desiredLogType.equals(logType)) { - long logLength = logReader.getCurrentLogLength(); - if (foundLog) { - html.pre().__("\n\n").__(); - } - - html.p().__("Log Type: " + logType).__(); - html.p().__("Log Upload Time: " + Times.format(logUpLoadTime)).__(); - html.p().__("Log Length: " + Long.toString(logLength)).__(); - - long start = logLimits.start < 0 - ? logLength + logLimits.start : logLimits.start; - start = start < 0 ? 0 : start; - start = start > logLength ? logLength : start; - long end = logLimits.end < 0 - ? logLength + logLimits.end : logLimits.end; - end = end < 0 ? 0 : end; - end = end > logLength ? logLength : end; - end = end < start ? start : end; - - long toRead = end - start; - if (toRead < logLength) { - html.p().__("Showing " + toRead + " bytes of " + logLength - + " total. Click ") - .a(url("logs", $(NM_NODENAME), $(CONTAINER_ID), - $(ENTITY_STRING), $(APP_OWNER), - logType, "?start=0"), "here"). - __(" for the full log.").__(); - } - - long totalSkipped = 0; - while (totalSkipped < start) { - long ret = logReader.skip(start - totalSkipped); - if (ret == 0) { - //Read one byte - int nextByte = logReader.read(); - // Check if we have reached EOF - if (nextByte == -1) { - throw new IOException( "Premature EOF from container log"); - } - ret = 1; - } - totalSkipped += ret; - } - - int len = 0; - int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; - PRE pre = html.pre(); - - while (toRead > 0 - && (len = logReader.read(cbuf, 0, currentToRead)) > 0) { - pre.__(new String(cbuf, 0, len)); - toRead = toRead - len; - currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; - } - - pre.__(); - foundLog = true; - } - - logType = logReader.nextLog(); - } - - return foundLog; - } - - private ContainerId verifyAndGetContainerId(Block html) { - String containerIdStr = $(CONTAINER_ID); - if (containerIdStr == null || containerIdStr.isEmpty()) { - html.h1().__("Cannot get container logs without a ContainerId").__(); - return null; - } - ContainerId containerId = null; - try { - containerId = ContainerId.fromString(containerIdStr); - } catch (IllegalArgumentException e) { - html.h1() - .__("Cannot get container logs for invalid containerId: " - + containerIdStr).__(); - return null; - } - return containerId; - } - - private NodeId verifyAndGetNodeId(Block html) { - String nodeIdStr = $(NM_NODENAME); - if (nodeIdStr == null || nodeIdStr.isEmpty()) { - html.h1().__("Cannot get container logs without a NodeId").__(); - return null; - } - NodeId nodeId = null; - try { - nodeId = NodeId.fromString(nodeIdStr); - } catch (IllegalArgumentException e) { - html.h1().__("Cannot get container logs. Invalid nodeId: " + nodeIdStr) - .__(); - return null; - } - return nodeId; - } - - private String verifyAndGetAppOwner(Block html) { - String appOwner = $(APP_OWNER); - if (appOwner == null || appOwner.isEmpty()) { - html.h1().__("Cannot get container logs without an app owner").__(); - } - return appOwner; - } - - private static class LogLimits { - long start; - long end; - } - - private LogLimits verifyAndGetLogLimits(Block html) { - long start = -4096; - long end = Long.MAX_VALUE; - boolean isValid = true; - - String startStr = $("start"); - if (startStr != null && !startStr.isEmpty()) { - try { - start = Long.parseLong(startStr); - } catch (NumberFormatException e) { - isValid = false; - html.h1().__("Invalid log start value: " + startStr).__(); - } - } - - String endStr = $("end"); - if (endStr != null && !endStr.isEmpty()) { - try { - end = Long.parseLong(endStr); - } catch (NumberFormatException e) { - isValid = false; - html.h1().__("Invalid log end value: " + endStr).__(); - } - } - - if (!isValid) { - return null; } - LogLimits limits = new LogLimits(); - limits.start = start; - limits.end = end; - return limits; + fileController.renderAggregatedLogsBlock(html, this.context()); } private String getApplicationLogURL(ApplicationId applicationId) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 17d04965dda73..0cad1672ba9f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1182,7 +1182,7 @@ Class that supports TFile read and write operations. yarn.log-aggregation.file-controller.TFile.class - org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController + org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java index 3dd7de3a0ad96..bee34e07354f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java @@ -48,7 +48,9 @@ import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; +import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.TFileAggregatedLogsBlock; import org.apache.hadoop.yarn.webapp.YarnWebParams; +import org.apache.hadoop.yarn.webapp.View.ViewContext; import org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlockForTest; import org.apache.hadoop.yarn.webapp.view.BlockForTest; import org.apache.hadoop.yarn.webapp.view.HtmlBlock; @@ -56,6 +58,9 @@ import org.junit.Test; import static org.mockito.Mockito.*; + +import com.google.inject.Inject; + import static org.junit.Assert.*; /** @@ -77,12 +82,14 @@ public void testAccessDenied() throws Exception { writeLog(configuration, "owner"); - AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest( - configuration, "owner", "container_0_0001_01_000001"); + ByteArrayOutputStream data = new ByteArrayOutputStream(); PrintWriter printWriter = new PrintWriter(data); HtmlBlock html = new HtmlBlockForTest(); HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false); + TFileAggregatedLogsBlockForTest aggregatedBlock + = getTFileAggregatedLogsBlockForTest(configuration, "owner", + "container_0_0001_01_000001", "localhost:1234"); aggregatedBlock.render(block); block.getWriter().flush(); @@ -158,12 +165,13 @@ public void testAggregatedLogsBlock() throws Exception { writeLog(configuration, "admin"); - AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest( - configuration, "admin", "container_0_0001_01_000001"); ByteArrayOutputStream data = new ByteArrayOutputStream(); PrintWriter printWriter = new PrintWriter(data); HtmlBlock html = new HtmlBlockForTest(); HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false); + TFileAggregatedLogsBlockForTest aggregatedBlock + = getTFileAggregatedLogsBlockForTest(configuration, "admin", + "container_0_0001_01_000001", "localhost:1234"); aggregatedBlock.render(block); block.getWriter().flush(); @@ -191,13 +199,13 @@ public void testAggregatedLogsBlockHar() throws Exception { "/application_1440536969523_0001.har"; FileUtils.copyDirectory(new File(harUrl.getPath()), new File(path)); - AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest( - configuration, "admin", - "container_1440536969523_0001_01_000001", "host1:1111"); ByteArrayOutputStream data = new ByteArrayOutputStream(); PrintWriter printWriter = new PrintWriter(data); HtmlBlock html = new HtmlBlockForTest(); HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false); + TFileAggregatedLogsBlockForTest aggregatedBlock + = getTFileAggregatedLogsBlockForTest(configuration, "admin", + "container_1440536969523_0001_01_000001", "host1:1111"); aggregatedBlock.render(block); block.getWriter().flush(); @@ -206,7 +214,7 @@ public void testAggregatedLogsBlockHar() throws Exception { assertTrue(out.contains("Hello stdout")); assertTrue(out.contains("Hello syslog")); - aggregatedBlock = getAggregatedLogsBlockForTest( + aggregatedBlock = getTFileAggregatedLogsBlockForTest( configuration, "admin", "container_1440536969523_0001_01_000002", "host2:2222"); data = new ByteArrayOutputStream(); @@ -237,12 +245,13 @@ public void testNoLogs() throws Exception { } writeLog(configuration, "admin"); - AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest( - configuration, "admin", "container_0_0001_01_000001"); ByteArrayOutputStream data = new ByteArrayOutputStream(); PrintWriter printWriter = new PrintWriter(data); HtmlBlock html = new HtmlBlockForTest(); HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false); + TFileAggregatedLogsBlockForTest aggregatedBlock + = getTFileAggregatedLogsBlockForTest(configuration, "admin", + "container_0_0001_01_000001", "localhost:1234"); aggregatedBlock.render(block); block.getWriter().flush(); @@ -250,8 +259,7 @@ public void testNoLogs() throws Exception { assertTrue(out.contains("No logs available for container container_0_0001_01_000001")); } - - + private Configuration getConfiguration() { Configuration configuration = new YarnConfiguration(); configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); @@ -267,6 +275,28 @@ private AggregatedLogsBlockForTest getAggregatedLogsBlockForTest( "localhost:1234"); } + private TFileAggregatedLogsBlockForTest getTFileAggregatedLogsBlockForTest( + Configuration configuration, String user, String containerId, + String nodeName) { + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRemoteUser()).thenReturn(user); + ViewContext mockContext = mock(ViewContext.class); + TFileAggregatedLogsBlockForTest aggregatedBlock + = new TFileAggregatedLogsBlockForTest(mockContext, + configuration); + aggregatedBlock.setRequest(request); + aggregatedBlock.moreParams().put(YarnWebParams.CONTAINER_ID, + containerId); + aggregatedBlock.moreParams().put(YarnWebParams.NM_NODENAME, + nodeName); + aggregatedBlock.moreParams().put(YarnWebParams.APP_OWNER, user); + aggregatedBlock.moreParams().put("start", ""); + aggregatedBlock.moreParams().put("end", ""); + aggregatedBlock.moreParams().put(YarnWebParams.ENTITY_STRING, "entity"); + return aggregatedBlock; + } + + private AggregatedLogsBlockForTest getAggregatedLogsBlockForTest( Configuration configuration, String user, String containerId, String nodeName) { @@ -340,4 +370,32 @@ private void writeLog(String fileName, String text) throws Exception { writer.close(); } + private static class TFileAggregatedLogsBlockForTest + extends TFileAggregatedLogsBlock { + + private Map params = new HashMap(); + private HttpServletRequest request; + + @Inject + TFileAggregatedLogsBlockForTest(ViewContext ctx, Configuration conf) { + super(ctx, conf); + } + + public void render(Block html) { + super.render(html); + } + + @Override + public Map moreParams() { + return params; + } + + public HttpServletRequest request() { + return request; + } + + public void setRequest(HttpServletRequest request) { + this.request = request; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java index 45f18c104810f..2d0864a474bcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java @@ -23,19 +23,27 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.io.OutputStream; import java.io.Writer; import java.util.LinkedList; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; -import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; +import org.apache.hadoop.yarn.webapp.View.ViewContext; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; import org.junit.Test; /** @@ -167,5 +175,34 @@ public void initializeWriter(LogAggregationFileControllerContext context) throws IOException { // Do Nothing } + + @Override + public boolean readAggregatedLogs(ContainerLogsRequest logRequest, + OutputStream os) throws IOException { + return false; + } + + @Override + public List readAggregatedLogsMeta( + ContainerLogsRequest logRequest) throws IOException { + return null; + } + + @Override + public void renderAggregatedLogsBlock(Block html, ViewContext context) { + // DO NOTHING + } + + @Override + public String getApplicationOwner(Path aggregatedLogPath) + throws IOException { + return null; + } + + @Override + public Map getApplicationAcls( + Path aggregatedLogPath) throws IOException { + return null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java index 13410a88054ce..755127b20b228 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java @@ -23,6 +23,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -54,8 +55,9 @@ import org.apache.hadoop.yarn.api.ApplicationBaseProtocol; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; -import org.apache.hadoop.yarn.logaggregation.LogToolUtils; import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; @@ -92,12 +94,14 @@ public class AHSWebServices extends WebServices { private static final Joiner JOINER = Joiner.on(""); private static final Joiner DOT_JOINER = Joiner.on(". "); private final Configuration conf; + private final LogAggregationFileControllerFactory factory; @Inject public AHSWebServices(ApplicationBaseProtocol appBaseProt, Configuration conf) { super(appBaseProt); this.conf = conf; + this.factory = new LogAggregationFileControllerFactory(conf); } @GET @@ -525,9 +529,17 @@ private StreamingOutput getStreamingOutput(final ApplicationId appId, @Override public void write(OutputStream os) throws IOException, WebApplicationException { - byte[] buf = new byte[65535]; - boolean findLogs = LogToolUtils.outputAggregatedContainerLog(conf, - appId, appOwner, containerIdStr, nodeId, logFile, bytes, os, buf); + ContainerLogsRequest request = new ContainerLogsRequest(); + request.setAppId(appId); + request.setAppOwner(appOwner); + request.setContainerId(containerIdStr); + request.setBytes(bytes); + request.setNodeId(nodeId); + Set logTypes = new HashSet<>(); + logTypes.add(logFile); + request.setLogTypes(logTypes); + boolean findLogs = factory.getFileControllerForRead(appId, appOwner) + .readAggregatedLogs(request, os); if (!findLogs) { os.write(("Can not find logs for container:" + containerIdStr).getBytes(Charset.forName("UTF-8"))); @@ -558,9 +570,14 @@ private Response getContainerLogMeta(ApplicationId appId, String appOwner, final String nodeId, final String containerIdStr, boolean emptyLocalContainerLogMeta) { try { - List containerLogMeta = LogToolUtils - .getContainerLogMetaFromRemoteFS(conf, appId, containerIdStr, - nodeId, appOwner); + ContainerLogsRequest request = new ContainerLogsRequest(); + request.setAppId(appId); + request.setAppOwner(appOwner); + request.setContainerId(containerIdStr); + request.setNodeId(nodeId); + List containerLogMeta = factory + .getFileControllerForRead(appId, appOwner) + .readAggregatedLogsMeta(request); if (containerLogMeta.isEmpty()) { throw new NotFoundException( "Can not get log meta for container: " + containerIdStr); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 51c63c4f01c99..4ac150ac065ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; -import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy; import org.apache.hadoop.yarn.server.api.ContainerLogContext; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java index 3ba7d5cc02129..c5379ccf25847 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java @@ -23,8 +23,10 @@ import java.io.OutputStream; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map.Entry; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,8 +57,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; import org.apache.hadoop.yarn.logaggregation.LogToolUtils; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -90,6 +94,7 @@ public class NMWebServices { private static RecordFactory recordFactory = RecordFactoryProvider .getRecordFactory(null); private final String redirectWSUrl; + private final LogAggregationFileControllerFactory factory; private @javax.ws.rs.core.Context HttpServletRequest request; @@ -108,6 +113,8 @@ public NMWebServices(final Context nm, final ResourceView view, this.webapp = webapp; this.redirectWSUrl = this.nmContext.getConf().get( YarnConfiguration.YARN_LOG_SERVER_WEBSERVICE_URL); + this.factory = new LogAggregationFileControllerFactory( + this.nmContext.getConf()); } private void init() { @@ -262,10 +269,14 @@ public Response getContainerLogsInfo( Application app = this.nmContext.getApplications().get(appId); String appOwner = app == null ? null : app.getUser(); try { - List containerLogMeta = LogToolUtils - .getContainerLogMetaFromRemoteFS(this.nmContext.getConf(), - appId, containerIdStr, - this.nmContext.getNodeId().toString(), appOwner); + ContainerLogsRequest logRequest = new ContainerLogsRequest(); + logRequest.setAppId(appId); + logRequest.setAppOwner(appOwner); + logRequest.setContainerId(containerIdStr); + logRequest.setNodeId(this.nmContext.getNodeId().toString()); + List containerLogMeta = factory + .getFileControllerForRead(appId, appOwner) + .readAggregatedLogsMeta(logRequest); if (!containerLogMeta.isEmpty()) { for (ContainerLogMeta logMeta : containerLogMeta) { containersLogsInfo.add(new ContainerLogsInfo(logMeta, @@ -449,12 +460,17 @@ public void write(OutputStream os) throws IOException, Application app = nmContext.getApplications().get(appId); String appOwner = app == null ? null : app.getUser(); try { - int bufferSize = 65536; - byte[] buf = new byte[bufferSize]; - LogToolUtils.outputAggregatedContainerLog(nmContext.getConf(), - appId, appOwner, containerId.toString(), - nmContext.getNodeId().toString(), outputFileName, bytes, - os, buf); + ContainerLogsRequest logRequest = new ContainerLogsRequest(); + logRequest.setAppId(appId); + logRequest.setAppOwner(appOwner); + logRequest.setContainerId(containerId.toString()); + logRequest.setNodeId(nmContext.getNodeId().toString()); + logRequest.setBytes(bytes); + Set logTypes = new HashSet<>(); + logTypes.add(outputFileName); + logRequest.setLogTypes(logTypes); + factory.getFileControllerForRead(appId, appOwner) + .readAggregatedLogs(logRequest, os); } catch (Exception ex) { // Something wrong when we try to access the aggregated log. if (LOG.isDebugEnabled()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java index bedad33ceabd2..e13c805f1e34c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; -import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; import org.apache.hadoop.yarn.server.api.ContainerLogContext; import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.Context; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index a3e01afcbc795..05cdd49db3ac2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -105,7 +105,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; -import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;