Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
// These variables are only for testing
private final AtomicBoolean waiting = new AtomicBoolean(false);
private int logAggregationTimes = 0;
private int cleanupOldLogTimes = 0;
private long logFileSizeThreshold;
private boolean renameTemporaryLogFileFailed = false;

Expand Down Expand Up @@ -196,10 +195,16 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
}
boolean logAggregationInRolling =
rollingMonitorInterval <= 0 || this.logAggregationContext == null
|| this.logAggregationContext.getRolledLogsIncludePattern() == null
|| this.logAggregationContext.getRolledLogsIncludePattern()
.isEmpty() ? false : true;
rollingMonitorInterval > 0 && this.logAggregationContext != null
&& this.logAggregationContext.getRolledLogsIncludePattern() != null
&& !this.logAggregationContext.getRolledLogsIncludePattern()
.isEmpty();
if (logAggregationInRolling) {
LOG.info("Rolling mode is turned on with include pattern {}",
this.logAggregationContext.getRolledLogsIncludePattern());
} else {
LOG.debug("Rolling mode is turned off");
}
logControllerContext = new LogAggregationFileControllerContext(
this.remoteNodeLogFileForApp,
this.remoteNodeTmpLogFileForApp,
Expand Down Expand Up @@ -299,11 +304,13 @@ private void uploadLogsForContainers(boolean appFinished)
}

if (pendingContainerInThisCycle.isEmpty()) {
LOG.debug("No pending container in this cycle");
sendLogAggregationReport(true, "", appFinished);
return;
}

logAggregationTimes++;
LOG.debug("Cycle #{} of log aggregator", logAggregationTimes);
String diagnosticMessage = "";
boolean logAggregationSucceedInThisCycle = true;
DeletionTask deletionTask = null;
Expand Down Expand Up @@ -331,6 +338,8 @@ private void uploadLogsForContainers(boolean appFinished)
appFinished, finishedContainers.contains(container));
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
LOG.trace("Uploaded the following files for {}: {}",
container, uploadedFilePathsInThisCycle.toString());
List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -386,6 +395,13 @@ private void uploadLogsForContainers(boolean appFinished)
if (logAggregationSucceedInThisCycle && deletionTask != null) {
delService.delete(deletionTask);
}
if (!diagnosticMessage.isEmpty()) {
LOG.debug("Sending log aggregation report along with the " +
"following diagnostic message:\"{}\"", diagnosticMessage);
}
if (!logAggregationSucceedInThisCycle) {
LOG.warn("Log aggregation did not succeed in this cycle");
}
sendLogAggregationReport(logAggregationSucceedInThisCycle,
diagnosticMessage, appFinished);
if (exc != null) {
Expand Down Expand Up @@ -522,14 +538,17 @@ private void doAppLogAggregationPostCleanUp() {
lfs.getFileStatus(logPath);
localAppLogDirs.add(logPath);
} catch (UnsupportedFileSystemException ue) {
LOG.warn("Log dir " + rootLogDir + "is an unsupported file system", ue);
LOG.warn("Log dir {} is in an unsupported file system", rootLogDir,
ue);
continue;
} catch (IOException fe) {
LOG.warn("An exception occurred while getting file information", fe);
continue;
}
}

if (localAppLogDirs.size() > 0) {
LOG.debug("Cleaning up {} files", localAppLogDirs.size());
List<Path> localAppLogDirsList = new ArrayList<>();
localAppLogDirsList.addAll(localAppLogDirs);
DeletionTask deletionTask = new FileDeletionTask(delService,
Expand Down Expand Up @@ -672,17 +691,6 @@ public UserGroupInformation updateCredentials(Credentials cred) {
return userUgi;
}

@Private
@VisibleForTesting
public int getLogAggregationTimes() {
return this.logAggregationTimes;
}

@VisibleForTesting
int getCleanupOldLogTimes() {
return this.cleanupOldLogTimes;
}

@VisibleForTesting
public LogAggregationFileController getLogAggregationFileController() {
return this.logAggregationFileController;
Expand Down