Skip to content

Commit

Permalink
HDDS-297. Add pipeline actions in Ozone. Contributed by Mukul Kumar S…
Browse files Browse the repository at this point in the history
…ingh and Shashikant Banerjee
  • Loading branch information
szetszwo committed Sep 6, 2018
1 parent fa2945e commit b3161c4
Show file tree
Hide file tree
Showing 38 changed files with 815 additions and 163 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -186,15 +186,17 @@ public ContainerProtos.ContainerCommandResponseProto sendCommand(


/** /**
* Create a pipeline. * Create a pipeline.
*
* @param ignored - pipeline to be created.
*/ */
@Override @Override
public void createPipeline(Pipeline ignored) public void createPipeline()
throws IOException { throws IOException {
// For stand alone pipeline, there is no notion called setup pipeline. // For stand alone pipeline, there is no notion called setup pipeline.
} }


public void destroyPipeline() {
// For stand alone pipeline, there is no notion called destroy pipeline.
}

/** /**
* Returns pipeline Type. * Returns pipeline Type.
* *
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -216,15 +216,16 @@ private void reconnect() throws IOException {


/** /**
* Create a pipeline. * Create a pipeline.
*
* @param ignored - pipeline to be created.
*/ */
@Override @Override
public void createPipeline(Pipeline ignored) public void createPipeline() {
throws IOException {
// For stand alone pipeline, there is no notion called setup pipeline. // For stand alone pipeline, there is no notion called setup pipeline.
} }


public void destroyPipeline() {
// For stand alone pipeline, there is no notion called destroy pipeline.
}

/** /**
* Returns pipeline Type. * Returns pipeline Type.
* *
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -88,13 +88,27 @@ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
public void createPipeline(Pipeline pipeline) public void createPipeline()
throws IOException { throws IOException {
RaftGroupId groupId = pipeline.getId().getRaftGroupID(); RaftGroupId groupId = pipeline.getId().getRaftGroupID();
RaftGroup group = RatisHelper.newRaftGroup(groupId, pipeline.getMachines()); RaftGroup group = RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
LOG.debug("initializing pipeline:{} with nodes:{}", LOG.debug("initializing pipeline:{} with nodes:{}",
pipeline.getId(), group.getPeers()); pipeline.getId(), group.getPeers());
reinitialize(pipeline.getMachines(), group); reinitialize(pipeline.getMachines(), RatisHelper.emptyRaftGroup(), group);
}

/**
* {@inheritDoc}
*/
public void destroyPipeline()
throws IOException {
RaftGroupId groupId = pipeline.getId().getRaftGroupID();
RaftGroup currentGroup =
RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
LOG.debug("destroying pipeline:{} with nodes:{}",
pipeline.getId(), currentGroup.getPeers());
reinitialize(pipeline.getMachines(), currentGroup,
RatisHelper.emptyRaftGroup());
} }


/** /**
Expand All @@ -107,16 +121,16 @@ public HddsProtos.ReplicationType getPipelineType() {
return HddsProtos.ReplicationType.RATIS; return HddsProtos.ReplicationType.RATIS;
} }


private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup group) private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup oldGroup,
throws IOException { RaftGroup newGroup) throws IOException {
if (datanodes.isEmpty()) { if (datanodes.isEmpty()) {
return; return;
} }


IOException exception = null; IOException exception = null;
for (DatanodeDetails d : datanodes) { for (DatanodeDetails d : datanodes) {
try { try {
reinitialize(d, group); reinitialize(d, oldGroup, newGroup);
} catch (IOException ioe) { } catch (IOException ioe) {
if (exception == null) { if (exception == null) {
exception = new IOException( exception = new IOException(
Expand All @@ -135,14 +149,18 @@ private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup group)
* Adds a new peers to the Ratis Ring. * Adds a new peers to the Ratis Ring.
* *
* @param datanode - new datanode * @param datanode - new datanode
* @param group - Raft group * @param oldGroup - previous Raft group
* @param newGroup - new Raft group
* @throws IOException - on Failure. * @throws IOException - on Failure.
*/ */
private void reinitialize(DatanodeDetails datanode, RaftGroup group) private void reinitialize(DatanodeDetails datanode, RaftGroup oldGroup,
RaftGroup newGroup)
throws IOException { throws IOException {
final RaftPeer p = RatisHelper.toRaftPeer(datanode); final RaftPeer p = RatisHelper.toRaftPeer(datanode);
try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) { try (RaftClient client = oldGroup == RatisHelper.emptyRaftGroup() ?
client.reinitialize(group, p.getId()); RatisHelper.newRaftClient(rpcType, p) :
RatisHelper.newRaftClient(rpcType, p, oldGroup)) {
client.reinitialize(newGroup, p.getId());
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ", LOG.error("Failed to reinitialize RaftPeer:{} datanode: {} ",
p, datanode, ioe); p, datanode, ioe);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
// ObjectStageChangeRequestProto.Op.create, // ObjectStageChangeRequestProto.Op.create,
// ObjectStageChangeRequestProto.Stage.begin); // ObjectStageChangeRequestProto.Stage.begin);


client.createPipeline(pipeline); client.createPipeline();


//storageContainerLocationClient.notifyObjectStageChange( //storageContainerLocationClient.notifyObjectStageChange(
// ObjectStageChangeRequestProto.Type.pipeline, // ObjectStageChangeRequestProto.Type.pipeline,
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ private HddsConfigKeys() {
public static final int HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT = public static final int HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT =
20; 20;


public static final String HDDS_PIPELINE_ACTION_MAX_LIMIT =
"hdds.pipeline.action.max.limit";
public static final int HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT =
20;

// Configuration to allow volume choosing policy. // Configuration to allow volume choosing policy.
public static final String HDDS_DATANODE_VOLUME_CHOOSING_POLICY = public static final String HDDS_DATANODE_VOLUME_CHOOSING_POLICY =
"hdds.datanode.volume.choosing.policy"; "hdds.datanode.volume.choosing.policy";
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeDuration;


import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -57,6 +58,10 @@ public final class ScmConfigKeys {
= "dfs.container.ratis.num.write.chunk.threads"; = "dfs.container.ratis.num.write.chunk.threads";
public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT
= 60; = 60;
public static final String DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY
= "dfs.container.ratis.replication.level";
public static final ReplicationLevel
DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT = ReplicationLevel.MAJORITY;
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY = public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
"dfs.container.ratis.segment.size"; "dfs.container.ratis.segment.size";
public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT = public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =
Expand All @@ -76,6 +81,12 @@ public final class ScmConfigKeys {
DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT = DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT =
TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);


public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
"dfs.ratis.server.failure.duration";
public static final TimeDuration
DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT =
TimeDuration.valueOf(120, TimeUnit.SECONDS);

// TODO : this is copied from OzoneConsts, may need to move to a better place // TODO : this is copied from OzoneConsts, may need to move to a better place
public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size"; public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size";
// 16 MB by default // 16 MB by default
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -111,10 +111,14 @@ public abstract ContainerCommandResponseProto sendCommand(


/** /**
* Create a pipeline. * Create a pipeline.
*
* @param pipeline - pipeline to be created.
*/ */
public abstract void createPipeline(Pipeline pipeline) throws IOException; public abstract void createPipeline() throws IOException;

/**
* Destroy a pipeline.
* @throws IOException
*/
public abstract void destroyPipeline() throws IOException;


/** /**
* Returns pipeline Type. * Returns pipeline Type.
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;


import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeDuration;


/** /**
Expand Down Expand Up @@ -214,6 +215,11 @@ public final class OzoneConfigKeys {
= ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY; = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY;
public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT; = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT;
public static final String DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY;
public static final ReplicationLevel
DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT;
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY; = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY;
public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT
Expand All @@ -237,6 +243,12 @@ public final class OzoneConfigKeys {
DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT = DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT =
ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT; ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT;


public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY;
public static final TimeDuration
DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT =
ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT;

public static final String OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL = public static final String OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL =
"ozone.web.authentication.kerberos.principal"; "ozone.web.authentication.kerberos.principal";


Expand Down
22 changes: 20 additions & 2 deletions hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos;
import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -39,6 +40,7 @@
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;


/** /**
Expand All @@ -48,8 +50,19 @@ public interface RatisHelper {
Logger LOG = LoggerFactory.getLogger(RatisHelper.class); Logger LOG = LoggerFactory.getLogger(RatisHelper.class);


static String toRaftPeerIdString(DatanodeDetails id) { static String toRaftPeerIdString(DatanodeDetails id) {
return id.getUuidString() + "_" + return id.getUuidString();
id.getPort(DatanodeDetails.Port.Name.RATIS).getValue(); }

static UUID toDatanodeId(String peerIdString) {
return UUID.fromString(peerIdString);
}

static UUID toDatanodeId(RaftPeerId peerId) {
return toDatanodeId(peerId.toString());
}

static UUID toDatanodeId(RaftProtos.RaftPeerProto peerId) {
return toDatanodeId(RaftPeerId.valueOf(peerId.getId()));
} }


static String toRaftPeerAddressString(DatanodeDetails id) { static String toRaftPeerAddressString(DatanodeDetails id) {
Expand Down Expand Up @@ -117,6 +130,11 @@ static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) {
newRaftGroup(new ArrayList<>(Arrays.asList(leader)))); newRaftGroup(new ArrayList<>(Arrays.asList(leader))));
} }


static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RaftGroup group) {
return newRaftClient(rpcType, leader.getId(), group);
}

static RaftClient newRaftClient( static RaftClient newRaftClient(
RpcType rpcType, RaftPeerId leader, RaftGroup group) { RpcType rpcType, RaftPeerId leader, RaftGroup group) {
LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group); LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);
Expand Down
27 changes: 27 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@
will use for writing chunks (60 by default). will use for writing chunks (60 by default).
</description> </description>
</property> </property>
<property>
<name>dfs.container.ratis.replication.level</name>
<value>MAJORITY</value>
<tag>OZONE, RATIS</tag>
<description>Replication level to be used by datanode for submitting a
container command to ratis. Available replication levels are ALL and
MAJORTIY, MAJORITY is used as the default replication level.
</description>
</property>
<property> <property>
<name>dfs.container.ratis.segment.size</name> <name>dfs.container.ratis.segment.size</name>
<value>1073741824</value> <value>1073741824</value>
Expand Down Expand Up @@ -154,6 +163,15 @@
<tag>OZONE, RATIS, MANAGEMENT</tag> <tag>OZONE, RATIS, MANAGEMENT</tag>
<description>The timeout duration for ratis server request.</description> <description>The timeout duration for ratis server request.</description>
</property> </property>
<property>
<name>dfs.ratis.server.failure.duration</name>
<value>120s</value>
<tag>OZONE, RATIS, MANAGEMENT</tag>
<description>The timeout duration for ratis server failure detection,
once the threshold has reached, the ratis state machine will be informed
about the failure in the ratis ring
</description>
</property>
<property> <property>
<name>hdds.node.report.interval</name> <name>hdds.node.report.interval</name>
<value>60000ms</value> <value>60000ms</value>
Expand Down Expand Up @@ -1104,6 +1122,15 @@
</description> </description>
</property> </property>


<property>
<name>hdds.pipeline.action.max.limit</name>
<value>20</value>
<tag>DATANODE</tag>
<description>
Maximum number of Pipeline Actions sent by the datanode to SCM in a
single heartbeat.
</description>
</property>
<property> <property>
<name>hdds.scm.watcher.timeout</name> <name>hdds.scm.watcher.timeout</name>
<value>10m</value> <value>10m</value>
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction; .StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
Expand Down Expand Up @@ -66,6 +68,7 @@ public class StateContext {
private final Configuration conf; private final Configuration conf;
private final Queue<GeneratedMessage> reports; private final Queue<GeneratedMessage> reports;
private final Queue<ContainerAction> containerActions; private final Queue<ContainerAction> containerActions;
private final Queue<PipelineAction> pipelineActions;
private DatanodeStateMachine.DatanodeStates state; private DatanodeStateMachine.DatanodeStates state;


/** /**
Expand All @@ -91,6 +94,7 @@ public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates
cmdStatusMap = new ConcurrentHashMap<>(); cmdStatusMap = new ConcurrentHashMap<>();
reports = new LinkedList<>(); reports = new LinkedList<>();
containerActions = new LinkedList<>(); containerActions = new LinkedList<>();
pipelineActions = new LinkedList<>();
lock = new ReentrantLock(); lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0); stateExecutionCount = new AtomicLong(0);
} }
Expand Down Expand Up @@ -256,6 +260,47 @@ public List<ContainerAction> getPendingContainerAction(int maxLimit) {
} }
} }


/**
* Add PipelineAction to PipelineAction queue if it's not present.
*
* @param pipelineAction PipelineAction to be added
*/
public void addPipelineActionIfAbsent(PipelineAction pipelineAction) {
synchronized (pipelineActions) {
/**
* If pipelineAction queue already contains entry for the pipeline id
* with same action, we should just return.
* Note: We should not use pipelineActions.contains(pipelineAction) here
* as, pipelineAction has a msg string. So even if two msgs differ though
* action remains same on the given pipeline, it will end up adding it
* multiple times here.
*/
for (PipelineAction pipelineActionIter : pipelineActions) {
if (pipelineActionIter.getAction() == pipelineAction.getAction()
&& pipelineActionIter.hasClosePipeline() && pipelineAction
.hasClosePipeline()
&& pipelineActionIter.getClosePipeline().getPipelineID()
== pipelineAction.getClosePipeline().getPipelineID()) {
return;
}
}
pipelineActions.add(pipelineAction);
}
}

/**
* Returns pending PipelineActions from the PipelineAction queue with a
* max limit on list size, or empty list if the queue is empty.
*
* @return List<ContainerAction>
*/
public List<PipelineAction> getPendingPipelineAction(int maxLimit) {
synchronized (pipelineActions) {
return pipelineActions.parallelStream().limit(maxLimit)
.collect(Collectors.toList());
}
}

/** /**
* Returns the next task to get executed by the datanode state machine. * Returns the next task to get executed by the datanode state machine.
* @return A callable that will be executed by the * @return A callable that will be executed by the
Expand Down
Loading

0 comments on commit b3161c4

Please sign in to comment.