diff --git a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java index 331c5e3d997c4..feeabce018931 100644 --- a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java +++ b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java @@ -38,12 +38,12 @@ public class CountPointProcessor implements PipeProcessor { private static final String AGGREGATE_SERIES_KEY = "aggregate-series"; - private static AtomicLong writePointCount = new AtomicLong(0); + private static final AtomicLong writePointCount = new AtomicLong(0); private PartialPath aggregateSeries; @Override - public void validate(PipeParameterValidator validator) throws Exception { + public void validate(PipeParameterValidator validator) { validator.validateRequiredAttribute(AGGREGATE_SERIES_KEY); } @@ -54,12 +54,9 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati } @Override - public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) - throws Exception { + public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) { tabletInsertionEvent.processTablet( - (tablet, rowCollector) -> { - writePointCount.addAndGet(tablet.rowSize); - }); + (tablet, rowCollector) -> writePointCount.addAndGet(tablet.rowSize)); } @Override @@ -79,5 +76,7 @@ public void process(Event event, EventCollector eventCollector) throws Exception } @Override - public void close() throws Exception {} + public void close() { + // Do nothing + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java index c112aeb51b17e..6a8d2fc3d17da 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java @@ -43,9 +43,9 @@ import java.util.HashMap; import java.util.Map; +/** Test pipe's basic functionalities under multiple cluster and consensus protocol settings. */ @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2.class}) -/** Test pipe's basic functionalities under multiple cluster and consensus protocol settings. */ public class IoTDBPipeProtocolIT { private BaseEnv senderEnv; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java index f3b6cd9c69c8c..7e35f16d107cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java @@ -50,6 +50,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils; +import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener.Status; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; @@ -233,7 +234,9 @@ private class DeleteOutdatedFileTask implements Runnable { private int recursionTime = 0; - public DeleteOutdatedFileTask() {} + public DeleteOutdatedFileTask() { + // Do nothing + } private void init() { this.firstValidVersionId = initFirstValidWALVersionId(); @@ -928,7 +931,7 @@ public boolean isAllWALEntriesConsumed() { public void rollWALFile() { WALEntry rollWALFileSignal = new WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true); WALFlushListener walFlushListener = log(rollWALFileSignal); - if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) { + if (walFlushListener.waitForResult() == AbstractResultListener.Status.FAILURE) { logger.error( "Fail to trigger rolling wal node-{}'s wal file log writer.", identifier, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java index 3979c49bffc6c..4ef1aa9ce6bb8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java @@ -47,6 +47,8 @@ public class WALEntryPosition { // cache for wal entry private WALInsertNodeCache cache = null; + private static final String ENTRY_NOT_READY_MESSAGE = "This entry isn't ready for read."; + public WALEntryPosition() {} public WALEntryPosition(String identifier, long walFileVersionId, long position, int size) { @@ -71,7 +73,7 @@ public Pair readByteBufferOrInsertNodeViaCacheDirectly() */ public InsertNode readInsertNodeViaCacheAfterCanRead() throws IOException { if (!canRead()) { - throw new IOException("This entry isn't ready for read."); + throw new IOException(ENTRY_NOT_READY_MESSAGE); } return cache.getInsertNode(this); } @@ -83,7 +85,7 @@ public InsertNode readInsertNodeViaCacheAfterCanRead() throws IOException { */ public ByteBuffer readByteBufferViaCacheAfterCanRead() throws IOException { if (!canRead()) { - throw new IOException("This entry isn't ready for read."); + throw new IOException(ENTRY_NOT_READY_MESSAGE); } return cache.getByteBuffer(this); } @@ -144,7 +146,7 @@ public boolean canRead() { /** Return true only when this wal file is sealed. */ public boolean isInSealedFile() { if (walNode == null || !canRead()) { - throw new RuntimeException("This entry isn't ready for read."); + throw new RuntimeException(ENTRY_NOT_READY_MESSAGE); } return walFileVersionId < walNode.getCurrentWALFileVersion(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java index 9c7ba908d1e3c..8c84a0cb0fe54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/listener/WALFlushListener.java @@ -24,7 +24,7 @@ /** This class helps judge whether wal is flushed to the storage device. */ public class WALFlushListener extends AbstractResultListener { - // handler for pipeline, only exists then value is InsertNode + // handler for pipeline, only exists when value is InsertNode private final WALEntryHandler walEntryHandler; public WALFlushListener(boolean wait, WALEntryValue value) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java index 45f6535734797..48d2f60ff67ee 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java @@ -42,6 +42,7 @@ import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -234,9 +235,7 @@ public void getFlushedValue() throws Exception { handler.pinMemTable(); walNode1.onMemTableFlushed(memTable); // wait until wal flushed - while (!walNode1.isAllWALEntriesConsumed()) { - Thread.sleep(50); - } + Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed()); assertEquals(node1, handler.getInsertNode()); } @@ -269,9 +268,7 @@ public void testConcurrentGetValue() throws Exception { } // wait until wal flushed - while (!walNode1.isAllWALEntriesConsumed() && !walNode2.isAllWALEntriesConsumed()) { - Thread.sleep(50); - } + Awaitility.await().until(walNode::isAllWALEntriesConsumed); walFlushListeners.get(0).getWalEntryHandler().pinMemTable(); walNode.onMemTableFlushed(memTable);