New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ozone:SCM: Add support for registerNode in datanode #151
Conversation
} | ||
} | ||
return Optional.absent(); | ||
} | ||
|
||
/** | ||
* Returns a hostname from the hostname:port or hostname. | ||
* @param value | ||
* @return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc for the pos parameter is missing. Suggest document pos (0/1) usage or use a Enum like HostComponentType for safer usage as we don't have the validation of pos inside getHostComponent().
pos(0) -> host
pos(1) -> port
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. Thanks for the comment. I have replaced the parsing code with HostAndPort class. So this code has becomes simpler and I have updated the Javadocs. So we don't need the pos parameter any longer.
} | ||
|
||
public static Optional<String> getHostName(String value) { | ||
final int hostIndex = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a Enum as commented above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced this logic with HostAndPort class.
return Optional.of(Integer.parseInt(splits[1])); | ||
} | ||
} | ||
return getHostPort(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we handle multiple keys but only the last key has a valid value? The new logic will return Optional.absent() in the first loop instead of the last valid value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanx fixed. The intention is to return the first key with valid hostport, now it checks for the isPresent and returns the value if it is present or loops till the end of the key space.
@@ -205,23 +247,11 @@ public static InetSocketAddress getScmDataNodeBindAddress( | |||
* @throws IllegalArgumentException if any values are not in the 'host' | |||
* or host:port format. | |||
*/ | |||
static Optional<Integer> getPortNumberFromConfigKeys( | |||
public static Optional<Integer> getPortNumberFromConfigKeys( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add complete java docs for the parameters after make this public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
* @return - Rpc timeout in Milliseconds. | ||
*/ | ||
public static int getScmRpcTimeOutInMilliseconds(Configuration conf) { | ||
return conf.getInt(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_MS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest we use Configuration#getTimeDuration() instead of Configuration#getInt() for durations. This will reduce the documentation cost and chance of mis-configuration in deployment. An sample usage can be found in datanode for DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
Runtime.getRuntime().availableProcessors() * 2; | ||
|
||
public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_MS = | ||
"ozone.scm.heartbeat.rpc-timeout.ms"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest using time duration property with suffix and remove the .ms from the configuration key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
10; | ||
|
||
public static final String OZONE_CONTAINER_TASK_WAIT = | ||
"ozone.container.task.wait.seconds"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest using time duration property with suffix and remove the .seconds from the configuration key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This key has been removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean remove the .seconds from the key name.
|
||
|
||
// ozone.scm.names key is a set of DNS | DNS:PORT | IP Address | IP:PORT. | ||
// Written as a comma separated string. e.g. scm1, scm2:8020, 7.7.7.7:7777 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have unit test for ozone.scm.names like "scm1, scm2:8020, 7.7.7.7:7777"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will post another patch this negative test cases . I have filed HDFS-11137 to track this issue.
public static final String OZONE_SCM_NAMES = "ozone.scm.names"; | ||
|
||
public static final int OZONE_SCM_DEFAULT_PORT = 9862; | ||
// Full path the location where datanode ID is to written to. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Maybe we should say "file name" instead of "Full path" here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
new ThreadFactoryBuilder().setDaemon(true) | ||
.setNameFormat("Container State Machine Thread - %d").build()); | ||
connectionManager = new SCMConnectionManager( | ||
OzoneClientUtils.getScmRpcTimeOutInMilliseconds(conf), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OzoneClientUtils.getScmRpcTimeOutInMilliseconds(conf) can be encapsulated inside SCMConnectionManager constructor as we have passed the conf parameter anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
} | ||
} catch (InterruptedException e) { | ||
executorService.shutdownNow(); | ||
Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a Log.error() after the Thread.currentThread.interrupt() for troubleshooting of shutdown issues in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
static final Logger | ||
LOG = LoggerFactory.getLogger(EndpointStateMachine.class); | ||
private final StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint; | ||
private final AtomicInteger missedCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AtomicLong instead of AtomicInteger?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
} | ||
|
||
/** | ||
* Returns the next logical state that endPoint should move to. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document the same assumption that the state is transited by value + 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
private final Configuration conf; | ||
|
||
|
||
public SCMConnectionManager(int rpcTimeout, Configuration conf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rpcTimeout parameter can be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
EndpointStateMachine endPoint = | ||
new EndpointStateMachine(address, rpcClient, conf); | ||
scmMachines.put(address, endPoint); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: extra blank line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
* if you need to execute a task without any concurrent execution, please | ||
* return a single task in this list. | ||
* | ||
* @return List of Callables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we return a List here. Can you clarify here as well as "Returns a lit of tasks that..;"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
|
||
Container has the following states. | ||
|
||
Start - > getVersion -> Register -> Running +-> Upgrade -> Shutdown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update the states document here without upgrade/decommission state since they are not implemented yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
import java.util.concurrent.TimeoutException; | ||
|
||
/** | ||
* Init Container Task is the task that gets run when we are in Init State. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadoc needs update? init container task-> init datanode state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
OzoneConfigKeys.OZONE_SCM_DATANODE_ID); | ||
|
||
// This is an unrecoverable error. | ||
this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we return null after mark DatanodeStates.SHUTDOWN instead of continue attempt to read the container ID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
try { | ||
nodeID = readPersistedDatanodeID(Paths.get(dataNodeIDPath)); | ||
if (nodeID != null) { | ||
return nodeID; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a LOG.trace() for containerID read from file successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
// info to SCM. | ||
try { | ||
nodeID = createNewContainerID(Paths.get(dataNodeIDPath)); | ||
return nodeID; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a trace for ContainerID created?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
* Computes the next state the container state machine must move to by looking | ||
* at all the state of endpoints. | ||
* <p> | ||
* if any endpoint state has moved to Register state, then the Container State |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"if any endpoint state has moved to Register state,...", this seems not being enforced in the code below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
return new VersionEndpointTask(endpoint, conf); | ||
case REGISTER: | ||
RegisterEndpointTask task = new RegisterEndpointTask(endpoint, conf); | ||
task.setContainerNodeIDProto(getContainerNodeID()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: maybe use a builder pattern to avoid creating RegisterEndpointTask without ContainerNodeID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
* | ||
* @return ContainerNodeIDProto | ||
*/ | ||
public ContainerNodeIDProto getContainerNodeIDProto() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Builder pattern as suggested earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* @return ContainerNodeIDProto | ||
*/ | ||
public ContainerNodeIDProto getContainerNodeIDProto() { | ||
return containerNodeIDProto; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Builder pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
if (getContainerNodeIDProto() == null) { | ||
LOG.error("Container ID proto cannot be null in RegisterEndpoint task, " + | ||
"shutting down the endpoint."); | ||
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Consider returning the new state from rpcEndPoint.setState so that we can consolidate logic here?
rpcEndpoint.zeroMissedCount(); | ||
} catch (IOException ex) { | ||
rpcEndpoint.logIfNeeded(ex, | ||
OzoneClientUtils.getScmHeartbeatInterval(this.conf)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second parameter can be removed as ScmHeartbeatInterval is available within rpcEndpoint.logIfNeeded() with RpcEndpoint#conf.
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageContainerDatanodeProtocolService; | ||
|
||
/** | ||
* Protocol used from an HDFS node to StorageContainerManager. This extends the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate the comment on "HDFS node"?
datanodeID); | ||
nodes.put(newDatanodeID.getDatanodeUuid(), newDatanodeID); | ||
|
||
nodes.put(datanodeID.getDatanodeUuid(), datanodeID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we handle the case when two datanodes attempt to register with the same datanodeID? I don't think we should allow both succeed without warning or error.
No description provided.