From 829102e969a1e868e149b39b5ccaa03bed04c2a6 Mon Sep 17 00:00:00 2001 From: chaosju Date: Tue, 6 Jul 2021 17:52:31 +0800 Subject: [PATCH] Add Leveldb statestore metrics to NM --- .../NMLeveldbStateStoreOpDurations.java | 387 ++++++++++++++++++ .../recovery/NMLeveldbStateStoreService.java | 120 +++++- 2 files changed, 504 insertions(+), 3 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreOpDurations.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreOpDurations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreOpDurations.java new file mode 100644 index 0000000000000..944ab406b3434 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreOpDurations.java @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.recovery; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; + +import static org.apache.hadoop.metrics2.lib.Interns.info; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** + * Class to capture the performance metrics of NM LeveldbStateStore. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +@Metrics(context="NMLeveldbStateStore-op-durations") +public final class NMLeveldbStateStoreOpDurations implements MetricsSource { + + @Metric("Duration for load applications state") + MutableRate loadApplicationsStateDuration; + + @Metric("Duration for store application state") + MutableRate storeApplicationStateDuration; + + @Metric("Duration to handle remove application state") + MutableRate removeApplicationStateDuration; + + @Metric("Duration for store deletion task state") + MutableRate storeDeletionTaskStateDuration; + + @Metric("Duration to handle remove deletion task state") + MutableRate removeDeletionTaskStateDuration; + + @Metric("Duration for the state of the deletion service") + MutableRate loadDeletionServiceStateDuration; + + @Metric("Duration for store the start request of container") + MutableRate storeContainerStateDuration; + + @Metric("Duration for store a queued container state") + MutableRate storeContainerQueuedStateDuration; + + @Metric("Duration for store a paused container state") + MutableRate storeContainerPausedStateDuration; + + @Metric("Duration for store a launched container state") + MutableRate storeContainerLaunchedStateDuration; + + @Metric("Duration for store a updated container state") + MutableRate storeContainerUpdateTokenStateDuration; + + @Metric("Duration for store a completed container state") + MutableRate storeContainerCompletedStateDuration; + + @Metric("Duration for store a Killed container state") + MutableRate storeContainerKilledStateDuration; + + @Metric("Duration for store the diagnostics of container") + MutableRate storeContainerDiagnosticsStateDuration; + + @Metric("Duration for store the remaining retry attempts of container") + MutableRate storeContainerRemainingRetryAttemptsDuration; + + @Metric("Duration for store the restart times of container") + MutableRate storeContainerRestartTimesDuration; + + @Metric("Duration for store the working directory of container") + MutableRate storeContainerWorkDirStateDuration; + + @Metric("Duration for store the log directory of container") + MutableRate storeContainerLogDirStateDuration; + + @Metric("Duration to handle remove container state") + MutableRate removeContainerStateDuration; + + @Metric("Duration for load localized resources state") + MutableRate loadLocalizationStateDuration; + + @Metric("Duration for store the start of localization for resource") + MutableRate storeStartResourceLocalizationStateCall; + + @Metric("Duration for store the completion of a resource localization") + MutableRate storeFinishResourceLocalizationStateDuration; + + @Metric("Duration to handle remove a resource localization") + MutableRate removeLocalizedResourceStateDuration; + + @Metric("Duration to handle remove container has been resumed") + MutableRate removeContainerPausedStateDuration; + + @Metric("Duration to handle remove container has been queued") + MutableRate removeContainerQueuedStateDuration; + + @Metric("Duration for load the state of NM tokens state") + MutableRate loadNMTokensStateDuration; + + @Metric("Duration for store the current NM token master key") + MutableRate storeNMTokenCurrentMasterKeyStateDuration; + + @Metric("Duration for store the previous NM token master key") + MutableRate storeNMTokenPreviousMasterKeyStateDuration; + + @Metric("Duration for store the master key of application") + MutableRate storeNMTokenApplicationMasterKeyStateDuration; + + @Metric("Duration to handle remove the master key of application") + MutableRate removeNMTokenApplicationMasterKeyStateDuration; + + @Metric("Duration for load the state of container tokens") + MutableRate loadContainerTokensStateDuration; + + @Metric("Duration for store the current container token master key") + MutableRate storeContainerTokenCurrentMasterKeyStateDuration; + + @Metric("Duration for store the previous container token master key") + MutableRate storeContainerTokenPreviousMasterKeyStateDuration; + + @Metric("Duration for store the expiration time for a container token") + MutableRate storeContainerTokenStateDuration; + + @Metric("Duration to handle remove records for a container token") + MutableRate removeContainerTokenStateDuration; + + @Metric("Duration for load the state of log deleters") + MutableRate loadLogDeleterStateDuration; + + @Metric("Duration for store the state of a log deleter") + MutableRate storeLogDeleterStateDuration; + + @Metric("Duration to handle remove the state of a log deleter") + MutableRate removeLogDeleterStateDuration; + + @Metric("Duration for load the state of AMRMProxy") + MutableRate loadAMRMProxyStateDuration; + + @Metric("Duration for store " + + "the current AMRMProxyTokenSecretManager master key") + MutableRate storeAMRMProxyCurrentMasterKeyStateDuration; + + @Metric("Duration for store a context " + + "entry of application attempt in AMRMProxyService") + MutableRate storeAMRMProxyAppContextEntryStateDuration; + + @Metric("Duration for store " + + "the next AMRMProxyTokenSecretManager master key") + MutableRate storeAMRMProxyNextMasterKeyStateDuration; + + @Metric("Duration to handle remove a context entry for an application") + MutableRate removeAMRMProxyAppContextEntryStateDuration; + + @Metric("Duration to handle remove a context entry " + + "for an application attempt in AMRMProxyService") + MutableRate removeAMRMProxyAppContextStateDuration; + + @Metric("Duration for store the assigned resources to a container") + MutableRate storeAssignedResourcesStateDuration; + + + protected static final MetricsInfo RECORD_INFO = + info("NMLeveldbStateStoreOpDurations", + "Durations of NMLeveldbStateStore calls"); + + private final MetricsRegistry registry; + + private static final NMLeveldbStateStoreOpDurations INSTANCE + = new NMLeveldbStateStoreOpDurations(); + + public static NMLeveldbStateStoreOpDurations getInstance() { + return INSTANCE; + } + + private NMLeveldbStateStoreOpDurations() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "NMLeveldbStateStoreOpDurations"); + + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register(RECORD_INFO.name(), RECORD_INFO.description(), this); + } + } + + @Override + public synchronized void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + public void addLoadApplicationsStateDuration(long value) { + loadApplicationsStateDuration.add(value); + } + + public void addStoreApplicationStateDuration(long value) { + storeApplicationStateDuration.add(value); + } + + public void addRemoveApplicationStateDuration(long value) { + removeApplicationStateDuration.add(value); + } + + public void addsStoreDeletionTaskStateDuration(long value) { + storeDeletionTaskStateDuration.add(value); + } + + public void addRemoveDeletionTaskStateDuration(long value) { + removeDeletionTaskStateDuration.add(value); + } + + public void addLoadDeletionServiceStateDuration(long value) { + loadDeletionServiceStateDuration.add(value); + } + + public void addStoreContainerStateDuration(long value) { + storeContainerStateDuration.add(value); + } + + public void addStoreContainerQueuedStateDuration(long value) { + storeContainerQueuedStateDuration.add(value); + } + + public void addStoreContainerPausedStateDuration(long value) { + storeContainerPausedStateDuration.add(value); + } + + public void addStoreContainerLaunchedStateDuration(long value) { + storeContainerLaunchedStateDuration.add(value); + } + + public void addStoreContainerUpdateTokenStateDuration(long value) { + storeContainerUpdateTokenStateDuration.add(value); + } + + public void addStoreContainerCompletedStateDuration(long value) { + storeContainerCompletedStateDuration.add(value); + } + + public void addStoreContainerKilledStateDuration(long value) { + storeContainerKilledStateDuration.add(value); + } + + public void addStoreContainerDiagnosticsStateDuration(long value) { + storeContainerDiagnosticsStateDuration.add(value); + } + + public void addStoreContainerRemainingRetryAttemptsDuration(long value) { + storeContainerRemainingRetryAttemptsDuration.add(value); + } + + public void addStoreContainerRestartTimesDuration(long value) { + storeContainerRestartTimesDuration.add(value); + } + + public void addStoreContainerWorkDirStateDuration(long value) { + storeContainerWorkDirStateDuration.add(value); + } + + public void addStoreContainerLogDirStateDuration(long value) { + storeContainerLogDirStateDuration.add(value); + } + + public void addRemoveContainerStateDuration(long value) { + removeContainerStateDuration.add(value); + } + public void addLoadLocalizationStateDuration(long value) { + loadLocalizationStateDuration.add(value); + } + + public void addStoreStartResourceLocalizationDuration(long value) { + storeStartResourceLocalizationStateCall.add(value); + } + + public void addStoreFinishResourceLocalizationDuration(long value) { + storeFinishResourceLocalizationStateDuration.add(value); + } + public void addRemoveLocalizedResourceStateDuration(long value) { + removeLocalizedResourceStateDuration.add(value); + } + + public void addRemoveContainerPausedStateDuration(long value) { + removeContainerPausedStateDuration.add(value); + } + + public void addLoadNMTokensStateDuration(long value) { + loadNMTokensStateDuration.add(value); + } + + public void addStoreNMTokenCurrentMasterKeyDuration(long value) { + storeNMTokenCurrentMasterKeyStateDuration.add(value); + } + + public void addStoreNMTokenPreviousMasterKeyDuration(long value) { + storeNMTokenPreviousMasterKeyStateDuration.add(value); + } + + public void addStoreNMTokenApplicationMasterKeyDuration(long value) { + storeNMTokenApplicationMasterKeyStateDuration.add(value); + } + public void addRemoveNMTokenApplicationMasterKeyDuration(long value) { + removeNMTokenApplicationMasterKeyStateDuration.add(value); + } + + public void addLoadContainerTokensStateDuration(long value) { + loadContainerTokensStateDuration.add(value); + } + + public void addStoreContainerTokenCurrentMasterKeyDuration(long value) { + storeContainerTokenCurrentMasterKeyStateDuration.add(value); + } + + public void addStoreContainerTokenPreviousMasterKeyDuration(long value) { + storeContainerTokenPreviousMasterKeyStateDuration.add(value); + } + + public void addStoreContainerTokenStateDuration(long value) { + storeContainerTokenStateDuration.add(value); + } + + public void addLoadLogDeleterStateDuration(long value) { + loadLogDeleterStateDuration.add(value); + } + + public void addStoreLogDeleterStateDuration(long value) { + storeLogDeleterStateDuration.add(value); + } + + public void addRemoveLogDeleterStateDuration(long value) { + removeLogDeleterStateDuration.add(value); + } + + public void addLoadAMRMProxyStateDuration(long value) { + loadAMRMProxyStateDuration.add(value); + } + + public void addStoreAMRMProxyCurrentMasterKeyDuration(long value) { + storeAMRMProxyCurrentMasterKeyStateDuration.add(value); + } + + public void addStoreAMRMProxyAppContextEntryDuration(long value) { + storeAMRMProxyAppContextEntryStateDuration.add(value); + } + + public void addStoreAMRMProxyNextMasterKeyDuration(long value) { + storeAMRMProxyNextMasterKeyStateDuration.add(value); + } + + public void addRemoveAMRMProxyAppContextEntryDuration(long value) { + removeAMRMProxyAppContextEntryStateDuration.add(value); + } + + public void addRemoveAMRMProxyAppContextDuration(long value) { + removeAMRMProxyAppContextStateDuration.add(value); + } + + public void addStoreAssignedResourcesDuration(long value) { + storeAssignedResourcesStateDuration.add(value); + } + + public void addRemoveContainerTokenDuration(long value) { + removeContainerTokenStateDuration.add(value); + } + + public void addRemoveContainerQueuedDuration(long value) { + removeContainerQueuedStateDuration.add(value); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index b18f8e06fd62e..0ad1ff1f346f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -54,7 +54,9 @@ import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.SystemClock; import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; @@ -175,6 +177,10 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private boolean isHealthy; private Timer compactionTimer; + private volatile Clock clock = SystemClock.getInstance(); + @VisibleForTesting + protected NMLeveldbStateStoreOpDurations opDurations; + /** * Map of containerID vs List of unknown key suffixes. */ @@ -183,6 +189,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { public NMLeveldbStateStoreService() { super(NMLeveldbStateStoreService.class.getName()); + opDurations = NMLeveldbStateStoreOpDurations.getInstance(); } @Override @@ -446,6 +453,7 @@ private RecoveredContainerState loadContainerState(LeveldbIterator iter, @Override public void storeContainer(ContainerId containerId, int containerVersion, long startTime, StartContainerRequest startRequest) throws IOException { + long start = clock.getTime(); String idStr = containerId.toString(); LOG.debug("storeContainer: containerId= {}, startRequest= {}", idStr, startRequest); @@ -473,6 +481,7 @@ public void storeContainer(ContainerId containerId, int containerVersion, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreContainerStateDuration(clock.getTime() - start); } @VisibleForTesting @@ -486,6 +495,7 @@ private String getContainerKey(String containerId, String suffix) { @Override public void storeContainerQueued(ContainerId containerId) throws IOException { + long start = clock.getTime(); LOG.debug("storeContainerQueued: containerId={}", containerId); String key = CONTAINERS_KEY_PREFIX + containerId.toString() @@ -496,10 +506,12 @@ public void storeContainerQueued(ContainerId containerId) throws IOException { markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreContainerQueuedStateDuration(clock.getTime() - start); } private void removeContainerQueued(ContainerId containerId) throws IOException { + long start = clock.getTime(); LOG.debug("removeContainerQueued: containerId={}", containerId); String key = CONTAINERS_KEY_PREFIX + containerId.toString() @@ -510,10 +522,12 @@ private void removeContainerQueued(ContainerId containerId) markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addRemoveContainerQueuedDuration(clock.getTime() - start); } @Override public void storeContainerPaused(ContainerId containerId) throws IOException { + long start = clock.getTime(); LOG.debug("storeContainerPaused: containerId={}", containerId); String key = CONTAINERS_KEY_PREFIX + containerId.toString() @@ -524,11 +538,14 @@ public void storeContainerPaused(ContainerId containerId) throws IOException { markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreContainerPausedStateDuration( + clock.getTime() - start); } @Override public void removeContainerPaused(ContainerId containerId) throws IOException { + long start = clock.getTime(); LOG.debug("removeContainerPaused: containerId={}", containerId); String key = CONTAINERS_KEY_PREFIX + containerId.toString() @@ -539,11 +556,13 @@ public void removeContainerPaused(ContainerId containerId) markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addRemoveContainerPausedStateDuration(clock.getTime() - start); } @Override public void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException { + long start = clock.getTime(); LOG.debug("storeContainerDiagnostics: containerId={}, diagnostics=", containerId, diagnostics); @@ -555,11 +574,14 @@ public void storeContainerDiagnostics(ContainerId containerId, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreContainerDiagnosticsStateDuration( + clock.getTime() - start); } @Override public void storeContainerLaunched(ContainerId containerId) throws IOException { + long start = clock.getTime(); LOG.debug("storeContainerLaunched: containerId={}", containerId); // Removing the container if queued for backward compatibility reasons @@ -572,11 +594,13 @@ public void storeContainerLaunched(ContainerId containerId) markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreContainerLaunchedStateDuration(clock.getTime() - start); } @Override public void storeContainerUpdateToken(ContainerId containerId, ContainerTokenIdentifier containerTokenIdentifier) throws IOException { + long start = clock.getTime(); LOG.debug("storeContainerUpdateToken: containerId={}", containerId); String keyUpdateToken = CONTAINERS_KEY_PREFIX + containerId.toString() @@ -600,11 +624,14 @@ public void storeContainerUpdateToken(ContainerId containerId, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreContainerUpdateTokenStateDuration( + clock.getTime() - start); } @Override public void storeContainerKilled(ContainerId containerId) throws IOException { + long start = clock.getTime(); LOG.debug("storeContainerKilled: containerId={}", containerId); String key = CONTAINERS_KEY_PREFIX + containerId.toString() @@ -615,11 +642,13 @@ public void storeContainerKilled(ContainerId containerId) markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreContainerKilledStateDuration(clock.getTime() - start); } @Override public void storeContainerCompleted(ContainerId containerId, int exitCode) throws IOException { + long start = clock.getTime(); LOG.debug("storeContainerCompleted: containerId={}", containerId); String key = CONTAINERS_KEY_PREFIX + containerId.toString() @@ -630,11 +659,14 @@ public void storeContainerCompleted(ContainerId containerId, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreContainerCompletedStateDuration( + clock.getTime() - start); } @Override public void storeContainerRemainingRetryAttempts(ContainerId containerId, int remainingRetryAttempts) throws IOException { + long start = clock.getTime(); String key = CONTAINERS_KEY_PREFIX + containerId.toString() + CONTAINER_REMAIN_RETRIES_KEY_SUFFIX; try { @@ -643,11 +675,14 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreContainerRemainingRetryAttemptsDuration( + clock.getTime() - start); } @Override public void storeContainerRestartTimes(ContainerId containerId, List restartTimes) throws IOException { + long start = clock.getTime(); String key = CONTAINERS_KEY_PREFIX + containerId.toString() + CONTAINER_RESTART_TIMES_SUFFIX; try { @@ -655,11 +690,13 @@ public void storeContainerRestartTimes(ContainerId containerId, } catch (DBException e) { throw new IOException(e); } + opDurations.addStoreContainerRestartTimesDuration(clock.getTime() - start); } @Override public void storeContainerWorkDir(ContainerId containerId, String workDir) throws IOException { + long start = clock.getTime(); String key = CONTAINERS_KEY_PREFIX + containerId.toString() + CONTAINER_WORK_DIR_KEY_SUFFIX; try { @@ -668,11 +705,13 @@ public void storeContainerWorkDir(ContainerId containerId, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreContainerWorkDirStateDuration(clock.getTime() - start); } @Override public void storeContainerLogDir(ContainerId containerId, String logDir) throws IOException { + long start = clock.getTime(); String key = CONTAINERS_KEY_PREFIX + containerId.toString() + CONTAINER_LOG_DIR_KEY_SUFFIX; try { @@ -681,11 +720,13 @@ public void storeContainerLogDir(ContainerId containerId, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreContainerLogDirStateDuration(clock.getTime() - start); } @Override public void removeContainer(ContainerId containerId) throws IOException { + long start = clock.getTime(); LOG.debug("removeContainer: containerId={}", containerId); String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString(); @@ -719,6 +760,7 @@ public void removeContainer(ContainerId containerId) markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addRemoveContainerStateDuration(clock.getTime() - start); } @@ -758,15 +800,18 @@ private ContainerManagerApplicationProto getNextRecoveredApplication( @Override public RecoveredApplicationsState loadApplicationsState() throws IOException { + long start = clock.getTime(); RecoveredApplicationsState state = new RecoveredApplicationsState(); state.it = new ApplicationStateIterator(); cleanupDeprecatedFinishedApps(); + opDurations.addLoadApplicationsStateDuration(clock.getTime() - start); return state; } @Override public void storeApplication(ApplicationId appId, ContainerManagerApplicationProto p) throws IOException { + long start = clock.getTime(); LOG.debug("storeApplication: appId={}, proto={}", appId, p); String key = APPLICATIONS_KEY_PREFIX + appId; @@ -776,13 +821,14 @@ public void storeApplication(ApplicationId appId, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreApplicationStateDuration(clock.getTime() - start); } @Override public void removeApplication(ApplicationId appId) throws IOException { + long start = clock.getTime(); LOG.debug("removeApplication: appId={}", appId); - try { WriteBatch batch = db.createWriteBatch(); try { @@ -796,6 +842,7 @@ public void removeApplication(ApplicationId appId) markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addRemoveApplicationStateDuration(clock.getTime() - start); } @@ -845,10 +892,12 @@ private Entry getNextRecoveredPrivateLocalizatio @Override public RecoveredLocalizationState loadLocalizationState() throws IOException { + long start = clock.getTime(); RecoveredLocalizationState state = new RecoveredLocalizationState(); state.publicTrackerState = loadResourceTrackerState( LOCALIZATION_PUBLIC_KEY_PREFIX); state.it = new UserResourcesIterator(); + opDurations.addLoadLocalizationStateDuration(clock.getTime() - start); return state; } @@ -996,6 +1045,7 @@ private RecoveredUserResources loadUserLocalizedResources( @Override public void startResourceLocalization(String user, ApplicationId appId, LocalResourceProto proto, Path localPath) throws IOException { + long start = clock.getTime(); String key = getResourceStartedKey(user, appId, localPath.toString()); try { db.put(bytes(key), proto.toByteArray()); @@ -1003,11 +1053,14 @@ public void startResourceLocalization(String user, ApplicationId appId, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreStartResourceLocalizationDuration( + clock.getTime() - start); } @Override public void finishResourceLocalization(String user, ApplicationId appId, LocalizedResourceProto proto) throws IOException { + long start = clock.getTime(); String localPath = proto.getLocalPath(); String startedKey = getResourceStartedKey(user, appId, localPath); String completedKey = getResourceCompletedKey(user, appId, localPath); @@ -1025,11 +1078,14 @@ public void finishResourceLocalization(String user, ApplicationId appId, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreFinishResourceLocalizationDuration( + clock.getTime() - start); } @Override public void removeLocalizedResource(String user, ApplicationId appId, Path localPath) throws IOException { + long start = clock.getTime(); String localPathStr = localPath.toString(); String startedKey = getResourceStartedKey(user, appId, localPathStr); String completedKey = getResourceCompletedKey(user, appId, localPathStr); @@ -1047,6 +1103,8 @@ public void removeLocalizedResource(String user, ApplicationId appId, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addRemoveLocalizedResourceStateDuration( + clock.getTime() - start); } private String getResourceStartedKey(String user, ApplicationId appId, @@ -1110,14 +1168,18 @@ private DeletionServiceDeleteTaskProto getNextRecoveredDeletionService( @Override public RecoveredDeletionServiceState loadDeletionServiceState() throws IOException { + long start = clock.getTime(); RecoveredDeletionServiceState state = new RecoveredDeletionServiceState(); state.it = new DeletionStateIterator(); + opDurations.addLoadDeletionServiceStateDuration( + clock.getTime() - start); return state; } @Override public void storeDeletionTask(int taskId, DeletionServiceDeleteTaskProto taskProto) throws IOException { + long start = clock.getTime(); String key = DELETION_TASK_KEY_PREFIX + taskId; try { db.put(bytes(key), taskProto.toByteArray()); @@ -1125,10 +1187,12 @@ public void storeDeletionTask(int taskId, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addsStoreDeletionTaskStateDuration(clock.getTime() - start); } @Override public void removeDeletionTask(int taskId) throws IOException { + long start = clock.getTime(); String key = DELETION_TASK_KEY_PREFIX + taskId; try { db.delete(bytes(key)); @@ -1136,6 +1200,7 @@ public void removeDeletionTask(int taskId) throws IOException { markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addRemoveDeletionTaskStateDuration(clock.getTime() - start); } private MasterKey getMasterKey(String dbKey) throws IOException { @@ -1196,36 +1261,48 @@ private Entry getNextMasterKeyEntry( @Override public RecoveredNMTokensState loadNMTokensState() throws IOException { + long start = clock.getTime(); RecoveredNMTokensState state = new RecoveredNMTokensState(); state.currentMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX); state.previousMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX); state.it = new NMTokensStateIterator(); + opDurations.addLoadNMTokensStateDuration(clock.getTime() - start); return state; } @Override public void storeNMTokenCurrentMasterKey(MasterKey key) throws IOException { + long start = clock.getTime(); storeMasterKey(NM_TOKENS_CURRENT_MASTER_KEY, key); + opDurations.addStoreNMTokenCurrentMasterKeyDuration( + clock.getTime() - start); } @Override public void storeNMTokenPreviousMasterKey(MasterKey key) throws IOException { + long start = clock.getTime(); storeMasterKey(NM_TOKENS_PREV_MASTER_KEY, key); + opDurations.addStoreNMTokenPreviousMasterKeyDuration( + clock.getTime() - start); } @Override public void storeNMTokenApplicationMasterKey( ApplicationAttemptId attempt, MasterKey key) throws IOException { + long start = clock.getTime(); storeMasterKey(NM_TOKENS_KEY_PREFIX + attempt, key); + opDurations.addStoreNMTokenApplicationMasterKeyDuration( + clock.getTime() - start); } @Override public void removeNMTokenApplicationMasterKey( ApplicationAttemptId attempt) throws IOException { + long start = clock.getTime(); String key = NM_TOKENS_KEY_PREFIX + attempt; try { db.delete(bytes(key)); @@ -1233,6 +1310,8 @@ public void removeNMTokenApplicationMasterKey( markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addRemoveNMTokenApplicationMasterKeyDuration( + clock.getTime() - start); } private MasterKey parseMasterKey(byte[] keyData) throws IOException { @@ -1303,30 +1382,39 @@ private static Entry loadContainerToken(String key, @Override public RecoveredContainerTokensState loadContainerTokensState() throws IOException { + long start = clock.getTime(); RecoveredContainerTokensState state = new RecoveredContainerTokensState(); state.currentMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX); state.previousMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX); state.it = new ContainerTokensStateIterator(); + opDurations.addLoadContainerTokensStateDuration(clock.getTime() - start); return state; } @Override public void storeContainerTokenCurrentMasterKey(MasterKey key) throws IOException { + long start = clock.getTime(); storeMasterKey(CONTAINER_TOKEN_SECRETMANAGER_CURRENT_MASTER_KEY, key); + opDurations.addStoreContainerTokenCurrentMasterKeyDuration( + clock.getTime() - start); } @Override public void storeContainerTokenPreviousMasterKey(MasterKey key) throws IOException { + long start = clock.getTime(); storeMasterKey(CONTAINER_TOKEN_SECRETMANAGER_PREV_MASTER_KEY, key); + opDurations.addStoreContainerTokenPreviousMasterKeyDuration( + clock.getTime() - start); } @Override public void storeContainerToken(ContainerId containerId, Long expTime) throws IOException { + long start = clock.getTime(); String key = CONTAINER_TOKENS_KEY_PREFIX + containerId; try { db.put(bytes(key), bytes(expTime.toString())); @@ -1334,11 +1422,13 @@ public void storeContainerToken(ContainerId containerId, Long expTime) markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreContainerTokenStateDuration(clock.getTime() - start); } @Override public void removeContainerToken(ContainerId containerId) throws IOException { + long start = clock.getTime(); String key = CONTAINER_TOKENS_KEY_PREFIX + containerId; try { db.delete(bytes(key)); @@ -1346,11 +1436,13 @@ public void removeContainerToken(ContainerId containerId) markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addRemoveContainerTokenDuration(clock.getTime() - start); } @Override public RecoveredLogDeleterState loadLogDeleterState() throws IOException { + long start = clock.getTime(); RecoveredLogDeleterState state = new RecoveredLogDeleterState(); state.logDeleterMap = new HashMap(); LeveldbIterator iter = null; @@ -1384,12 +1476,14 @@ public RecoveredLogDeleterState loadLogDeleterState() throws IOException { iter.close(); } } + opDurations.addLoadLogDeleterStateDuration(clock.getTime() - start); return state; } @Override public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto) throws IOException { + long start = clock.getTime(); String key = getLogDeleterKey(appId); try { db.put(bytes(key), proto.toByteArray()); @@ -1397,10 +1491,12 @@ public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto) markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreLogDeleterStateDuration(clock.getTime() - start); } @Override public void removeLogDeleter(ApplicationId appId) throws IOException { + long start = clock.getTime(); String key = getLogDeleterKey(appId); try { db.delete(bytes(key)); @@ -1408,12 +1504,14 @@ public void removeLogDeleter(ApplicationId appId) throws IOException { markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addRemoveLogDeleterStateDuration(clock.getTime() - start); } @Override public void storeAssignedResources(Container container, String resourceType, List assignedResources) throws IOException { + long start = clock.getTime(); if (LOG.isDebugEnabled()) { LOG.debug( "storeAssignedResources: containerId=" + container.getContainerId() @@ -1437,7 +1535,7 @@ public void storeAssignedResources(Container container, markStoreUnHealthy(e); throw new IOException(e); } - + opDurations.addStoreAssignedResourcesDuration(clock.getTime() - start); // update container resource mapping. updateContainerResourceMapping(container, resourceType, assignedResources); } @@ -1492,6 +1590,7 @@ private String getLogDeleterKey(ApplicationId appId) { @Override public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException { + long start = clock.getTime(); RecoveredAMRMProxyState result = new RecoveredAMRMProxyState(); Set unknownKeys = new HashSet<>(); LeveldbIterator iter = null; @@ -1562,7 +1661,7 @@ public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException { } catch (DBException e) { throw new IOException(e); } - + opDurations.addLoadAMRMProxyStateDuration(clock.getTime() - start); return result; } @@ -1585,11 +1684,15 @@ private Map loadAMRMProxyAppContextMap(LeveldbIterator iter, @Override public void storeAMRMProxyCurrentMasterKey(MasterKey key) throws IOException { + long start = clock.getTime(); storeMasterKey(AMRMPROXY_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX, key); + opDurations.addStoreAMRMProxyCurrentMasterKeyDuration( + clock.getTime() - start); } @Override public void storeAMRMProxyNextMasterKey(MasterKey key) throws IOException { + long start = clock.getTime(); String dbkey = AMRMPROXY_KEY_PREFIX + NEXT_MASTER_KEY_SUFFIX; if (key == null) { // When key is null, delete the entry instead @@ -1602,11 +1705,14 @@ public void storeAMRMProxyNextMasterKey(MasterKey key) throws IOException { return; } storeMasterKey(dbkey, key); + opDurations.addStoreAMRMProxyNextMasterKeyDuration( + clock.getTime() - start); } @Override public void storeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, String key, byte[] data) throws IOException { + long start = clock.getTime(); String fullkey = AMRMPROXY_KEY_PREFIX + attempt + "/" + key; try { db.put(bytes(fullkey), data); @@ -1614,11 +1720,14 @@ public void storeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addStoreAMRMProxyAppContextEntryDuration( + clock.getTime() - start); } @Override public void removeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, String key) throws IOException { + long start = clock.getTime(); String fullkey = AMRMPROXY_KEY_PREFIX + attempt + "/" + key; try { db.delete(bytes(fullkey)); @@ -1626,11 +1735,14 @@ public void removeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addRemoveAMRMProxyAppContextEntryDuration( + clock.getTime() - start); } @Override public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) throws IOException { + long start = clock.getTime(); Set candidates = new HashSet<>(); String keyPrefix = AMRMPROXY_KEY_PREFIX + attempt + "/"; LeveldbIterator iter = null; @@ -1664,6 +1776,8 @@ public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) markStoreUnHealthy(e); throw new IOException(e); } + opDurations.addRemoveAMRMProxyAppContextDuration( + clock.getTime() - start); } @Override