RATIS-1545. RaftLogOutputStream support async flush.#616
RATIS-1545. RaftLogOutputStream support async flush.#616szetszwo merged 3 commits intoapache:masterfrom
Conversation
szetszwo
left a comment
There was a problem hiding this comment.
@SincereXIA , thanks for working on this. Let's change flush the async but not other logics.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 9e5eb7e1..e010c495 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -51,6 +51,9 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -144,6 +147,7 @@ class SegmentedRaftLogWorker {
private final WriteLogTasks writeTasks = new WriteLogTasks();
private volatile boolean running = true;
private final Thread workerThread;
+ private final ExecutorService flushExecutor;
private final RaftStorage storage;
private volatile SegmentedRaftLogOutputStream out;
@@ -219,6 +223,7 @@ class SegmentedRaftLogWorker {
this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
this.lastFlush = Timestamp.currentTime();
this.flushIntervalMin = RaftServerConfigKeys.Log.flushIntervalMin(properties);
+ this.flushExecutor = Executors.newSingleThreadExecutor(ConcurrentUtils.newThreadFactory(name + "-flush"));
}
void start(long latestIndex, long evictIndex, File openSegmentFile) throws IOException {
@@ -236,6 +241,7 @@ class SegmentedRaftLogWorker {
void close() {
this.running = false;
workerThread.interrupt();
+ flushExecutor.shutdown();
try {
workerThread.join(3000);
} catch (InterruptedException ignored) {
@@ -371,10 +377,8 @@ class SegmentedRaftLogWorker {
if (stateMachineDataPolicy.isSync()) {
stateMachineDataPolicy.getFromFuture(f, () -> this + "-flushStateMachineData");
}
- final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
flushBatchSize = (int)(lastWrittenIndex - flushIndex.get());
- out.flush();
- logSyncTimerContext.stop();
+ CompletableFuture.supplyAsync(this::flushOutStream, flushExecutor);
if (!stateMachineDataPolicy.isSync()) {
IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData");
}
@@ -386,6 +390,18 @@ class SegmentedRaftLogWorker {
}
}
+ private Void flushOutStream() {
+ final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
+ try {
+ out.flush();
+ } catch (IOException e) {
+ throw new CompletionException("Failed to flush", e);
+ } finally {
+ logSyncTimerContext.stop();
+ }
+ return null;
+ }
+
private void updateFlushedIndexIncreasingly() {
final long i = lastWrittenIndex;
flushIndex.updateIncreasingly(i, traceIndexChange);
...s-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
Outdated
Show resolved
Hide resolved
...s-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
Outdated
Show resolved
Hide resolved
...s-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
Show resolved
Hide resolved
| static TimeDuration flushIntervalMin(RaftProperties properties) { | ||
| return getTimeDuration(properties.getTimeDuration(FLUSH_INTERVAL_MIN_DEFAULT.getUnit()), | ||
| FLUSH_INTERVAL_MIN_KEY, FLUSH_INTERVAL_MIN_DEFAULT, getDefaultLog()); | ||
| String ASYNC_FLUSH_ENABLED_KEY = PREFIX + ".async.flush.enabled"; |
There was a problem hiding this comment.
@SincereXIA , if the performance can be improved by async, let's just change the code to async and don't need this conf.
I guess you are testing the change. Will wait for your results. Thanks a lot!
...s-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
Outdated
Show resolved
Hide resolved
|
@szetszwo Thanks for your attention to this issue, Our tests show that using the thread pool scheme to asynchronous flush has obvious performance improvement over synchronous flush, but a little worse performance than using a separate thread: Do you think this is ok? |
|
@SincereXIA , could you try also async flush with 3s flush interval? It seems that ExecutorService and single thread are supposed to have the same performance. Thanks. |
Hi @szetszwo , I tried increasing the interval between async flushes like this: SincereXIA@118b2e5 and I repeated the test, but I couldn't find a difference in performance by use 3s flush interval. The average write time is almost the same as the last column in the table. I think maybe the time it takes to create the async task is causing the slight drop in performance. By the way, is it a good idea to add a configurable flush interval? If yes, I can add it. |
@SincereXIA , It seems the sleep in SincereXIA@118b2e5 is the problem. Could you try https://issues.apache.org/jira/secure/attachment/13040724/616_asyncFlush.patch (applying to master) ? |
@SincereXIA , We already have raft.server.log.flush.interval.min . Do you mean adding another flush interval? |
Hi @szetszwo , This commit: SincereXIA@118b2e5 is only in my test branch, and is not included in this pr. So I think the changes in this pr is same as this patch: https://issues.apache.org/jira/secure/attachment/13040724/616_asyncFlush.patch , and the test result is here: |
| } else if (pendingFlushNum >= forceSyncNum) { | ||
| return true; | ||
| } | ||
| return pendingFlushNum > 0 && queue.isEmpty() && lastFlush.elapsedTime().compareTo(flushIntervalMin) > 0; |
There was a problem hiding this comment.
@SincereXIA , We already have raft.server.log.flush.interval.min . Do you mean adding another flush interval?
In my test, I found that add flush interval in this way has bug, It will greatly reduce the submission speed of Raft log. So I had to remove it.
So affter use this pr, I can't increase the flush interval.
I figured out a way to increase the flush interval: SincereXIA@118b2e5 It add a sleep in async flush call task, and it will not block the main thread. But after I increased the flush interval by this method, the performance did not improve. So I didn't add this commit to this pr. I haven't been able to think of a better way to do it.
szetszwo
left a comment
There was a problem hiding this comment.
@SincereXIA , just found that we should not updateFlushedIndexIncreasingly() before flush has completed. We need to think about how to fix it.
...s-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
Outdated
Show resolved
Hide resolved
| out.flush(); | ||
| logSyncTimerContext.stop(); | ||
| if (flushExecutorWorkQueue.isEmpty()) { | ||
| CompletableFuture.supplyAsync(this::flushOutStream, flushExecutor); |
There was a problem hiding this comment.
Since it is async, it may call updateFlushedIndexIncreasingly() before flush actually has completed. It is incorrect.
|
@SincereXIA , we need to make sure that the flushIndex is updated only after flush has completed. Here is the idea https://issues.apache.org/jira/secure/attachment/13040750/616_FlushWorker.patch . Please see if you agree with the approach. Thanks. |
Hi @szetszwo, Thank you. We tested this patch, but we found that calling Committing the raft log before the flush is fully completed may indeed be a security issue. But I I want to know if we can add a switch to allow users to choose whether they can accept sacrificing a little security in exchange for higher performance. Thanks. |
|
@SincereXIA , thanks a lot for testing it.
Yes, that's a good idea, especially for the groups with 3 or more nodes. |
+1 For make this configurable. After all, in most cases, we have a three replication. Async flush not only optimizes streaming, but also improves async write performance by more than 35% by we tested. |
|
@SincereXIA , if we want to support unsafe-flush (i.e. increase flush index without waiting flush to complete), we don't need to use FlushWorker anymore since it is for safe-flush. We should answer the following two questions:
Could you test https://issues.apache.org/jira/secure/attachment/13040828/616_unsafeFlush.patch ? |
It's not a bug, but this value needs to be set carefully. |
|
Hi @szetszwo, Thanks for your opinion. I made a small change and tested your patch, and the result is here: When unsafe flush + interval 3s is enabled, the average write performance of Ozone streaming is improved by 25%
In the first version I implemented, I pulled
For this question, @kaijchen is right. Exactly, it's not a bug. In the last pr, I didn't consider that extending the refresh interval will cause the log submission time to be extended. The performance did not meet our expectations. In the current version, I use flushIntervalMin to control the flush interval of unsafe flush, which will not prolong the time that the log is finally committed. |
| SegmentedRaftLogFormat.applyHeaderTo(CheckedConsumer.asCheckedFunction(out::write)); | ||
| out.flush(); | ||
| synchronized (out) { | ||
| out.flush(); |
There was a problem hiding this comment.
We found errors like this during testing:
2022-03-08 11:15:23,856 [97cb3f28-f298-4ed3-93c7-3be44bfe8b7b@group-892C7CF01088-SegmentedRaftLogWorker] ERROR org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker: 97cb3f28-f298-4ed3-93c7-3be44bfe8b7b@group-892C7CF01088-SegmentedRaftLogWorker hit exception
java.lang.IllegalArgumentException
at java.nio.Buffer.position(Buffer.java:244)
at java.nio.DirectByteBuffer.put(DirectByteBuffer.java:377)
at org.apache.ratis.server.raftlog.segmented.BufferedWriteChannel.write(BufferedWriteChannel.java:61)
at org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogOutputStream.write(SegmentedRaftLogOutputStream.java:100)
at org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker$WriteLog.execute(SegmentedRaftLogWorker.java:610)
at org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker.run(SegmentedRaftLogWorker.java:404)
at java.lang.Thread.run(Thread.java:748)
2022-03-08 11:15:23,858 [97cb3f28-f298-4ed3-93c7-3be44bfe8b7b@group-892C7CF01088-SegmentedRaftLogWorker] INFO org.apache.ratis.server.RaftServer$Division: 97cb3f28-f298-4ed3-93c7-3be44bfe8b7b@group-892C7CF01088: shutdown
I think the cause of this error is that class BufferedWriteChannel is not thread safe, but an asynchronous flush may operate on this object at the same time as the main thread.
/**
* Provides a buffering layer in front of a FileChannel for writing.
*
* This class is NOT threadsafe.
*/
class BufferedWriteChannel implements Closeable {
After we make it synchronized, the error no longer occurs.
@SincereXIA , do you think that the first version is better? We should use it if it is the case. |
@szetszwo , I think this version now is better, In the first version, there will always be a thread that repeatedly does meaningless flush. And the performance of the previous version is tested without locking. It is not safe. After add lock at the |
|
@szetszwo @SincereXIA, Actually, there is a bug in current master. The following patch demonstrates the fix on current master (a4ffad2). diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 9e5eb7e1..b6e10a4c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -177,6 +177,7 @@ class SegmentedRaftLogWorker {
private int flushBatchSize;
private Timestamp lastFlush;
+ private Timestamp firstArrival;
private final TimeDuration flushIntervalMin;
private final StateMachineDataPolicy stateMachineDataPolicy;
@@ -218,6 +219,7 @@ class SegmentedRaftLogWorker {
final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
this.lastFlush = Timestamp.currentTime();
+ this.firstArrival = Timestamp.currentTime();
this.flushIntervalMin = RaftServerConfigKeys.Log.flushIntervalMin(properties);
}
@@ -355,7 +357,10 @@ class SegmentedRaftLogWorker {
} else if (pendingFlushNum >= forceSyncNum) {
return true;
}
- return pendingFlushNum > 0 && queue.isEmpty() && lastFlush.elapsedTime().compareTo(flushIntervalMin) > 0;
+ if (lastFlush.compareTo(firstArrival) >= 0) {
+ firstArrival = Timestamp.currentTime();
+ }
+ return pendingFlushNum > 0 && queue.isEmpty() && firstArrival.elapsedTime().compareTo(flushIntervalMin) > 0;
}
@SuppressFBWarnings("NP_NULL_PARAM_DEREF")
--
2.27.0 |
|
@SincereXIA , thanks for the update.
Since FileChannel is thread safe, let's call FileChannel.force(..) asynchronously so that we don't have to synchronize all the output stream calls; see https://issues.apache.org/jira/secure/attachment/13040845/616_asyncFileChannelForce.patch |
@szetszwo , Thanks for your opinion. We tested the performance of this patch: Direct call But we find a issue, sleep at here is useless. Because this will block the main thread and will not reduce the total number of flushes. So I removed |
szetszwo
left a comment
There was a problem hiding this comment.
+1 the change looks good.
@SincereXIA , let's merge this version? Or, would you like to test it more?
@szetszwo, I have tested it, it looks ok. I think this version is ready for merge. Thank you. |






What changes were proposed in this pull request?
We previously tried to reduce disk IO by introducing a minimum interval between flushes, the relevant pr is here: #611
However, after subsequent tests, we found that the pr #611 has a bug, make the raft performance degraded, because
flushIfNecessary()function does more operations than just flushing the stream.Since we enforced the minimum time for each flush, the raft log to can't be committed in flush intervals, which reduces the performance of raft.
So we want to separate
out.flush()fromflushIfNecessary()and execute it asynchronously, so that the process of writing to disk will not block the operation of other threadsWhat is the link to the Apache JIRA
https://issues.apache.org/jira/projects/RATIS/issues/RATIS-1545
How was this patch tested?
We use the ozone freon ockg tool to detect the effect of the changes.
test command:
We tested the speed of writing 32MB and 128MB objects after enabe async flush: