Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ protected LogHandler createLogHandler(Configuration conf, Context context,
if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
return new LogAggregationService(this.dispatcher, context,
deletionService, dirsHandler);
deletionService, dirsHandler, metrics);
} else {
return new NonAggregatingLogHandler(this.dispatcher, deletionService,
dirsHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ public interface AppLogAggregator extends Runnable {
boolean isAggregationEnabled();

UserGroupInformation updateCredentials(Credentials cred);

boolean isLogAggregationInRolling();

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times;

Expand Down Expand Up @@ -117,6 +118,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {

private final LogAggregationFileController logAggregationFileController;

private long appFinishedTime;

/**
* The value recovered from state store to determine the age of application
Expand All @@ -125,16 +127,18 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
*/
private final long recoveredLogInitedTime;

protected final NodeManagerMetrics metrics;

public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf,
ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long rollingMonitorInterval) {
FileContext lfs, long rollingMonitorInterval, NodeManagerMetrics metrics) {
this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
dirsHandler, remoteNodeLogFileForApp, appAcls,
logAggregationContext, context, lfs, rollingMonitorInterval, -1, null);
logAggregationContext, context, lfs, rollingMonitorInterval, -1, null, metrics);
}

public AppLogAggregatorImpl(Dispatcher dispatcher,
Expand All @@ -144,11 +148,11 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long rollingMonitorInterval,
long recoveredLogInitedTime) {
long recoveredLogInitedTime, NodeManagerMetrics metrics) {
this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
dirsHandler, remoteNodeLogFileForApp, appAcls,
logAggregationContext, context, lfs, rollingMonitorInterval,
recoveredLogInitedTime, null);
recoveredLogInitedTime, null, metrics);
}

public AppLogAggregatorImpl(Dispatcher dispatcher,
Expand All @@ -159,7 +163,8 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long rollingMonitorInterval,
long recoveredLogInitedTime,
LogAggregationFileController logAggregationFileController) {
LogAggregationFileController logAggregationFileController,
NodeManagerMetrics metrics) {
this.dispatcher = dispatcher;
this.conf = conf;
this.delService = deletionService;
Expand All @@ -185,6 +190,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
this.logFileSizeThreshold =
conf.getLong(YarnConfiguration.LOG_AGGREGATION_DEBUG_FILESIZE,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_DEBUG_FILESIZE);
this.metrics = metrics;
if (logAggregationFileController == null) {
// by default, use T-File Controller
this.logAggregationFileController = new LogAggregationTFileController();
Expand Down Expand Up @@ -220,6 +226,11 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
this.appId, this.appAcls, this.nodeId, this.userUgi);
}

@Override
public boolean isLogAggregationInRolling(){
return this.logControllerContext.isLogAggregationInRolling();
}

private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
ContainerLogAggregationPolicy policy = getLogAggPolicyInstance(conf);
String params = getLogAggPolicyParameters(conf);
Expand Down Expand Up @@ -449,6 +460,8 @@ private void sendLogAggregationReport(
? LogAggregationStatus.FAILED
: LogAggregationStatus.SUCCEEDED;
sendLogAggregationReportInternal(finalLogAggregationStatus, "", true);

this.metrics.reportLogAggregationStatus(finalLogAggregationStatus == LogAggregationStatus.SUCCEEDED);
}
}

Expand Down Expand Up @@ -476,6 +489,7 @@ public void run() {
// loss of logs
LOG.error("Error occurred while aggregating the log for the application "
+ appId, e);

} catch (Exception e) {
// do post clean up of log directories on any other exception
LOG.error("Error occurred while aggregating the log for the application "
Expand All @@ -489,6 +503,7 @@ public void run() {
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
}
this.appAggregationFinished.set(true);
LOG.info("Releasing thread for " + applicationId);
}
}

Expand Down Expand Up @@ -523,6 +538,9 @@ private void doAppLogAggregation() throws LogAggregationDFSException {
try {
// App is finished, upload the container logs.
uploadLogsForContainers(true);
LOG.info("Starting log upload for " +this.applicationId);
// App is finished, upload the container logs.
uploadLogsForContainers(true);

doAppLogAggregationPostCleanUp();
} catch (LogAggregationDFSException e) {
Expand Down Expand Up @@ -589,6 +607,7 @@ public void startContainerLogAggregation(ContainerLogContext logContext) {
public synchronized void finishLogAggregation() {
LOG.info("Application just finished : " + this.applicationId);
this.appFinishing.set(true);
this.appFinishedTime = System.currentTimeMillis();
this.notifyAll();
}

Expand Down Expand Up @@ -656,6 +675,7 @@ public Set<Path> doContainerLogAggregation(
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirsForRead()));
long startTime = System.currentTimeMillis();
final LogKey logKey = new LogKey(containerId);
final LogValue logValue =
new LogValue(dirsHandler.getLogDirsForRead(), containerId,
Expand All @@ -664,9 +684,12 @@ public Set<Path> doContainerLogAggregation(
containerFinished);
try {
logAggregationFileController.write(logKey, logValue);
metrics.successfulContainerLogUpload();
} catch (Exception e) {
LOG.error("Couldn't upload logs for " + containerId
+ ". Skipping this container.", e);
metrics.failedContainerLogUpload();

return new HashSet<Path>();
}
this.uploadedFileMeta.addAll(logValue
Expand All @@ -676,6 +699,12 @@ public Set<Path> doContainerLogAggregation(
this.uploadedFileMeta = uploadedFileMeta.stream().filter(
next -> logValue.getAllExistingFilesMeta().contains(next)).collect(
Collectors.toSet());

long logUploadTime = System.currentTimeMillis() - startTime;
LOG.info("Container " + containerId
+ " log aggregation is completed in " + logUploadTime + "ms");
metrics.addLogUploadDuration(logUploadTime);

// need to return files uploaded or older-than-retention clean up.
return Sets.union(logValue.getCurrentUpLoadedFilesPath(),
logValue.getObsoleteRetentionLogFiles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;


import org.apache.hadoop.classification.VisibleForTesting;
Expand All @@ -77,6 +78,7 @@ public class LogAggregationService extends AbstractService implements
private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
= YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
private long rollingMonitorInterval;
private final NodeManagerMetrics nmMetrics;

private final Context context;
private final DeletionService deletionService;
Expand All @@ -86,22 +88,27 @@ public class LogAggregationService extends AbstractService implements
private NodeId nodeId;

private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
private final ConcurrentMap<ApplicationId, Runnable> appLogAggregatorWrappers;
private final ConcurrentMap<ApplicationId, Long> timeStartedWaitingForThread;

// Holds applications whose aggregation is disable due to invalid Token
private final Set<ApplicationId> invalidTokenApps;

@VisibleForTesting
ExecutorService threadPool;

public LogAggregationService(Dispatcher dispatcher, Context context,
DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
DeletionService deletionService, LocalDirsHandlerService dirsHandler, NodeManagerMetrics nmMetrics) {
super(LogAggregationService.class.getName());
this.dispatcher = dispatcher;
this.context = context;
this.deletionService = deletionService;
this.dirsHandler = dirsHandler;
this.nmMetrics = nmMetrics;
this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
this.appLogAggregatorWrappers = new ConcurrentHashMap<ApplicationId, Runnable>();
this.timeStartedWaitingForThread = new ConcurrentHashMap<ApplicationId, Long>();
this.invalidTokenApps = ConcurrentHashMap.newKeySet();
}

Expand Down Expand Up @@ -170,14 +177,14 @@ protected void serviceStart() throws Exception {
this.nodeId = this.context.getNodeId();
super.serviceStart();
}

@Override
protected void serviceStop() throws Exception {
LOG.info(this.getName() + " waiting for pending aggregation during exit");
stopAggregators();
super.serviceStop();
}

private void stopAggregators() {
threadPool.shutdown();
boolean supervised = getConfig().getBoolean(
Expand Down Expand Up @@ -213,25 +220,28 @@ private void stopAggregators() {
}
}

@SuppressWarnings("unchecked")
private void initApp(final ApplicationId appId, String user,
Credentials credentials, Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext,
long recoveredLogInitedTime) {
ApplicationEvent eventResponse;

try {
initAppAggregator(appId, user, credentials, appAcls,
logAggregationContext, recoveredLogInitedTime);
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
if(appLogAggregators.get(appId) != null && appLogAggregators.get(appId).isLogAggregationInRolling()) {
scheduleAggregator(appId);
}
} catch (YarnRuntimeException e) {
LOG.warn("Application failed to init aggregation", e);
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
}
this.dispatcher.getEventHandler().handle(eventResponse);
}

FileContext getLocalFileContext(Configuration conf) {
try {
return FileContext.getLocalFSFileContext(conf);
Expand Down Expand Up @@ -262,7 +272,7 @@ protected void initAppAggregator(final ApplicationId appId, String user,
logAggregationFileController.getRemoteNodeLogFileForApp(appId,
user, nodeId), appAcls, logAggregationContext, this.context,
getLocalFileContext(getConfig()), this.rollingMonitorInterval,
recoveredLogInitedTime, logAggregationFileController);
recoveredLogInitedTime, logAggregationFileController, nmMetrics);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnRuntimeException("Duplicate initApp for " + appId);
}
Expand Down Expand Up @@ -291,15 +301,31 @@ protected void initAppAggregator(final ApplicationId appId, String user,
// Schedule the aggregator.
Runnable aggregatorWrapper = new Runnable() {
public void run() {
nmMetrics.appStartedLogAggr();
long threadStartTime = System.currentTimeMillis();
long threadWaitTime = threadStartTime - timeStartedWaitingForThread.get(appId);
nmMetrics.addLogAggregationThreadWaitTime(threadWaitTime);

try {
appLogAggregator.run();
} finally {
appLogAggregators.remove(appId);
closeFileSystems(userUgi);
appLogAggregatorWrappers.remove(appId);
timeStartedWaitingForThread.remove(appId);

// set metrics
nmMetrics.appLogAggregationFinished();
long threadEndTime = System.currentTimeMillis();
long threadHoldTime = threadEndTime- threadStartTime;
nmMetrics.addLogAggregationThreadHoldTime(threadHoldTime);
}
}
};
this.threadPool.execute(aggregatorWrapper);

if (this.appLogAggregatorWrappers.putIfAbsent(appId, aggregatorWrapper) != null) {
throw new YarnRuntimeException("Duplicate runnable for " + appId);
}

if (appDirException != null) {
throw appDirException;
Expand Down Expand Up @@ -351,6 +377,9 @@ private void stopApp(ApplicationId appId) {
return;
}
aggregator.finishLogAggregation();
if(!aggregator.isLogAggregationInRolling()) {
scheduleAggregator(appId);
}
} finally {
// Remove invalid Token Apps
invalidTokenApps.remove(appId);
Expand All @@ -363,6 +392,7 @@ public void handle(LogHandlerEvent event) {
case APPLICATION_STARTED:
LogHandlerAppStartedEvent appStartEvent =
(LogHandlerAppStartedEvent) event;
LOG.info("LogAggregationService received APPLICATION_STARTED event for " + appStartEvent.getApplicationId() );
initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
appStartEvent.getCredentials(),
appStartEvent.getApplicationAcls(),
Expand All @@ -379,6 +409,7 @@ public void handle(LogHandlerEvent event) {
case APPLICATION_FINISHED:
LogHandlerAppFinishedEvent appFinishedEvent =
(LogHandlerAppFinishedEvent) event;
LOG.info("LogAggregationService received APPLICATION_FINISHED event for " + appFinishedEvent.getApplicationId() );
stopApp(appFinishedEvent.getApplicationId());
break;
case LOG_AGG_TOKEN_UPDATE:
Expand All @@ -387,7 +418,19 @@ public void handle(LogHandlerEvent event) {
default:
; // Ignore
}
}

public void scheduleAggregator(ApplicationId appId){
this.nmMetrics.appReadyForLogAggregation();
this.timeStartedWaitingForThread.put(appId, System.currentTimeMillis());
Runnable runnable = appLogAggregatorWrappers.get(appId);
try {
LOG.info("Scheduling aggregator for " + appId);
this.threadPool.execute(runnable);
} catch (RuntimeException e) {
LOG.warn("RuntimeException while executing runnable " + runnable + " with " +
"executor " + threadPool, e);
}
}

private void checkAndEnableAppAggregators() {
Expand Down
Loading