Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,25 @@ private void handleUnderReplicatedContainer(final ContainerInfo container,
} else {
LOG.warn("Cannot replicate container {}, no healthy replica found.",
container.containerID());
if (rmConf.isContainerUnrecoverableCleanupEnabled()) {
replicas.stream().filter(rp -> rp.getState().equals(State.UNHEALTHY))
.forEach(rp -> {
sendDeleteCommand(container, rp.getDatanodeDetails(), true);
LOG.info("Send deleteCmd to force delete the unhealthy " +
"replica {}", rp);
});
// manually reset these as the container is unhealthy and won't update
// through {@link updateRatisContainerStats}.
container.setNumberOfKeys(0);
container.setUsedBytes(0);
try {
containerManager.updateContainerState(container.containerID(),
HddsProtos.LifeCycleEvent.DELETE);
} catch (InvalidStateTransitionException | TimeoutException ex) {
LOG.error("Exception while updating container {} state to DELETE," +
" current state is {}", container, container.getState());
}
}
}
} catch (IOException | IllegalStateException ex) {
LOG.warn("Exception while replicating container {}.",
Expand Down Expand Up @@ -1764,6 +1783,19 @@ public void setEventTimeout(Duration timeout) {
)
private int containerInflightDeletionLimit = 0;

@Config(key = "container.unrecoverable.cleanup.enabled",
type = ConfigType.BOOLEAN,
defaultValue = "false",
tags = {SCM, OZONE},
description = "This property is used to clean up " +
"closed unrecoverable container."
)
private boolean containerUnrecoverableCleanupEnabled = false;

public void setContainerUnrecoverableCleanupEnabled(boolean enabled) {
this.containerUnrecoverableCleanupEnabled = enabled;
}

public void setContainerInflightReplicationLimit(int replicationLimit) {
this.containerInflightReplicationLimit = replicationLimit;
}
Expand All @@ -1784,6 +1816,10 @@ public int getContainerInflightDeletionLimit() {
return containerInflightDeletionLimit;
}

public boolean isContainerUnrecoverableCleanupEnabled() {
return containerUnrecoverableCleanupEnabled;
}

public long getInterval() {
return interval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.CONTAINER_NUM_KEYS_DEFAULT;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.CONTAINER_USED_BYTES_DEFAULT;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
Expand Down Expand Up @@ -138,7 +139,7 @@ int getInflightCount(InflightType type) {

@BeforeEach
public void setup()
throws IOException, InterruptedException,
throws IOException, InterruptedException, TimeoutException,
NodeNotFoundException, InvalidStateTransitionException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(
Expand Down Expand Up @@ -193,6 +194,16 @@ public void setup()
.getContainerReplicas(((ContainerID)invocation
.getArguments()[0])));

Mockito.doAnswer(invocation -> {
containerStateManager
.updateContainerState(((ContainerID)invocation
.getArguments()[0]).getProtobuf(),
(HddsProtos.LifeCycleEvent)invocation.getArguments()[1]);
return null;
}).when(containerManager).updateContainerState(
Mockito.any(ContainerID.class),
Mockito.any(HddsProtos.LifeCycleEvent.class));

containerPlacementPolicy = Mockito.mock(PlacementPolicy.class);

Mockito.when(containerPlacementPolicy.chooseDatanodes(
Expand Down Expand Up @@ -241,6 +252,8 @@ void createReplicationManager(ReplicationManagerConfiguration rmConf,
LegacyReplicationManager.ReplicationManagerConfiguration lrmConf)
throws InterruptedException, IOException {
OzoneConfiguration config = new OzoneConfiguration();
config.setBoolean(
"hdds.scm.replication.container.unrecoverable.cleanup.enabled", true);
testDir = GenericTestUtils
.getTestDir(TestContainerManagerImpl.class.getSimpleName());
config.set(HddsConfigKeys.OZONE_METADATA_DIRS,
Expand Down Expand Up @@ -472,7 +485,7 @@ public void testQuasiClosedContainerWithUnhealthyReplica()
id, replicaThree);

final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
.getInvocationCount(deleteContainerCommand);
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);

Expand All @@ -493,10 +506,9 @@ public void testQuasiClosedContainerWithUnhealthyReplica()
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 1,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
.getInvocationCount(deleteContainerCommand));
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaOne.getDatanodeDetails()));
deleteContainerCommand, replicaOne.getDatanodeDetails()));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());

Expand Down Expand Up @@ -590,13 +602,12 @@ public void testOverReplicatedQuasiClosedContainer()
containerStateManager.updateContainerReplica(id, replicaFour);

final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
.getInvocationCount(deleteContainerCommand);

replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 1,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
datanodeCommandHandler.getInvocationCount(deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Expand Down Expand Up @@ -679,16 +690,15 @@ public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
containerStateManager.updateContainerReplica(id, replicaFour);

final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
.getInvocationCount(deleteContainerCommand);

replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 1,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
.getInvocationCount(deleteContainerCommand));
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaOne.getDatanodeDetails()));
deleteContainerCommand, replicaOne.getDatanodeDetails()));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Expand Down Expand Up @@ -840,7 +850,7 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
.getInvocationCount(deleteContainerCommand);
final long currentBytesToDelete = replicationManager.getMetrics()
.getNumDeletionBytesTotal();

Expand Down Expand Up @@ -881,12 +891,10 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 1,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
datanodeCommandHandler.getInvocationCount(deleteContainerCommand));
// ReplicaTwo should be deleted, that is the unhealthy one
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaTwo.getDatanodeDetails()));
deleteContainerCommand, replicaTwo.getDatanodeDetails()));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertEquals(currentBytesToDelete + 99,
Expand Down Expand Up @@ -1043,6 +1051,49 @@ public void testUnhealthyOpenContainer()
ReplicationManagerReport.HealthState.OPEN_UNHEALTHY));
}

/**
* ReplicationManager should delete all unhealthy replicas if
* HDDS_CONTAINER_UNHEALTHY_CLEANUP_ENABLED.
*/
@Test
public void testContainerWithAllUnhealthyReplicas()
throws IOException, TimeoutException {

final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
final Set<ContainerReplica> replicas = getReplicas(id, State.UNHEALTHY,
randomDatanodeDetails(),
randomDatanodeDetails(),
randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
for (ContainerReplica replica : replicas) {
containerStateManager.updateContainerReplica(id, replica);
}

// For closed container with three unhealthy replicas, RM will delete
// one replica and try to recover it.
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(LifeCycleState.CLOSED, containerStateManager.
getContainer(container.containerID()).getState());
Assertions.assertEquals(1, datanodeCommandHandler.
getInvocationCount(deleteContainerCommand));

// mock the response of DN to remove one replica
containerStateManager.removeContainerReplica(id,
replicas.stream().findFirst().get());

// For closed container with less than three unhealthy replicas (still
// unrecoverable), if enables HDDS_CONTAINER_UNHEALTHY_CLEANUP_ENABLED,
// RM will delete the rest replicas and update container state.
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(LifeCycleState.DELETING,
containerStateManager.getContainer(container.containerID()).getState());
Assertions.assertEquals(3, datanodeCommandHandler.
getInvocationCount(deleteContainerCommand));
}

/**
* ReplicationManager should skip send close command to unhealthy replica.
*/
Expand Down Expand Up @@ -1197,22 +1248,20 @@ public void overReplicatedButRemovingMakesMisReplicated()
invocation -> new ContainerPlacementStatusDefault(1, 2, 3));

int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
.getInvocationCount(deleteContainerCommand);

replicationManager.processAll();
eventQueue.processAll(1000);
// The unhealthy replica should be removed, but not the other replica
// as each time we test with 3 replicas, Mockito ensures it returns
// mis-replicated
Assertions.assertEquals(currentDeleteCommandCount + 1,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
datanodeCommandHandler.getInvocationCount(deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());

Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaFive.getDatanodeDetails()));
deleteContainerCommand, replicaFive.getDatanodeDetails()));
Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
Expand Down Expand Up @@ -1248,13 +1297,12 @@ public void testOverReplicatedAndPolicySatisfied()
invocation -> new ContainerPlacementStatusDefault(2, 2, 3));

final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
.getInvocationCount(deleteContainerCommand);

replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 1,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
datanodeCommandHandler.getInvocationCount(deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Expand Down Expand Up @@ -1296,13 +1344,13 @@ public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted()
invocation -> new ContainerPlacementStatusDefault(1, 2, 3));

final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
.getInvocationCount(deleteContainerCommand);

replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 2,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
.getInvocationCount(deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 2,
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Expand Down Expand Up @@ -1523,13 +1571,12 @@ public void testOverReplicatedClosedContainerWithDecomAndMaint()
addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);

final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
.getInvocationCount(deleteContainerCommand);

replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 2,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
datanodeCommandHandler.getInvocationCount(deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 2,
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Expand All @@ -1545,8 +1592,7 @@ public void testOverReplicatedClosedContainerWithDecomAndMaint()
.collect(Collectors.toSet());
for (ContainerReplica r : decom) {
Assertions.assertFalse(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
r.getDatanodeDetails()));
deleteContainerCommand, r.getDatanodeDetails()));
}
assertOverReplicatedCount(1);
}
Expand Down Expand Up @@ -1601,9 +1647,9 @@ public void testMove() throws IOException, NodeNotFoundException,
eventQueue.processAll(1000);

Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails()));
deleteContainerCommand, dn1.getDatanodeDetails()));
Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.deleteContainerCommand));
deleteContainerCommand));
containerStateManager.removeContainerReplica(id, dn1);

replicationManager.processAll();
Expand Down Expand Up @@ -1653,7 +1699,7 @@ public void testMoveCrashAndRestart() throws IOException,
// now, the container is not over-replicated,
// so no deleteContainerCommand will be sent
Assertions.assertFalse(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails()));
deleteContainerCommand, dn1.getDatanodeDetails()));
//replica does not exist in target datanode, so a replicateContainerCommand
//will be sent again at notifyStatusChanged#onLeaderReadyAndOutOfSafeMode
Assertions.assertEquals(2, datanodeCommandHandler.getInvocationCount(
Expand All @@ -1667,7 +1713,7 @@ public void testMoveCrashAndRestart() throws IOException,

//deleteContainerCommand is sent, but the src replica is not deleted now
Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.deleteContainerCommand));
deleteContainerCommand));

//crash happens, restart scm.
//clear current inflight actions and reload inflightMove from DBStore.
Expand All @@ -1685,7 +1731,7 @@ public void testMoveCrashAndRestart() throws IOException,
//after restart and the container is over-replicated now,
//deleteContainerCommand will be sent again
Assertions.assertEquals(2, datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.deleteContainerCommand));
deleteContainerCommand));
containerStateManager.removeContainerReplica(id, dn1);

//replica in src datanode is deleted now
Expand Down Expand Up @@ -1741,7 +1787,7 @@ public void testMoveNotDeleteSrcIfPolicyNotSatisfied()
eventQueue.processAll(1000);

Assertions.assertFalse(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails()));
deleteContainerCommand, dn1.getDatanodeDetails()));

Assertions.assertTrue(cf.isDone() &&
cf.get() == MoveResult.DELETE_FAIL_POLICY);
Expand Down Expand Up @@ -2092,13 +2138,13 @@ private void assertReplicaScheduled(int delta) {

private void assertDeleteScheduled(int delta) {
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
.getInvocationCount(deleteContainerCommand);

replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + delta,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.deleteContainerCommand));
deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + delta,
replicationManager.getMetrics().getNumDeletionCmdsSent());
}
Expand Down