Skip to content

Commit

Permalink
HDDS-1603. Handle Ratis Append Failure in Container State Machine. Co…
Browse files Browse the repository at this point in the history
…ntributed by Supratim Deka
  • Loading branch information
sdeka committed Jul 2, 2019
1 parent e966edd commit 227a1b0
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,11 @@ public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
evictStateMachineCache();
}

@Override
public void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
ratisServer.handleNodeLogFailure(gid, t);
}

@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,18 +545,28 @@ private void handlePipelineFailure(RaftGroupId groupId,
+ roleInfoProto.getRole());
}

triggerPipelineClose(groupId, msg,
ClosePipelineInfo.Reason.PIPELINE_FAILED,false);
}

private void triggerPipelineClose(RaftGroupId groupId, String detail,
ClosePipelineInfo.Reason reasonCode, boolean triggerHB) {
PipelineID pipelineID = PipelineID.valueOf(groupId.getUuid());
ClosePipelineInfo.Builder closePipelineInfo =
ClosePipelineInfo.newBuilder()
.setPipelineID(pipelineID.getProtobuf())
.setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED)
.setDetailedReason(msg);
.setReason(reasonCode)
.setDetailedReason(detail);

PipelineAction action = PipelineAction.newBuilder()
.setClosePipeline(closePipelineInfo)
.setAction(PipelineAction.Action.CLOSE)
.build();
context.addPipelineActionIfAbsent(action);
// wait for the next HB timeout or right away?
if (triggerHB) {
context.getParent().triggerHeartbeat();
}
LOG.debug(
"pipeline Action " + action.getAction() + " on pipeline " + pipelineID
+ ".Reason : " + action.getClosePipeline().getDetailedReason());
Expand Down Expand Up @@ -628,4 +638,20 @@ void handleInstallSnapshotFromLeader(RaftGroupId groupId,
firstTermIndexInLog, groupId);
handlePipelineFailure(groupId, roleInfoProto);
}

/**
* Notify the Datanode Ratis endpoint of Ratis log failure.
* Expected to be invoked from the Container StateMachine
* @param groupId the Ratis group/pipeline for which log has failed
* @param t exception encountered at the time of the failure
*
*/
@VisibleForTesting
public void handleNodeLogFailure(RaftGroupId groupId, Throwable t) {
String msg = (t == null) ? "Unspecified failure reported in Ratis log"
: t.getMessage();

triggerPipelineClose(groupId, msg,
ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED,true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ message PipelineActionsProto {
message ClosePipelineInfo {
enum Reason {
PIPELINE_FAILED = 1;
PIPELINE_LOG_FAILED = 2;
}
required PipelineID pipelineID = 1;
optional Reason reason = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
Expand All @@ -29,20 +30,27 @@
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.protocol.RaftGroupId;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -180,4 +188,77 @@ public void testPipelineCloseWithPipelineAction() throws Exception {
} catch (PipelineNotFoundException e) {
}
}

@Test
public void testPipelineCloseWithLogFailure() throws IOException {

EventQueue eventQ = (EventQueue) scm.getEventQueue();
PipelineActionHandler pipelineActionTest =
Mockito.mock(PipelineActionHandler.class);
eventQ.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionTest);
ArgumentCaptor<PipelineActionsFromDatanode> actionCaptor =
ArgumentCaptor.forClass(PipelineActionsFromDatanode.class);

ContainerInfo containerInfo = containerManager
.allocateContainer(RATIS, THREE, "testOwner");
ContainerWithPipeline containerWithPipeline =
new ContainerWithPipeline(containerInfo,
pipelineManager.getPipeline(containerInfo.getPipelineID()));
Pipeline openPipeline = containerWithPipeline.getPipeline();
RaftGroupId groupId = RaftGroupId.valueOf(openPipeline.getId().getId());

try {
pipelineManager.getPipeline(openPipeline.getId());
} catch (PipelineNotFoundException e) {
Assert.assertTrue("pipeline should exist", false);
}

DatanodeDetails datanodeDetails = openPipeline.getNodes().get(0);
int index = cluster.getHddsDatanodeIndex(datanodeDetails);

XceiverServerRatis xceiverRatis =
(XceiverServerRatis) cluster.getHddsDatanodes().get(index)
.getDatanodeStateMachine().getContainer().getWriteChannel();

/**
* Notify Datanode Ratis Server endpoint of a Ratis log failure.
* This is expected to trigger an immediate pipeline actions report to SCM
*/
xceiverRatis.handleNodeLogFailure(groupId, null);

// verify SCM receives a pipeline action report "immediately"
Mockito.verify(pipelineActionTest, Mockito.timeout(100))
.onMessage(
actionCaptor.capture(),
Mockito.any(EventPublisher.class));

PipelineActionsFromDatanode actionsFromDatanode =
actionCaptor.getValue();

// match the pipeline id
verifyCloseForPipeline(openPipeline, actionsFromDatanode);
}

private boolean verifyCloseForPipeline(Pipeline pipeline,
PipelineActionsFromDatanode report) {
UUID uuidToFind = pipeline.getId().getId();

boolean found = false;
for (StorageContainerDatanodeProtocolProtos.PipelineAction action :
report.getReport().getPipelineActionsList()) {
if (action.getAction() ==
StorageContainerDatanodeProtocolProtos.PipelineAction.Action.CLOSE) {
PipelineID closedPipelineId = PipelineID.
getFromProtobuf(action.getClosePipeline().getPipelineID());

if (closedPipelineId.getId().equals(uuidToFind)) {
found = true;
}
}
}

Assert.assertTrue("SCM did not receive a Close action for the Pipeline",
found);
return found;
}
}

0 comments on commit 227a1b0

Please sign in to comment.