Skip to content

Commit

Permalink
HDDS-88. Create separate message structure to represent ports in Data…
Browse files Browse the repository at this point in the history
…nodeDetails.

Contributed by Nanda Kumar.
  • Loading branch information
anuengineer committed May 30, 2018
1 parent b24098b commit 3b34148
Show file tree
Hide file tree
Showing 30 changed files with 260 additions and 153 deletions.
Expand Up @@ -93,7 +93,7 @@ public void connect() throws Exception {


// read port from the data node, on failure use default configured // read port from the data node, on failure use default configured
// port. // port.
int port = leader.getContainerPort(); int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
if (port == 0) { if (port == 0) {
port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
Expand Down
Expand Up @@ -80,7 +80,7 @@ public void connect() throws Exception {


// read port from the data node, on failure use default configured // read port from the data node, on failure use default configured
// port. // port.
int port = leader.getContainerPort(); int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
if (port == 0) { if (port == 0) {
port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
Expand Down
Expand Up @@ -23,6 +23,8 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;


import java.util.ArrayList;
import java.util.List;
import java.util.UUID; import java.util.UUID;


/** /**
Expand All @@ -42,9 +44,7 @@ public final class DatanodeDetails implements Comparable<DatanodeDetails> {


private String ipAddress; private String ipAddress;
private String hostName; private String hostName;
private Integer containerPort; private List<Port> ports;
private Integer ratisPort;
private Integer ozoneRestPort;




/** /**
Expand All @@ -53,18 +53,14 @@ public final class DatanodeDetails implements Comparable<DatanodeDetails> {
* @param uuid DataNode's UUID * @param uuid DataNode's UUID
* @param ipAddress IP Address of this DataNode * @param ipAddress IP Address of this DataNode
* @param hostName DataNode's hostname * @param hostName DataNode's hostname
* @param containerPort Container Port * @param ports Ports used by the DataNode
* @param ratisPort Ratis Port
* @param ozoneRestPort Rest Port
*/ */
private DatanodeDetails(String uuid, String ipAddress, String hostName, private DatanodeDetails(String uuid, String ipAddress, String hostName,
Integer containerPort, Integer ratisPort, Integer ozoneRestPort) { List<Port> ports) {
this.uuid = UUID.fromString(uuid); this.uuid = UUID.fromString(uuid);
this.ipAddress = ipAddress; this.ipAddress = ipAddress;
this.hostName = hostName; this.hostName = hostName;
this.containerPort = containerPort; this.ports = ports;
this.ratisPort = ratisPort;
this.ozoneRestPort = ozoneRestPort;
} }


/** /**
Expand Down Expand Up @@ -122,54 +118,40 @@ public String getHostName() {
} }


/** /**
* Sets the Container Port. * Sets a DataNode Port.
* @param port ContainerPort
*/
public void setContainerPort(int port) {
containerPort = port;
}

/**
* Returns standalone container Port.
* *
* @return Container Port * @param port DataNode port
*/ */
public int getContainerPort() { public void setPort(Port port) {
return containerPort; // If the port is already in the list remove it first and add the
// new/updated port value.
ports.remove(port);
ports.add(port);
} }


/** /**
* Sets Ratis Port. * Returns all the Ports used by DataNode.
* @param port RatisPort *
*/ * @return DataNode Ports
public void setRatisPort(int port) {
ratisPort = port;
}


/**
* Returns Ratis Port.
* @return Ratis Port
*/
public int getRatisPort() {
return ratisPort;
}


/**
* Sets OzoneRestPort.
* @param port OzoneRestPort
*/ */
public void setOzoneRestPort(int port) { public List<Port> getPorts() {
ozoneRestPort = port; return ports;
} }


/** /**
* Returns Ozone Rest Port. * Given the name returns port number, null if the asked port is not found.
* @return OzoneRestPort *
* @param name Name of the port
*
* @return Port
*/ */
public int getOzoneRestPort() { public Port getPort(Port.Name name) {
return ozoneRestPort; for (Port port : ports) {
if (port.getName().equals(name)) {
return port;
}
}
return null;
} }


/** /**
Expand All @@ -188,14 +170,9 @@ public static DatanodeDetails getFromProtoBuf(
if (datanodeDetailsProto.hasHostName()) { if (datanodeDetailsProto.hasHostName()) {
builder.setHostName(datanodeDetailsProto.getHostName()); builder.setHostName(datanodeDetailsProto.getHostName());
} }
if (datanodeDetailsProto.hasContainerPort()) { for (HddsProtos.Port port : datanodeDetailsProto.getPortsList()) {
builder.setContainerPort(datanodeDetailsProto.getContainerPort()); builder.addPort(newPort(
} Port.Name.valueOf(port.getName().toUpperCase()), port.getValue()));
if (datanodeDetailsProto.hasRatisPort()) {
builder.setRatisPort(datanodeDetailsProto.getRatisPort());
}
if (datanodeDetailsProto.hasOzoneRestPort()) {
builder.setOzoneRestPort(datanodeDetailsProto.getOzoneRestPort());
} }
return builder.build(); return builder.build();
} }
Expand All @@ -214,14 +191,11 @@ public HddsProtos.DatanodeDetailsProto getProtoBufMessage() {
if (hostName != null) { if (hostName != null) {
builder.setHostName(hostName); builder.setHostName(hostName);
} }
if (containerPort != null) { for (Port port : ports) {
builder.setContainerPort(containerPort); builder.addPorts(HddsProtos.Port.newBuilder()
} .setName(port.getName().toString())
if (ratisPort != null) { .setValue(port.getValue())
builder.setRatisPort(ratisPort); .build());
}
if (ozoneRestPort != null) {
builder.setOzoneRestPort(ozoneRestPort);
} }
return builder.build(); return builder.build();
} }
Expand Down Expand Up @@ -268,9 +242,15 @@ public static class Builder {
private String id; private String id;
private String ipAddress; private String ipAddress;
private String hostName; private String hostName;
private Integer containerPort; private List<Port> ports;
private Integer ratisPort;
private Integer ozoneRestPort; /**
* Default private constructor. To create Builder instance use
* DatanodeDetails#newBuilder.
*/
private Builder() {
ports = new ArrayList<>();
}


/** /**
* Sets the DatanodeUuid. * Sets the DatanodeUuid.
Expand Down Expand Up @@ -304,50 +284,111 @@ public Builder setHostName(String host) {
this.hostName = host; this.hostName = host;
return this; return this;
} }

/** /**
* Sets the ContainerPort. * Adds a DataNode Port.
*
* @param port DataNode port
* *
* @param port ContainerPort
* @return DatanodeDetails.Builder * @return DatanodeDetails.Builder
*/ */
public Builder setContainerPort(Integer port) { public Builder addPort(Port port) {
this.containerPort = port; this.ports.add(port);
return this; return this;
} }


/** /**
* Sets the RatisPort. * Builds and returns DatanodeDetails instance.
* *
* @param port RatisPort * @return DatanodeDetails
* @return DatanodeDetails.Builder
*/ */
public Builder setRatisPort(Integer port) { public DatanodeDetails build() {
this.ratisPort = port; Preconditions.checkNotNull(id);
return this; return new DatanodeDetails(id, ipAddress, hostName, ports);
} }


}

/**
* Constructs a new Port with name and value.
*
* @param name Name of the port
* @param value Port number
*
* @return {@code Port} instance
*/
public static Port newPort(Port.Name name, Integer value) {
return new Port(name, value);
}

/**
* Container to hold DataNode Port details.
*/
public static class Port {

/**
* Ports that are supported in DataNode.
*/
public enum Name {
STANDALONE, RATIS, REST
}

private Name name;
private Integer value;

/** /**
* Sets the OzoneRestPort. * Private constructor for constructing Port object. Use
* DatanodeDetails#newPort to create a new Port object.
* *
* @param port OzoneRestPort * @param name
* @return DatanodeDetails.Builder * @param value
*/ */
public Builder setOzoneRestPort(Integer port) { private Port(Name name, Integer value) {
this.ozoneRestPort = port; this.name = name;
return this; this.value = value;
} }


/** /**
* Builds and returns DatanodeDetails instance. * Returns the name of the port.
* *
* @return DatanodeDetails * @return Port name
*/ */
public DatanodeDetails build() { public Name getName() {
Preconditions.checkNotNull(id); return name;
return new DatanodeDetails(id, ipAddress, hostName, containerPort, }
ratisPort, ozoneRestPort);
/**
* Returns the port number.
*
* @return Port number
*/
public Integer getValue() {
return value;
}

@Override
public int hashCode() {
return name.hashCode();
} }


/**
* Ports are considered equal if they have the same name.
*
* @param anObject
* The object to compare this {@code Port} against
* @return {@code true} if the given object represents a {@code Port}
and has the same name, {@code false} otherwise
*/
@Override
public boolean equals(Object anObject) {
if (this == anObject) {
return true;
}
if (anObject instanceof Port) {
return name.equals(((Port) anObject).name);
}
return false;
}
} }


} }
Expand Up @@ -48,11 +48,13 @@ public interface RatisHelper {
Logger LOG = LoggerFactory.getLogger(RatisHelper.class); Logger LOG = LoggerFactory.getLogger(RatisHelper.class);


static String toRaftPeerIdString(DatanodeDetails id) { static String toRaftPeerIdString(DatanodeDetails id) {
return id.getUuidString() + "_" + id.getRatisPort(); return id.getUuidString() + "_" +
id.getPort(DatanodeDetails.Port.Name.RATIS);
} }


static String toRaftPeerAddressString(DatanodeDetails id) { static String toRaftPeerAddressString(DatanodeDetails id) {
return id.getIpAddress() + ":" + id.getRatisPort(); return id.getIpAddress() + ":" +
id.getPort(DatanodeDetails.Port.Name.RATIS);
} }


static RaftPeerId toRaftPeerId(DatanodeDetails id) { static RaftPeerId toRaftPeerId(DatanodeDetails id) {
Expand Down
10 changes: 6 additions & 4 deletions hadoop-hdds/common/src/main/proto/hdds.proto
Expand Up @@ -29,13 +29,15 @@ option java_generate_equals_and_hash = true;
package hadoop.hdds; package hadoop.hdds;


message DatanodeDetailsProto { message DatanodeDetailsProto {
// TODO: make the port as a seperate proto message and use it here
required string uuid = 1; // UUID assigned to the Datanode. required string uuid = 1; // UUID assigned to the Datanode.
required string ipAddress = 2; // IP address required string ipAddress = 2; // IP address
required string hostName = 3; // hostname required string hostName = 3; // hostname
optional uint32 containerPort = 4 [default = 0]; // Ozone stand_alone protocol repeated Port ports = 4;
optional uint32 ratisPort = 5 [default = 0]; //Ozone ratis port }
optional uint32 ozoneRestPort = 6 [default = 0];
message Port {
required string name = 1;
required uint32 value = 2;
} }


message PipelineChannel { message PipelineChannel {
Expand Down
Expand Up @@ -80,7 +80,8 @@ public XceiverServer(DatanodeDetails datanodeDetails, Configuration conf,
+ "fallback to use default port {}", this.port, e); + "fallback to use default port {}", this.port, e);
} }
} }
datanodeDetails.setContainerPort(port); datanodeDetails.setPort(
DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));
this.storageContainer = dispatcher; this.storageContainer = dispatcher;
} }


Expand Down
Expand Up @@ -71,7 +71,8 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
+ "fallback to use default port {}", this.port, e); + "fallback to use default port {}", this.port, e);
} }
} }
datanodeDetails.setContainerPort(port); datanodeDetails.setPort(
DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));
server = ((NettyServerBuilder) ServerBuilder.forPort(port)) server = ((NettyServerBuilder) ServerBuilder.forPort(port))
.maxMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) .maxMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
.addService(new GrpcXceiverService(dispatcher)) .addService(new GrpcXceiverService(dispatcher))
Expand Down
Expand Up @@ -203,7 +203,8 @@ public static XceiverServerRatis newXceiverServerRatis(
+ "fallback to use default port {}", localPort, e); + "fallback to use default port {}", localPort, e);
} }
} }
datanodeDetails.setRatisPort(localPort); datanodeDetails.setPort(
DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
return new XceiverServerRatis(datanodeDetails, localPort, storageDir, return new XceiverServerRatis(datanodeDetails, localPort, storageDir,
dispatcher, ozoneConf); dispatcher, ozoneConf);
} }
Expand Down

0 comments on commit 3b34148

Please sign in to comment.