Skip to content

Commit

Permalink
YARN-7891. LogAggregationIndexedFileController should support read fr…
Browse files Browse the repository at this point in the history
…om HAR file. (Xuan Gong via wangda)

Change-Id: I16e081f21c5f80160564c49cc49d103bd8eb7e16
  • Loading branch information
wangdatan committed Mar 7, 2018
1 parent e718ac5 commit 583f459
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 19 deletions.
4 changes: 4 additions & 0 deletions hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
Expand Up @@ -249,6 +249,10 @@
<exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude> <exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude>
<exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude> <exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude>
<exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude> <exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude>
<exclude>src/test/resources/application_123456_0001.har/_index</exclude>
<exclude>src/test/resources/application_123456_0001.har/part-0</exclude>
<exclude>src/test/resources/application_123456_0001.har/_masterindex</exclude>
<exclude>src/test/resources/application_123456_0001.har/_SUCCESS</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>
Expand Down
Expand Up @@ -495,16 +495,21 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
boolean getAllContainers = (containerIdStr == null boolean getAllContainers = (containerIdStr == null
|| containerIdStr.isEmpty()); || containerIdStr.isEmpty());
long size = logRequest.getBytes(); long size = logRequest.getBytes();
List<FileStatus> nodeFiles = LogAggregationUtils RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(), .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(),
this.remoteRootLogDir, this.remoteRootLogDirSuffix); this.remoteRootLogDir, this.remoteRootLogDirSuffix);
if (nodeFiles.isEmpty()) { if (!nodeFiles.hasNext()) {
throw new IOException("There is no available log fils for " throw new IOException("There is no available log fils for "
+ "application:" + appId); + "application:" + appId);
} }
Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles); List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
if (allFiles.isEmpty()) {
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
List<FileStatus> fileToRead = getNodeLogFileToRead( List<FileStatus> fileToRead = getNodeLogFileToRead(
nodeFiles, nodeIdStr, appId); allFiles, nodeIdStr, appId);
byte[] buf = new byte[65535]; byte[] buf = new byte[65535];
for (FileStatus thisNodeFile : fileToRead) { for (FileStatus thisNodeFile : fileToRead) {
String nodeName = thisNodeFile.getPath().getName(); String nodeName = thisNodeFile.getPath().getName();
Expand Down Expand Up @@ -609,16 +614,21 @@ public List<ContainerLogMeta> readAggregatedLogsMeta(
containerIdStr.isEmpty()); containerIdStr.isEmpty());
String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
: LogAggregationUtils.getNodeString(nodeId); : LogAggregationUtils.getNodeString(nodeId);
List<FileStatus> nodeFiles = LogAggregationUtils RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir, .getRemoteNodeFileDir(conf, appId, appOwner, this.remoteRootLogDir,
this.remoteRootLogDirSuffix); this.remoteRootLogDirSuffix);
if (nodeFiles.isEmpty()) { if (!nodeFiles.hasNext()) {
throw new IOException("There is no available log fils for " throw new IOException("There is no available log fils for "
+ "application:" + appId); + "application:" + appId);
} }
Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles); List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
if (allFiles.isEmpty()) {
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
List<FileStatus> fileToRead = getNodeLogFileToRead( List<FileStatus> fileToRead = getNodeLogFileToRead(
nodeFiles, nodeIdStr, appId); allFiles, nodeIdStr, appId);
for(FileStatus thisNodeFile : fileToRead) { for(FileStatus thisNodeFile : fileToRead) {
try { try {
Long checkSumIndex = checkSumFiles.get( Long checkSumIndex = checkSumFiles.get(
Expand Down Expand Up @@ -727,21 +737,33 @@ public List<FileStatus> getNodeLogFileToRead(
List<FileStatus> nodeFiles, String nodeId, ApplicationId appId) List<FileStatus> nodeFiles, String nodeId, ApplicationId appId)
throws IOException { throws IOException {
List<FileStatus> listOfFiles = new ArrayList<>(); List<FileStatus> listOfFiles = new ArrayList<>();
List<FileStatus> files = new ArrayList<>(nodeFiles); for (FileStatus thisNodeFile : nodeFiles) {
for (FileStatus file : files) { String nodeName = thisNodeFile.getPath().getName();
String nodeName = file.getPath().getName();
if ((nodeId == null || nodeId.isEmpty() if ((nodeId == null || nodeId.isEmpty()
|| nodeName.contains(LogAggregationUtils || nodeName.contains(LogAggregationUtils
.getNodeString(nodeId))) && !nodeName.endsWith( .getNodeString(nodeId))) && !nodeName.endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX) && LogAggregationUtils.TMP_FILE_SUFFIX) &&
!nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) { !nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
if (nodeName.equals(appId + ".har")) { listOfFiles.add(thisNodeFile);
Path p = new Path("har:///" + file.getPath().toUri().getRawPath()); }
files = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p)); }
continue; return listOfFiles;
} }
listOfFiles.add(file);
private List<FileStatus> getAllNodeFiles(
RemoteIterator<FileStatus> nodeFiles, ApplicationId appId)
throws IOException {
List<FileStatus> listOfFiles = new ArrayList<>();
while (nodeFiles != null && nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
String nodeName = thisNodeFile.getPath().getName();
if (nodeName.equals(appId + ".har")) {
Path p = new Path("har:///"
+ thisNodeFile.getPath().toUri().getRawPath());
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
continue;
} }
listOfFiles.add(thisNodeFile);
} }
return listOfFiles; return listOfFiles;
} }
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile; package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;


import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
Expand All @@ -27,6 +28,7 @@
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.io.Writer; import java.io.Writer;
import java.net.URL;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
Expand Down Expand Up @@ -364,6 +366,58 @@ public boolean isRollover(final FileContext fc,
sysOutStream.reset(); sysOutStream.reset();
} }


@Test(timeout = 15000)
public void testFetchApplictionLogsHar() throws Exception {
List<String> newLogTypes = new ArrayList<>();
newLogTypes.add("syslog");
newLogTypes.add("stdout");
newLogTypes.add("stderr");
newLogTypes.add("test1");
newLogTypes.add("test2");
URL harUrl = ClassLoader.getSystemClassLoader()
.getResource("application_123456_0001.har");
assertNotNull(harUrl);

Path path = new Path(remoteLogDir + "/" + USER_UGI.getShortUserName()
+ "/logs/application_123456_0001");
if (fs.exists(path)) {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
Path harPath = new Path(path, "application_123456_0001.har");
fs.copyFromLocalFile(false, new Path(harUrl.toURI()), harPath);
assertTrue(fs.exists(harPath));
LogAggregationIndexedFileController fileFormat
= new LogAggregationIndexedFileController();
fileFormat.initialize(conf, "Indexed");
ContainerLogsRequest logRequest = new ContainerLogsRequest();
logRequest.setAppId(appId);
logRequest.setNodeId(nodeId.toString());
logRequest.setAppOwner(USER_UGI.getShortUserName());
logRequest.setContainerId(containerId.toString());
logRequest.setBytes(Long.MAX_VALUE);
List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta(
logRequest);
Assert.assertEquals(meta.size(), 3);
List<String> fileNames = new ArrayList<>();
for (ContainerLogMeta log : meta) {
Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
fileNames.add(file.getFileName());
}
}
fileNames.removeAll(newLogTypes);
Assert.assertTrue(fileNames.isEmpty());
boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
Assert.assertTrue(foundLogs);
for (String logType : newLogTypes) {
Assert.assertTrue(sysOutStream.toString().contains(logMessage(
containerId, logType)));
}
sysOutStream.reset();
}

private File createAndWriteLocalLogFile(ContainerId containerId, private File createAndWriteLocalLogFile(ContainerId containerId,
Path localLogDir, String logType) throws IOException { Path localLogDir, String logType) throws IOException {
File file = new File(localLogDir.toString(), logType); File file = new File(localLogDir.toString(), logType);
Expand Down
@@ -0,0 +1,3 @@
%2F dir 1517728311922+493+xuan+supergroup 0 0 localhost_9999_1517727665265 localhost_9999_1517727668513
%2Flocalhost_9999_1517727665265 file part-0 0 2895 1517728301581+420+xuan+supergroup
%2Flocalhost_9999_1517727668513 file part-0 2895 1228 1517728311919+420+xuan+supergroup
@@ -0,0 +1,2 @@
3
0 1897968749 0 280
Binary file not shown.

0 comments on commit 583f459

Please sign in to comment.