From 3136ed5c82592310d06e6d75d7470f6beed59ea6 Mon Sep 17 00:00:00 2001 From: Potato Date: Thu, 21 Nov 2024 18:58:12 +0800 Subject: [PATCH] Fixed bug where Ratis could not write large requests and could not be configured (#14160) --- .../iotdb/consensus/config/RatisConfig.java | 28 ------------------- .../iotdb/consensus/ratis/utils/Utils.java | 23 +++++++++------ 2 files changed, 15 insertions(+), 36 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java index 220514a5a093e..3bb4e64e4900f 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java @@ -541,8 +541,6 @@ public ThreadPool.Builder setClientSize(int clientSize) { public static class Log { private final boolean useMemory; - private final int queueElementLimit; - private final SizeInBytes queueByteLimit; private final int purgeGap; private final boolean purgeUptoSnapshotIndex; private final long preserveNumsWhenPurge; @@ -555,8 +553,6 @@ public static class Log { private Log( boolean useMemory, - int queueElementLimit, - SizeInBytes queueByteLimit, int purgeGap, boolean purgeUptoSnapshotIndex, long preserveNumsWhenPurge, @@ -567,8 +563,6 @@ private Log( int forceSyncNum, boolean unsafeFlushEnabled) { this.useMemory = useMemory; - this.queueElementLimit = queueElementLimit; - this.queueByteLimit = queueByteLimit; this.purgeGap = purgeGap; this.purgeUptoSnapshotIndex = purgeUptoSnapshotIndex; this.preserveNumsWhenPurge = preserveNumsWhenPurge; @@ -584,14 +578,6 @@ public boolean isUseMemory() { return useMemory; } - public int getQueueElementLimit() { - return queueElementLimit; - } - - public SizeInBytes getQueueByteLimit() { - return queueByteLimit; - } - public int getPurgeGap() { return purgeGap; } @@ -635,8 +621,6 @@ public static Log.Builder newBuilder() { public static class Builder { private boolean useMemory = false; - private int queueElementLimit = 4096; - private SizeInBytes queueByteLimit = SizeInBytes.valueOf("64MB"); private int purgeGap = 1024; private boolean purgeUptoSnapshotIndex = true; private long preserveNumsWhenPurge = 1000; @@ -650,8 +634,6 @@ public static class Builder { public Log build() { return new Log( useMemory, - queueElementLimit, - queueByteLimit, purgeGap, purgeUptoSnapshotIndex, preserveNumsWhenPurge, @@ -668,16 +650,6 @@ public Log.Builder setUseMemory(boolean useMemory) { return this; } - public Log.Builder setQueueElementLimit(int queueElementLimit) { - this.queueElementLimit = queueElementLimit; - return this; - } - - public Log.Builder setQueueByteLimit(SizeInBytes queueByteLimit) { - this.queueByteLimit = queueByteLimit; - return this; - } - public Log.Builder setPurgeGap(int purgeGap) { this.purgeGap = purgeGap; return this; diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java index 9f25e83792479..367b330ac3ecb 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java @@ -56,6 +56,7 @@ import java.util.stream.Collectors; public class Utils { + private static final int TEMP_BUFFER_SIZE = 1024; private static final byte PADDING_MAGIC = 0x47; private static final String DATA_REGION_GROUP = "group-0001"; @@ -288,9 +289,6 @@ public static void initRatisConfig(RaftProperties properties, RatisConfig config properties, config.getThreadPool().getServerSize()); RaftServerConfigKeys.Log.setUseMemory(properties, config.getLog().isUseMemory()); - RaftServerConfigKeys.Log.setQueueElementLimit( - properties, config.getLog().getQueueElementLimit()); - RaftServerConfigKeys.Log.setQueueByteLimit(properties, config.getLog().getQueueByteLimit()); RaftServerConfigKeys.Log.setPurgeGap(properties, config.getLog().getPurgeGap()); RaftServerConfigKeys.Log.setPurgeUptoSnapshotIndex( properties, config.getLog().isPurgeUptoSnapshotIndex()); @@ -302,15 +300,20 @@ public static void initRatisConfig(RaftProperties properties, RatisConfig config RaftServerConfigKeys.Log.setSegmentCacheSizeMax( properties, config.getLog().getSegmentCacheSizeMax()); RaftServerConfigKeys.Log.setPreallocatedSize(properties, config.getLog().getPreallocatedSize()); - final SizeInBytes writeBufferSize = - SizeInBytes.valueOf(config.getLeaderLogAppender().getBufferByteLimit().getSizeInt() + 8L); - RaftServerConfigKeys.Log.setWriteBufferSize(properties, writeBufferSize); + RaftServerConfigKeys.Log.setWriteBufferSize( + properties, + SizeInBytes.valueOf(config.getLeaderLogAppender().getBufferByteLimit().getSizeInt() + 8L)); RaftServerConfigKeys.Log.setForceSyncNum(properties, config.getLog().getForceSyncNum()); RaftServerConfigKeys.Log.setUnsafeFlushEnabled( properties, config.getLog().isUnsafeFlushEnabled()); RaftServerConfigKeys.Log.setCorruptionPolicy( properties, RaftServerConfigKeys.Log.CorruptionPolicy.WARN_AND_RETURN); + RaftServerConfigKeys.Write.setByteLimit( + properties, config.getLeaderLogAppender().getBufferByteLimit()); + + RaftServerConfigKeys.Log.setQueueByteLimit( + properties, config.getLeaderLogAppender().getBufferByteLimit()); RaftServerConfigKeys.Log.Appender.setBufferByteLimit( properties, config.getLeaderLogAppender().getBufferByteLimit()); RaftServerConfigKeys.Log.Appender.setSnapshotChunkSizeMax( @@ -331,8 +334,12 @@ public static void initRatisConfig(RaftProperties properties, RatisConfig config RaftServerConfigKeys.Read.setTimeout(properties, config.getRead().getReadTimeout()); RaftServerConfigKeys.setSleepDeviationThreshold( - properties, config.getUtils().getSleepDeviationThresholdMs()); - RaftServerConfigKeys.setCloseThreshold(properties, config.getUtils().getCloseThresholdMs()); + properties, + TimeDuration.valueOf( + config.getUtils().getSleepDeviationThresholdMs(), TimeUnit.MILLISECONDS)); + RaftServerConfigKeys.setCloseThreshold( + properties, + TimeDuration.valueOf(config.getUtils().getCloseThresholdMs(), TimeUnit.MILLISECONDS)); final TimeDuration clientMaxRetryGap = getMaxRetrySleepTime(config.getClient()); RaftServerConfigKeys.RetryCache.setExpiryTime(properties, clientMaxRetryGap);