Skip to content

Commit

Permalink
HDDS-325. Add event watcher for delete blocks command. Contributed by…
Browse files Browse the repository at this point in the history
… Lokesh Jain.
  • Loading branch information
nandakumar131 committed Oct 1, 2018
1 parent fd6be58 commit f7ff8c0
Show file tree
Hide file tree
Showing 34 changed files with 503 additions and 223 deletions.
Expand Up @@ -171,6 +171,10 @@ public void resetPipeline() {
public Map<String, DatanodeDetails> getDatanodes() {
return datanodes;
}

public boolean isEmpty() {
return datanodes.isEmpty();
}
/**
* Returns the leader host.
*
Expand Down
Expand Up @@ -82,6 +82,6 @@ protected CommandStatusReportsProto getReport() {
map.remove(key);
}
});
return builder.build();
return builder.getCmdStatusCount() > 0 ? builder.build() : null;
}
}
Expand Up @@ -21,6 +21,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.protocol.proto
Expand All @@ -35,8 +37,11 @@
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus
.CommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands
.DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;

import static java.lang.Math.min;
import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,6 +57,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;

Expand All @@ -67,7 +73,7 @@ public class StateContext {
private final DatanodeStateMachine parent;
private final AtomicLong stateExecutionCount;
private final Configuration conf;
private final Queue<GeneratedMessage> reports;
private final List<GeneratedMessage> reports;
private final Queue<ContainerAction> containerActions;
private final Queue<PipelineAction> pipelineActions;
private DatanodeStateMachine.DatanodeStates state;
Expand Down Expand Up @@ -174,19 +180,23 @@ public void setState(DatanodeStateMachine.DatanodeStates state) {
* @param report report to be added
*/
public void addReport(GeneratedMessage report) {
synchronized (reports) {
reports.add(report);
if (report != null) {
synchronized (reports) {
reports.add(report);
}
}
}

/**
* Returns the next report, or null if the report queue is empty.
* Adds the reports which could not be sent by heartbeat back to the
* reports list.
*
* @return report
* @param reportsToPutBack list of reports which failed to be sent by
* heartbeat.
*/
public GeneratedMessage getNextReport() {
public void putBackReports(List<GeneratedMessage> reportsToPutBack) {
synchronized (reports) {
return reports.poll();
reports.addAll(0, reportsToPutBack);
}
}

Expand All @@ -207,19 +217,14 @@ public List<GeneratedMessage> getAllAvailableReports() {
* @return List<reports>
*/
public List<GeneratedMessage> getReports(int maxLimit) {
List<GeneratedMessage> reportList = new ArrayList<>();
List<GeneratedMessage> reportsToReturn = new LinkedList<>();
synchronized (reports) {
if (!reports.isEmpty()) {
int size = reports.size();
int limit = size > maxLimit ? maxLimit : size;
for (int count = 0; count < limit; count++) {
GeneratedMessage report = reports.poll();
Preconditions.checkNotNull(report);
reportList.add(report);
}
}
return reportList;
List<GeneratedMessage> tempList = reports.subList(
0, min(reports.size(), maxLimit));
reportsToReturn.addAll(tempList);
tempList.clear();
}
return reportsToReturn;
}


Expand Down Expand Up @@ -442,9 +447,14 @@ public void addCmdStatus(Long key, CommandStatus status) {
* @param cmd - {@link SCMCommand}.
*/
public void addCmdStatus(SCMCommand cmd) {
CommandStatusBuilder statusBuilder;
if (cmd.getType() == Type.deleteBlocksCommand) {
statusBuilder = new DeleteBlockCommandStatusBuilder();
} else {
statusBuilder = CommandStatusBuilder.newBuilder();
}
this.addCmdStatus(cmd.getId(),
CommandStatusBuilder.newBuilder()
.setCmdId(cmd.getId())
statusBuilder.setCmdId(cmd.getId())
.setStatus(Status.PENDING)
.setType(cmd.getType())
.build());
Expand All @@ -469,13 +479,13 @@ public void removeCommandStatus(Long cmdId) {
/**
* Updates status of a pending status command.
* @param cmdId command id
* @param cmdExecuted SCMCommand
* @param cmdStatusUpdater Consumer to update command status.
* @return true if command status updated successfully else false.
*/
public boolean updateCommandStatus(Long cmdId, boolean cmdExecuted) {
public boolean updateCommandStatus(Long cmdId,
Consumer<CommandStatus> cmdStatusUpdater) {
if(cmdStatusMap.containsKey(cmdId)) {
cmdStatusMap.get(cmdId)
.setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED);
cmdStatusUpdater.accept(cmdStatusMap.get(cmdId));
return true;
}
return false;
Expand Down
Expand Up @@ -117,7 +117,8 @@ public void handle(SCMCommand command, OzoneContainer container,
cmdExecuted = false;
}
} finally {
updateCommandStatus(context, command, cmdExecuted, LOG);
updateCommandStatus(context, command,
(cmdStatus) -> cmdStatus.setStatus(cmdExecuted), LOG);
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}
Expand Down
Expand Up @@ -23,9 +23,12 @@
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;

import java.util.function.Consumer;

/**
* Generic interface for handlers.
*/
Expand Down Expand Up @@ -63,8 +66,8 @@ void handle(SCMCommand command, OzoneContainer container,
* Default implementation for updating command status.
*/
default void updateCommandStatus(StateContext context, SCMCommand command,
boolean cmdExecuted, Logger log) {
if (!context.updateCommandStatus(command.getId(), cmdExecuted)) {
Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
log.debug("{} with Id:{} not found.", command.getType(),
command.getId());
}
Expand Down
Expand Up @@ -38,12 +38,12 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
Expand All @@ -54,6 +54,7 @@

import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_NOT_FOUND;
Expand All @@ -63,7 +64,7 @@
*/
public class DeleteBlocksCommandHandler implements CommandHandler {

private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(DeleteBlocksCommandHandler.class);

private final ContainerSet containerSet;
Expand All @@ -83,6 +84,7 @@ public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
cmdExecuted = false;
long startTime = Time.monotonicNow();
ContainerBlocksDeletionACKProto blockDeletionACK = null;
try {
if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
LOG.warn("Skipping handling command, expected command "
Expand Down Expand Up @@ -144,31 +146,28 @@ public void handle(SCMCommand command, OzoneContainer container,
.setDnId(context.getParent().getDatanodeDetails()
.getUuid().toString());
});
ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
blockDeletionACK = resultBuilder.build();

// Send ACK back to SCM as long as meta updated
// TODO Or we should wait until the blocks are actually deleted?
if (!containerBlocks.isEmpty()) {
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending following block deletion ACK to SCM");
for (DeleteBlockTransactionResult result :
blockDeletionACK.getResultsList()) {
LOG.debug(result.getTxID() + " : " + result.getSuccess());
}
}
endPoint.getEndPoint()
.sendContainerBlocksDeletionACK(blockDeletionACK);
} catch (IOException e) {
LOG.error("Unable to send block deletion ACK to SCM {}",
endPoint.getAddress().toString(), e);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending following block deletion ACK to SCM");
for (DeleteBlockTransactionResult result : blockDeletionACK
.getResultsList()) {
LOG.debug(result.getTxID() + " : " + result.getSuccess());
}
}
}
cmdExecuted = true;
} finally {
updateCommandStatus(context, command, cmdExecuted, LOG);
final ContainerBlocksDeletionACKProto deleteAck =
blockDeletionACK;
Consumer<CommandStatus> statusUpdater = (cmdStatus) -> {
cmdStatus.setStatus(cmdExecuted);
((DeleteBlockCommandStatus) cmdStatus).setBlocksDeletionAck(deleteAck);
};
updateCommandStatus(context, command, statusUpdater, LOG);
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}
Expand Down Expand Up @@ -238,9 +237,9 @@ private void deleteKeyValueContainerBlocks(
}
}

containerDB.put(DFSUtil.string2Bytes(
OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + delTX.getContainerID()),
Longs.toByteArray(delTX.getTxID()));
containerDB
.put(DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX),
Longs.toByteArray(delTX.getTxID()));
containerData
.updateDeleteTransactionId(delTX.getTxID());
// update pending deletion blocks count in in-memory container status
Expand Down
Expand Up @@ -77,7 +77,8 @@ public void handle(SCMCommand command, OzoneContainer container,
supervisor.addTask(replicationTask);

} finally {
updateCommandStatus(context, command, true, LOG);
updateCommandStatus(context, command,
(cmdStatus) -> cmdStatus.setStatus(true), LOG);
}
}

Expand Down
Expand Up @@ -54,6 +54,7 @@

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;

Expand Down Expand Up @@ -124,12 +125,12 @@ public void setDatanodeDetailsProto(DatanodeDetailsProto
@Override
public EndpointStateMachine.EndPointStates call() throws Exception {
rpcEndpoint.lock();
SCMHeartbeatRequestProto.Builder requestBuilder = null;
try {
Preconditions.checkState(this.datanodeDetailsProto != null);

SCMHeartbeatRequestProto.Builder requestBuilder =
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetailsProto);
requestBuilder = SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetailsProto);
addReports(requestBuilder);
addContainerActions(requestBuilder);
addPipelineActions(requestBuilder);
Expand All @@ -139,13 +140,33 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
rpcEndpoint.zeroMissedCount();
} catch (IOException ex) {
// put back the reports which failed to be sent
putBackReports(requestBuilder);
rpcEndpoint.logIfNeeded(ex);
} finally {
rpcEndpoint.unlock();
}
return rpcEndpoint.getState();
}

// TODO: Make it generic.
private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
List<GeneratedMessage> reports = new LinkedList<>();
if (requestBuilder.hasContainerReport()) {
reports.add(requestBuilder.getContainerReport());
}
if (requestBuilder.hasNodeReport()) {
reports.add(requestBuilder.getNodeReport());
}
if (requestBuilder.getCommandStatusReportsCount() != 0) {
for (GeneratedMessage msg : requestBuilder
.getCommandStatusReportsList()) {
reports.add(msg);
}
}
context.putBackReports(reports);
}

/**
* Adds all the available reports to heartbeat.
*
Expand All @@ -158,7 +179,11 @@ private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
SCMHeartbeatRequestProto.getDescriptor().getFields()) {
String heartbeatFieldName = descriptor.getMessageType().getFullName();
if (heartbeatFieldName.equals(reportName)) {
requestBuilder.setField(descriptor, report);
if (descriptor.isRepeated()) {
requestBuilder.addRepeatedField(descriptor, report);
} else {
requestBuilder.setField(descriptor, report);
}
}
}
}
Expand Down

0 comments on commit f7ff8c0

Please sign in to comment.