From 0e459a1aab867effe88d1bb06284b39e4d0f9368 Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Mon, 13 Nov 2023 14:22:15 +0800 Subject: [PATCH 01/10] Sonar fixing --- .../org/apache/iotdb/CountPointProcessor.java | 15 +++++++-------- .../apache/iotdb/pipe/it/IoTDBPipeClusterIT.java | 15 +++++---------- .../dataregion/wal/utils/WALEntryPosition.java | 8 +++++--- .../dataregion/wal/node/WALEntryHandlerTest.java | 11 +++++------ 4 files changed, 22 insertions(+), 27 deletions(-) diff --git a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java index 331c5e3d997c4..feeabce018931 100644 --- a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java +++ b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java @@ -38,12 +38,12 @@ public class CountPointProcessor implements PipeProcessor { private static final String AGGREGATE_SERIES_KEY = "aggregate-series"; - private static AtomicLong writePointCount = new AtomicLong(0); + private static final AtomicLong writePointCount = new AtomicLong(0); private PartialPath aggregateSeries; @Override - public void validate(PipeParameterValidator validator) throws Exception { + public void validate(PipeParameterValidator validator) { validator.validateRequiredAttribute(AGGREGATE_SERIES_KEY); } @@ -54,12 +54,9 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati } @Override - public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) - throws Exception { + public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) { tabletInsertionEvent.processTablet( - (tablet, rowCollector) -> { - writePointCount.addAndGet(tablet.rowSize); - }); + (tablet, rowCollector) -> writePointCount.addAndGet(tablet.rowSize)); } @Override @@ -79,5 +76,7 @@ public void process(Event event, EventCollector eventCollector) throws Exception } @Override - public void close() throws Exception {} + public void close() { + // Do nothing + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java index 5792ea3598c3d..5181dee00935e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java @@ -68,24 +68,19 @@ public void setUp() throws Exception { senderEnv = MultiEnvFactory.getEnv(0); receiverEnv = MultiEnvFactory.getEnv(1); + // Avoid hard coding the nodes num and consensus to enable traversal of configurations senderEnv .getConfig() .getCommonConfig() - .setAutoCreateSchemaEnabled(true) - .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) - .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) - .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); + .setAutoCreateSchemaEnabled(true); receiverEnv .getConfig() .getCommonConfig() - .setAutoCreateSchemaEnabled(true) - .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) - .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) - .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); + .setAutoCreateSchemaEnabled(true); - senderEnv.initClusterEnvironment(3, 3, 180); - receiverEnv.initClusterEnvironment(3, 3, 180); + senderEnv.initClusterEnvironment(); + receiverEnv.initClusterEnvironment(); } @After diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java index 3979c49bffc6c..4ef1aa9ce6bb8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java @@ -47,6 +47,8 @@ public class WALEntryPosition { // cache for wal entry private WALInsertNodeCache cache = null; + private static final String ENTRY_NOT_READY_MESSAGE = "This entry isn't ready for read."; + public WALEntryPosition() {} public WALEntryPosition(String identifier, long walFileVersionId, long position, int size) { @@ -71,7 +73,7 @@ public Pair readByteBufferOrInsertNodeViaCacheDirectly() */ public InsertNode readInsertNodeViaCacheAfterCanRead() throws IOException { if (!canRead()) { - throw new IOException("This entry isn't ready for read."); + throw new IOException(ENTRY_NOT_READY_MESSAGE); } return cache.getInsertNode(this); } @@ -83,7 +85,7 @@ public InsertNode readInsertNodeViaCacheAfterCanRead() throws IOException { */ public ByteBuffer readByteBufferViaCacheAfterCanRead() throws IOException { if (!canRead()) { - throw new IOException("This entry isn't ready for read."); + throw new IOException(ENTRY_NOT_READY_MESSAGE); } return cache.getByteBuffer(this); } @@ -144,7 +146,7 @@ public boolean canRead() { /** Return true only when this wal file is sealed. */ public boolean isInSealedFile() { if (walNode == null || !canRead()) { - throw new RuntimeException("This entry isn't ready for read."); + throw new RuntimeException(ENTRY_NOT_READY_MESSAGE); } return walFileVersionId < walNode.getCurrentWALFileVersion(); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java index 45f6535734797..5c8e71cf17215 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java @@ -42,6 +42,7 @@ import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -234,9 +235,7 @@ public void getFlushedValue() throws Exception { handler.pinMemTable(); walNode1.onMemTableFlushed(memTable); // wait until wal flushed - while (!walNode1.isAllWALEntriesConsumed()) { - Thread.sleep(50); - } + Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed()); assertEquals(node1, handler.getInsertNode()); } @@ -269,9 +268,9 @@ public void testConcurrentGetValue() throws Exception { } // wait until wal flushed - while (!walNode1.isAllWALEntriesConsumed() && !walNode2.isAllWALEntriesConsumed()) { - Thread.sleep(50); - } + Awaitility.await() + .until( + () -> walNode1.isAllWALEntriesConsumed() && walNode2.isAllWALEntriesConsumed()); walFlushListeners.get(0).getWalEntryHandler().pinMemTable(); walNode.onMemTableFlushed(memTable); From cad781146cca126de2c53dbcf581952096df07a4 Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Mon, 13 Nov 2023 14:23:06 +0800 Subject: [PATCH 02/10] Update IoTDBPipeProtocolIT.java --- .../test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java index c112aeb51b17e..6a8d2fc3d17da 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java @@ -43,9 +43,9 @@ import java.util.HashMap; import java.util.Map; +/** Test pipe's basic functionalities under multiple cluster and consensus protocol settings. */ @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2.class}) -/** Test pipe's basic functionalities under multiple cluster and consensus protocol settings. */ public class IoTDBPipeProtocolIT { private BaseEnv senderEnv; From db221a229909e55a47a1a03514fa9932efeef291 Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Mon, 13 Nov 2023 14:35:02 +0800 Subject: [PATCH 03/10] Update WALEntryHandlerTest.java --- .../storageengine/dataregion/wal/node/WALEntryHandlerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java index 5c8e71cf17215..54771507916f3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java @@ -270,7 +270,7 @@ public void testConcurrentGetValue() throws Exception { // wait until wal flushed Awaitility.await() .until( - () -> walNode1.isAllWALEntriesConsumed() && walNode2.isAllWALEntriesConsumed()); + walNode::isAllWALEntriesConsumed); walFlushListeners.get(0).getWalEntryHandler().pinMemTable(); walNode.onMemTableFlushed(memTable); From 6c288e44d6c258f3a29cd3cec5fe518528a7f505 Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Mon, 13 Nov 2023 14:36:12 +0800 Subject: [PATCH 04/10] Update WALEntryHandlerTest.java --- .../dataregion/wal/node/WALEntryHandlerTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java index 54771507916f3..48d2f60ff67ee 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java @@ -268,9 +268,7 @@ public void testConcurrentGetValue() throws Exception { } // wait until wal flushed - Awaitility.await() - .until( - walNode::isAllWALEntriesConsumed); + Awaitility.await().until(walNode::isAllWALEntriesConsumed); walFlushListeners.get(0).getWalEntryHandler().pinMemTable(); walNode.onMemTableFlushed(memTable); From a835d6e2790d9316c2a3dc6f2cd63ba46aaefe28 Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Mon, 13 Nov 2023 14:45:00 +0800 Subject: [PATCH 05/10] Update WALFlushListener.java --- .../dataregion/wal/utils/listener/WALFlushListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java index 9c7ba908d1e3c..8c84a0cb0fe54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java @@ -24,7 +24,7 @@ /** This class helps judge whether wal is flushed to the storage device. */ public class WALFlushListener extends AbstractResultListener { - // handler for pipeline, only exists then value is InsertNode + // handler for pipeline, only exists when value is InsertNode private final WALEntryHandler walEntryHandler; public WALFlushListener(boolean wait, WALEntryValue value) { From aa06fa7130c3b57b5fe2a284aa814206e71352c8 Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Mon, 13 Nov 2023 14:52:11 +0800 Subject: [PATCH 06/10] Update WALNode.java --- .../db/storageengine/dataregion/wal/node/WALNode.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java index f3b6cd9c69c8c..7e35f16d107cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java @@ -50,6 +50,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils; +import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener.Status; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; @@ -233,7 +234,9 @@ private class DeleteOutdatedFileTask implements Runnable { private int recursionTime = 0; - public DeleteOutdatedFileTask() {} + public DeleteOutdatedFileTask() { + // Do nothing + } private void init() { this.firstValidVersionId = initFirstValidWALVersionId(); @@ -928,7 +931,7 @@ public boolean isAllWALEntriesConsumed() { public void rollWALFile() { WALEntry rollWALFileSignal = new WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true); WALFlushListener walFlushListener = log(rollWALFileSignal); - if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) { + if (walFlushListener.waitForResult() == AbstractResultListener.Status.FAILURE) { logger.error( "Fail to trigger rolling wal node-{}'s wal file log writer.", identifier, From c9ba8fbbb8a7ec4f5447f264beac1272fed97e9b Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Mon, 13 Nov 2023 14:58:40 +0800 Subject: [PATCH 07/10] Update PipeHistoricalDataRegionTsFileExtractor.java --- .../PipeHistoricalDataRegionTsFileExtractor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java index 9c9fb01ff2567..a184cfd852549 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -267,7 +267,7 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) .collect(Collectors.toList()); resourceList.addAll(sequenceTsFileResources); - final Collection unsequenceTsFileResources = + final Collection unSequenceTsFileResources = tsFileManager.getTsFileList(false).stream() .filter( resource -> @@ -278,7 +278,7 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) .collect(Collectors.toList()); - resourceList.addAll(unsequenceTsFileResources); + resourceList.addAll(unSequenceTsFileResources); resourceList.forEach( resource -> { @@ -297,10 +297,10 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) LOGGER.info( "Pipe: start to extract historical TsFile, data region {}, " - + "sequence file count {}, unsequence file count {}, historical extraction time {} ms", + + "sequence file count {}, unSequence file count {}, historical extraction time {} ms", dataRegionId, sequenceTsFileResources.size(), - unsequenceTsFileResources.size(), + unSequenceTsFileResources.size(), System.currentTimeMillis() - startHistoricalExtractionTime); } finally { tsFileManager.readUnlock(); From 3817fa86a3deb33bb13195b5ebb85b99d30dba21 Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Mon, 13 Nov 2023 15:47:13 +0800 Subject: [PATCH 08/10] Update IoTDBPipeClusterIT.java --- .../apache/iotdb/pipe/it/IoTDBPipeClusterIT.java | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java index 5181dee00935e..66d42732cb6d8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java @@ -27,7 +27,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; -import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.it.env.MultiEnvFactory; import org.apache.iotdb.it.env.cluster.env.AbstractEnv; @@ -69,15 +68,9 @@ public void setUp() throws Exception { receiverEnv = MultiEnvFactory.getEnv(1); // Avoid hard coding the nodes num and consensus to enable traversal of configurations - senderEnv - .getConfig() - .getCommonConfig() - .setAutoCreateSchemaEnabled(true); - - receiverEnv - .getConfig() - .getCommonConfig() - .setAutoCreateSchemaEnabled(true); + senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + + receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); senderEnv.initClusterEnvironment(); receiverEnv.initClusterEnvironment(); From dac3b9aa0e3e9355775c2b073c6add6007b4956f Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Mon, 13 Nov 2023 22:00:54 +0800 Subject: [PATCH 09/10] Reverted change of "unsequence" --- .../PipeHistoricalDataRegionTsFileExtractor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java index a184cfd852549..9c9fb01ff2567 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -267,7 +267,7 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) .collect(Collectors.toList()); resourceList.addAll(sequenceTsFileResources); - final Collection unSequenceTsFileResources = + final Collection unsequenceTsFileResources = tsFileManager.getTsFileList(false).stream() .filter( resource -> @@ -278,7 +278,7 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) .collect(Collectors.toList()); - resourceList.addAll(unSequenceTsFileResources); + resourceList.addAll(unsequenceTsFileResources); resourceList.forEach( resource -> { @@ -297,10 +297,10 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) LOGGER.info( "Pipe: start to extract historical TsFile, data region {}, " - + "sequence file count {}, unSequence file count {}, historical extraction time {} ms", + + "sequence file count {}, unsequence file count {}, historical extraction time {} ms", dataRegionId, sequenceTsFileResources.size(), - unSequenceTsFileResources.size(), + unsequenceTsFileResources.size(), System.currentTimeMillis() - startHistoricalExtractionTime); } finally { tsFileManager.readUnlock(); From dbef12b76806fd27760ec8151e506a3cbc5bdec7 Mon Sep 17 00:00:00 2001 From: Caideyipi <1336190078@qq.com> Date: Tue, 14 Nov 2023 15:04:07 +0800 Subject: [PATCH 10/10] Revert IT changes --- .../iotdb/pipe/it/IoTDBPipeClusterIT.java | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java index 66d42732cb6d8..5792ea3598c3d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java @@ -27,6 +27,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.it.env.MultiEnvFactory; import org.apache.iotdb.it.env.cluster.env.AbstractEnv; @@ -67,13 +68,24 @@ public void setUp() throws Exception { senderEnv = MultiEnvFactory.getEnv(0); receiverEnv = MultiEnvFactory.getEnv(1); - // Avoid hard coding the nodes num and consensus to enable traversal of configurations - senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); - - receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); - - senderEnv.initClusterEnvironment(); - receiverEnv.initClusterEnvironment(); + senderEnv + .getConfig() + .getCommonConfig() + .setAutoCreateSchemaEnabled(true) + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); + + receiverEnv + .getConfig() + .getCommonConfig() + .setAutoCreateSchemaEnabled(true) + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); + + senderEnv.initClusterEnvironment(3, 3, 180); + receiverEnv.initClusterEnvironment(3, 3, 180); } @After