Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
Preconditions.checkArgument(
deletedBlockLog instanceof DeletedBlockLogImplV2);
((DeletedBlockLogImplV2) deletedBlockLog).onBecomeLeader();

scm.getScmDecommissionManager().onBecomeLeader();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdfs.DFSConfigKeys;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class NodeDecommissionManager {

private NodeManager nodeManager;
//private ContainerManagerV2 containerManager;
private SCMContext scmContext;
private EventPublisher eventQueue;
private ReplicationManager replicationManager;
private OzoneConfiguration conf;
Expand Down Expand Up @@ -171,11 +173,12 @@ private boolean validateDNPortMatch(int port, DatanodeDetails dn) {
}

public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm,
ContainerManagerV2 containerManager,
ContainerManagerV2 containerManager, SCMContext scmContext,
EventPublisher eventQueue, ReplicationManager rm) {
this.nodeManager = nm;
conf = config;
//this.containerManager = containerManager;
this.scmContext = scmContext;
this.eventQueue = eventQueue;
this.replicationManager = rm;

Expand Down Expand Up @@ -248,10 +251,15 @@ public synchronized List<DatanodeAdminError> decommissionNodes(
*/
public synchronized void continueAdminForNode(DatanodeDetails dn)
throws NodeNotFoundException {
if (!scmContext.isLeader()) {
LOG.info("follower SCM ignored continue admin for datanode {}", dn);
return;
}
NodeOperationalState opState = getNodeStatus(dn).getOperationalState();
if (opState == NodeOperationalState.DECOMMISSIONING
|| opState == NodeOperationalState.ENTERING_MAINTENANCE
|| opState == NodeOperationalState.IN_MAINTENANCE) {
LOG.info("Continue admin for datanode {}", dn);
monitor.startMonitoring(dn);
}
}
Expand Down Expand Up @@ -375,4 +383,20 @@ private NodeStatus getNodeStatus(DatanodeDetails dn)
return nodeManager.getNodeStatus(dn);
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add a test for this - ie ensure that nodes are added to the decommission workflow when the onBecomeLeader() event is fired.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* Called in SCMStateMachine#notifyLeaderChanged when current SCM becomes
* leader.
*/
public void onBecomeLeader() {
nodeManager.getAllNodes().forEach(datanodeDetails -> {
try {
continueAdminForNode(datanodeDetails);
} catch (NodeNotFoundException e) {
// Should not happen, as the node has just registered to call this event
// handler.
LOG.warn("NodeNotFound when adding the node to the decommissionManager",
e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,38 +413,56 @@ boolean opStateDiffers(DatanodeDetails dnDetails, NodeStatus nodeStatus) {
}

/**
* If the operational state or expiry reported in the datanode heartbeat do
* not match those store in SCM, queue a command to update the state persisted
* on the datanode. Additionally, ensure the datanodeDetails stored in SCM
* match those reported in the heartbeat.
* This method should only be called when processing the
* heartbeat, and for a registered node, the information stored in SCM is the
* source of truth.
* This method should only be called when processing the heartbeat.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a test for this new logic - Command fired if leader, but not fired if follower?

I checked in TestSCMNodeManager, and there is a test "testSetNodeOpStateAndCommandFired" which I earlier set to ignore as it became invalid when I developed decommission. I then forgot to go back and fix it.

You could use that tests as a starting point, and change it to call processHeartBeat with the correct DatanodeDetails to trigger a command.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hint. testSetNodeOpStateAndCommandFired is a good place to test the logic.

*
* On leader SCM, for a registered node, the information stored in SCM is
* the source of truth. If the operational state or expiry reported in the
* datanode heartbeat do not match those store in SCM, queue a command to
* update the state persisted on the datanode. Additionally, ensure the
* datanodeDetails stored in SCM match those reported in the heartbeat.
*
* On follower SCM, datanode notifies follower SCM its latest operational
* state or expiry via heartbeat. If the operational state or expiry
* reported in the datanode heartbeat do not match those stored in SCM,
* just update the state in follower SCM accordingly.
*
* @param reportedDn The DatanodeDetails taken from the node heartbeat.
* @throws NodeNotFoundException
*/
protected void updateDatanodeOpState(DatanodeDetails reportedDn)
throws NodeNotFoundException {
NodeStatus scmStatus = getNodeStatus(reportedDn);
if (opStateDiffers(reportedDn, scmStatus)) {
LOG.info("Scheduling a command to update the operationalState " +
"persisted on {} as the reported value does not " +
"match the value stored in SCM ({}, {})",
reportedDn,
scmStatus.getOperationalState(),
scmStatus.getOpStateExpiryEpochSeconds());
if (scmContext.isLeader()) {
LOG.info("Scheduling a command to update the operationalState " +
"persisted on {} as the reported value does not " +
"match the value stored in SCM ({}, {})",
reportedDn,
scmStatus.getOperationalState(),
scmStatus.getOpStateExpiryEpochSeconds());

try {
SCMCommand<?> command = new SetNodeOperationalStateCommand(
Time.monotonicNow(),
try {
SCMCommand<?> command = new SetNodeOperationalStateCommand(
Time.monotonicNow(),
scmStatus.getOperationalState(),
scmStatus.getOpStateExpiryEpochSeconds());
command.setTerm(scmContext.getTermOfLeader());
addDatanodeCommand(reportedDn.getUuid(), command);
} catch (NotLeaderException nle) {
LOG.warn("Skip sending SetNodeOperationalStateCommand,"
+ " since current SCM is not leader.", nle);
return;
}
} else {
LOG.info("Update the operationalState saved in follower SCM " +
"for {} as the reported value does not " +
"match the value stored in SCM ({}, {})",
reportedDn,
scmStatus.getOperationalState(),
scmStatus.getOpStateExpiryEpochSeconds());
command.setTerm(scmContext.getTermOfLeader());
addDatanodeCommand(reportedDn.getUuid(), command);
} catch (NotLeaderException nle) {
LOG.warn("Skip sending SetNodeOperationalStateCommand,"
+ " since current SCM is not leader.", nle);
return;

setNodeOperationalState(reportedDn, reportedDn.getPersistedOpState(),
reportedDn.getPersistedOpStateExpiryEpochSec());
}
}
DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ private void initializeSystemManagers(OzoneConfiguration conf,
pipelineManager, eventQueue, serviceManager, scmContext);
}
scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
containerManager, eventQueue, replicationManager);
containerManager, scmContext, eventQueue, replicationManager);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
Expand Down Expand Up @@ -58,8 +60,8 @@ public void setup() throws Exception {
TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
nodeManager = createNodeManager(conf);
decom = new NodeDecommissionManager(
conf, nodeManager, null, null, null);
decom = new NodeDecommissionManager(conf, nodeManager, null,
SCMContext.emptyContext(), new EventQueue(), null);
}

@Test
Expand Down Expand Up @@ -250,6 +252,34 @@ public void testNodesCannotTransitionFromDecomToMaint() throws Exception {
nodeManager.getNodeStatus(dns.get(2)).getOperationalState());
}

@Test
public void testNodeDecommissionManagerOnBecomeLeader() throws Exception {
List<DatanodeDetails> dns = generateDatanodes();

long maintenanceEnd =
(System.currentTimeMillis() / 1000L) + (100 * 60L * 60L);

// Put 1 node into entering_maintenance, 1 node into decommissioning
// and 1 node into in_maintenance.
nodeManager.setNodeOperationalState(dns.get(1),
HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, maintenanceEnd);
nodeManager.setNodeOperationalState(dns.get(2),
HddsProtos.NodeOperationalState.DECOMMISSIONING, 0);
nodeManager.setNodeOperationalState(dns.get(3),
HddsProtos.NodeOperationalState.IN_MAINTENANCE, maintenanceEnd);

// trackedNodes should be empty now.
assertEquals(decom.getMonitor().getTrackedNodes().size(), 0);

// all nodes with decommissioning, entering_maintenance and in_maintenance
// should be added to trackedNodes
decom.onBecomeLeader();
decom.getMonitor().run();

// so size of trackedNodes will be 3.
assertEquals(decom.getMonitor().getTrackedNodes().size(), 3);
}

private SCMNodeManager createNodeManager(OzoneConfiguration config)
throws IOException, AuthenticationException {
scm = TestUtils.getScm(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,13 @@ public void testScmSanityOfUserConfig2()
}

/**
* Ensure that a change to the operationalState of a node fires a datanode
* event of type SetNodeOperationalStateCommand.
* For leader SCM, ensure that a change to the operationalState of a node
* fires a SCMCommand of type SetNodeOperationalStateCommand.
*
* For follower SCM, no SetNodeOperationalStateCommand should be fired, yet
* operationalState of the node will be updated according to the heartbeat.
*/
@Test
@Ignore // TODO - this test is no longer valid as the heartbeat processing
// now generates the command message.
public void testSetNodeOpStateAndCommandFired()
throws IOException, NodeNotFoundException, AuthenticationException {
final int interval = 100;
Expand All @@ -299,11 +300,27 @@ public void testSetNodeOpStateAndCommandFired()
long expiry = System.currentTimeMillis() / 1000 + 1000;
nodeManager.setNodeOperationalState(dn,
HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, expiry);
List<SCMCommand> commands = nodeManager.getCommandQueue(dn.getUuid());

// If found mismatch, leader SCM fires a SetNodeOperationalStateCommand
// to update the opState persisted in Datanode.
scm.getScmContext().updateLeaderAndTerm(true, 1);
List<SCMCommand> commands = nodeManager.processHeartbeat(dn);

Assert.assertTrue(commands.get(0).getClass().equals(
SetNodeOperationalStateCommand.class));
assertEquals(1, commands.size());

// If found mismatch, follower SCM update its own opState according
// to the heartbeat, and no SCMCommand will be fired.
scm.getScmContext().updateLeaderAndTerm(false, 2);
commands = nodeManager.processHeartbeat(dn);

assertEquals(0, commands.size());

NodeStatus scmStatus = nodeManager.getNodeStatus(dn);
assertTrue(scmStatus.getOperationalState() == dn.getPersistedOpState()
&& scmStatus.getOpStateExpiryEpochSeconds()
== dn.getPersistedOpStateExpiryEpochSec());
}
}

Expand Down