Navigation Menu

Skip to content

Commit

Permalink
YARN-3154. Added additional APIs in LogAggregationContext to avoid ag…
Browse files Browse the repository at this point in the history
…gregating running logs of application when rolling is enabled. Contributed by Xuan Gong.
  • Loading branch information
vinoduec committed Mar 12, 2015
1 parent b49c3a1 commit 863079b
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 48 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -757,6 +757,9 @@ Release 2.7.0 - UNRELEASED


YARN-3338. Exclude jline dependency from YARN. (Zhijie Shen via xgong) YARN-3338. Exclude jline dependency from YARN. (Zhijie Shen via xgong)


YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating
running logs of application when rolling is enabled. (Xuan Gong via vinodkv)

Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18


INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -32,11 +32,20 @@
* <ul> * <ul>
* <li>includePattern. It uses Java Regex to filter the log files * <li>includePattern. It uses Java Regex to filter the log files
* which match the defined include pattern and those log files * which match the defined include pattern and those log files
* will be uploaded. </li> * will be uploaded when the application finishes. </li>
* <li>excludePattern. It uses Java Regex to filter the log files * <li>excludePattern. It uses Java Regex to filter the log files
* which match the defined exclude pattern and those log files * which match the defined exclude pattern and those log files
* will not be uploaded. If the log file name matches both the * will not be uploaded when application finishes. If the log file
* include and the exclude pattern, this file will be excluded eventually</li> * name matches both the include and the exclude pattern, this file
* will be excluded eventually</li>
* <li>rolledLogsIncludePattern. It uses Java Regex to filter the log files
* which match the defined include pattern and those log files
* will be aggregated in a rolling fashion.</li>
* <li>rolledLogsExcludePattern. It uses Java Regex to filter the log files
* which match the defined exclude pattern and those log files
* will not be aggregated in a rolling fashion. If the log file
* name matches both the include and the exclude pattern, this file
* will be excluded eventually</li>
* </ul> * </ul>
* </p> * </p>
* *
Expand All @@ -57,8 +66,23 @@ public static LogAggregationContext newInstance(String includePattern,
return context; return context;
} }


@Public
@Unstable
public static LogAggregationContext newInstance(String includePattern,
String excludePattern, String rolledLogsIncludePattern,
String rolledLogsExcludePattern) {
LogAggregationContext context =
Records.newRecord(LogAggregationContext.class);
context.setIncludePattern(includePattern);
context.setExcludePattern(excludePattern);
context.setRolledLogsIncludePattern(rolledLogsIncludePattern);
context.setRolledLogsExcludePattern(rolledLogsExcludePattern);
return context;
}

/** /**
* Get include pattern * Get include pattern. This includePattern only takes affect
* on logs that exist at the time of application finish.
* *
* @return include pattern * @return include pattern
*/ */
Expand All @@ -67,7 +91,8 @@ public static LogAggregationContext newInstance(String includePattern,
public abstract String getIncludePattern(); public abstract String getIncludePattern();


/** /**
* Set include pattern * Set include pattern. This includePattern only takes affect
* on logs that exist at the time of application finish.
* *
* @param includePattern * @param includePattern
*/ */
Expand All @@ -76,7 +101,8 @@ public static LogAggregationContext newInstance(String includePattern,
public abstract void setIncludePattern(String includePattern); public abstract void setIncludePattern(String includePattern);


/** /**
* Get exclude pattern * Get exclude pattern. This excludePattern only takes affect
* on logs that exist at the time of application finish.
* *
* @return exclude pattern * @return exclude pattern
*/ */
Expand All @@ -85,11 +111,50 @@ public static LogAggregationContext newInstance(String includePattern,
public abstract String getExcludePattern(); public abstract String getExcludePattern();


/** /**
* Set exclude pattern * Set exclude pattern. This excludePattern only takes affect
* on logs that exist at the time of application finish.
* *
* @param excludePattern * @param excludePattern
*/ */
@Public @Public
@Unstable @Unstable
public abstract void setExcludePattern(String excludePattern); public abstract void setExcludePattern(String excludePattern);

/**
* Get include pattern in a rolling fashion.
*
* @return include pattern
*/
@Public
@Unstable
public abstract String getRolledLogsIncludePattern();

/**
* Set include pattern in a rolling fashion.
*
* @param rolledLogsIncludePattern
*/
@Public
@Unstable
public abstract void setRolledLogsIncludePattern(
String rolledLogsIncludePattern);

/**
* Get exclude pattern for aggregation in a rolling fashion.
*
* @return exclude pattern
*/
@Public
@Unstable
public abstract String getRolledLogsExcludePattern();

/**
* Set exclude pattern for in a rolling fashion.
*
* @param rolledLogsExcludePattern
*/
@Public
@Unstable
public abstract void setRolledLogsExcludePattern(
String rolledLogsExcludePattern);
} }
Expand Up @@ -314,6 +314,8 @@ message ApplicationSubmissionContextProto {
message LogAggregationContextProto { message LogAggregationContextProto {
optional string include_pattern = 1 [default = ".*"]; optional string include_pattern = 1 [default = ".*"];
optional string exclude_pattern = 2 [default = ""]; optional string exclude_pattern = 2 [default = ""];
optional string rolled_logs_include_pattern = 3 [default = ""];
optional string rolled_logs_exclude_pattern = 4 [default = ".*"];
} }


enum ApplicationAccessTypeProto { enum ApplicationAccessTypeProto {
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder;

import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat;


public class LogAggregationContextPBImpl extends LogAggregationContext{ public class LogAggregationContextPBImpl extends LogAggregationContext{
Expand Down Expand Up @@ -116,4 +117,42 @@ public void setExcludePattern(String excludePattern) {
} }
builder.setExcludePattern(excludePattern); builder.setExcludePattern(excludePattern);
} }

@Override
public String getRolledLogsIncludePattern() {
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
if (! p.hasRolledLogsIncludePattern()) {
return null;
}
return p.getRolledLogsIncludePattern();
}

@Override
public void setRolledLogsIncludePattern(String rolledLogsIncludePattern) {
maybeInitBuilder();
if (rolledLogsIncludePattern == null) {
builder.clearRolledLogsIncludePattern();
return;
}
builder.setRolledLogsIncludePattern(rolledLogsIncludePattern);
}

@Override
public String getRolledLogsExcludePattern() {
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
if (! p.hasRolledLogsExcludePattern()) {
return null;
}
return p.getRolledLogsExcludePattern();
}

@Override
public void setRolledLogsExcludePattern(String rolledLogsExcludePattern) {
maybeInitBuilder();
if (rolledLogsExcludePattern == null) {
builder.clearRolledLogsExcludePattern();
return;
}
builder.setRolledLogsExcludePattern(rolledLogsExcludePattern);
}
} }
Expand Up @@ -167,17 +167,18 @@ public static class LogValue {
private Set<File> uploadedFiles = new HashSet<File>(); private Set<File> uploadedFiles = new HashSet<File>();
private final Set<String> alreadyUploadedLogFiles; private final Set<String> alreadyUploadedLogFiles;
private Set<String> allExistingFileMeta = new HashSet<String>(); private Set<String> allExistingFileMeta = new HashSet<String>();
private final boolean appFinished;
// TODO Maybe add a version string here. Instead of changing the version of // TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format // the entire k-v format


public LogValue(List<String> rootLogDirs, ContainerId containerId, public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user) { String user) {
this(rootLogDirs, containerId, user, null, new HashSet<String>()); this(rootLogDirs, containerId, user, null, new HashSet<String>(), true);
} }


public LogValue(List<String> rootLogDirs, ContainerId containerId, public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user, LogAggregationContext logAggregationContext, String user, LogAggregationContext logAggregationContext,
Set<String> alreadyUploadedLogFiles) { Set<String> alreadyUploadedLogFiles, boolean appFinished) {
this.rootLogDirs = new ArrayList<String>(rootLogDirs); this.rootLogDirs = new ArrayList<String>(rootLogDirs);
this.containerId = containerId; this.containerId = containerId;
this.user = user; this.user = user;
Expand All @@ -186,6 +187,7 @@ public LogValue(List<String> rootLogDirs, ContainerId containerId,
Collections.sort(this.rootLogDirs); Collections.sort(this.rootLogDirs);
this.logAggregationContext = logAggregationContext; this.logAggregationContext = logAggregationContext;
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
this.appFinished = appFinished;
} }


private Set<File> getPendingLogFilesToUploadForThisContainer() { private Set<File> getPendingLogFilesToUploadForThisContainer() {
Expand Down Expand Up @@ -296,17 +298,15 @@ private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
} }


if (this.logAggregationContext != null && candidates.size() > 0) { if (this.logAggregationContext != null && candidates.size() > 0) {
if (this.logAggregationContext.getIncludePattern() != null filterFiles(
&& !this.logAggregationContext.getIncludePattern().isEmpty()) { this.appFinished ? this.logAggregationContext.getIncludePattern()
filterFiles(this.logAggregationContext.getIncludePattern(), : this.logAggregationContext.getRolledLogsIncludePattern(),
candidates, false); candidates, false);
}


if (this.logAggregationContext.getExcludePattern() != null filterFiles(
&& !this.logAggregationContext.getExcludePattern().isEmpty()) { this.appFinished ? this.logAggregationContext.getExcludePattern()
filterFiles(this.logAggregationContext.getExcludePattern(), : this.logAggregationContext.getRolledLogsExcludePattern(),
candidates, true); candidates, true);
}


Iterable<File> mask = Iterable<File> mask =
Iterables.filter(candidates, new Predicate<File>() { Iterables.filter(candidates, new Predicate<File>() {
Expand All @@ -323,14 +323,15 @@ public boolean apply(File next) {


private void filterFiles(String pattern, Set<File> candidates, private void filterFiles(String pattern, Set<File> candidates,
boolean exclusion) { boolean exclusion) {
Pattern filterPattern = if (pattern != null && !pattern.isEmpty()) {
Pattern.compile(pattern); Pattern filterPattern = Pattern.compile(pattern);
for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
.hasNext();) { .hasNext();) {
File candidate = candidatesItr.next(); File candidate = candidatesItr.next();
boolean match = filterPattern.matcher(candidate.getName()).find(); boolean match = filterPattern.matcher(candidate.getName()).find();
if ((!match && !exclusion) || (match && exclusion)) { if ((!match && !exclusion) || (match && exclusion)) {
candidatesItr.remove(); candidatesItr.remove();
}
} }
} }
} }
Expand Down
Expand Up @@ -44,7 +44,6 @@
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
Expand Down Expand Up @@ -116,6 +115,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final Context context; private final Context context;
private final int retentionSize; private final int retentionSize;
private final long rollingMonitorInterval; private final long rollingMonitorInterval;
private final boolean logAggregationInRolling;
private final NodeId nodeId; private final NodeId nodeId;
// This variable is only for testing // This variable is only for testing
private final AtomicBoolean waiting = new AtomicBoolean(false); private final AtomicBoolean waiting = new AtomicBoolean(false);
Expand Down Expand Up @@ -193,9 +193,14 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
} }
this.rollingMonitorInterval = configuredRollingMonitorInterval; this.rollingMonitorInterval = configuredRollingMonitorInterval;
} }
this.logAggregationInRolling =
this.rollingMonitorInterval <= 0 || this.logAggregationContext == null
|| this.logAggregationContext.getRolledLogsIncludePattern() == null
|| this.logAggregationContext.getRolledLogsIncludePattern()
.isEmpty() ? false : true;
} }


private void uploadLogsForContainers() { private void uploadLogsForContainers(boolean appFinished) {
if (this.logAggregationDisabled) { if (this.logAggregationDisabled) {
return; return;
} }
Expand Down Expand Up @@ -262,7 +267,7 @@ private void uploadLogsForContainers() {
containerLogAggregators.put(container, aggregator); containerLogAggregators.put(container, aggregator);
} }
Set<Path> uploadedFilePathsInThisCycle = Set<Path> uploadedFilePathsInThisCycle =
aggregator.doContainerLogAggregation(writer); aggregator.doContainerLogAggregation(writer, appFinished);
if (uploadedFilePathsInThisCycle.size() > 0) { if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true; uploadedLogsInThisCycle = true;
} }
Expand Down Expand Up @@ -394,12 +399,12 @@ private void doAppLogAggregation() {
synchronized(this) { synchronized(this) {
try { try {
waiting.set(true); waiting.set(true);
if (this.rollingMonitorInterval > 0) { if (logAggregationInRolling) {
wait(this.rollingMonitorInterval * 1000); wait(this.rollingMonitorInterval * 1000);
if (this.appFinishing.get() || this.aborted.get()) { if (this.appFinishing.get() || this.aborted.get()) {
break; break;
} }
uploadLogsForContainers(); uploadLogsForContainers(false);
} else { } else {
wait(THREAD_SLEEP_TIME); wait(THREAD_SLEEP_TIME);
} }
Expand All @@ -415,7 +420,7 @@ private void doAppLogAggregation() {
} }


// App is finished, upload the container logs. // App is finished, upload the container logs.
uploadLogsForContainers(); uploadLogsForContainers(true);


// Remove the local app-log-dirs // Remove the local app-log-dirs
List<Path> localAppLogDirs = new ArrayList<Path>(); List<Path> localAppLogDirs = new ArrayList<Path>();
Expand Down Expand Up @@ -536,15 +541,16 @@ public ContainerLogAggregator(ContainerId containerId) {
this.containerId = containerId; this.containerId = containerId;
} }


public Set<Path> doContainerLogAggregation(LogWriter writer) { public Set<Path> doContainerLogAggregation(LogWriter writer,
boolean appFinished) {
LOG.info("Uploading logs for container " + containerId LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are " + ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirs())); + StringUtils.join(",", dirsHandler.getLogDirs()));
final LogKey logKey = new LogKey(containerId); final LogKey logKey = new LogKey(containerId);
final LogValue logValue = final LogValue logValue =
new LogValue(dirsHandler.getLogDirs(), containerId, new LogValue(dirsHandler.getLogDirs(), containerId,
userUgi.getShortUserName(), logAggregationContext, userUgi.getShortUserName(), logAggregationContext,
this.uploadedFileMeta); this.uploadedFileMeta, appFinished);
try { try {
writer.append(logKey, logValue); writer.append(logKey, logValue);
} catch (Exception e) { } catch (Exception e) {
Expand Down
Expand Up @@ -130,8 +130,10 @@ public void testApplicationRecovery() throws Exception {
containerTokens, acls); containerTokens, acls);
// create the logAggregationContext // create the logAggregationContext
LogAggregationContext logAggregationContext = LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance("includePattern", "excludePattern"); LogAggregationContext.newInstance("includePattern", "excludePattern",
StartContainersResponse startResponse = startContainer(context, cm, cid, "includePatternInRollingAggregation",
"excludePatternInRollingAggregation");
StartContainersResponse startResponse = startContainer(context, cm, cid,
clc, logAggregationContext); clc, logAggregationContext);
assertTrue(startResponse.getFailedRequests().isEmpty()); assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size()); assertEquals(1, context.getApplications().size());
Expand Down Expand Up @@ -171,6 +173,10 @@ public void testApplicationRecovery() throws Exception {
recovered.getIncludePattern()); recovered.getIncludePattern());
assertEquals(logAggregationContext.getExcludePattern(), assertEquals(logAggregationContext.getExcludePattern(),
recovered.getExcludePattern()); recovered.getExcludePattern());
assertEquals(logAggregationContext.getRolledLogsIncludePattern(),
recovered.getRolledLogsIncludePattern());
assertEquals(logAggregationContext.getRolledLogsExcludePattern(),
recovered.getRolledLogsExcludePattern());


waitForAppState(app, ApplicationState.INITING); waitForAppState(app, ApplicationState.INITING);
assertTrue(context.getApplicationACLsManager().checkAccess( assertTrue(context.getApplicationACLsManager().checkAccess(
Expand Down

0 comments on commit 863079b

Please sign in to comment.