diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java index e6aee0de6a6..2afd3c53db3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java @@ -64,6 +64,7 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler { private final PlacementPolicy containerPlacement; private final long currentContainerSize; private final ReplicationManager replicationManager; + private final ReplicationManagerMetrics metrics; ECUnderReplicationHandler(final PlacementPolicy containerPlacement, final ConfigurationSource conf, ReplicationManager replicationManager) { @@ -72,6 +73,7 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler { .getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); this.replicationManager = replicationManager; + this.metrics = replicationManager.getMetrics(); } private boolean validatePlacement(List replicaNodes, @@ -281,6 +283,7 @@ private int processMissingIndexes( (ECReplicationConfig)container.getReplicationConfig(); List missingIndexes = replicaCount.unavailableIndexes(true); final int expectedTargetCount = missingIndexes.size(); + boolean recoveryIsCritical = expectedTargetCount == repConfig.getParity(); if (expectedTargetCount == 0) { return 0; } @@ -307,7 +310,7 @@ private int processMissingIndexes( // selection allows partial recovery 0 < targetCount && targetCount < expectedTargetCount && // recovery is not yet critical - expectedTargetCount < repConfig.getParity()) { + !recoveryIsCritical) { // check if placement exists when overloaded nodes are not excluded final List targetsMaybeOverloaded = getTargetDatanodes( @@ -319,7 +322,7 @@ private int processMissingIndexes( + " target nodes to be fully reconstructed, but {} selected" + " nodes are currently overloaded.", container.getContainerID(), expectedTargetCount, overloadedCount); - + metrics.incrECPartialReconstructionSkippedTotal(); throw new InsufficientDatanodesException(expectedTargetCount, targetCount); } @@ -369,6 +372,11 @@ private int processMissingIndexes( LOG.debug("Insufficient nodes were returned from the placement policy" + " to fully reconstruct container {}. Requested {} received {}", container.getContainerID(), expectedTargetCount, targetCount); + if (hasOverloaded && recoveryIsCritical) { + metrics.incrECPartialReconstructionCriticalTotal(); + } else { + metrics.incrEcPartialReconstructionNoneOverloadedTotal(); + } throw new InsufficientDatanodesException(expectedTargetCount, targetCount); } @@ -454,6 +462,7 @@ private int processDecommissioningIndexes( " to fully replicate the decommission indexes for container {}." + " Requested {} received {}", container.getContainerID(), decomIndexes.size(), selectedDatanodes.size()); + metrics.incrEcPartialReplicationForOutOfServiceReplicasTotal(); throw new InsufficientDatanodesException(decomIndexes.size(), selectedDatanodes.size()); } @@ -538,6 +547,7 @@ private int processMaintenanceOnlyIndexes( " to fully replicate the maintenance indexes for container {}." + " Requested {} received {}", container.getContainerID(), maintIndexes.size(), targets.size()); + metrics.incrEcPartialReplicationForOutOfServiceReplicasTotal(); throw new InsufficientDatanodesException(maintIndexes.size(), targets.size()); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java index d41f80d6041..6dcec13c505 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java @@ -56,6 +56,7 @@ public abstract class MisReplicationHandler implements private final PlacementPolicy containerPlacement; private final long currentContainerSize; private final ReplicationManager replicationManager; + private final ReplicationManagerMetrics metrics; public MisReplicationHandler( final PlacementPolicy containerPlacement, @@ -65,6 +66,7 @@ public MisReplicationHandler( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); this.replicationManager = replicationManager; + this.metrics = replicationManager.getMetrics(); } protected ReplicationManager getReplicationManager() { @@ -164,6 +166,11 @@ public int processAndSendCommands( int found = targetDatanodes.size(); if (found < requiredNodes) { + if (container.getReplicationType() == HddsProtos.ReplicationType.EC) { + metrics.incrEcPartialReplicationForMisReplicationTotal(); + } else { + metrics.incrPartialReplicationForMisReplicationTotal(); + } LOG.warn("Placement Policy {} found only {} nodes for Container: {}," + " number of required nodes: {}, usedNodes : {}", containerPlacement.getClass(), found, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java index ae14c20f477..0e4c4b1d428 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java @@ -55,6 +55,7 @@ public class RatisUnderReplicationHandler private final PlacementPolicy placementPolicy; private final long currentContainerSize; private final ReplicationManager replicationManager; + private final ReplicationManagerMetrics metrics; public RatisUnderReplicationHandler(final PlacementPolicy placementPolicy, final ConfigurationSource conf, @@ -64,6 +65,7 @@ public RatisUnderReplicationHandler(final PlacementPolicy placementPolicy, .getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); this.replicationManager = replicationManager; + this.metrics = replicationManager.getMetrics(); } /** @@ -128,6 +130,7 @@ public int processAndSendCommands( "additional replicas needed: {}", containerInfo, targetDatanodes.size(), replicaCount.additionalReplicaNeeded()); + metrics.incrPartialReplicationTotal(); throw new InsufficientDatanodesException( replicaCount.additionalReplicaNeeded(), targetDatanodes.size()); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java index 219f74ccf05..5c3ee4e29ae 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java @@ -155,6 +155,10 @@ public final class ReplicationManagerMetrics implements MetricsSource { " due to the configured limit.") private MutableCounterLong inflightDeletionSkippedTotal; + @Metric("Number of times under replication processing has paused due to" + + " reaching the cluster inflight replication limit.") + private MutableCounterLong pendingReplicationLimitReachedTotal; + private MetricsRegistry registry; private final ReplicationManager replicationManager; @@ -183,6 +187,34 @@ public final class ReplicationManagerMetrics implements MetricsSource { @Metric("Number of EC replicas scheduled for delete which timed out.") private MutableCounterLong ecReplicaDeleteTimeoutTotal; + @Metric("Number of times partial EC reconstruction was needed due to " + + "overloaded nodes, but skipped as there was still sufficient redundancy.") + private MutableCounterLong ecPartialReconstructionSkippedTotal; + + @Metric("Number of times partial EC reconstruction was used due to " + + "insufficient nodes available and reconstruction was critical.") + private MutableCounterLong ecPartialReconstructionCriticalTotal; + + @Metric("Number of times partial EC reconstruction was used due to " + + "insufficient nodes available and with no overloaded nodes.") + private MutableCounterLong ecPartialReconstructionNoneOverloadedTotal; + + @Metric("Number of times EC decommissioning or entering maintenance mode " + + "replicas were not all replicated due to insufficient nodes available.") + private MutableCounterLong ecPartialReplicationForOutOfServiceReplicasTotal; + + @Metric("Number of times partial Ratis replication occurred due to " + + "insufficient nodes available.") + private MutableCounterLong partialReplicationTotal; + + @Metric("Number of times partial replication occurred to fix a " + + "mis-replicated ratis container due to insufficient nodes available.") + private MutableCounterLong partialReplicationForMisReplicationTotal; + + @Metric("Number of times partial replication occurred to fix a " + + "mis-replicated EC container due to insufficient nodes available.") + private MutableCounterLong ecPartialReplicationForMisReplicationTotal; + @Metric("NUmber of Reconstruct EC Container commands that could not be sent " + "due to the pending commands on the target datanode") private MutableCounterLong ecReconstructionCmdsDeferredTotal; @@ -272,6 +304,14 @@ public void getMetrics(MetricsCollector collector, boolean all) { ecReconstructionCmdsDeferredTotal.snapshot(builder, all); deleteContainerCmdsDeferredTotal.snapshot(builder, all); replicateContainerCmdsDeferredTotal.snapshot(builder, all); + pendingReplicationLimitReachedTotal.snapshot(builder, all); + ecPartialReconstructionSkippedTotal.snapshot(builder, all); + ecPartialReconstructionCriticalTotal.snapshot(builder, all); + ecPartialReconstructionNoneOverloadedTotal.snapshot(builder, all); + ecPartialReplicationForOutOfServiceReplicasTotal.snapshot(builder, all); + partialReplicationTotal.snapshot(builder, all); + ecPartialReplicationForMisReplicationTotal.snapshot(builder, all); + partialReplicationForMisReplicationTotal.snapshot(builder, all); } public void unRegister() { @@ -505,4 +545,68 @@ public long getReplicateContainerCmdsDeferredTotal() { return replicateContainerCmdsDeferredTotal.value(); } + public void incrPendingReplicationLimitReachedTotal() { + this.pendingReplicationLimitReachedTotal.incr(); + } + + public long getPendingReplicationLimitReachedTotal() { + return pendingReplicationLimitReachedTotal.value(); + } + + public long getECPartialReconstructionSkippedTotal() { + return ecPartialReconstructionSkippedTotal.value(); + } + + public void incrECPartialReconstructionSkippedTotal() { + this.ecPartialReconstructionSkippedTotal.incr(); + } + + public long getECPartialReconstructionCriticalTotal() { + return ecPartialReconstructionCriticalTotal.value(); + } + + public void incrECPartialReconstructionCriticalTotal() { + this.ecPartialReconstructionCriticalTotal.incr(); + } + + public long getEcPartialReconstructionNoneOverloadedTotal() { + return ecPartialReconstructionNoneOverloadedTotal.value(); + } + + public void incrEcPartialReconstructionNoneOverloadedTotal() { + this.ecPartialReconstructionNoneOverloadedTotal.incr(); + } + + public long getEcPartialReplicationForOutOfServiceReplicasTotal() { + return ecPartialReplicationForOutOfServiceReplicasTotal.value(); + } + + public void incrEcPartialReplicationForOutOfServiceReplicasTotal() { + this.ecPartialReplicationForOutOfServiceReplicasTotal.incr(); + } + + public long getPartialReplicationTotal() { + return partialReplicationTotal.value(); + } + + public void incrPartialReplicationTotal() { + this.partialReplicationTotal.incr(); + } + + public void incrEcPartialReplicationForMisReplicationTotal() { + this.ecPartialReplicationForMisReplicationTotal.incr(); + } + + public long getEcPartialReplicationForMisReplicationTotal() { + return this.ecPartialReplicationForMisReplicationTotal.value(); + } + + public void incrPartialReplicationForMisReplicationTotal() { + this.partialReplicationForMisReplicationTotal.incr(); + } + + public long getPartialReplicationForMisReplicationTotal() { + return this.partialReplicationForMisReplicationTotal.value(); + } + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java index 1f04edd2ee8..13ed8f87e30 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java @@ -103,6 +103,8 @@ public void processAll(ReplicationQueue queue) { inflightOperationLimitReached(replicationManager, inflightLimit)) { LOG.info("The maximum number of pending replicas ({}) are scheduled. " + "Ending the iteration.", inflightLimit); + replicationManager + .getMetrics().incrPendingReplicationLimitReachedTotal(); break; } HealthResult healthResult = diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java index c68b1cb8c53..5022d9ca5cc 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java @@ -45,6 +45,7 @@ import static java.util.Collections.singletonList; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -216,6 +217,8 @@ public void commandsForFewerThanRequiredNodes() throws IOException { assertThrows(InsufficientDatanodesException.class, () -> testMisReplication(availableReplicas, Collections.emptyList(), 0, 2, 1)); + assertEquals(1, + getMetrics().getEcPartialReplicationForMisReplicationTotal()); } @Override diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java index 6fc6b1c0eb5..be95bcee605 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java @@ -110,6 +110,7 @@ public class TestECUnderReplicationHandler { private ContainerInfo container; private NodeManager nodeManager; private ReplicationManager replicationManager; + private ReplicationManagerMetrics metrics; private OzoneConfiguration conf; private PlacementPolicy policy; private static final int DATA = 3; @@ -137,6 +138,8 @@ public NodeStatus getNodeStatus(DatanodeDetails dd) { new ReplicationManager.ReplicationManagerConfiguration(); when(replicationManager.getConfig()) .thenReturn(rmConf); + metrics = ReplicationManagerMetrics.create(replicationManager); + when(replicationManager.getMetrics()).thenReturn(metrics); when(replicationManager.getNodeStatus(any(DatanodeDetails.class))) .thenAnswer(invocation -> { @@ -200,6 +203,7 @@ void defersNonCriticalPartialReconstruction(String rep) throws IOException { assertEquals(e.getRequiredNodes() - excluded.size(), e.getAvailableNodes()); verify(replicationManager, never()) .sendThrottledReconstructionCommand(any(), any()); + assertEquals(1, metrics.getECPartialReconstructionSkippedTotal()); } private static UnderReplicatedHealthResult mockUnderReplicated( @@ -258,6 +262,7 @@ void performsCriticalPartialReconstruction(String rep) throws IOException { assertEquals(e.getRequiredNodes() - excluded.size(), e.getAvailableNodes()); verify(replicationManager, times(1)) .sendThrottledReconstructionCommand(any(), any()); + assertEquals(1, metrics.getECPartialReconstructionCriticalTotal()); } @Test @@ -711,6 +716,7 @@ public void testPartialReconstructionIfNotEnoughNodes() { ReconstructECContainersCommand cmd = (ReconstructECContainersCommand) commandsSent.iterator().next().getValue(); assertEquals(1, cmd.getTargetDatanodes().size()); + assertEquals(1, metrics.getEcPartialReconstructionNoneOverloadedTotal()); } @Test @@ -759,6 +765,8 @@ public void testPartialDecommissionIfNotEnoughNodes() { SCMCommand cmd = commandsSent.iterator().next().getValue(); assertEquals( SCMCommandProto.Type.replicateContainerCommand, cmd.getType()); + assertEquals(1, + metrics.getEcPartialReplicationForOutOfServiceReplicasTotal()); } @Test @@ -782,6 +790,11 @@ public void testPartialDecommissionOverloadedNodes() { SCMCommand cmd = commandsSent.iterator().next().getValue(); assertEquals( SCMCommandProto.Type.replicateContainerCommand, cmd.getType()); + // The partial recovery here is due to overloaded nodes, not insufficient + // nodes. The "deferred" metric should be updated when all sources are + // overloaded. + assertEquals(0, + metrics.getEcPartialReplicationForOutOfServiceReplicasTotal()); } @Test @@ -807,6 +820,8 @@ public void testPartialMaintenanceIfNotEnoughNodes() { SCMCommand cmd = commandsSent.iterator().next().getValue(); assertEquals( SCMCommandProto.Type.replicateContainerCommand, cmd.getType()); + assertEquals(1, + metrics.getEcPartialReplicationForOutOfServiceReplicasTotal()); } @Test @@ -831,6 +846,11 @@ public void testPartialMaintenanceOverloadedNodes() { SCMCommand cmd = commandsSent.iterator().next().getValue(); assertEquals( SCMCommandProto.Type.replicateContainerCommand, cmd.getType()); + // The partial recovery here is due to overloaded nodes, not insufficient + // nodes. The "deferred" metric should be updated when all sources are + // overloaded. + assertEquals(0, + metrics.getEcPartialReplicationForOutOfServiceReplicasTotal()); } @Test diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java index eb5f068e46c..9c2874740f0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestMisReplicationHandler.java @@ -72,6 +72,7 @@ public abstract class TestMisReplicationHandler { private Set>> commandsSent; private final AtomicBoolean throwThrottledException = new AtomicBoolean(false); + private ReplicationManagerMetrics metrics; protected void setup(ReplicationConfig repConfig) throws NodeNotFoundException, CommandTargetOverloadedException, @@ -89,6 +90,8 @@ protected void setup(ReplicationConfig repConfig) conf.getObject(ReplicationManagerConfiguration.class); Mockito.when(replicationManager.getConfig()) .thenReturn(rmConf); + metrics = ReplicationManagerMetrics.create(replicationManager); + Mockito.when(replicationManager.getMetrics()).thenReturn(metrics); commandsSent = new HashSet<>(); ReplicationTestUtil.mockRMSendDatanodeCommand( @@ -107,6 +110,10 @@ protected ReplicationManager getReplicationManager() { return replicationManager; } + protected ReplicationManagerMetrics getMetrics() { + return metrics; + } + protected void setThrowThrottledException(boolean showThrow) { throwThrottledException.set(showThrow); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java index fe0caf88106..e2cdea09ccf 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java @@ -76,6 +76,7 @@ public class TestRatisUnderReplicationHandler { private PlacementPolicy policy; private ReplicationManager replicationManager; private Set>> commandsSent; + private ReplicationManagerMetrics metrics; @Before public void setup() throws NodeNotFoundException, @@ -93,6 +94,8 @@ public void setup() throws NodeNotFoundException, Mockito.when(replicationManager.getConfig()) .thenReturn(ozoneConfiguration.getObject( ReplicationManagerConfiguration.class)); + metrics = ReplicationManagerMetrics.create(replicationManager); + Mockito.when(replicationManager.getMetrics()).thenReturn(metrics); /* Return NodeStatus with NodeOperationalState as specified in @@ -220,6 +223,7 @@ public void testNoTargetsFoundBecauseOfPlacementPolicy() { Assert.assertThrows(IOException.class, () -> handler.processAndSendCommands(replicas, Collections.emptyList(), getUnderReplicatedHealthResult(), 2)); + Assert.assertEquals(0, metrics.getPartialReplicationTotal()); } @Test @@ -239,6 +243,7 @@ public void testInsufficientTargetsFoundBecauseOfPlacementPolicy() { // One command should be sent to the replication manager as we could only // fine one node rather than two. Assert.assertEquals(1, commandsSent.size()); + Assert.assertEquals(1, metrics.getPartialReplicationTotal()); } @Test diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java index b604ae3e2be..1b01482be49 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java @@ -44,6 +44,7 @@ public class TestUnderReplicatedProcessor { private ECReplicationConfig repConfig; private UnderReplicatedProcessor underReplicatedProcessor; private ReplicationQueue queue; + private ReplicationManagerMetrics rmMetrics; @Before public void setup() { @@ -57,8 +58,7 @@ public void setup() { repConfig = new ECReplicationConfig(3, 2); Mockito.when(replicationManager.shouldRun()).thenReturn(true); Mockito.when(replicationManager.getConfig()).thenReturn(rmConf); - ReplicationManagerMetrics rmMetrics = - ReplicationManagerMetrics.create(replicationManager); + rmMetrics = ReplicationManagerMetrics.create(replicationManager); Mockito.when(replicationManager.getMetrics()) .thenReturn(rmMetrics); Mockito.when(replicationManager.getReplicationInFlightLimit()) @@ -132,6 +132,7 @@ public void testMessageNotProcessedIfGlobalLimitReached() throws IOException { // We should have processed the message now Mockito.verify(replicationManager, Mockito.times(1)) .processUnderReplicatedContainer(any()); + assertEquals(1, rmMetrics.getPendingReplicationLimitReachedTotal()); } }