Skip to content

Commit

Permalink
Fd disk probes (#3538)
Browse files Browse the repository at this point in the history
* Fd disk probes, design doc

* Fd disk probes: handle BatchProcessor failures

* Fd disk probes: fix errors on the "configuration" steps. Fix FailureDetectorService, FileSystemAgent

* Fd disk probes: fix an issue when failure detector can't make progress during healing. Also restart batch processor during reset operation

* Fd disk probes: fix BatchProcessor executor NPE

* Fd disk probes: review comments part 1

* Fd disk probes: rearranging fields in FileSystemStatsMsg

* Fd disk probes: address review requests

* Fd disk probes: fix protobuf compilation error

* Fd disk probes: node rank, add javadoc

* Fd disk probes: fix BatchProcessor#restart method, restart process() only if batch processor stopped
  • Loading branch information
xnull committed Mar 10, 2023
1 parent cf365ac commit c6580cb
Show file tree
Hide file tree
Showing 36 changed files with 1,284 additions and 524 deletions.
82 changes: 78 additions & 4 deletions docs/failure-detector/filesystem-failures.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ JVM failures:
System Failures:
- No free RAM memory

### Detecting Failures
### Detecting Failures in the Cluster

Each node runs its own instance of a [Failure Detector](failure-detector.md).
Since each node will provide all needed information in its NodeState, a decision maker can detect any possible failure
Expand All @@ -46,11 +46,85 @@ For instance, there are multiple possible scenarios of failures that could happe
simultaneous updates of the cluster layout. Using current "decision maker" approach only one node can be added to
the unresponsive list at a time.

#### Local Node Failure Detection Sequence Diagram
### Disk Failures Detection on a local node
- data corruption exception:
- StreamLogFiles#initializeLogMetadata(): corfu fails and can't recover if data on disk is corrupted
- StreamLogFiles#initStreamLogDirectory(): during the creation of LogUnitServer, it can go down if disk is read only or log dir is not writable
- Batch processor failures: can occur in case of an exceptions in BatchProcessor#process() method
- FileSystemAgent: check that the partition has been mounted

![Local Node Failure Detection Visualization](http://www.plantuml.com/plantuml/proxy?src=https://raw.githubusercontent.com/CorfuDB/CorfuDB/failure-detector-read-only-filesystem/docs/failure-detector/file-system-failure-detection.puml)
#### Disk failures handing:
- `FileSystemAgent` will collect information about disk failures (see above)

#### An example of a layout with detected failures
- `org.corfudb.infrastructure.management.failuredetector.DecisionMakerAgent` needs to decide:
- if a node is failed based on the information provided by `FileSystemAgent`
- if the node needs to be added to the unresponsive list

- The DecisionMakerAgent has been designed to proactively monitor the health of the system.
To achieve this, it will systematically collect detailed statistics from the FileSystemAgent.
If the analysis of these statistics reveals any instances of failures, such as a DataCorruptionException,
the DecisionMakerAgent will implement a mitigation strategy to exclude the affected node from participating
in the current cycle of failure detection in the cluster.

- The NodeState, which serves as a representation of the status of the node,
includes a information of the FileSystem statistics.
These statistics, obtained through the systematic collection of data by the FileSystemAgent,
will provide all necessary information to the Failure Detector to make a decision about the node failure.

- Other failure detectors will be able to collect the stats from the node and will decide
which node to exclude from the cluster according to the information in FileSystemStats and poll report.

- Nodes collect file system stats from all the other nodes in the cluster
and whichever node gets a decision maker in the cluster on the current iteration can see if another node has problems
with file system and if the node needs to be excluded from the cluster.

- `FailuresAgent#detectAndHandleFailure()` is in charge of finding a failed node and
figure out if a local node is a decision maker node.
If those parameters are met then the agent will trigger the layout update to exclude a node from the cluster

- Current design of the Failre Detector (with Disk ReadOnly Failure) allows to enrich
current FileSystemStats with the new types of failures to have disk probes in it and effectively handle more disk issues

**Changes in `LogUnitServer`:**
- LogUnitServer (during the creation) will catch DataCorruptionException and IllegalStateException exceptions
and send the information to FileSystemAgent

- LogUnitServer changes `AbstractServer#ServerState` which will indicate whether a node is ready to handle queries or not
in case of disk issues

- Failure Detector solves issues with FileSystem with the following steps:
- by detecting the failure
- and then healing the node by executing `HealNodeWorkflow`
- which will reset `StreamLogFiles` and triggers data transfer
- which will replace corrupted data with the consistent data that the node will collect from the cluster

- Until the node would have issues with ReadOnly file system or the disk partition is not mounted, the node will stay in the unresponsiveList in the layout

**Changes in `BatchProcessor`:**
- BatchProcessor will change its state to ERROR in case of exceptions in `BatchProcessor#process()` method
- FileSystemAgent will collect the status of BatchProcessor
- DecisionMakerAgent will detect if a node in a failed state the same way how we detect if a disk is in read only mode.
If so, the node will be added to the unresponsive list.


**Testing**
We will be able to add above scenarios to our test suite, since FileSystemStats contains just information (boolean values)
about the failure (if DataCorruptionException happened) we can emulate the entire cycle of additional failure detection scenarios.
Like following:
- BatchProcessorTest#testRestart(): will test proper status of batch processor in case of exceptions in the operations
- FileSystemAdvisorTest: covers the algorithm of finding failed/healed nodes
- NodeRankTest: checks proper ordering of the nodes in the list according to their state
- DecisionMakerAgentTest: checks that the correct decision maker was chosen
- FailureDetectorServiceTest: tests overall FD mechanism
- FailureAgentTest: checks the algorithm of finding failed nodes


### Local Node Failure Detection Sequence Diagram

![Local Node Failure Detection Visualization](http://www.plantuml.com/plantuml/proxy?src=https://raw.githubusercontent.com/CorfuDB/CorfuDB/master/docs/failure-detector/file-system-failure-detection.puml)


### An example of a possible layout with detected failures and the statistics about the failures

```json
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
import org.corfudb.runtime.exceptions.QuotaExceededException;
import org.corfudb.runtime.exceptions.WrongEpochException;
import org.corfudb.runtime.exceptions.unrecoverable.UnrecoverableCorfuInterruptedError;
import org.corfudb.runtime.proto.FileSystemStats.BatchProcessorStatus;
import org.corfudb.runtime.proto.service.CorfuMessage.RequestMsg;
import org.corfudb.runtime.proto.service.CorfuMessage.RequestPayloadMsg;
import org.corfudb.runtime.view.Layout;

import javax.annotation.Nonnull;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -30,7 +32,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.corfudb.protocols.CorfuProtocolLogData.getLogData;
Expand All @@ -46,7 +50,7 @@ public class BatchProcessor implements AutoCloseable {
private final int BATCH_SIZE;
private final boolean sync;
private final StreamLog streamLog;
private final BlockingQueue<BatchWriterOperation> operationsQueue;
private final BlockingQueue<BatchWriterOperation<?>> operationsQueue;
private final ExecutorService processorService;

/**
Expand All @@ -57,32 +61,46 @@ public class BatchProcessor implements AutoCloseable {
*/
private long sealEpoch;

private final BatchProcessorContext context;

/**
* Returns a new BatchProcessor for a stream log.
*
* @param streamLog stream log for writes (can be in memory or file)
* @param sealEpoch All operations stamped with epoch less than the epochWaterMark are discarded.
* @param sync If true, the batch writer will sync writes to secondary storage
*/
public BatchProcessor(StreamLog streamLog, long sealEpoch, boolean sync) {
public BatchProcessor(StreamLog streamLog, BatchProcessorContext context, long sealEpoch, boolean sync) {
this(new LinkedBlockingQueue<>(), streamLog, context, sealEpoch, sync);
}

public BatchProcessor(BlockingQueue<BatchWriterOperation<?>> operationsQueue, StreamLog streamLog,
BatchProcessorContext context, long sealEpoch, boolean sync) {
this.sealEpoch = sealEpoch;
this.sync = sync;
this.streamLog = streamLog;
this.context = context;

BATCH_SIZE = 50;
operationsQueue = new LinkedBlockingQueue<>();
processorService = Executors
.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(false)
.setNameFormat("LogUnit-BatchProcessor-%d")
.build());
this.operationsQueue = operationsQueue;

processorService = newExecutorService();
processorService.submit(this::process);

if (sealEpoch != Layout.INVALID_EPOCH) {
HealthMonitor.resolveIssue(Issue.createInitIssue(Component.LOG_UNIT));
}
}

private ExecutorService newExecutorService() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(false)
.setNameFormat("LogUnit-BatchProcessor-%d")
.build();

return Executors.newSingleThreadExecutor(threadFactory);
}

private void recordRunnable(Runnable runnable, Optional<Timer> timer) {
if (timer.isPresent()) {
timer.get().record(runnable);
Expand Down Expand Up @@ -230,6 +248,7 @@ private void process() {
}
} catch (Exception e) {
log.error("Caught exception in the write processor ", e);
context.setErrorStatus();
}
}

Expand All @@ -244,4 +263,29 @@ public void close() {
throw new UnrecoverableCorfuInterruptedError("BatchProcessor close interrupted.", e);
}
}

public void restart() {
operationsQueue.clear();

if (context.getStatus() == BatchProcessorStatus.BP_STATUS_ERROR) {
context.setOkStatus();
processorService.submit(this::process);
}
}

public static class BatchProcessorContext {
private final AtomicReference<BatchProcessorStatus> status = new AtomicReference<>(BatchProcessorStatus.BP_STATUS_OK);

private void setErrorStatus() {
status.set(BatchProcessorStatus.BP_STATUS_ERROR);
}

public void setOkStatus() {
status.set(BatchProcessorStatus.BP_STATUS_OK);
}

public BatchProcessorStatus getStatus() {
return status.get();
}
}
}

0 comments on commit c6580cb

Please sign in to comment.