Skip to content

Commit

Permalink
HDDS-212. Introduce NodeStateManager to manage the state of Datanodes…
Browse files Browse the repository at this point in the history
… in SCM. Contributed by Nanda kumar.
  • Loading branch information
nandakumar131 committed Jul 4, 2018
1 parent 3b63715 commit 71df8c2
Show file tree
Hide file tree
Showing 31 changed files with 1,288 additions and 918 deletions.
Expand Up @@ -37,7 +37,6 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.IOException; import java.io.IOException;
import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;


Expand Down Expand Up @@ -234,14 +233,14 @@ public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
/** /**
* Returns a set of Nodes that meet a query criteria. * Returns a set of Nodes that meet a query criteria.
* *
* @param nodeStatuses - A set of criteria that we want the node to have. * @param nodeStatuses - Criteria that we want the node to have.
* @param queryScope - Query scope - Cluster or pool. * @param queryScope - Query scope - Cluster or pool.
* @param poolName - if it is pool, a pool name is required. * @param poolName - if it is pool, a pool name is required.
* @return A set of nodes that meet the requested criteria. * @return A set of nodes that meet the requested criteria.
* @throws IOException * @throws IOException
*/ */
@Override @Override
public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> public List<HddsProtos.Node> queryNode(HddsProtos.NodeState
nodeStatuses, HddsProtos.QueryScope queryScope, String poolName) nodeStatuses, HddsProtos.QueryScope queryScope, String poolName)
throws IOException { throws IOException {
return storageContainerLocationClient.queryNode(nodeStatuses, queryScope, return storageContainerLocationClient.queryNode(nodeStatuses, queryScope,
Expand Down Expand Up @@ -458,7 +457,8 @@ public void closeContainer(long containerId)
*/ */
@Override @Override
public long getContainerSize(long containerID) throws IOException { public long getContainerSize(long containerID) throws IOException {
// TODO : Fix this, it currently returns the capacity but not the current usage. // TODO : Fix this, it currently returns the capacity
// but not the current usage.
long size = getContainerSizeB(); long size = getContainerSizeB();
if (size == -1) { if (size == -1) {
throw new IOException("Container size unknown!"); throw new IOException("Container size unknown!");
Expand Down
Expand Up @@ -35,7 +35,7 @@
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public final class DatanodeDetails implements Comparable<DatanodeDetails> { public class DatanodeDetails implements Comparable<DatanodeDetails> {


/** /**
* DataNode's unique identifier in the cluster. * DataNode's unique identifier in the cluster.
Expand Down Expand Up @@ -63,6 +63,13 @@ private DatanodeDetails(String uuid, String ipAddress, String hostName,
this.ports = ports; this.ports = ports;
} }


protected DatanodeDetails(DatanodeDetails datanodeDetails) {
this.uuid = datanodeDetails.uuid;
this.ipAddress = datanodeDetails.ipAddress;
this.hostName = datanodeDetails.hostName;
this.ports = datanodeDetails.ports;
}

/** /**
* Returns the DataNode UUID. * Returns the DataNode UUID.
* *
Expand Down Expand Up @@ -238,7 +245,7 @@ public static Builder newBuilder() {
/** /**
* Builder class for building DatanodeDetails. * Builder class for building DatanodeDetails.
*/ */
public static class Builder { public static final class Builder {
private String id; private String id;
private String ipAddress; private String ipAddress;
private String hostName; private String hostName;
Expand Down Expand Up @@ -324,7 +331,7 @@ public static Port newPort(Port.Name name, Integer value) {
/** /**
* Container to hold DataNode Port details. * Container to hold DataNode Port details.
*/ */
public static class Port { public static final class Port {


/** /**
* Ports that are supported in DataNode. * Ports that are supported in DataNode.
Expand Down
Expand Up @@ -165,10 +165,6 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_DEADNODE_INTERVAL_DEFAULT = public static final String OZONE_SCM_DEADNODE_INTERVAL_DEFAULT =
"10m"; "10m";


public static final String OZONE_SCM_MAX_HB_COUNT_TO_PROCESS =
"ozone.scm.max.hb.count.to.process";
public static final int OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT = 5000;

public static final String OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL = public static final String OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL =
"ozone.scm.heartbeat.thread.interval"; "ozone.scm.heartbeat.thread.interval";
public static final String OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_DEFAULT = public static final String OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_DEFAULT =
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;


import java.io.IOException; import java.io.IOException;
import java.util.EnumSet;
import java.util.List; import java.util.List;


/** /**
Expand Down Expand Up @@ -150,13 +149,13 @@ ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,


/** /**
* Returns a set of Nodes that meet a query criteria. * Returns a set of Nodes that meet a query criteria.
* @param nodeStatuses - A set of criteria that we want the node to have. * @param nodeStatuses - Criteria that we want the node to have.
* @param queryScope - Query scope - Cluster or pool. * @param queryScope - Query scope - Cluster or pool.
* @param poolName - if it is pool, a pool name is required. * @param poolName - if it is pool, a pool name is required.
* @return A set of nodes that meet the requested criteria. * @return A set of nodes that meet the requested criteria.
* @throws IOException * @throws IOException
*/ */
HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> nodeStatuses, List<HddsProtos.Node> queryNode(HddsProtos.NodeState nodeStatuses,
HddsProtos.QueryScope queryScope, String poolName) throws IOException; HddsProtos.QueryScope queryScope, String poolName) throws IOException;


/** /**
Expand Down
Expand Up @@ -26,7 +26,6 @@
.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;


import java.io.IOException; import java.io.IOException;
import java.util.EnumSet;
import java.util.List; import java.util.List;


/** /**
Expand Down Expand Up @@ -94,10 +93,10 @@ List<ContainerInfo> listContainer(long startContainerID, int count)


/** /**
* Queries a list of Node Statuses. * Queries a list of Node Statuses.
* @param nodeStatuses * @param state
* @return List of Datanodes. * @return List of Datanodes.
*/ */
HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> nodeStatuses, List<HddsProtos.Node> queryNode(HddsProtos.NodeState state,
HddsProtos.QueryScope queryScope, String poolName) throws IOException; HddsProtos.QueryScope queryScope, String poolName) throws IOException;


/** /**
Expand Down
Expand Up @@ -59,7 +59,6 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List; import java.util.List;


/** /**
Expand Down Expand Up @@ -215,20 +214,19 @@ public void deleteContainer(long containerID)
* @return List of Datanodes. * @return List of Datanodes.
*/ */
@Override @Override
public HddsProtos.NodePool queryNode(EnumSet<HddsProtos.NodeState> public List<HddsProtos.Node> queryNode(HddsProtos.NodeState
nodeStatuses, HddsProtos.QueryScope queryScope, String poolName) nodeStatuses, HddsProtos.QueryScope queryScope, String poolName)
throws IOException { throws IOException {
// TODO : We support only cluster wide query right now. So ignoring checking // TODO : We support only cluster wide query right now. So ignoring checking
// queryScope and poolName // queryScope and poolName
Preconditions.checkNotNull(nodeStatuses); Preconditions.checkNotNull(nodeStatuses);
Preconditions.checkState(nodeStatuses.size() > 0);
NodeQueryRequestProto request = NodeQueryRequestProto.newBuilder() NodeQueryRequestProto request = NodeQueryRequestProto.newBuilder()
.addAllQuery(nodeStatuses) .setState(nodeStatuses)
.setScope(queryScope).setPoolName(poolName).build(); .setScope(queryScope).setPoolName(poolName).build();
try { try {
NodeQueryResponseProto response = NodeQueryResponseProto response =
rpcProxy.queryNode(NULL_RPC_CONTROLLER, request); rpcProxy.queryNode(NULL_RPC_CONTROLLER, request);
return response.getDatanodes(); return response.getDatanodesList();
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
Expand Down
Expand Up @@ -57,7 +57,6 @@
.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; .StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;


import java.io.IOException; import java.io.IOException;
import java.util.EnumSet;
import java.util.List; import java.util.List;


/** /**
Expand Down Expand Up @@ -171,13 +170,12 @@ public SCMDeleteContainerResponseProto deleteContainer(
StorageContainerLocationProtocolProtos.NodeQueryRequestProto request) StorageContainerLocationProtocolProtos.NodeQueryRequestProto request)
throws ServiceException { throws ServiceException {
try { try {
EnumSet<HddsProtos.NodeState> nodeStateEnumSet = EnumSet.copyOf(request HddsProtos.NodeState nodeState = request.getState();
.getQueryList()); List<HddsProtos.Node> datanodes = impl.queryNode(nodeState,
HddsProtos.NodePool datanodes = impl.queryNode(nodeStateEnumSet,
request.getScope(), request.getPoolName()); request.getScope(), request.getPoolName());
return StorageContainerLocationProtocolProtos return StorageContainerLocationProtocolProtos
.NodeQueryResponseProto.newBuilder() .NodeQueryResponseProto.newBuilder()
.setDatanodes(datanodes) .addAllDatanodes(datanodes)
.build(); .build();
} catch (Exception e) { } catch (Exception e) {
throw new ServiceException(e); throw new ServiceException(e);
Expand Down
Expand Up @@ -118,26 +118,13 @@ message ObjectStageChangeResponseProto {
match the NodeState that we are requesting. match the NodeState that we are requesting.
*/ */
message NodeQueryRequestProto { message NodeQueryRequestProto {

required NodeState state = 1;

// Repeated, So we can specify more than one status type.
// These NodeState types are additive for now, in the sense that
// if you specify HEALTHY and FREE_NODE members --
// Then you get all healthy node which are not raft members.
//
// if you specify all healthy and dead nodes, you will get nothing
// back. Server is not going to dictate what combinations make sense,
// it is entirely up to the caller.
// TODO: Support operators like OR and NOT. Currently it is always an
// implied AND.

repeated NodeState query = 1;
required QueryScope scope = 2; required QueryScope scope = 2;
optional string poolName = 3; // if scope is pool, then pool name is needed. optional string poolName = 3; // if scope is pool, then pool name is needed.
} }


message NodeQueryResponseProto { message NodeQueryResponseProto {
required NodePool datanodes = 1; repeated Node datanodes = 1;
} }


/** /**
Expand Down Expand Up @@ -194,7 +181,7 @@ service StorageContainerLocationProtocolService {
/** /**
* Returns a set of Nodes that meet a criteria. * Returns a set of Nodes that meet a criteria.
*/ */
rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto); rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto);


/** /**
* Notify from client when begin or finish container or pipeline operations on datanodes. * Notify from client when begin or finish container or pipeline operations on datanodes.
Expand Down
13 changes: 5 additions & 8 deletions hadoop-hdds/common/src/main/proto/hdds.proto
Expand Up @@ -69,14 +69,11 @@ enum NodeType {
* and getNodeCount. * and getNodeCount.
*/ */
enum NodeState { enum NodeState {
HEALTHY = 1; HEALTHY = 1;
STALE = 2; STALE = 2;
DEAD = 3; DEAD = 3;
DECOMMISSIONING = 4; DECOMMISSIONING = 4;
DECOMMISSIONED = 5; DECOMMISSIONED = 5;
RAFT_MEMBER = 6;
FREE_NODE = 7; // Not a member in raft.
INVALID = 8;
} }


enum QueryScope { enum QueryScope {
Expand Down
11 changes: 0 additions & 11 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Expand Up @@ -773,17 +773,6 @@
The keytab file for Kerberos authentication in SCM. The keytab file for Kerberos authentication in SCM.
</description> </description>
</property> </property>
<property>
<name>ozone.scm.max.hb.count.to.process</name>
<value>5000</value>
<tag>OZONE, MANAGEMENT, PERFORMANCE</tag>
<description>
The maximum number of heartbeat to process per loop of the
heartbeat process thread. Please see
ozone.scm.heartbeat.thread.interval
for more info.
</description>
</property>
<property> <property>
<name>ozone.scm.names</name> <name>ozone.scm.names</name>
<value/> <value/>
Expand Down
Expand Up @@ -258,17 +258,6 @@ public static long getDeadNodeInterval(Configuration conf) {
return deadNodeIntervalMs; return deadNodeIntervalMs;
} }


/**
* Returns the maximum number of heartbeat to process per loop of the process
* thread.
* @param conf Configuration
* @return - int -- Number of HBs to process
*/
public static int getMaxHBToProcessPerLoop(Configuration conf) {
return conf.getInt(ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS,
ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT);
}

/** /**
* Timeout value for the RPC from Datanode to SCM, primarily used for * Timeout value for the RPC from Datanode to SCM, primarily used for
* Heartbeats and container reports. * Heartbeats and container reports.
Expand Down
Expand Up @@ -59,10 +59,8 @@ RegisteredCommand register(DatanodeDetails datanodeDetails,
/** /**
* Send heartbeat to indicate the datanode is alive and doing well. * Send heartbeat to indicate the datanode is alive and doing well.
* @param datanodeDetails - Datanode ID. * @param datanodeDetails - Datanode ID.
* @param nodeReport - node report.
* @return SCMheartbeat response list * @return SCMheartbeat response list
*/ */
List<SCMCommand> sendHeartbeat(DatanodeDetails datanodeDetails, List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails);
NodeReportProto nodeReport);


} }

0 comments on commit 71df8c2

Please sign in to comment.