Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,20 @@ RegisteredCommand register(DatanodeDetails datanodeDetails,
* TODO: Cleanup and update tests, HDDS-9642.
*
* @param datanodeDetails - Datanode ID.
* @param layoutVersionInfo - Layout Version Proto.
* @return Commands to be sent to the datanode.
*/
default List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
LayoutVersionProto layoutVersionInfo) {
return processHeartbeat(datanodeDetails, layoutVersionInfo, null);
default List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
return processHeartbeat(datanodeDetails, null);
};

/**
* Send heartbeat to indicate the datanode is alive and doing well.
* @param datanodeDetails - Datanode ID.
* @param layoutVersionInfo - Layout Version Proto.
* @param queueReport - The CommandQueueReportProto report from the
* heartbeating datanode.
* @return Commands to be sent to the datanode.
*/
List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
LayoutVersionProto layoutVersionInfo,
CommandQueueReportProto queueReport);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,19 +511,15 @@ private boolean updateDnsToUuidMap(
* Send heartbeat to indicate the datanode is alive and doing well.
*
* @param datanodeDetails - DatanodeDetailsProto.
* @param layoutInfo - Layout Version Proto.
* @return SCMheartbeat response.
*/
@Override
public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
LayoutVersionProto layoutInfo,
CommandQueueReportProto queueReport) {
CommandQueueReportProto queueReport) {
Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " +
"DatanodeDetails.");
try {
nodeStateManager.updateLastHeartbeatTime(datanodeDetails);
nodeStateManager.updateLastKnownLayoutVersion(datanodeDetails,
layoutInfo);
metrics.incNumHBProcessed();
updateDatanodeOpState(datanodeDetails);
} catch (NodeNotFoundException e) {
Expand Down Expand Up @@ -686,6 +682,15 @@ public void processLayoutVersionReport(DatanodeDetails datanodeDetails,
layoutVersionReport.toString().replaceAll("\n", "\\\\n"));
}

try {
nodeStateManager.updateLastKnownLayoutVersion(datanodeDetails,
layoutVersionReport);
} catch (NodeNotFoundException e) {
LOG.error("SCM trying to process Layout Version from an " +
"unregistered node {}.", datanodeDetails);
return;
}

// Software layout version is hardcoded to the SCM.
int scmSlv = scmLayoutVersionManager.getSoftwareLayoutVersion();
int dnSlv = layoutVersionReport.getSoftwareLayoutVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) {
commandQueueReport = heartbeat.getCommandQueueReport();
}
// should we dispatch heartbeat through eventPublisher?
commands = nodeManager.processHeartbeat(datanodeDetails,
layoutVersion, commandQueueReport);
commands = nodeManager.processHeartbeat(datanodeDetails, commandQueueReport);
if (heartbeat.hasNodeReport()) {
LOG.debug("Dispatching Node Report.");
eventPublisher.fireEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,13 +772,11 @@ private synchronized void addEntryTodnsToUuidMap(
* Send heartbeat to indicate the datanode is alive and doing well.
*
* @param datanodeDetails - Datanode ID.
* @param layoutInfo - DataNode Layout info
* @param commandQueueReportProto - Command Queue Report Proto
* @return SCMheartbeat response list
*/
@Override
public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
LayoutVersionProto layoutInfo,
CommandQueueReportProto commandQueueReportProto) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,6 @@ public RegisteredCommand register(DatanodeDetails datanodeDetails,

@Override
public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
LayoutVersionProto layoutInfo,
CommandQueueReportProto commandQueueReportProto) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
Expand All @@ -35,7 +34,6 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
Expand Down Expand Up @@ -67,7 +65,6 @@
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
import org.apache.hadoop.test.PathUtils;
import org.apache.commons.io.IOUtils;
import org.apache.ozone.test.GenericTestUtils;
Expand All @@ -79,7 +76,6 @@
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
import static org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.toLayoutVersionProto;
import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -182,7 +178,7 @@ ContainerManager createContainerManager()
*/
@Test
public void testContainerPlacementCapacity() throws IOException,
InterruptedException, TimeoutException {
InterruptedException {
final int nodeCount = 4;
final long capacity = 10L * OzoneConsts.GB;
final long used = 2L * OzoneConsts.GB;
Expand All @@ -201,11 +197,6 @@ public void testContainerPlacementCapacity() throws IOException,
List<DatanodeDetails> datanodes = HddsTestUtils
.getListOfRegisteredDatanodeDetails(scmNodeManager, nodeCount);
XceiverClientManager xceiverClientManager = null;
LayoutVersionManager versionManager =
scmNodeManager.getLayoutVersionManager();
LayoutVersionProto layoutInfo =
toLayoutVersionProto(versionManager.getMetadataLayoutVersion(),
versionManager.getSoftwareLayoutVersion());
try {
for (DatanodeDetails datanodeDetails : datanodes) {
UUID dnId = datanodeDetails.getUuid();
Expand All @@ -221,7 +212,7 @@ public void testContainerPlacementCapacity() throws IOException,
Arrays.asList(report), emptyList());
datanodeInfo.updateStorageReports(
nodeReportProto.getStorageReportList());
scmNodeManager.processHeartbeat(datanodeDetails, layoutInfo);
scmNodeManager.processHeartbeat(datanodeDetails);
}

//TODO: wait for heartbeat to be processed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.ArrayList;

import static java.util.Collections.singletonList;
import static org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.defaultLayoutVersionProto;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand Down Expand Up @@ -165,7 +164,7 @@ public void testNodesCanBeDecommissionedAndRecommissioned()

// Attempt to decommission on dn(9) which has another instance at
// dn(11) with identical ports.
nodeManager.processHeartbeat(dns.get(9), defaultLayoutVersionProto());
nodeManager.processHeartbeat(dns.get(9));
DatanodeDetails duplicatePorts = dns.get(9);
decom.decommissionNodes(singletonList(duplicatePorts.getIpAddress()));
assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONING,
Expand Down Expand Up @@ -237,7 +236,7 @@ public void testNodesCanBeDecommissionedAndRecommissionedMixedPorts()

// Now decommission one of the DNs with the duplicate port
DatanodeDetails expectedDN = dns.get(9);
nodeManager.processHeartbeat(expectedDN, defaultLayoutVersionProto());
nodeManager.processHeartbeat(expectedDN);

decom.decommissionNodes(singletonList(
expectedDN.getIpAddress() + ":" + ratisPort));
Expand Down Expand Up @@ -287,7 +286,7 @@ public void testNodesCanBePutIntoMaintenanceAndRecommissioned()

// Attempt to enable maintenance on dn(9) which has another instance at
// dn(11) with identical ports.
nodeManager.processHeartbeat(dns.get(9), defaultLayoutVersionProto());
nodeManager.processHeartbeat(dns.get(9));
DatanodeDetails duplicatePorts = dns.get(9);
decom.startMaintenanceNodes(singletonList(duplicatePorts.getIpAddress()),
100);
Expand Down
Loading