diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index cc5f0d914b880..bbbf00501b618 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -538,7 +538,7 @@ protected LogHandler createLogHandler(Configuration conf, Context context, if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { return new LogAggregationService(this.dispatcher, context, - deletionService, dirsHandler); + deletionService, dirsHandler, metrics); } else { return new NonAggregatingLogHandler(this.dispatcher, deletionService, dirsHandler, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java index 93436fa96da75..f17bf93514556 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java @@ -37,4 +37,7 @@ public interface AppLogAggregator extends Runnable { boolean isAggregationEnabled(); UserGroupInformation updateCredentials(Credentials cred); + + boolean isLogAggregationInRolling(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 1ba7353a1eef3..e83e787566119 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Times; @@ -117,6 +118,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private final LogAggregationFileController logAggregationFileController; + private long appFinishedTime; /** * The value recovered from state store to determine the age of application @@ -125,16 +127,18 @@ public class AppLogAggregatorImpl implements AppLogAggregator { */ private final long recoveredLogInitedTime; + protected final NodeManagerMetrics metrics; + public AppLogAggregatorImpl(Dispatcher dispatcher, DeletionService deletionService, Configuration conf, ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, Map appAcls, LogAggregationContext logAggregationContext, Context context, - FileContext lfs, long rollingMonitorInterval) { + FileContext lfs, long rollingMonitorInterval, NodeManagerMetrics metrics) { this(dispatcher, deletionService, conf, appId, userUgi, nodeId, dirsHandler, remoteNodeLogFileForApp, appAcls, - logAggregationContext, context, lfs, rollingMonitorInterval, -1, null); + logAggregationContext, context, lfs, rollingMonitorInterval, -1, null, metrics); } public AppLogAggregatorImpl(Dispatcher dispatcher, @@ -144,11 +148,11 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, Map appAcls, LogAggregationContext logAggregationContext, Context context, FileContext lfs, long rollingMonitorInterval, - long recoveredLogInitedTime) { + long recoveredLogInitedTime, NodeManagerMetrics metrics) { this(dispatcher, deletionService, conf, appId, userUgi, nodeId, dirsHandler, remoteNodeLogFileForApp, appAcls, logAggregationContext, context, lfs, rollingMonitorInterval, - recoveredLogInitedTime, null); + recoveredLogInitedTime, null, metrics); } public AppLogAggregatorImpl(Dispatcher dispatcher, @@ -159,7 +163,8 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, LogAggregationContext logAggregationContext, Context context, FileContext lfs, long rollingMonitorInterval, long recoveredLogInitedTime, - LogAggregationFileController logAggregationFileController) { + LogAggregationFileController logAggregationFileController, + NodeManagerMetrics metrics) { this.dispatcher = dispatcher; this.conf = conf; this.delService = deletionService; @@ -185,6 +190,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, this.logFileSizeThreshold = conf.getLong(YarnConfiguration.LOG_AGGREGATION_DEBUG_FILESIZE, YarnConfiguration.DEFAULT_LOG_AGGREGATION_DEBUG_FILESIZE); + this.metrics = metrics; if (logAggregationFileController == null) { // by default, use T-File Controller this.logAggregationFileController = new LogAggregationTFileController(); @@ -220,6 +226,11 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, this.appId, this.appAcls, this.nodeId, this.userUgi); } + @Override + public boolean isLogAggregationInRolling(){ + return this.logControllerContext.isLogAggregationInRolling(); + } + private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { ContainerLogAggregationPolicy policy = getLogAggPolicyInstance(conf); String params = getLogAggPolicyParameters(conf); @@ -449,6 +460,8 @@ private void sendLogAggregationReport( ? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED; sendLogAggregationReportInternal(finalLogAggregationStatus, "", true); + + this.metrics.reportLogAggregationStatus(finalLogAggregationStatus == LogAggregationStatus.SUCCEEDED); } } @@ -476,6 +489,7 @@ public void run() { // loss of logs LOG.error("Error occurred while aggregating the log for the application " + appId, e); + } catch (Exception e) { // do post clean up of log directories on any other exception LOG.error("Error occurred while aggregating the log for the application " @@ -489,6 +503,7 @@ public void run() { ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); } this.appAggregationFinished.set(true); + LOG.info("Releasing thread for " + applicationId); } } @@ -523,6 +538,9 @@ private void doAppLogAggregation() throws LogAggregationDFSException { try { // App is finished, upload the container logs. uploadLogsForContainers(true); + LOG.info("Starting log upload for " +this.applicationId); + // App is finished, upload the container logs. + uploadLogsForContainers(true); doAppLogAggregationPostCleanUp(); } catch (LogAggregationDFSException e) { @@ -589,6 +607,7 @@ public void startContainerLogAggregation(ContainerLogContext logContext) { public synchronized void finishLogAggregation() { LOG.info("Application just finished : " + this.applicationId); this.appFinishing.set(true); + this.appFinishedTime = System.currentTimeMillis(); this.notifyAll(); } @@ -656,6 +675,7 @@ public Set doContainerLogAggregation( LOG.info("Uploading logs for container " + containerId + ". Current good log dirs are " + StringUtils.join(",", dirsHandler.getLogDirsForRead())); + long startTime = System.currentTimeMillis(); final LogKey logKey = new LogKey(containerId); final LogValue logValue = new LogValue(dirsHandler.getLogDirsForRead(), containerId, @@ -664,9 +684,12 @@ public Set doContainerLogAggregation( containerFinished); try { logAggregationFileController.write(logKey, logValue); + metrics.successfulContainerLogUpload(); } catch (Exception e) { LOG.error("Couldn't upload logs for " + containerId + ". Skipping this container.", e); + metrics.failedContainerLogUpload(); + return new HashSet(); } this.uploadedFileMeta.addAll(logValue @@ -676,6 +699,12 @@ public Set doContainerLogAggregation( this.uploadedFileMeta = uploadedFileMeta.stream().filter( next -> logValue.getAllExistingFilesMeta().contains(next)).collect( Collectors.toSet()); + + long logUploadTime = System.currentTimeMillis() - startTime; + LOG.info("Container " + containerId + + " log aggregation is completed in " + logUploadTime + "ms"); + metrics.addLogUploadDuration(logUploadTime); + // need to return files uploaded or older-than-retention clean up. return Sets.union(logValue.getCurrentUpLoadedFilesPath(), logValue.getObsoleteRetentionLogFiles()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 0d3ea75f38fc5..a22c6c857ccd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.classification.VisibleForTesting; @@ -77,6 +78,7 @@ public class LogAggregationService extends AbstractService implements private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled"; private long rollingMonitorInterval; + private final NodeManagerMetrics nmMetrics; private final Context context; private final DeletionService deletionService; @@ -86,22 +88,27 @@ public class LogAggregationService extends AbstractService implements private NodeId nodeId; private final ConcurrentMap appLogAggregators; + private final ConcurrentMap appLogAggregatorWrappers; + private final ConcurrentMap timeStartedWaitingForThread; // Holds applications whose aggregation is disable due to invalid Token private final Set invalidTokenApps; @VisibleForTesting ExecutorService threadPool; - + public LogAggregationService(Dispatcher dispatcher, Context context, - DeletionService deletionService, LocalDirsHandlerService dirsHandler) { + DeletionService deletionService, LocalDirsHandlerService dirsHandler, NodeManagerMetrics nmMetrics) { super(LogAggregationService.class.getName()); this.dispatcher = dispatcher; this.context = context; this.deletionService = deletionService; this.dirsHandler = dirsHandler; + this.nmMetrics = nmMetrics; this.appLogAggregators = new ConcurrentHashMap(); + this.appLogAggregatorWrappers = new ConcurrentHashMap(); + this.timeStartedWaitingForThread = new ConcurrentHashMap(); this.invalidTokenApps = ConcurrentHashMap.newKeySet(); } @@ -170,14 +177,14 @@ protected void serviceStart() throws Exception { this.nodeId = this.context.getNodeId(); super.serviceStart(); } - + @Override protected void serviceStop() throws Exception { LOG.info(this.getName() + " waiting for pending aggregation during exit"); stopAggregators(); super.serviceStop(); } - + private void stopAggregators() { threadPool.shutdown(); boolean supervised = getConfig().getBoolean( @@ -213,17 +220,20 @@ private void stopAggregators() { } } - @SuppressWarnings("unchecked") private void initApp(final ApplicationId appId, String user, Credentials credentials, Map appAcls, LogAggregationContext logAggregationContext, long recoveredLogInitedTime) { ApplicationEvent eventResponse; + try { initAppAggregator(appId, user, credentials, appAcls, logAggregationContext, recoveredLogInitedTime); eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); + if(appLogAggregators.get(appId) != null && appLogAggregators.get(appId).isLogAggregationInRolling()) { + scheduleAggregator(appId); + } } catch (YarnRuntimeException e) { LOG.warn("Application failed to init aggregation", e); eventResponse = new ApplicationEvent(appId, @@ -231,7 +241,7 @@ private void initApp(final ApplicationId appId, String user, } this.dispatcher.getEventHandler().handle(eventResponse); } - + FileContext getLocalFileContext(Configuration conf) { try { return FileContext.getLocalFSFileContext(conf); @@ -262,7 +272,7 @@ protected void initAppAggregator(final ApplicationId appId, String user, logAggregationFileController.getRemoteNodeLogFileForApp(appId, user, nodeId), appAcls, logAggregationContext, this.context, getLocalFileContext(getConfig()), this.rollingMonitorInterval, - recoveredLogInitedTime, logAggregationFileController); + recoveredLogInitedTime, logAggregationFileController, nmMetrics); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); } @@ -291,15 +301,31 @@ protected void initAppAggregator(final ApplicationId appId, String user, // Schedule the aggregator. Runnable aggregatorWrapper = new Runnable() { public void run() { + nmMetrics.appStartedLogAggr(); + long threadStartTime = System.currentTimeMillis(); + long threadWaitTime = threadStartTime - timeStartedWaitingForThread.get(appId); + nmMetrics.addLogAggregationThreadWaitTime(threadWaitTime); + try { appLogAggregator.run(); } finally { appLogAggregators.remove(appId); closeFileSystems(userUgi); + appLogAggregatorWrappers.remove(appId); + timeStartedWaitingForThread.remove(appId); + + // set metrics + nmMetrics.appLogAggregationFinished(); + long threadEndTime = System.currentTimeMillis(); + long threadHoldTime = threadEndTime- threadStartTime; + nmMetrics.addLogAggregationThreadHoldTime(threadHoldTime); } } }; - this.threadPool.execute(aggregatorWrapper); + + if (this.appLogAggregatorWrappers.putIfAbsent(appId, aggregatorWrapper) != null) { + throw new YarnRuntimeException("Duplicate runnable for " + appId); + } if (appDirException != null) { throw appDirException; @@ -351,6 +377,9 @@ private void stopApp(ApplicationId appId) { return; } aggregator.finishLogAggregation(); + if(!aggregator.isLogAggregationInRolling()) { + scheduleAggregator(appId); + } } finally { // Remove invalid Token Apps invalidTokenApps.remove(appId); @@ -363,6 +392,7 @@ public void handle(LogHandlerEvent event) { case APPLICATION_STARTED: LogHandlerAppStartedEvent appStartEvent = (LogHandlerAppStartedEvent) event; + LOG.info("LogAggregationService received APPLICATION_STARTED event for " + appStartEvent.getApplicationId() ); initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(), appStartEvent.getCredentials(), appStartEvent.getApplicationAcls(), @@ -379,6 +409,7 @@ public void handle(LogHandlerEvent event) { case APPLICATION_FINISHED: LogHandlerAppFinishedEvent appFinishedEvent = (LogHandlerAppFinishedEvent) event; + LOG.info("LogAggregationService received APPLICATION_FINISHED event for " + appFinishedEvent.getApplicationId() ); stopApp(appFinishedEvent.getApplicationId()); break; case LOG_AGG_TOKEN_UPDATE: @@ -387,7 +418,19 @@ public void handle(LogHandlerEvent event) { default: ; // Ignore } + } + public void scheduleAggregator(ApplicationId appId){ + this.nmMetrics.appReadyForLogAggregation(); + this.timeStartedWaitingForThread.put(appId, System.currentTimeMillis()); + Runnable runnable = appLogAggregatorWrappers.get(appId); + try { + LOG.info("Scheduling aggregator for " + appId); + this.threadPool.execute(runnable); + } catch (RuntimeException e) { + LOG.warn("RuntimeException while executing runnable " + runnable + " with " + + "executor " + threadPool, e); + } } private void checkAndEnableAppAggregators() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java index 775196f582887..455257a9f0c10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java @@ -17,21 +17,26 @@ */ package org.apache.hadoop.yarn.server.nodemanager.metrics; +import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterInt; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableGaugeFloat; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.classification.VisibleForTesting; +import static org.apache.hadoop.metrics2.lib.Interns.info; + @Metrics(about="Metrics for node manager", context="yarn") public class NodeManagerMetrics { // CHECKSTYLE:OFF:VisibilityModifier @@ -120,14 +125,54 @@ public class NodeManagerMetrics { // CHECKSTYLE:ON:VisibilityModifier + // Log aggregation metrics + @Metric("# of log aggregation threads currently in use") + MutableGaugeInt logAggrThreadsUsed; + @Metric("# of applications currently waiting for aggregation threads") + MutableGaugeInt appsWaitingForLogAggregation; + @Metric("Number of applications with successful log aggregation") + MutableCounterLong numSuccessfulAppLogAggregations; + @Metric("Number of applications with failed log aggregation") + MutableCounterLong numFailedAppLogAggregations; + @Metric("Number of containers whose log were uploaded successfully") + MutableCounterLong numSuccessfulContainerLogUploads; + @Metric("Number of containers whose log were uploaded unsuccessfully") + MutableCounterLong numFailedContainerLogUploads; + @Metric("Time spent waiting for a log aggregation thread after application finished (ms)") + MutableRate logAggregationThreadWaitTimeMs; + @Metric("Time spent doing log aggregation i.e. time holding log aggr thread (ms)") + MutableRate logAggregationThreadHoldTimeMs; + @Metric("Time taken uploading logs to HDFS during log aggregation (ms)") + MutableRate HDFSUploadDurationMs; + + //Provide quantile counters for all latencies + private MutableQuantiles waitTimeLatency; + private MutableQuantiles logAggrLatency; + private MutableQuantiles HDFSUploadLatency; + private JvmMetrics jvmMetrics = null; private long allocatedMB; private long availableMB; private long allocatedOpportunisticMB; + private static MetricsRegistry registry; + + private static final String METRIC_NAME = "LogAggregationMetrics"; + private static final MetricsInfo RECORD_INFO = + info(METRIC_NAME, "Log Aggregation"); + private NodeManagerMetrics(JvmMetrics jvmMetrics) { this.jvmMetrics = jvmMetrics; + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "LogAggregation"); + waitTimeLatency = registry.newQuantiles("waitTimeLatency", + "latency of waiting for log aggr thread", "ops", "latency", 10); + logAggrLatency = registry.newQuantiles("logAggrLatency", + "latency of log aggregation", "ops", "latency", 10); + HDFSUploadLatency = + registry.newQuantiles("HDFSUploadLatency", + "latency of log upload to HDFS", "ops", "latency", 10); } public static NodeManagerMetrics create() { @@ -301,6 +346,50 @@ public void setPrivateBytesDeleted(long privateBytesDeleted) { this.privateBytesDeleted.set(privateBytesDeleted); } + public void appReadyForLogAggregation(){ + appsWaitingForLogAggregation.incr(); + } + + public void addLogAggregationThreadWaitTime(long waitTime) { + this.logAggregationThreadWaitTimeMs.add(waitTime); + this.waitTimeLatency.add(waitTime); + } + + public void appStartedLogAggr(){ + appsWaitingForLogAggregation.decr(); + logAggrThreadsUsed.incr(); + } + + public void successfulContainerLogUpload() { + this.numSuccessfulContainerLogUploads.incr(); + } + + public void failedContainerLogUpload() { + this.numFailedContainerLogUploads.incr(); + } + + public void addLogUploadDuration(long duration) { + this.HDFSUploadDurationMs.add(duration); + this.HDFSUploadLatency.add(duration); + } + + public void reportLogAggregationStatus(boolean succeeded){ + if(succeeded) { + this.numSuccessfulAppLogAggregations.incr(); + } else { + this.numFailedAppLogAggregations.incr(); + } + } + + public void appLogAggregationFinished(){ + logAggrThreadsUsed.decr(); + } + + public void addLogAggregationThreadHoldTime(long holdTime) { + this.logAggregationThreadHoldTimeMs.add(holdTime); + this.logAggrLatency.add(holdTime); + } + public int getRunningContainers() { return containersRunning.value(); } @@ -481,4 +570,49 @@ public void localizationCacheHitMiss(long size) { public void localizationComplete(long downloadMillis) { localizationDurationMillis.add(downloadMillis); } + + public long getLogAggrThreadsUsed(){ + return this.logAggrThreadsUsed.value(); + } + + public long getAppsWaitingForLogAggregation(){ + return this.appsWaitingForLogAggregation.value(); + } + + public long getNumSuccessfulAppLogAggregations(){ + return this.numSuccessfulAppLogAggregations.value(); + } + + public long getNumFailedAppLogAggregations(){ + return this.numFailedAppLogAggregations.value(); + } + + public long getNumSuccessfulContainerLogUploads(){ + return this.numSuccessfulContainerLogUploads.value(); + } + + public long getNumFailedContainerLogUploads(){ + return this.numFailedContainerLogUploads.value(); + } + + public long getNumLogAggregationThreadWaitApps() { + return this.logAggregationThreadWaitTimeMs.lastStat().numSamples(); + } + + public long getNumLogAggregatingApps() { + return this.logAggregationThreadHoldTimeMs.lastStat().numSamples(); + } + + public double getLogAggregationThreadWaitTimeMs() { + return this.logAggregationThreadWaitTimeMs.lastStat().mean(); + } + + public double getLogAggregationThreadHoldTimeMs() { + return this.logAggregationThreadHoldTimeMs.lastStat().mean(); + } + + @VisibleForTesting + public double getHDFSTotalUploadDurationMs() { + return this.HDFSUploadDurationMs.lastStat().total(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java index ab9d0f1c09757..85562a35c37fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -298,10 +299,15 @@ private static AppLogAggregatorInTest createAppLogAggregator( final FileContext fakeLfs = mock(FileContext.class); final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath()); + LogAggregationTFileController format = spy( + new LogAggregationTFileController()); + NodeManagerMetrics metrics = NodeManagerMetrics.create(); + + format.initialize(config, "TFile"); return new AppLogAggregatorInTest(dispatcher, deletionService, config, applicationId, ugi, nodeId, dirsService, remoteLogDirForApp, appAcls, logAggregationContext, - context, fakeLfs, recoveredLogInitedTimeMillis, tFileController); + context, fakeLfs, recoveredLogInitedTimeMillis, tFileController, metrics); } /** @@ -421,11 +427,11 @@ public AppLogAggregatorInTest(Dispatcher dispatcher, Map appAcls, LogAggregationContext logAggregationContext, Context context, FileContext lfs, long recoveredLogInitedTime, - LogAggregationTFileController format) throws IOException { + LogAggregationTFileController format, NodeManagerMetrics metrics) throws IOException { super(dispatcher, deletionService, conf, appId, ugi, nodeId, dirsHandler, remoteNodeLogFileForApp, appAcls, logAggregationContext, context, lfs, -1, recoveredLogInitedTime, - format); + format, metrics); this.applicationId = appId; this.deletionService = deletionService; this.logValue = ArgumentCaptor.forClass(LogValue.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 4ec8f462f5117..e495554d28dc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -140,6 +140,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; @@ -174,7 +175,7 @@ public TestLogAggregationService() throws UnsupportedFileSystemException { DrainDispatcher dispatcher; EventHandler appEventHandler; - + NodeManagerMetrics metrics; private NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555); @Override @@ -186,6 +187,7 @@ public void setup() throws IOException { appEventHandler = mock(EventHandler.class); dispatcher.register(ApplicationEventType.class, appEventHandler); UserGroupInformation.setConfiguration(conf); + metrics = mock(NodeManagerMetrics.class); } @Override @@ -306,7 +308,7 @@ public void testLocalFileDeletionAfterUpload() throws Exception { LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler)); + super.dirsHandler, this.metrics)); verifyLocalFileDeletion(logAggregationService); } @@ -320,7 +322,7 @@ public void testLocalFileRemainsAfterUploadOnCleanupDisable() throws Exception { this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); LogAggregationService logAggregationService = spy( - new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler)); + new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler, this.metrics)); verifyLocalFileDeletion(logAggregationService); } @@ -340,7 +342,7 @@ public void testLocalFileDeletionOnDiskFull() throws Exception { when(dirsHandler.getLogDirsForRead()).thenReturn(logDirs); LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, - dirsHandler)); + dirsHandler, metrics)); verifyLocalFileDeletion(logAggregationService); } @@ -355,7 +357,7 @@ public void testNoLogsUploadedOnAppFinish() throws Exception { this.remoteRootLogDir.getAbsolutePath()); LogAggregationService logAggregationService = new LogAggregationService( - dispatcher, this.context, this.delSrvc, super.dirsHandler); + dispatcher, this.context, this.delSrvc, super.dirsHandler, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); @@ -391,7 +393,7 @@ public void testNoContainerOnNode() throws Exception { LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler); + super.dirsHandler, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); @@ -442,7 +444,7 @@ public void testMultipleAppsLogAggregation() throws Exception { LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler); + super.dirsHandler, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); @@ -603,7 +605,7 @@ public void testVerifyAndCreateRemoteDirsFailure() .verifyAndCreateRemoteLogDir(); LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler) { + super.dirsHandler, metrics) { @Override public LogAggregationFileController getLogAggregationFileController( Configuration conf) { @@ -669,7 +671,7 @@ public void testVerifyAndCreateRemoteDirNonExistence() LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler)); + super.dirsHandler, metrics)); logAggregationService.init(this.conf); logAggregationService.start(); boolean existsBefore = aNewFile.exists(); @@ -699,7 +701,7 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner() this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, aNewFile.getName()); LogAggregationService logAggregationService = new LogAggregationService( - dispatcher, this.context, this.delSrvc, super.dirsHandler); + dispatcher, this.context, this.delSrvc, super.dirsHandler, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); @@ -751,7 +753,7 @@ public FileSystem getFileSystem(Configuration conf) }; spyFileFormat.initialize(conf, "TFile"); LogAggregationService aggSvc = new LogAggregationService(dispatcher, - this.context, this.delSrvc, super.dirsHandler) { + this.context, this.delSrvc, super.dirsHandler, this.metrics) { @Override public LogAggregationFileController getLogAggregationFileController( Configuration conf) { @@ -833,7 +835,7 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler)); + super.dirsHandler, this.metrics)); logAggregationService.init(this.conf); logAggregationService.start(); @@ -896,7 +898,7 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() any(UserGroupInformation.class)); LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, spyDelSrvc, - super.dirsHandler){ + super.dirsHandler, this.metrics){ @Override public LogAggregationFileController getLogAggregationFileController( Configuration conf) { @@ -1251,7 +1253,7 @@ public void testFixedSizeThreadPool() throws Exception { when(dirSvc.getLogDirs()).thenThrow(new RuntimeException()); LogAggregationService logAggregationService = - new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc); + new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); @@ -1354,7 +1356,7 @@ public Boolean get() { when(dirSvc.getLogDirs()).thenThrow(new RuntimeException()); LogAggregationService logAggregationService = - new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc); + new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); @@ -1385,7 +1387,7 @@ public void testStopAfterError() throws Exception { LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, delSrvc, - mockedDirSvc); + mockedDirSvc, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); @@ -1398,7 +1400,7 @@ public void testStopAfterError() throws Exception { application1, this.user, null, this.acls, contextWithAllContainers)); logAggregationService.stop(); - assertEquals(0, logAggregationService.getNumAggregators()); + assertEquals(1, logAggregationService.getNumAggregators()); logAggregationService.close(); } @@ -1411,7 +1413,7 @@ public void testLogAggregatorCleanup() throws Exception { LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, delSrvc, - mockedDirSvc); + mockedDirSvc, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); @@ -1541,7 +1543,7 @@ public void testFailedDirsLocalFileDeletionAfterUpload() throws Exception { LogAggregationService logAggregationService = spy(new LogAggregationService(dispatcher, this.context, mockDelService, - mockDirsHandler)); + this.dirsHandler, this.metrics)); AbstractFileSystem spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); FileContext lfs = FileContext.getFileContext(spylfs, conf); @@ -1604,7 +1606,7 @@ public void testLogAggregationServiceWithPatterns() throws Exception { LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler); + super.dirsHandler, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); @@ -1792,7 +1794,7 @@ public void testLogAggregationServiceWithPatternsAndIntervals() LogAggregationService logAggregationService = new LogAggregationService(dispatcher, context, this.delSrvc, - super.dirsHandler); + super.dirsHandler, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); @@ -2271,7 +2273,7 @@ private LogAggregationService createLogAggregationService( new ConcurrentHashMap(); LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler); + super.dirsHandler, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); LogAggregationContext logAggContext = null; @@ -2417,7 +2419,7 @@ private void testLogAggregationService(boolean retentionSizeLimitation) LogAggregationService logAggregationService = new LogAggregationService(dispatcher, context, this.delSrvc, - super.dirsHandler); + super.dirsHandler, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); @@ -2548,7 +2550,7 @@ public void testAddNewTokenSentFromRMForLogAggregation() throws Exception { @SuppressWarnings("resource") LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler); + super.dirsHandler, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); logAggregationService.handle(new LogHandlerAppStartedEvent(application1, @@ -2589,11 +2591,15 @@ public Boolean get() { @Test (timeout = 20000) public void testSkipUnnecessaryNNOperationsForShortJob() throws Exception { + this.conf.setLong( + YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, + 3600); LogAggregationContext logAggregationContext = Records.newRecord(LogAggregationContext.class); logAggregationContext.setLogAggregationPolicyClassName( FailedOrKilledContainerLogAggregationPolicy.class.getName()); - verifySkipUnnecessaryNNOperations(logAggregationContext, 0, 2, 0); + logAggregationContext.setRolledLogsIncludePattern("*"); + verifySkipUnnecessaryNNOperations(logAggregationContext, 0, 4, 0); } @Test (timeout = 20000) @@ -2615,7 +2621,7 @@ private void verifySkipUnnecessaryNNOperations( int expectedLogAggregationTimes, int expectedAggregationReportNum, int expectedCleanupOldLogsTimes) throws Exception { LogAggregationService logAggregationService = new LogAggregationService( - dispatcher, this.context, this.delSrvc, super.dirsHandler); + dispatcher, this.context, this.delSrvc, super.dirsHandler, this.metrics); logAggregationService.init(this.conf); logAggregationService.start(); @@ -2728,7 +2734,7 @@ public List getLogFileTypesInLastCycle() { public void testRollingMonitorIntervalDefault() { LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler); + super.dirsHandler, this.metrics); logAggregationService.init(this.conf); long interval = logAggregationService.getRollingMonitorInterval(); @@ -2742,7 +2748,7 @@ public void testRollingMonitorIntervalGreaterThanSet() { .NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, "2700"); LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler); + super.dirsHandler, this.metrics); logAggregationService.init(this.conf); long interval = logAggregationService.getRollingMonitorInterval(); @@ -2756,7 +2762,7 @@ public void testRollingMonitorIntervalLessThanSet() { .NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, "600"); LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, this.delSrvc, - super.dirsHandler); + super.dirsHandler, this.metrics); logAggregationService.init(this.conf); long interval = logAggregationService.getRollingMonitorInterval(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java index 33a3ae12f109e..c5fc3ea2b00ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java @@ -31,7 +31,6 @@ public class TestNodeManagerMetrics { static final int GiB = 1024; // MiB - private NodeManagerMetrics metrics; @Before @@ -138,6 +137,38 @@ public static void checkMetrics(int launched, int completed, int failed, assertGauge("AvailableGB", availableGB, rb); assertGauge("AvailableVCores", availableVCores, rb); assertGauge("NodeGpuUtilization", nodeGpuUtilization, rb); - assertGauge("ApplicationsRunning", applicationsRunning, rb); + assertGauge("ApplicationsRunning", applicationsRunning, rb); } + + @Test + public void testLogAggregationMetrics() { + metrics.appReadyForLogAggregation(); + metrics.addLogAggregationThreadWaitTime(5); + metrics.appStartedLogAggr(); + + metrics.successfulContainerLogUpload(); + metrics.addLogUploadDuration(3); + metrics.successfulContainerLogUpload(); + metrics.addLogUploadDuration(13); + metrics.failedContainerLogUpload(); + + metrics.reportLogAggregationStatus(false); + metrics.appLogAggregationFinished(); + metrics.addLogAggregationThreadHoldTime(18); + + metrics.appReadyForLogAggregation(); + metrics.appReadyForLogAggregation(); + metrics.appStartedLogAggr(); + + Assert.assertEquals(1, metrics.getLogAggrThreadsUsed()); + Assert.assertEquals(1, metrics.getAppsWaitingForLogAggregation()); + Assert.assertEquals(0L, metrics.getNumSuccessfulAppLogAggregations()); + Assert.assertEquals(1L, metrics.getNumFailedAppLogAggregations()); + Assert.assertEquals(2L, metrics.getNumSuccessfulContainerLogUploads()); + Assert.assertEquals(1L, metrics.getNumFailedContainerLogUploads()); + Assert.assertEquals(1, metrics.getNumLogAggregationThreadWaitApps()); + Assert.assertEquals(5, metrics.getLogAggregationThreadWaitTimeMs(), 0); + Assert.assertEquals(1, metrics.getNumLogAggregatingApps()); + Assert.assertEquals(18, metrics.getLogAggregationThreadHoldTimeMs(), 0); + Assert.assertEquals(16, metrics.getHDFSTotalUploadDurationMs(), 0); } }